7.3 编程 API

(中文版)

本节介绍训练脚本中常用的核心 Python 组件。DGL 提供了三种分布式数据结构以及用于初始化、分布式采样和工作负载分割的各种 API。

  • 使用 DistGraph 访问分布式存储图的结构和特征。

  • 使用 DistTensor 访问跨机器分区的节点/边特征张量。

  • 使用 DistEmbedding 访问跨机器分区的可学习节点/边嵌入张量。

DGL 分布式模块的初始化

dgl.distributed.initialize() 初始化分布式模块。如果由训练器调用,此 API 会创建采样进程并与图服务器建立连接;如果由图服务器调用,此 API 会启动一个服务循环来监听训练器/采样器请求。此 API 必须torch.distributed.init_process_group() 和任何其他 dgl.distributed API 之前调用,顺序如下所示

dgl.distributed.initialize('ip_config.txt')
th.distributed.init_process_group(backend='gloo')

注意

如果训练脚本包含必须在服务器上调用的用户自定义函数 (UDF)(更多详细信息请参见 DistTensor 和 DistEmbedding 部分),则这些 UDF 必须在 initialize() 之前声明。

分布式图

DistGraph 是一个 Python 类,用于在机器集群中访问图结构和节点/边特征。每台机器负责且只负责一个分区。它加载分区数据(分区中的图结构、节点数据和边数据),并使其可供集群中的所有训练器访问。DistGraph 提供 DGLGraph API 的一小部分子集用于数据访问。

分布式模式与独立模式

DistGraph 可以运行在两种模式下:分布式模式独立模式。当用户在 Python 命令行或 Jupyter Notebook 中执行训练脚本时,它运行在独立模式下。也就是说,它在单个进程中运行所有计算,并且不与其他任何进程通信。因此,独立模式要求输入图只有一个分区。此模式主要用于开发和测试(例如,在 Jupyter Notebook 中开发和运行代码)。当用户使用启动脚本执行训练脚本时(参见启动脚本部分),DistGraph 运行在分布式模式下。启动工具在后台启动服务器(节点/边特征访问和图采样),并自动在每台机器上加载分区数据。DistGraph 连接到机器集群中的服务器,并通过网络访问它们。

DistGraph 创建

在分布式模式下,创建 DistGraph 需要图划分时指定的图名称。图名称用于标识集群中加载的图。

import dgl
g = dgl.distributed.DistGraph('graph_name')

在独立模式下运行时,它会在本地机器中加载图数据。因此,用户需要提供分区配置文件,该文件包含有关输入图的所有信息。

import dgl
g = dgl.distributed.DistGraph('graph_name', part_config='data/graph_name.json')

注意

DGL 只允许一个 DistGraph 对象。销毁 DistGraph 并创建一个新对象的行为是未定义的。

访问图结构

DistGraph 提供一组 API 来访问图结构。目前,大多数 API 提供图信息,例如节点数和边数。DistGraph 的主要用例是运行采样 API 以支持 mini-batch 训练(参见分布式采样)。

print(g.num_nodes())

访问节点/边数据

DGLGraph 类似,DistGraph 提供 ndataedata 来访问节点和边中的数据。不同之处在于,DistGraph 中的 ndata/edata 返回 DistTensor,而不是底层框架的张量。用户还可以将新的 DistTensor 赋值给 DistGraph 作为节点数据或边数据。

g.ndata['train_mask']  # <dgl.distributed.dist_graph.DistTensor at 0x7fec820937b8>
g.ndata['train_mask'][0]  # tensor([1], dtype=torch.uint8)

分布式张量

如前所述,DGL 将节点/边特征分片并将其存储在机器集群中。DGL 提供分布式张量,具有类似张量的接口,用于访问集群中分区的节点/边特征。在分布式设置中,DGL 只支持密集(dense)节点/边特征。

DistTensor 管理在多台机器中分区和存储的密集张量。目前,分布式张量必须与图的节点或边相关联。换句话说,DistTensor 中的行数必须与图中的节点数或边数相同。以下代码创建了一个分布式张量。除了张量的形状和 dtype 之外,用户还可以提供一个唯一的张量名称。如果用户想引用一个持久的分布式张量(即使 DistTensor 对象消失,该张量仍存在于集群中),此名称非常有用。

tensor = dgl.distributed.DistTensor((g.num_nodes(), 10), th.float32, name='test')

注意

DistTensor 创建是一个同步操作。所有训练器都必须调用创建操作,并且只有当所有训练器都调用时创建才会成功。

用户可以将一个 DistTensor 添加到一个 DistGraph 对象中,作为节点数据或边数据之一。

g.ndata['feat'] = tensor

注意

节点数据名称和张量名称不必相同。前者(在训练器进程中)用于标识来自 DistGraph 的节点数据,而后者用于标识 DGL 服务器中的分布式张量。

DistTensor 具有与常规张量相同的 API 来访问其元数据,例如形状和 dtype。它还支持索引读写,但不支持计算操作符,例如求和和求平均。

data = g.ndata['feat'][[1, 2, 3]]
print(data)
g.ndata['feat'][[3, 4, 5]] = data

注意

目前,当一台机器运行多个服务器时,DGL 不提供多训练器并发写入的保护。这可能导致数据损坏。避免对同一行数据进行并发写入的一种方法是在一台机器上只运行一个服务器进程。

分布式 DistEmbedding

