dgl.distributed

DGL 分布式模块包含用于支持在机器集群上进行分布式图神经网络训练和推理的类和函数。

这包括几个子模块

  • 分布式数据结构,包括分布式图、分布式张量和分布式嵌入。

  • 分布式采样。

  • 运行时分布式工作负载划分。

  • 图分区。

初始化

initialize(ip_config[, max_queue_size, ...])

初始化 DGL 的分布式模块

分布式图

class dgl.distributed.DistGraph(graph_name, gpb=None, part_config=None)[source]

用于访问分布式图的类。

此类提供 DGLGraph API 的一个子集,用于在分布式 GNN 训练和推理中访问分区图数据。因此,其主要用例是与分布式采样 API 配合使用,以生成 mini-batch 并在 mini-batch 上执行前向和反向计算。

该类可以在两种模式下运行:独立模式和分布式模式。

  • 当用户正常运行训练脚本时,DistGraph 将处于独立模式。在此模式下,输入数据必须由 partition_graph() 使用单个分区构建。此模式用于测试和调试目的。在此模式下,用户必须提供 part_config,以便 DistGraph 可以加载输入图。

  • 当用户使用分布式启动脚本运行训练脚本时,DistGraph 将设置为分布式模式。这用于实际的分布式训练。所有分区数据由 DistGraph 服务器加载,这些服务器由 DGL 的启动脚本创建。DistGraph 连接到服务器以访问分区图数据。

目前,在分布式模式下,DistGraph 服务器和客户端在同一组机器上运行。DistGraph 使用共享内存访问本地机器中的分区数据。这为分布式训练提供了最佳性能。

用户可能希望在不同的机器组上运行 DistGraph 服务器和客户端。在这种情况下,用户可以在创建 DistGraphServer 时通过传入 disable_shared_mem=False 来禁用共享内存。禁用共享内存时,用户必须传递一个分区簿。

参数:
  • graph_name (str) – 图的名称。此名称必须与在 dgl.distributed.partition.partition_graph() 中用于划分图的名称相同。

  • gpb (GraphPartitionBook, 可选) – 图划分簿对象。通常,用户无需提供图划分簿。仅当用户希望在不同的机器上运行服务器进程和训练器进程时,此参数才必需。

  • part_config (str, 可选) – dgl.distributed.partition.partition_graph() 生成的分区配置文件路径。在独立模式下使用。

示例

此示例展示了在独立模式下创建 DistGraph

>>> dgl.distributed.partition_graph(g, 'graph_name', 1, num_hops=1, part_method='metis',
...                                 out_path='output/')
>>> g = dgl.distributed.DistGraph('graph_name', part_config='output/graph_name.json')

此示例展示了在分布式模式下创建 DistGraph

>>> g = dgl.distributed.DistGraph('graph-name')

以下代码展示了使用 DistGraph 进行 mini-batch 训练。

>>> def sample(seeds):
...     seeds = th.LongTensor(np.asarray(seeds))
...     frontier = dgl.distributed.sample_neighbors(g, seeds, 10)
...     return dgl.to_block(frontier, seeds)
>>> dataloader = dgl.distributed.DistDataLoader(dataset=nodes, batch_size=1000,
...                                             collate_fn=sample, shuffle=True)
>>> for block in dataloader:
...     feat = g.ndata['features'][block.srcdata[dgl.NID]]
...     labels = g.ndata['labels'][block.dstdata[dgl.NID]]
...     pred = model(block, feat)

注意

默认情况下,DGL 的分布式训练在同一组机器上运行服务器进程和训练器进程。如果用户需要在不同的机器组上运行它们,则需要手动设置服务器和训练器。此设置尚未经过全面测试。

barrier()[source]

所有客户端节点的障碍。

此 API 阻塞当前进程,直到所有客户端调用此 API。请谨慎使用此 API。

property device

获取此图的设备上下文。

示例

以下示例使用 PyTorch 后端。

>>> g = dgl.heterograph({
...     ('user', 'plays', 'game'): ([0, 1, 1, 2], [0, 0, 2, 1])
... })
>>> print(g.device)
device(type='cpu')
>>> g = g.to('cuda:0')
>>> print(g.device)
device(type='cuda', index=0)
返回类型:

设备上下文对象

property edata

返回所有边的数据视图。

返回:

分布式图存储中的数据视图。

返回类型:

EdgeDataView

edge_attr_schemes()[source]

返回边特征方案。

每个特征方案是一个命名元组,存储边特征的形状和数据类型。

返回:

边特征列的方案。

返回类型:

dict,键为 str,值为方案

示例

以下使用 PyTorch 后端。

>>> g.edge_attr_schemes()
{'h': Scheme(shape=(4,), dtype=torch.float32)}

另请参阅

node_attr_schemes

property edges

返回边视图

property etypes

返回此图的边类型列表。

返回类型:

list,元素为 str

示例

>>> g = DistGraph("test")
>>> g.etypes
['_E']
find_edges(edges, etype=None)[source]

给定一个边 ID 数组,返回源节点和目标节点 ID 数组 sds[i]d[i] 分别是边 eid[i] 的源节点和目标节点 ID。

