分布式项目采样器

class dgl.graphbolt.DistributedItemSampler(item_set: ~dgl.graphbolt.itemset.ItemSet | ~dgl.graphbolt.itemset.HeteroItemSet, batch_size: int, minibatcher: ~typing.Callable | None = <function minibatcher_default>, drop_last: bool | None = False, shuffle: bool | None = False, drop_uneven_inputs: bool | None = False, seed: int | None = None)[source]

继承自: ItemSampler

用于迭代输入项目并分布式创建子集的采样器。

此采样器从给定数据集中创建项目的分布式子集,可用于使用 PyTorch 的分布式数据并行 (DDP) 进行训练。项目可以是节点 ID、带或不带标签的节点对、带负源/目标的节点对、DGLGraph 或异构对应项。原始项目集被分割,以便每个副本(进程)接收一个专属子集。

注意:项目将首先分配到每个副本,然后进行洗牌(如果需要)并分批。因此,每个副本总是会获得一组相同的项目。

注意:此 DistributedItemSampler 类故意未用 torch.utils.data.functional_datapipe 修饰。这表示它不支持函数式调用。但可以附加来自 torch.utils.data.datapipes 的任何可迭代数据管道。

参数:
  • item_set (Union[ItemSet, HeteroItemSet]) – 待采样的数据。

  • batch_size (int) – 每个批次的大小。

  • minibatcher (Optional[Callable]) – 一个可调用对象,接受项目列表并返回一个 MiniBatch

  • drop_last (bool) – 如果最后一个批次未满,则丢弃该批次。

  • shuffle (bool) – 在采样前进行洗牌。

  • num_replicas (int) – 在分布式数据并行 (DDP) 训练期间创建的模型副本数量。它应与实际的 world size 相同,否则可能导致错误。默认情况下,它从当前的分布式组中获取。

  • drop_uneven_inputs (bool) – 确保每个副本的批次数量相同的选项。如果某些副本的批次数量多于其他副本,则会丢弃这些副本的多余批次。如果 drop_last 参数也设置为 True,则在丢弃多余批次之前会先丢弃最后一个批次。注意:使用分布式数据并行 (DDP) 训练时,如果某个副本的输入较少,程序可能会挂起或出错。建议使用 PyTorch 提供的 Join Context Manager 来解决此问题。请参阅 https://pytorch.ac.cn/tutorials/advanced/generic_join.html。但是,如果出于任何原因 Join Context Manager 无效,则可以使用此选项。

  • seed (int) – 用于可重现随机洗牌的种子。如果为 None,将生成一个随机种子。

示例

0. 准备:DistributedItemSampler 需要多进程环境才能工作。在执行以下示例之前,需要生成子进程并初始化进程组。由于随机性,输出不总是与下面列出的相同。

>>> import torch
>>> from dgl import graphbolt as gb
>>> item_set = gb.ItemSet(torch.arange(15))
>>> num_replicas = 4
>>> batch_size = 2
>>> mp.spawn(...)
  1. shuffle = False, drop_last = False, drop_uneven_inputs = False.

>>> item_sampler = gb.DistributedItemSampler(
>>>     item_set, batch_size=2, shuffle=False, drop_last=False,
>>>     drop_uneven_inputs=False
>>> )
>>> data_loader = gb.DataLoader(item_sampler)
>>> print(f"Replica#{proc_id}: {list(data_loader)})
Replica#0: [tensor([0, 1]), tensor([2, 3])]
Replica#1: [tensor([4, 5]), tensor([6, 7])]
Replica#2: [tensor([8, 9]), tensor([10, 11])]
Replica#3: [tensor([12, 13]), tensor([14])]
  1. shuffle = False, drop_last = True, drop_uneven_inputs = False.

>>> item_sampler = gb.DistributedItemSampler(
>>>     item_set, batch_size=2, shuffle=False, drop_last=True,
>>>     drop_uneven_inputs=False
>>> )
>>> data_loader = gb.DataLoader(item_sampler)
>>> print(f"Replica#{proc_id}: {list(data_loader)})
Replica#0: [tensor([0, 1]), tensor([2, 3])]
Replica#1: [tensor([4, 5]), tensor([6, 7])]
Replica#2: [tensor([8, 9]), tensor([10, 11])]
Replica#3: [tensor([12, 13])]
  1. shuffle = False, drop_last = False, drop_uneven_inputs = True.

>>> item_sampler = gb.DistributedItemSampler(
>>>     item_set, batch_size=2, shuffle=False, drop_last=False,
>>>     drop_uneven_inputs=True
>>> )
>>> data_loader = gb.DataLoader(item_sampler)
>>> print(f"Replica#{proc_id}: {list(data_loader)})
Replica#0: [tensor([0, 1]), tensor([2, 3])]
Replica#1: [tensor([4, 5]), tensor([6, 7])]
Replica#2: [tensor([8, 9]), tensor([10, 11])]
Replica#3: [tensor([12, 13]), tensor([14])]
  1. shuffle = False, drop_last = True, drop_uneven_inputs = True.

>>> item_sampler = gb.DistributedItemSampler(
>>>     item_set, batch_size=2, shuffle=False, drop_last=True,
>>>     drop_uneven_inputs=True
>>> )
>>> data_loader = gb.DataLoader(item_sampler)
>>> print(f"Replica#{proc_id}: {list(data_loader)})
Replica#0: [tensor([0, 1])]
Replica#1: [tensor([4, 5])]
Replica#2: [tensor([8, 9])]
Replica#3: [tensor([12, 13])]
  1. shuffle = True, drop_last = True, drop_uneven_inputs = False.

>>> item_sampler = gb.DistributedItemSampler(
>>>     item_set, batch_size=2, shuffle=True, drop_last=True,
>>>     drop_uneven_inputs=False
>>> )
>>> data_loader = gb.DataLoader(item_sampler)
>>> print(f"Replica#{proc_id}: {list(data_loader)})
(One possible output:)
Replica#0: [tensor([3, 2]), tensor([0, 1])]
Replica#1: [tensor([6, 5]), tensor([7, 4])]
Replica#2: [tensor([8, 10])]
Replica#3: [tensor([14, 12])]
  1. shuffle = True, drop_last = True, drop_uneven_inputs = True.

>>> item_sampler = gb.DistributedItemSampler(
>>>     item_set, batch_size=2, shuffle=True, drop_last=True,
>>>     drop_uneven_inputs=True
>>> )
>>> data_loader = gb.DataLoader(item_sampler)
>>> print(f"Replica#{proc_id}: {list(data_loader)})
(One possible output:)
Replica#0: [tensor([1, 3])]
Replica#1: [tensor([7, 5])]
Replica#2: [tensor([11, 9])]
Replica#3: [tensor([13, 14])]