DGL 提供 DistEmbedding 来支持需要节点嵌入的直推式模型。创建分布式嵌入与创建分布式张量非常相似。

def initializer(shape, dtype):
    arr = th.zeros(shape, dtype=dtype)
    arr.uniform_(-1, 1)
    return arr
emb = dgl.distributed.DistEmbedding(g.num_nodes(), 10, init_func=initializer)

在内部,分布式嵌入构建在分布式张量之上,因此与分布式张量有非常相似的行为。例如,创建嵌入时,它们被分片并存储在集群中的所有机器上。它可以通过名称唯一标识。

注意

初始化器函数在服务器进程中调用。因此,它必须在 dgl.distributed.initialize 之前声明。

由于嵌入是模型的一部分,用户必须将它们附加到优化器上进行 mini-batch 训练。目前,DGL 提供了一个稀疏 Adagrad 优化器 SparseAdagrad(DGL 稍后会为稀疏嵌入添加更多优化器)。用户需要从模型中收集所有分布式嵌入,并将它们传递给稀疏优化器。如果模型既有节点嵌入又有常规的密集模型参数,并且用户想要对嵌入执行稀疏更新,他们需要创建两个优化器,一个用于节点嵌入,另一个用于密集模型参数,如下面的代码所示

sparse_optimizer = dgl.distributed.SparseAdagrad([emb], lr=lr1)
optimizer = th.optim.Adam(model.parameters(), lr=lr2)
feats = emb(nids)
loss = model(feats)
loss.backward()
optimizer.step()
sparse_optimizer.step()

注意

DistEmbedding 不继承 torch.nn.Module,因此我们建议在您自己的神经网络模块之外使用它。

分布式采样

DGL 提供了两级 API 用于采样节点和边以生成 mini-batch(参见 mini-batch 训练部分)。低级 API 要求用户编写代码明确定义如何采样一层节点(例如,使用 dgl.sampling.sample_neighbors())。高级采样 API 实现了几种流行的节点分类和链接预测任务的采样算法(例如,NodeDataLoaderEdgeDataLoader)。

分布式采样模块遵循相同的设计,并提供了两级采样 API。对于低级采样 API,它提供了 sample_neighbors() 用于在 DistGraph 上进行分布式邻居采样。此外,DGL 提供了一个分布式 DataLoader (DistDataLoader ) 用于分布式采样。分布式 DataLoader 的接口与 PyTorch DataLoader 相同,只是用户在创建 dataloader 时不能指定工作进程的数量。工作进程在 dgl.distributed.initialize() 中创建。

注意

当在 DistGraph 上运行 dgl.distributed.sample_neighbors() 时,采样器不能在具有多个工作进程的 PyTorch DataLoader 中运行。主要原因是 PyTorch DataLoader 在每个 epoch 中都会创建新的采样工作进程,这会导致多次创建和销毁 DistGraph 对象。

使用低级 API 时,采样代码与单进程采样类似。唯一的区别是用户需要使用 dgl.distributed.sample_neighbors()DistDataLoader

def sample_blocks(seeds):
    seeds = th.LongTensor(np.asarray(seeds))
    blocks = []
    for fanout in [10, 25]:
        frontier = dgl.distributed.sample_neighbors(g, seeds, fanout, replace=True)
        block = dgl.to_block(frontier, seeds)
        seeds = block.srcdata[dgl.NID]
        blocks.insert(0, block)
        return blocks
    dataloader = dgl.distributed.DistDataLoader(dataset=train_nid,
                                                batch_size=batch_size,
                                                collate_fn=sample_blocks,
                                                shuffle=True)
    for batch in dataloader:
        ...

高级采样 API(NodeDataLoaderEdgeDataLoader)有对应的分布式版本(DistNodeDataLoaderDistEdgeDataLoader)。除此之外,代码与单进程采样完全相同。

sampler = dgl.sampling.MultiLayerNeighborSampler([10, 25])
dataloader = dgl.distributed.DistNodeDataLoader(g, train_nid, sampler,
                                             batch_size=batch_size, shuffle=True)
for batch in dataloader:
    ...

分割工作负载

为了训练模型,用户首先需要将数据集分割成训练集、验证集和测试集。对于分布式训练,这一步通常在我们调用 dgl.distributed.partition_graph() 划分图之前完成。我们建议将数据分割存储为布尔数组,作为节点数据或边数据。对于节点分类任务,这些布尔数组的长度与图中的节点数相同,它们的每个元素表示一个节点是否存在于训练集/验证集/测试集中。链接预测任务也应使用类似的布尔数组。dgl.distributed.partition_graph() 根据图划分结果分割这些布尔数组(因为它们存储为图的节点数据或边数据),并将它们与图分区一起存储。

在分布式训练期间,用户需要为每个训练器分配训练节点/边。类似地,我们也需要以相同的方式分割验证集和测试集。DGL 提供 node_split()edge_split(),用于在运行时为分布式训练分割训练集、验证集和测试集。这两个函数以图划分前构建的布尔数组作为输入,对其进行分割并返回一部分给本地训练器。默认情况下,它们确保所有部分具有相同数量的节点/边。这对于同步 SGD 很重要,因为它假设每个训练器具有相同数量的 mini-batch。

以下示例分割训练集,并返回本地进程的节点子集。

train_nids = dgl.distributed.node_split(g.ndata['train_mask'])