参数:
  • edges (Int Tensor) –

    每个元素都是一个 ID。张量必须具有相同的设备类型

    和 ID 数据类型与图一致。

  • etype (str(str, str, str), 可选) –

    边的类型名称。允许的类型名称格式为:

    • (str, str, str) 表示源节点类型、边类型和目标节点类型。

    • 或者一个 str 边类型名称,如果该名称在图中可以唯一标识三元组格式。

    如果图只有一种边类型,则可以省略。

返回:

  • tensor – 源节点 ID 数组。

  • tensor – 目标节点 ID 数组。

get_edge_partition_policy(etype)[source]

获取边类型的划分策略。

创建新的分布式张量时,我们需要提供一个划分策略,该策略指示如何在机器集群中分布分布式张量的数据。当我们在集群中加载分布式图时,我们为每个节点类型和每个边类型预定义了划分策略。通过提供边类型,我们可以引用该边类型的预定义划分策略。

参数:

etype (str(str, str, str)) – 边类型

返回:

该边类型的划分策略。

返回类型:

PartitionPolicy

get_etype_id(etype)[source]

返回给定边类型的 ID。

etype 也可以是 None。如果是这样,图中应该只有一种边类型。

参数:

etype (strtuple,元素为 str) – 边类型

返回类型:

int

get_node_partition_policy(ntype)[source]

获取节点类型的划分策略。

创建新的分布式张量时,我们需要提供一个划分策略,该策略指示如何在机器集群中分布分布式张量的数据。当我们在集群中加载分布式图时,我们为每个节点类型和每个边类型预定义了划分策略。通过提供节点类型,我们可以引用该节点类型的预定义划分策略。

参数:

ntype (str) – 节点类型

返回:

该节点类型的划分策略。

返回类型:

PartitionPolicy

get_ntype_id(ntype)[source]

返回给定节点类型的 ID。

ntype 也可以是 None。如果是这样,图中应该只有一种节点类型。

参数:

ntype (str) – 节点类型

返回类型:

int

get_partition_book()[source]

获取划分信息。

返回:

存储所有图划分信息的对象。

返回类型:

GraphPartitionBook

property idtype

图索引的数据类型

返回:

th.int32/th.int64 或 tf.int32/tf.int64 等。

返回类型:

后端数据类型对象

另请参阅

long, int

in_degrees(v='__ALL__')[source]

返回给定节点的入度。

它计算入度。目前尚不支持异构图。

参数:

v (节点 ID) –

节点 ID。允许的格式为:

  • int: 单个节点。

  • Int Tensor: 每个元素都是一个节点 ID。张量必须具有相同的设备类型和 ID 数据类型与图一致。

  • iterable[int]: 每个元素都是一个节点 ID。

如果未给定,返回所有节点的入度。

返回:

Tensor 中节点(群)的入度。第 i 个元素是第 i 个输入节点的入度。如果 vint,也返回一个 int

返回类型:

int 或 Tensor

示例

以下示例使用 PyTorch 后端。

>>> import dgl
>>> import torch

查询所有节点。

>>> g.in_degrees()
tensor([0, 2, 1, 1])

查询节点 1 和 2。

>>> g.in_degrees(torch.tensor([1, 2]))
tensor([2, 1])

另请参阅

out_degrees

property local_partition

返回客户端上的本地分区

DistGraph 提供分布式图的全局视图。在内部,如果它与服务器位于同一位置,则可能包含图的一个分区。当服务器和客户端在不同的机器组上运行时,此项返回 None。

返回:

本地分区

返回类型:

DGLGraph

property ndata

返回所有节点的数据视图。

返回:

分布式图存储中的数据视图。

返回类型:

NodeDataView

node_attr_schemes()[source]

返回节点特征方案。

每个特征方案是一个命名元组,存储节点特征的形状和数据类型。

返回:

节点特征列的方案。

返回类型:

dict,键为 str,值为方案

示例

以下使用 PyTorch 后端。

>>> g.node_attr_schemes()
{'h': Scheme(shape=(4,), dtype=torch.float32)}

另请参阅

edge_attr_schemes

property nodes

返回节点视图

property ntypes

返回此图的节点类型列表。

返回类型:

list,元素为 str

示例

>>> g = DistGraph("test")
>>> g.ntypes
['_U']
num_edges(etype=None)[source]

返回分布式图中的边总数。

参数:

etype (str(str, str, str), 可选) –

边的类型名称。允许的类型名称格式为:

  • (str, str, str) 表示源节点类型、边类型和目标节点类型。

  • 或者一个 str 边类型名称,如果该名称在图中可以唯一标识三元组格式。

如果未提供,则返回图中所有类型的边总数。

返回:

边的数量

返回类型:

int

示例

>>> g = dgl.distributed.DistGraph('ogb-product')
>>> print(g.num_edges())
123718280
num_nodes(ntype=None)[source]

返回分布式图中的节点总数。

参数:

ntype (str, 可选) – 节点类型名称。如果给定,返回该类型节点的数量。如果未给定(默认),则返回所有类型节点的总数。

返回:

