注意
跳到末尾以下载完整示例代码。
分布式节点分类
在本教程中,我们将详细介绍如何对节点分类任务执行分布式 GNN 训练。为了理解分布式 GNN 训练,您需要首先阅读多 GPU 训练的教程。本教程是在多 GPU 训练的基础上进行的,提供了图划分、修改训练脚本和设置分布式训练环境的额外步骤。
划分图
在本教程中,我们将以 OGBN products 图为例,演示图划分。首先,我们将图加载到 DGL 图中。在此,我们将节点标签存储为 DGL 图中的节点数据。
import os
os.environ['DGLBACKEND'] = 'pytorch'
import dgl
import torch as th
from ogb.nodeproppred import DglNodePropPredDataset
data = DglNodePropPredDataset(name='ogbn-products')
graph, labels = data[0]
labels = labels[:, 0]
graph.ndata['labels'] = labels
在图划分期间,我们需要将数据分割成训练/验证/测试集。由于这是一个节点分类任务,训练/验证/测试集包含节点 ID。我们建议用户将其转换为布尔数组,其中 True 表示集合中存在该节点 ID。通过这种方式,我们可以将其存储为节点数据。划分后,布尔数组将与图分区一起存储。
splitted_idx = data.get_idx_split()
train_nid, val_nid, test_nid = splitted_idx['train'], splitted_idx['valid'], splitted_idx['test']
train_mask = th.zeros((graph.num_nodes(),), dtype=th.bool)
train_mask[train_nid] = True
val_mask = th.zeros((graph.num_nodes(),), dtype=th.bool)
val_mask[val_nid] = True
test_mask = th.zeros((graph.num_nodes(),), dtype=th.bool)
test_mask[test_nid] = True
graph.ndata['train_mask'] = train_mask
graph.ndata['val_mask'] = val_mask
graph.ndata['test_mask'] = test_mask
然后,我们调用 partition_graph 函数使用 METIS 划分图,并将划分结果保存在指定的文件夹中。注意:partition_graph 在单台机器上单线程运行。您可以访问我们的用户指南,了解更多关于分布式图划分的信息。
下面的代码展示了调用划分算法并生成四个分区的示例。划分结果存储在一个名为 4part_data 的文件夹中。在划分图时,我们允许用户指定如何平衡分区。默认情况下,该算法尽可能平衡每个分区的节点数量。然而,这种平衡策略对于分布式 GNN 训练来说不够充分,因为某些分区可能比其他分区包含更多的训练节点,或者某些分区比其他分区包含更多的边。因此,partition_graph 提供了另外两个参数 balance_ntypes 和 balance_edges 来强制执行更严格的平衡标准。例如,我们可以使用训练掩码来平衡每个分区的训练节点数量,如下面的示例所示。我们还可以启用 balance_edges 标志,以确保所有分区具有大致相同的边数。
dgl.distributed.partition_graph(graph, graph_name='ogbn-products', num_parts=4,
out_path='4part_data',
balance_ntypes=graph.ndata['train_mask'],
balance_edges=True)
在划分图时,DGL 会打乱节点 ID 和边 ID,以便分配给同一分区的节点/边具有连续的 ID。这对于 DGL 维护全局节点/边 ID 和分区 ID 的映射是必需的。如果用户需要将打乱后的节点/边 ID 映射回其原始 ID,他们可以启用 partition_graph 函数的 return_mapping 标志,该标志将返回一个向量用于节点 ID 映射和边 ID 映射。下面展示了如何使用 ID 映射在分布式训练后保存节点嵌入的示例。这是用户希望在下游任务中使用训练好的节点嵌入时的常见用例。下面我们假设训练好的节点嵌入存储在 node_emb 张量中,该张量以打乱后的节点 ID 为索引。我们再次打乱嵌入,并将其存储在 orig_node_emb 张量中,该张量以原始节点 ID 为索引。
nmap, emap = dgl.distributed.partition_graph(graph, graph_name='ogbn-products',
num_parts=4,
out_path='4part_data',
balance_ntypes=graph.ndata['train_mask'],
balance_edges=True,
return_mapping=True)
orig_node_emb = th.zeros(node_emb.shape, dtype=node_emb.dtype)
orig_node_emb[nmap] = node_emb
分布式训练脚本
分布式训练脚本与多 GPU 训练脚本非常相似,只需进行少量修改。它也依赖于 PyTorch 的分布式组件来交换梯度和更新模型参数。分布式训练脚本仅包含训练器的代码。
初始化网络通信
分布式 GNN 训练需要访问已划分的图结构和节点/边特征,并聚合来自多个训练器的模型参数梯度。DGL 的分布式组件负责访问分布式图结构和分布式节点特征及边特征,而 PyTorch 分布式组件负责交换模型参数的梯度。因此,在训练脚本开始时,我们需要初始化 DGL 和 PyTorch 的分布式组件。
在分布式训练脚本的最开始,我们需要调用 DGL 的初始化函数来初始化训练器的网络通信并连接 DGL 的服务器。此函数接受集群配置文件路径作为参数。
import dgl
import torch as th
dgl.distributed.initialize(ip_config='ip_config.txt')
配置文件 ip_config.txt 具有以下格式
ip_addr1 [port1]
ip_addr2 [port2]
每行表示一台机器。第一列是 IP 地址,第二列是用于连接该机器上的 DGL 服务器的端口。端口是可选的,默认端口为 30050。
初始化 DGL 的网络通信后,用户可以初始化 PyTorch 的分布式通信。
th.distributed.init_process_group(backend='gloo')
引用分布式图
DGL 的服务器会自动加载图分区。服务器加载分区后,训练器连接到服务器,并可以开始引用集群中的分布式图,如下所示。
g = dgl.distributed.DistGraph('ogbn-products')
如代码所示,我们通过名称引用分布式图。这个名称基本上就是上面一节中传递给 partition_graph 函数的名称。
获取训练和验证节点 ID
对于分布式训练,每个训练器可以运行自己的训练节点集。整个图的训练节点存储在分布式张量中,作为 train_mask 节点数据,这是我们在划分图之前构建的。每个训练器可以对其训练节点集调用 node_split 函数。node_split 函数将完整的训练集均匀分割,并返回训练节点,其中大部分存储在本地分区中,以确保良好的数据局部性。
train_nid = dgl.distributed.node_split(g.ndata['train_mask'])
我们可以像上面一样分割验证节点。在这种情况下,每个训练器会获得一组不同的验证节点。
valid_nid = dgl.distributed.node_split(g.ndata['val_mask'])
定义 GNN 模型
对于分布式训练,我们定义 GNN 模型的方式与mini-batch 训练或full-graph 训练完全相同。下面的代码定义了 GraphSage 模型。
import torch.nn as nn
import torch.nn.functional as F
import dgl.nn as dglnn
import torch.optim as optim
class SAGE(nn.Module):
def __init__(self, in_feats, n_hidden, n_classes, n_layers):
super().__init__()
self.n_layers = n_layers
self.n_hidden = n_hidden
self.n_classes = n_classes
self.layers = nn.ModuleList()
self.layers.append(dglnn.SAGEConv(in_feats, n_hidden, 'mean'))
for i in range(1, n_layers - 1):
self.layers.append(dglnn.SAGEConv(n_hidden, n_hidden, 'mean'))
self.layers.append(dglnn.SAGEConv(n_hidden, n_classes, 'mean'))
def forward(self, blocks, x):
for l, (layer, block) in enumerate(zip(self.layers, blocks)):
x = layer(block, x)
if l != self.n_layers - 1:
x = F.relu(x)
return x
num_hidden = 256
num_labels = len(th.unique(g.ndata['labels'][0:g.num_nodes()]))
num_layers = 2
lr = 0.001
model = SAGE(g.ndata['feat'].shape[1], num_hidden, num_labels, num_layers)
loss_fcn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)
对于分布式训练,我们需要使用 PyTorch 的 DistributedDataParallel 将模型转换为分布式模型。
model = th.nn.parallel.DistributedDataParallel(model)
分布式 mini-batch 采样器
我们可以使用与 NodeDataLoader
相对应的分布式类 DistNodeDataLoader
来创建用于节点分类的分布式 mini-batch 采样器。
sampler = dgl.dataloading.MultiLayerNeighborSampler([25,10])
train_dataloader = dgl.dataloading.DistNodeDataLoader(
g, train_nid, sampler, batch_size=1024,
shuffle=True, drop_last=False)
valid_dataloader = dgl.dataloading.DistNodeDataLoader(
g, valid_nid, sampler, batch_size=1024,
shuffle=False, drop_last=False)
训练循环
分布式训练的训练循环也与单进程训练完全相同。
import sklearn.metrics
import numpy as np
for epoch in range(10):
# Loop over the dataloader to sample mini-batches.
losses = []
with model.join():
for step, (input_nodes, seeds, blocks) in enumerate(train_dataloader):
# Load the input features as well as output labels
batch_inputs = g.ndata['feat'][input_nodes]
batch_labels = g.ndata['labels'][seeds]
# Compute loss and prediction
batch_pred = model(blocks, batch_inputs)
loss = loss_fcn(batch_pred, batch_labels)
optimizer.zero_grad()
loss.backward()
losses.append(loss.detach().cpu().numpy())
optimizer.step()
# validation
predictions = []
labels = []
with th.no_grad(), model.join():
for step, (input_nodes, seeds, blocks) in enumerate(valid_dataloader):
inputs = g.ndata['feat'][input_nodes]
labels.append(g.ndata['labels'][seeds].numpy())
predictions.append(model(blocks, inputs).argmax(1).numpy())
predictions = np.concatenate(predictions)
labels = np.concatenate(labels)
accuracy = sklearn.metrics.accuracy_score(labels, predictions)
print('Epoch {}: Validation Accuracy {}'.format(epoch, accuracy))
设置分布式训练环境
划分图并准备好训练脚本后,我们现在需要设置分布式训练环境并启动训练作业。基本上,我们需要创建一个机器集群,并将训练脚本和划分好的数据上传到集群中的每台机器。在集群中共享训练脚本和划分数据的推荐解决方案是使用 NFS (网络文件系统)。
对于不熟悉 NFS 的用户,下面是一个在现有集群中设置 NFS 的简短教程。
NFS 服务器端设置(仅限 Ubuntu)
首先,在存储服务器上安装必要的库
sudo apt-get install nfs-kernel-server
下面我们假设用户账户是 ubuntu,并且我们在主目录中创建一个 workspace 目录。
mkdir -p /home/ubuntu/workspace
我们假设所有服务器都处于一个子网下,IP 范围为 192.168.0.0 到 192.168.255.255。我们需要将以下行添加到 /etc/exports 中
/home/ubuntu/workspace 192.168.0.0/16(rw,sync,no_subtree_check)
然后重启 NFS,服务器端设置完成。
sudo systemctl restart nfs-kernel-server
有关配置详情,请参考 NFS ArchWiki (https://wiki.archlinux.org.cn/index.php/NFS)。
NFS 客户端设置(仅限 Ubuntu)
要使用 NFS,客户端也需要安装必要的软件包
sudo apt-get install nfs-common
您可以手动挂载 NFS
mkdir -p /home/ubuntu/workspace
sudo mount -t nfs <nfs-server-ip>:/home/ubuntu/workspace /home/ubuntu/workspace
或者将以下行添加到 /etc/fstab 中,以便该文件夹自动挂载
<nfs-server-ip>:/home/ubuntu/workspace /home/ubuntu/workspace nfs defaults 0 0
然后运行
mount -a
现在进入 /home/ubuntu/workspace 目录,并将训练脚本和划分好的数据保存在该文件夹中。
SSH 访问
启动脚本通过 SSH 访问集群中的机器。用户应按照本文档中的说明在集群中的每台机器上设置无密码 SSH 登录。设置无密码 SSH 后,用户需要对每台机器进行连接认证,并将其密钥指纹添加到 ~/.ssh/known_hosts 文件中。这可以在首次通过 SSH 连接到机器时自动完成。
启动分布式训练作业
一切准备就绪后,我们现在可以使用 DGL 提供的启动脚本在集群中启动分布式训练作业。我们可以在集群中的任何机器上运行此启动脚本。
python3 ~/workspace/dgl/tools/launch.py --workspace ~/workspace/ --num_trainers 1 --num_samplers 0 --num_servers 1 --part_config 4part_data/ogbn-products.json --ip_config ip_config.txt "python3 train_dist.py"
如果我们将图划分为四个分区,如教程开头所示,则集群必须包含四台机器。上面的命令将在集群中的每台机器上启动一个训练器和一个服务器。ip_config.txt 文件列出了集群中所有机器的 IP 地址,格式如下
ip_addr1
ip_addr2
ip_addr3
ip_addr4
使用 GraphBolt 采样邻居
自 DGL 2.0 起,我们引入了一个新的数据加载框架 GraphBolt,与 DGL 先前的实现相比,其采样性能得到了极大提升。因此,我们将 GraphBolt 引入到分布式训练中,以提高分布式采样的性能。此外,图分区可以比以前小得多,这对于分布式训练期间的加载速度和内存使用非常有益。
图划分
为了利用 GraphBolt 进行分布式采样,我们需要将分区从 DGL 格式转换为 GraphBolt 格式。这可以通过 dgl.distributed.dgl_partition_to_graphbolt 函数完成。或者,我们可以使用 dgl.distributed.partition_graph 函数直接生成 GraphBolt 格式的分区。
将分区从 DGL 格式转换为 GraphBolt 格式。
part_config = "4part_data/ogbn-products.json"
dgl.distributed.dgl_partition_to_graphbolt(part_config)
新的分区将存储在与原始分区相同的目录中。
2. 直接生成 GraphBolt 格式的分区。只需在 partition_graph 函数中将 use_graphbolt 标志设置为 True。
dgl.distributed.partition_graph(graph, graph_name='ogbn-products', num_parts=4,
out_path='4part_data',
balance_ntypes=graph.ndata['train_mask'],
balance_edges=True,
use_graphbolt=True)
在训练脚本中启用 GraphBolt 采样
只需在 dgl.distributed.initialize 函数中将 use_graphbolt 标志设置为 True。这是在训练脚本中启用 GraphBolt 采样所需的唯一更改。
dgl.distributed.initialize('ip_config.txt', use_graphbolt=True)
脚本总运行时间: (0 minutes 0.000 seconds)