分布式项目采样器
- class dgl.graphbolt.DistributedItemSampler(item_set: ~dgl.graphbolt.itemset.ItemSet | ~dgl.graphbolt.itemset.HeteroItemSet, batch_size: int, minibatcher: ~typing.Callable | None = <function minibatcher_default>, drop_last: bool | None = False, shuffle: bool | None = False, drop_uneven_inputs: bool | None = False, seed: int | None = None)[source]
继承自:
ItemSampler
用于迭代输入项目并分布式创建子集的采样器。
此采样器从给定数据集中创建项目的分布式子集,可用于使用 PyTorch 的分布式数据并行 (DDP) 进行训练。项目可以是节点 ID、带或不带标签的节点对、带负源/目标的节点对、DGLGraph 或异构对应项。原始项目集被分割,以便每个副本(进程)接收一个专属子集。
注意:项目将首先分配到每个副本,然后进行洗牌(如果需要)并分批。因此,每个副本总是会获得一组相同的项目。
注意:此 DistributedItemSampler 类故意未用 torch.utils.data.functional_datapipe 修饰。这表示它不支持函数式调用。但可以附加来自 torch.utils.data.datapipes 的任何可迭代数据管道。
- 参数:
item_set (Union[ItemSet, HeteroItemSet]) – 待采样的数据。
batch_size (int) – 每个批次的大小。
minibatcher (Optional[Callable]) – 一个可调用对象,接受项目列表并返回一个 MiniBatch。
drop_last (bool) – 如果最后一个批次未满,则丢弃该批次。
shuffle (bool) – 在采样前进行洗牌。
num_replicas (int) – 在分布式数据并行 (DDP) 训练期间创建的模型副本数量。它应与实际的 world size 相同,否则可能导致错误。默认情况下,它从当前的分布式组中获取。
drop_uneven_inputs (bool) – 确保每个副本的批次数量相同的选项。如果某些副本的批次数量多于其他副本,则会丢弃这些副本的多余批次。如果 drop_last 参数也设置为 True,则在丢弃多余批次之前会先丢弃最后一个批次。注意:使用分布式数据并行 (DDP) 训练时,如果某个副本的输入较少,程序可能会挂起或出错。建议使用 PyTorch 提供的 Join Context Manager 来解决此问题。请参阅 https://pytorch.ac.cn/tutorials/advanced/generic_join.html。但是,如果出于任何原因 Join Context Manager 无效,则可以使用此选项。
seed (int) – 用于可重现随机洗牌的种子。如果为 None,将生成一个随机种子。
示例
0. 准备:DistributedItemSampler 需要多进程环境才能工作。在执行以下示例之前,需要生成子进程并初始化进程组。由于随机性,输出不总是与下面列出的相同。
>>> import torch >>> from dgl import graphbolt as gb >>> item_set = gb.ItemSet(torch.arange(15)) >>> num_replicas = 4 >>> batch_size = 2 >>> mp.spawn(...)
shuffle = False, drop_last = False, drop_uneven_inputs = False.
>>> item_sampler = gb.DistributedItemSampler( >>> item_set, batch_size=2, shuffle=False, drop_last=False, >>> drop_uneven_inputs=False >>> ) >>> data_loader = gb.DataLoader(item_sampler) >>> print(f"Replica#{proc_id}: {list(data_loader)}) Replica#0: [tensor([0, 1]), tensor([2, 3])] Replica#1: [tensor([4, 5]), tensor([6, 7])] Replica#2: [tensor([8, 9]), tensor([10, 11])] Replica#3: [tensor([12, 13]), tensor([14])]
shuffle = False, drop_last = True, drop_uneven_inputs = False.
>>> item_sampler = gb.DistributedItemSampler( >>> item_set, batch_size=2, shuffle=False, drop_last=True, >>> drop_uneven_inputs=False >>> ) >>> data_loader = gb.DataLoader(item_sampler) >>> print(f"Replica#{proc_id}: {list(data_loader)}) Replica#0: [tensor([0, 1]), tensor([2, 3])] Replica#1: [tensor([4, 5]), tensor([6, 7])] Replica#2: [tensor([8, 9]), tensor([10, 11])] Replica#3: [tensor([12, 13])]
shuffle = False, drop_last = False, drop_uneven_inputs = True.
>>> item_sampler = gb.DistributedItemSampler( >>> item_set, batch_size=2, shuffle=False, drop_last=False, >>> drop_uneven_inputs=True >>> ) >>> data_loader = gb.DataLoader(item_sampler) >>> print(f"Replica#{proc_id}: {list(data_loader)}) Replica#0: [tensor([0, 1]), tensor([2, 3])] Replica#1: [tensor([4, 5]), tensor([6, 7])] Replica#2: [tensor([8, 9]), tensor([10, 11])] Replica#3: [tensor([12, 13]), tensor([14])]
shuffle = False, drop_last = True, drop_uneven_inputs = True.
>>> item_sampler = gb.DistributedItemSampler( >>> item_set, batch_size=2, shuffle=False, drop_last=True, >>> drop_uneven_inputs=True >>> ) >>> data_loader = gb.DataLoader(item_sampler) >>> print(f"Replica#{proc_id}: {list(data_loader)}) Replica#0: [tensor([0, 1])] Replica#1: [tensor([4, 5])] Replica#2: [tensor([8, 9])] Replica#3: [tensor([12, 13])]
shuffle = True, drop_last = True, drop_uneven_inputs = False.
>>> item_sampler = gb.DistributedItemSampler( >>> item_set, batch_size=2, shuffle=True, drop_last=True, >>> drop_uneven_inputs=False >>> ) >>> data_loader = gb.DataLoader(item_sampler) >>> print(f"Replica#{proc_id}: {list(data_loader)}) (One possible output:) Replica#0: [tensor([3, 2]), tensor([0, 1])] Replica#1: [tensor([6, 5]), tensor([7, 4])] Replica#2: [tensor([8, 10])] Replica#3: [tensor([14, 12])]
shuffle = True, drop_last = True, drop_uneven_inputs = True.
>>> item_sampler = gb.DistributedItemSampler( >>> item_set, batch_size=2, shuffle=True, drop_last=True, >>> drop_uneven_inputs=True >>> ) >>> data_loader = gb.DataLoader(item_sampler) >>> print(f"Replica#{proc_id}: {list(data_loader)}) (One possible output:) Replica#0: [tensor([1, 3])] Replica#1: [tensor([7, 5])] Replica#2: [tensor([11, 9])] Replica#3: [tensor([13, 14])]