节点的数量

返回类型:

int

示例

>>> g = dgl.distributed.DistGraph('ogb-product')
>>> print(g.num_nodes())
2449029
number_of_edges(etype=None)[source]

num_edges() 的别名

number_of_nodes(ntype=None)[source]

num_nodes() 的别名

out_degrees(u='__ALL__')[source]

返回给定节点的出度。

它计算出度。目前尚不支持异构图。

参数:

u (节点 ID) –

节点 ID。允许的格式为:

  • int: 单个节点。

  • Int Tensor: 每个元素都是一个节点 ID。张量必须具有相同的设备类型和 ID 数据类型与图一致。

  • iterable[int]: 每个元素都是一个节点 ID。

如果未给定,返回所有节点的入度。

返回:

Tensor 中节点(群)的出度。第 i 个元素是第 i 个输入节点的出度。如果 uint,也返回一个 int

返回类型:

int 或 Tensor

示例

以下示例使用 PyTorch 后端。

>>> import dgl
>>> import torch

查询所有节点。

>>> g.out_degrees()
tensor([2, 2, 0, 0])

查询节点 1 和 2。

>>> g.out_degrees(torch.tensor([1, 2]))
tensor([2, 0])

另请参阅

in_degrees

rank()[source]

当前 DistGraph 的秩。

这返回一个唯一数字,用于标识所有客户端进程中的 DistGraph 对象。

返回:

当前 DistGraph 的秩。

返回类型:

int

分布式张量

class dgl.distributed.DistTensor(shape, dtype, name=None, init_func=None, part_policy=None, persistent=False, is_gdata=True, attach=True)[source]

分布式张量。

DistTensor 引用存储在机器集群中并已分片的分布式张量。它具有与 Pytorch Tensor 相同的接口来访问其元数据(例如,形状和数据类型)。要访问分布式张量中的数据,它支持行切片和向行写入数据。它不支持任何深度学习框架的操作符,例如加法和乘法。

目前,分布式张量设计用于存储分布式图的节点数据和边数据。因此,它们的第一个维度必须是图中的节点数或边数。张量根据节点或边的划分策略在第一个维度上进行分片。创建分布式张量时,如果未提供划分策略,则根据第一个维度自动确定划分策略。如果第一个维度与某种节点类型的节点数匹配,则 DistTensor 将使用该特定节点类型的划分策略;如果第一个维度与某种边类型的边数匹配,则 DistTensor 将使用该特定边类型的划分策略。如果 DGL 无法自动确定划分策略(例如,多个节点类型或边类型具有相同的节点数或边数),用户必须明确提供划分策略。

分布式张量可以是有名称的或匿名的。当分布式张量有名称时,如果 persistent=True,则张量可以是持久的。通常,当 DistTensor 对象消失时,DGL 会销毁系统中的分布式张量。然而,即使 DistTenor 对象在训练器进程中消失,持久张量也会在系统中存在。持久张量与 DGL 服务器具有相同的生命周期。DGL 不允许匿名张量成为持久张量。

创建 DistTensor 对象时,它可能引用现有的分布式张量或创建一个新的。分布式张量由传递给构造函数的名称标识。如果名称存在,DistTensor 将引用现有张量。在这种情况下,形状和数据类型必须与现有张量匹配。如果名称不存在,将在 kvstore 中创建一个新张量。

创建分布式张量时,其值初始化为零。用户可以定义一个初始化函数来控制值的初始化方式。初始化函数有两个输入参数:形状和数据类型,并返回一个张量。下面是一个初始化函数的示例:

def init_func(shape, dtype):
    return torch.ones(shape=shape, dtype=dtype)
参数:
  • shape (tuple) – 张量的形状。第一个维度必须是分布式图的节点数或边数。

  • dtype (dtype) – 张量的数据类型。数据类型必须是深度学习框架中的类型。

  • name (string, 可选) – 嵌入的名称。名称可以在系统中唯一标识嵌入,以便另一个 DistTensor 对象可以引用该分布式张量。

  • init_func (可调用, 可选) – 初始化张量中数据的函数。如果未提供初始化函数,嵌入的值将初始化为零。

  • part_policy (PartitionPolicy, 可选) – 张量的行在集群中不同机器上的划分策略。目前,仅支持节点划分策略或边划分策略。系统会自动确定正确的划分策略。

  • persistent (bool) – 创建的张量在 DistTensor 对象被销毁后是否继续存在。

  • is_gdata (bool) – 创建的张量是否为 ndata/edata。

  • attach (bool) – 是否将组 ID 附加到名称中以使其全局唯一。

示例

>>> init = lambda shape, dtype: th.ones(shape, dtype=dtype)
>>> arr = dgl.distributed.DistTensor((g.num_nodes(), 2), th.int32, init_func=init)
>>> print(arr[0:3])
tensor([[1, 1],
        [1, 1],
        [1, 1]], dtype=torch.int32)
>>> arr[0:3] = th.ones((3, 2), dtype=th.int32) * 2
>>> print(arr[0:3])
tensor([[2, 2],
        [2, 2],
        [2, 2]], dtype=torch.int32)

注意

DistTensor 的创建是一个同步操作。当一个训练器进程试图创建 DistTensor 对象时,只有当所有训练器进程都这样做时,创建才会成功。

property dtype

返回分布式张量的数据类型。

返回:

张量的数据类型。

返回类型:

dtype

property name

返回分布式张量的名称

返回:

张量的名称。

返回类型:

str

property part_policy

返回划分策略

返回:

分布式张量的划分策略。

返回类型:

PartitionPolicy

property shape

返回分布式张量的形状。

返回:

分布式张量的形状。

返回类型:

tuple

分布式节点嵌入

class dgl.distributed.DistEmbedding(num_embeddings, embedding_dim, name=None, init_func=None, part_policy=None)[source]

分布式节点嵌入。

DGL 提供分布式嵌入,以支持需要可学习嵌入的模型。DGL 的分布式嵌入主要用于学习图模型的节点嵌入。由于分布式嵌入是模型的一部分,因此通过 mini-batch 进行更新。分布式嵌入必须由 DGL 的优化器更新,而不是深度学习框架(例如 PyTorch 和 MXNet)提供的优化器。

为了支持在具有许多节点的图上进行高效训练,嵌入支持稀疏更新。也就是说,只更新参与 mini-batch 计算的嵌入。有关 DGL 中可用的优化器,请参阅分布式优化器

分布式嵌入以与 dgl.distributed.DistTensor 相同的方式在机器集群中进行分片和存储,但分布式嵌入是可训练的。由于分布式嵌入与分布式图的节点和边以相同的方式进行分片,因此通常比深度学习框架提供的稀疏嵌入更有效率地访问。

参数:
  • num_embeddings (int) – 嵌入的数量。目前,嵌入的数量必须与节点数或边数相同。

  • embedding_dim (int) – 嵌入的维度大小。

  • name (str, 可选) – 嵌入的名称。名称可以在系统中唯一标识嵌入,以便另一个 DistEmbedding 对象可以引用相同的嵌入。

  • init_func (可调用, 可选) – 创建初始数据的函数。如果未提供初始化函数,嵌入的值将初始化为零。

  • part_policy (PartitionPolicy, 可选) – 将嵌入分配给集群中不同机器的划分策略。目前,仅支持节点划分策略或边划分策略。系统会自动确定正确的划分策略。

示例

>>> def initializer(shape, dtype):
        arr = th.zeros(shape, dtype=dtype)
        arr.uniform_(-1, 1)
        return arr
>>> emb = dgl.distributed.DistEmbedding(g.num_nodes(), 10, init_func=initializer)
>>> optimizer = dgl.distributed.optim.SparseAdagrad([emb], lr=0.001)
>>> for blocks in dataloader:
...     feats = emb(nids)
...     loss = F.sum(feats + 1, 0)
...     loss.backward()
...     optimizer.step()

注意

在正向计算中使用 DistEmbedding 对象时,用户之后必须调用 step()。否则,可能会出现内存泄漏。

分布式嵌入优化器

class dgl.distributed.optim.SparseAdagrad(params, lr, eps=1e-10)[source]

使用 Adagrad 算法的分布式节点嵌入优化器。

此优化器实现了 Adagrad 算法的分布式稀疏版本,用于优化 dgl.distributed.DistEmbedding。稀疏意味着它只更新其梯度有更新的嵌入,这通常只占总嵌入的一小部分。

Adagrad 为嵌入中的每个参数维护一个 \(G_{t,i,j}\),其中 \(G_{t,i,j}=G_{t-1,i,j} + g_{t,i,j}^2\)\(g_{t,i,j}\) 是步骤 \(t\) 时嵌入 \(i\) 的维度 \(j\) 的梯度。

注意:稀疏 Adagrad 优化器的支持尚处于实验阶段。

参数:
load(f)

从每个秩上的文件加载优化器的本地状态。

注意:这需要在所有秩上调用。

参数:

f (Union[str, os.PathLike]) – 要从中加载的文件路径。

另请参阅

save

save(f)

将本地 state_dict 保存到每个秩上的磁盘。

保存的字典包含 2 部分

  • ‘params’:优化器的超参数。

  • ‘emb_states’:部分优化器状态,每个嵌入包含 2 个项
    1. `ids`: 存储在此秩中的节点/边的全局 ID。

    2. `states`: 与 `ids` 对应的状态数据。

注意:这需要在所有秩上调用。

参数:

f (Union[str, os.PathLike]) – 要保存到的文件路径。

另请参阅

load

step()

步进函数。

步进函数在每个批次结束时调用,将 mini-batch 中涉及的嵌入的梯度推送到 DGL 的服务器并更新嵌入。

class dgl.distributed.optim.SparseAdam(params, lr, betas=(0.9, 0.999), eps=1e-08)[source]

使用 Adam 算法的分布式节点嵌入优化器。

此优化器实现了 Adam 算法的分布式稀疏版本,用于优化 dgl.distributed.DistEmbedding。稀疏意味着它只更新其梯度有更新的嵌入,这通常只占总嵌入的一小部分。

Adam 为嵌入中的每个参数维护一个 \(Gm_{t,i,j}\)Gp_{t,i,j},其中 \(Gm_{t,i,j}=beta1 * Gm_{t-1,i,j} + (1-beta1) * g_{t,i,j}\)\(Gp_{t,i,j}=beta2 * Gp_{t-1,i,j} + (1-beta2) * g_{t,i,j}^2\)\(g_{t,i,j} = lr * Gm_{t,i,j} / (1 - beta1^t) / \sqrt{Gp_{t,i,j} / (1 - beta2^t)}\)\(g_{t,i,j}\) 是步骤 \(t\) 时嵌入 \(i\) 的维度 \(j\) 的梯度。

注意:稀疏 Adam 优化器的支持尚处于实验阶段。

参数:
  • params (list[dgl.distributed.DistEmbedding]) – dgl.distributed.DistEmbedding 的列表。

  • lr (float) – 学习率。

  • betas (tuple[float, float], 可选) – 用于计算梯度及其平方的移动平均值的系数。默认值:(0.9, 0.999)

  • eps (float, 可选) – 添加到分母中以提高数值稳定性的项 默认值:1e-8

load(f)

从每个秩上的文件加载优化器的本地状态。

注意:这需要在所有秩上调用。

参数:

f (Union[str, os.PathLike]) – 要从中加载的文件路径。

另请参阅

save

save(f)

将本地 state_dict 保存到每个秩上的磁盘。

保存的字典包含 2 部分

  • ‘params’:优化器的超参数。

  • ‘emb_states’:部分优化器状态,每个嵌入包含 2 个项
    1. `ids`: 存储在此秩中的节点/边的全局 ID。

    2. `states`: 与 `ids` 对应的状态数据。

注意:这需要在所有秩上调用。

参数:

f (Union[str, os.PathLike]) – 要保存到的文件路径。

另请参阅

load

step()

步进函数。

步进函数在每个批次结束时调用,将 mini-batch 中涉及的嵌入的梯度推送到 DGL 的服务器并更新嵌入。

分布式工作负载划分

node_split(nodes[, partition_book, ntype, ...])

划分节点并为本地秩返回一个子集。

edge_split(edges[, partition_book, etype, ...])

划分边并为本地秩返回一个子集。

分布式采样

分布式数据加载器

class dgl.distributed.NodeCollator(g, nids, graph_sampler)[source]

DGL 校对器,用于在具有邻域采样的单个图上进行节点分类或回归训练时,在一个 minibatch 内组合节点及其计算依赖关系。

参数:

示例

在同构图上对节点集 train_nid 进行 3 层 GNN 训练以进行节点分类,其中每个节点从所有邻居接收消息(假设后端是 PyTorch)

>>> sampler = dgl.dataloading.MultiLayerNeighborSampler([15, 10, 5])
>>> collator = dgl.dataloading.NodeCollator(g, train_nid, sampler)
>>> dataloader = torch.utils.data.DataLoader(
...     collator.dataset, collate_fn=collator.collate,
...     batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, output_nodes, blocks in dataloader:
...     train_on(input_nodes, output_nodes, blocks)

备注

有关 MFG 的概念,请参阅 用户指南第 6 节Minibatch 训练教程

class dgl.distributed.EdgeCollator(g, eids, graph_sampler, g_sampling=None, exclude=None, reverse_eids=None, reverse_etypes=None, negative_sampler=None)[source]

DGL 校对器,用于在具有邻域采样的单个图上进行边分类、边回归或链接预测训练时,在一个 minibatch 内组合边及其计算依赖关系。

给定一组边,校对函数将返回:

  • 计算边上表示所需的输入节点的张量,或者节点类型名称和此类张量的字典。

  • 一个子图,仅包含 minibatch 中的边及其关联节点。注意,该子图与原始图具有相同的元图。

  • 如果提供了负采样器,则还会返回另一个图,其中包含“负边”,连接从给定负采样器产生的源节点和目标节点。

  • 计算 minibatch 中边的关联节点的表示所需的 MFG 列表。

参数:
  • g (DGLGraph) – 从中以 minibatches 方式迭代边并生成子图的图。

  • eids (Tensordict[etype, Tensor]) – 图 g 中用于计算输出的边集。

  • graph_sampler (dgl.dataloading.BlockSampler) – 邻域采样器。

  • g_sampling (DGLGraph, 可选) –

    执行邻域采样和消息传递的图。

    注意,这不一定与 g 相同。

    如果为 None,则假定与 g 相同。

  • exclude (str, 可选) –

    是否以及如何排除 minibatch 中与采样边相关的依赖关系。可能的值为:

    • None,不排除任何内容。

    • 'self',排除采样边本身,但不排除其他任何内容。

    • 'reverse_id',排除采样边的反向边。所述反向边与采样边具有相同的边类型。仅适用于源节点类型与其目标节点类型相同的边类型。

    • 'reverse_types',排除采样边的反向边。所述反向边与采样边具有不同的边类型。

    如果给定了 g_sampling,则忽略 exclude,且始终为 None

  • reverse_eids (Tensordict[etype, Tensor], 可选) –

    反向边 ID 映射的张量。第 i 个元素指示第 i 条边的反向边的 ID。

    如果图是异构的,此参数需要一个字典,包含边类型和反向边 ID 映射张量。

    exclude 设置为 reverse_id 时必需且仅使用。

    对于异构图,这将是一个边类型和边 ID 的字典。请注意,仅需要源节点类型与目标节点类型相同的边类型。

  • reverse_etypes (dict[etype, etype], 可选) –

    从边类型到其反向边类型的映射。

    exclude 设置为 reverse_types 时必需且仅使用。

  • negative_sampler (可调用, 可选) –

    负采样器。如果不需要负采样,可以省略。

    负采样器必须是一个可调用对象,接受以下参数:

    • 原始(异构)图。

    • minibatch 中采样边的 ID 数组,如果图是异构的,则为边类型和 minibatch 中采样边的 ID 数组的字典。

    它应该返回:

    • 作为负样本的一对源节点和目标节点 ID 数组,如果图是异构的,则为边类型和此类对的字典。

    负采样模块 中提供了一组内置的负采样器。

示例

以下示例展示了如何在同构无向图上对边集 train_eid 进行 3 层 GNN 训练以进行边分类。每个节点从所有邻居接收消息。

假设您有一个源节点 ID 数组 src 和一个目标节点 ID 数组 dst。可以通过添加另一组从 dst 连接到 src 的边来使其成为双向的。

>>> g = dgl.graph((torch.cat([src, dst]), torch.cat([dst, src])))

然后可以知道边与其反向边的 ID 差是 |E|,其中 |E| 是源/目标数组的长度。反向边映射可以通过以下方式获得:

>>> E = len(src)
>>> reverse_eids = torch.cat([torch.arange(E, 2 * E), torch.arange(0, E)])

注意,采样边及其反向边将从关联节点的计算依赖关系中移除。这是避免信息泄露的常用技巧。

>>> sampler = dgl.dataloading.MultiLayerNeighborSampler([15, 10, 5])
>>> collator = dgl.dataloading.EdgeCollator(
...     g, train_eid, sampler, exclude='reverse_id',
...     reverse_eids=reverse_eids)
>>> dataloader = torch.utils.data.DataLoader(
...     collator.dataset, collate_fn=collator.collate,
...     batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, pair_graph, blocks in dataloader:
...     train_on(input_nodes, pair_graph, blocks)

在同构图上对边集 train_eid 进行 3 层 GNN 训练以进行链接预测,其中每个节点从所有邻居接收消息(假设后端是 PyTorch),每个边统一选择 5 个负样本

>>> sampler = dgl.dataloading.MultiLayerNeighborSampler([15, 10, 5])
>>> neg_sampler = dgl.dataloading.negative_sampler.Uniform(5)
>>> collator = dgl.dataloading.EdgeCollator(
...     g, train_eid, sampler, exclude='reverse_id',
...     reverse_eids=reverse_eids, negative_sampler=neg_sampler)
>>> dataloader = torch.utils.data.DataLoader(
...     collator.dataset, collate_fn=collator.collate,
...     batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, pos_pair_graph, neg_pair_graph, blocks in dataloader:
...     train_on(input_nodse, pair_graph, neg_pair_graph, blocks)

对于异构图,边的反向边可能与原始边具有不同的边类型。例如,假设您有一个用户-项目点击数组,由用户数组 user 和项目数组 item 表示。您可能想构建一个具有用户-点击-项目关系和项目-被用户点击关系(即反向关系)的异构图。

>>> g = dgl.heterograph({
...     ('user', 'click', 'item'): (user, item),
...     ('item', 'clicked-by', 'user'): (item, user)})

在边类型为 click 的边集 train_eid 上进行 3 层 GNN 训练以进行边分类,您可以编写:

>>> sampler = dgl.dataloading.MultiLayerNeighborSampler([15, 10, 5])
>>> collator = dgl.dataloading.EdgeCollator(
...     g, {'click': train_eid}, sampler, exclude='reverse_types',
...     reverse_etypes={'click': 'clicked-by', 'clicked-by': 'click'})
>>> dataloader = torch.utils.data.DataLoader(
...     collator.dataset, collate_fn=collator.collate,
...     batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, pair_graph, blocks in dataloader:
...     train_on(input_nodes, pair_graph, blocks)

在边类型为 click 的边集 train_eid 上进行 3 层 GNN 训练以进行链接预测,您可以编写:

>>> sampler = dgl.dataloading.MultiLayerNeighborSampler([15, 10, 5])
>>> neg_sampler = dgl.dataloading.negative_sampler.Uniform(5)
>>> collator = dgl.dataloading.EdgeCollator(
...     g, train_eid, sampler, exclude='reverse_types',
...     reverse_etypes={'click': 'clicked-by', 'clicked-by': 'click'},
...     negative_sampler=neg_sampler)
>>> dataloader = torch.utils.data.DataLoader(
...     collator.dataset, collate_fn=collator.collate,
...     batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, pos_pair_graph, neg_pair_graph, blocks in dataloader:
...     train_on(input_nodes, pair_graph, neg_pair_graph, blocks)

备注

有关 MFG 的概念,请参阅 用户指南第 6 节Minibatch 训练教程

class dgl.distributed.DistDataLoader(dataset, batch_size, shuffle=False, collate_fn=None, drop_last=False, queue_size=None)[source]

DGL 自定义多进程数据加载器。

DistDataLoader 提供了与 PyTorch 的 DataLoader 相似的接口,用于使用多进程生成 mini-batches。它利用 dgl.distributed.initialize() 创建的工作进程来并行化采样。

参数:
  • dataset (一个张量) – 节点 ID 或边 ID 的张量。

  • batch_size (int) – 每批次要加载的样本数量。

  • shuffle (bool, 可选) – 设置为 True 可在每个 epoch 对数据进行重新洗牌(默认值:False)。

  • collate_fn (可调用, 可选) – 此函数通常用于采样批次中节点的邻居或批次中边的端点节点。

  • drop_last (bool, 可选) – 如果数据集大小不能被批次大小整除,设置为 True 则丢弃最后一个不完整的批次。如果设置为 False 且数据集大小不能被批次大小整除,则最后一个批次将较小。(默认值:False

  • queue_size (int, 可选) – 多进程队列的大小

示例

>>> g = dgl.distributed.DistGraph('graph-name')
>>> def sample(seeds):
...     seeds = th.LongTensor(np.asarray(seeds))
...     frontier = dgl.distributed.sample_neighbors(g, seeds, 10)
...     return dgl.to_block(frontier, seeds)
>>> dataloader = dgl.distributed.DistDataLoader(dataset=nodes, batch_size=1000,
                                                collate_fn=sample, shuffle=True)
>>> for block in dataloader:
...     feat = g.ndata['features'][block.srcdata[dgl.NID]]
...     labels = g.ndata['labels'][block.dstdata[dgl.NID]]
...     pred = model(block, feat)

注意

在使用 DGL 的多进程分布式采样时,用户必须使用此类而不是 PyTorch 的 DataLoader,因为 DGL 的 RPC 要求所有进程在调用任何 DGL 的分布式 API 之前与服务器建立连接。因此,此数据加载器使用在 dgl.distributed.initialize() 中创建的工作进程。

注意

此数据加载器不保证迭代顺序。例如,如果 dataset = [1, 2, 3, 4],batch_size = 2 且 shuffle = False,则 [1, 2] 和 [3, 4] 的顺序不保证。

class dgl.distributed.DistNodeDataLoader(g, nids, graph_sampler, device=None, **kwargs)[source]

分布式图存储上基于节点的采样图数据加载器。

它包装了对节点集的迭代器,在分布式图上生成消息流图 (MFG) 列表,作为所述 minibatch 的计算依赖关系。

所有参数的含义与单机对应的 dgl.dataloading.DataLoader 相同,除了第一个参数 g 必须是 dgl.distributed.DistGraph

参数:
class dgl.distributed.DistEdgeDataLoader(g, eids, graph_sampler, device=None, **kwargs)[source]

用于分布式图存储的基于边的采样图数据加载器。

它包装了一个对一组边进行迭代的对象,生成消息流图(MFG)列表,作为分布式图上边分类、边回归和链接预测所述小批次的计算依赖。

所有参数的含义与单机对应的 dgl.dataloading.DataLoader 相同,除了第一个参数 g 必须是 dgl.distributed.DistGraph

参数:

分布式图采样操作符

sample_neighbors(g, nodes, fanout[, ...])

从分布式图中给定节点的邻居进行采样。

sample_etype_neighbors(g, nodes, fanout[, ...])

从分布式图中给定节点的邻居进行采样。

find_edges(g, edge_ids)

给定边 ID 数组,从分布式图中返回源节点和目标节点 ID 数组 sd

in_subgraph(g, nodes)

返回由给定节点的入边诱导的子图。

分区

图分区簿

class dgl.distributed.GraphPartitionBook[source]

图分区簿的基类。

对于分布式训练,图被划分为多个部分并加载到多台机器中。分区簿包含定位集群中节点和边的所有必要信息。

分区簿包含各种分区信息,包括

  • 分区的数量,

  • 节点或边所属的分区 ID,

  • 一个分区拥有的节点 ID 和边 ID。

  • 分区中节点和边的本地 ID。

目前,只有一个实现 GraphPartitionBook 的类:RangePartitionBook。由于节点/边已被重新标记,使得同一分区的 ID 落在一个连续的 ID 范围内,因此它基于一些小型元数据计算节点/边 ID 和分区 ID 之间的映射。

图被分区时会自动构造图分区簿。加载图分区时,也会加载图分区簿。更多详情请参阅 partition_graph()load_partition()load_partition_book()

property canonical_etypes

获取规范边类型列表

返回:

规范边类型列表

返回类型:

list[(str, str, str)]

eid2localeid(eids, partid, etype)[source]

获取给定分区内的本地边 ID。

参数:
  • eids (张量) – 全局边 ID

  • partid (int) – 分区 ID

  • etype (str(str, str, str)) – 边类型

返回:

本地边 ID

返回类型:

张量

eid2partid(eids, etype)[source]

从全局边 ID 到分区 ID

参数:
  • eids (张量) – 全局边 ID

  • etype (str(str, str, str)) – 边类型

返回:

分区 ID

返回类型:

张量

map_to_homo_eid(ids, etype)[source]

将按类型划分的边 ID 和类型 ID 映射到齐次边 ID。

参数:
  • ids (张量) – 按类型划分的边 ID

  • etype (str(str, str, str)) – 边类型

返回:

齐次边 ID。

返回类型:

张量

map_to_homo_nid(ids, ntype)[source]

将按类型划分的节点 ID 和类型 ID 映射到齐次节点 ID。

参数:
  • ids (张量) – 按类型划分的节点 ID

  • ntype (str) – 节点类型

返回:

齐次节点 ID。

返回类型:

张量

map_to_per_etype(ids)[source]

将齐次边 ID 映射到按类型划分的 ID 和边类型。

参数:

ids (张量) – 齐次边 ID。

返回:

边类型 ID 和按类型划分的边 ID。

返回类型:

(张量, 张量)

map_to_per_ntype(ids)[source]

将齐次节点 ID 映射到按类型划分的 ID 和节点类型。

参数:

ids (张量) – 齐次节点 ID。

返回:

节点类型 ID 和按类型划分的节点 ID。

返回类型:

(张量, 张量)

metadata()[source]

返回分区元数据。

元数据包括

  • 机器 ID。

  • 每个分区的节点数和边数。

示例

>>> print(g.get_partition_book().metadata())
>>> [{'machine_id' : 0, 'num_nodes' : 3000, 'num_edges' : 5000},
...  {'machine_id' : 1, 'num_nodes' : 2000, 'num_edges' : 4888},
...  ...]
返回:

每个分区的元数据。

返回类型:

list[dict[str, any]]

nid2localnid(nids, partid, ntype)[source]

获取给定分区内的本地节点 ID。

参数:
  • nids (张量) – 全局节点 ID

  • partid (int) – 分区 ID

  • ntype (str) – 节点类型

返回:

本地节点 ID

返回类型:

张量

nid2partid(nids, ntype)[source]

从全局节点 ID 到分区 ID

参数:
  • nids (张量) – 全局节点 ID

  • ntype (str) – 节点类型

返回:

分区 ID

返回类型:

张量

num_partitions()[source]

返回分区的数量。

返回:

分区数量

返回类型:

int

property partid

获取当前分区 ID

返回:

当前机器的分区 ID

返回类型:

int

partid2eids(partid, etype)[source]

从分区 ID 到全局边 ID

参数:
返回:

边 ID

返回类型:

张量

partid2nids(partid, ntype)[source]

从分区 ID 到全局节点 ID

参数:
  • partid (int) – 分区 ID

  • ntype (str) – 节点类型

返回:

节点 ID

返回类型:

张量

shared_memory(graph_name)[source]

将分区簿移动到共享内存。

参数:

graph_name (str) – 图的名称。此名称将用于在另一个进程中从共享内存读取分区簿。

class dgl.distributed.PartitionPolicy(policy_str, partition_book)[source]

这定义了分布式张量或分布式嵌入的分区策略。

当 DGL 将张量分片并存储在机器集群中时,需要分区策略将张量的行映射到集群中的机器。

虽然可以定义任意分区策略,但 DGL 目前支持两种用于将节点和边映射到机器的分区策略。要从图分区簿定义分区策略,用户需要指定策略名称('node' 或 'edge')。

参数:
  • policy_str (str) – 分区策略名称,例如 ‘edge~_N:_E:_N’ 或 ‘node~_N’。

  • partition_book (GraphPartitionBook) – 图分区簿

get_part_size()[source]

获取当前分区的数据大小。

返回:

数据大小

返回类型:

int

get_size()[source]

获取数据的完整大小。

返回:

数据大小

返回类型:

int

property part_id

获取分区 ID

返回:

分区 ID

返回类型:

int

property partition_book

获取分区簿

返回:

图分区簿

返回类型:

GraphPartitionBook

property policy_str

获取策略名称

返回:

分区策略的名称。

返回类型:

str

to_local(id_tensor)[source]

将全局 ID 映射到本地 ID。

参数:

id_tensor (张量) – 全局 ID 张量

返回:

本地 ID 张量

返回类型:

张量

to_partid(id_tensor)[source]

将全局 ID 映射到分区 ID。

参数:

id_tensor (张量) – 全局 ID 张量

返回:

分区 ID

返回类型:

张量

划分和加载分区

load_partition(part_config, part_id[, ...])

从数据路径加载分区数据。

load_partition_feats(part_config, part_id[, ...])

从分区加载节点/边特征数据。

load_partition_book(part_config, part_id[, ...])

从分区配置文件加载图分区簿。

partition_graph(g, graph_name, num_parts, ...)

对图进行分区以进行分布式训练,并将分区存储到文件中。

dgl_partition_to_graphbolt(part_config, *[, ...])

将 DGL 分区转换为 GraphBolt 的 FusedCSCSamplingGraph。