分布式节点分类

在本教程中,我们将详细介绍如何对节点分类任务执行分布式 GNN 训练。为了理解分布式 GNN 训练,您需要首先阅读多 GPU 训练的教程。本教程是在多 GPU 训练的基础上进行的,提供了图划分、修改训练脚本和设置分布式训练环境的额外步骤。

划分图

在本教程中,我们将以 OGBN products 图为例,演示图划分。首先,我们将图加载到 DGL 图中。在此,我们将节点标签存储为 DGL 图中的节点数据。

import os
os.environ['DGLBACKEND'] = 'pytorch'
import dgl
import torch as th
from ogb.nodeproppred import DglNodePropPredDataset
data = DglNodePropPredDataset(name='ogbn-products')
graph, labels = data[0]
labels = labels[:, 0]
graph.ndata['labels'] = labels

在图划分期间,我们需要将数据分割成训练/验证/测试集。由于这是一个节点分类任务,训练/验证/测试集包含节点 ID。我们建议用户将其转换为布尔数组,其中 True 表示集合中存在该节点 ID。通过这种方式,我们可以将其存储为节点数据。划分后,布尔数组将与图分区一起存储。

splitted_idx = data.get_idx_split()
train_nid, val_nid, test_nid = splitted_idx['train'], splitted_idx['valid'], splitted_idx['test']
train_mask = th.zeros((graph.num_nodes(),), dtype=th.bool)
train_mask[train_nid] = True
val_mask = th.zeros((graph.num_nodes(),), dtype=th.bool)
val_mask[val_nid] = True
test_mask = th.zeros((graph.num_nodes(),), dtype=th.bool)
test_mask[test_nid] = True
graph.ndata['train_mask'] = train_mask
graph.ndata['val_mask'] = val_mask
graph.ndata['test_mask'] = test_mask

然后,我们调用 partition_graph 函数使用 METIS 划分图,并将划分结果保存在指定的文件夹中。注意partition_graph 在单台机器上单线程运行。您可以访问我们的用户指南,了解更多关于分布式图划分的信息。

下面的代码展示了调用划分算法并生成四个分区的示例。划分结果存储在一个名为 4part_data 的文件夹中。在划分图时,我们允许用户指定如何平衡分区。默认情况下,该算法尽可能平衡每个分区的节点数量。然而,这种平衡策略对于分布式 GNN 训练来说不够充分,因为某些分区可能比其他分区包含更多的训练节点,或者某些分区比其他分区包含更多的边。因此,partition_graph 提供了另外两个参数 balance_ntypesbalance_edges 来强制执行更严格的平衡标准。例如,我们可以使用训练掩码来平衡每个分区的训练节点数量,如下面的示例所示。我们还可以启用 balance_edges 标志,以确保所有分区具有大致相同的边数。

dgl.distributed.partition_graph(graph, graph_name='ogbn-products', num_parts=4,
                                out_path='4part_data',
                                balance_ntypes=graph.ndata['train_mask'],
                                balance_edges=True)

在划分图时,DGL 会打乱节点 ID 和边 ID,以便分配给同一分区的节点/边具有连续的 ID。这对于 DGL 维护全局节点/边 ID 和分区 ID 的映射是必需的。如果用户需要将打乱后的节点/边 ID 映射回其原始 ID,他们可以启用 partition_graph 函数的 return_mapping 标志,该标志将返回一个向量用于节点 ID 映射和边 ID 映射。下面展示了如何使用 ID 映射在分布式训练后保存节点嵌入的示例。这是用户希望在下游任务中使用训练好的节点嵌入时的常见用例。下面我们假设训练好的节点嵌入存储在 node_emb 张量中,该张量以打乱后的节点 ID 为索引。我们再次打乱嵌入,并将其存储在 orig_node_emb 张量中,该张量以原始节点 ID 为索引。

nmap, emap = dgl.distributed.partition_graph(graph, graph_name='ogbn-products',
                                             num_parts=4,
                                             out_path='4part_data',
                                             balance_ntypes=graph.ndata['train_mask'],
                                             balance_edges=True,
                                             return_mapping=True)
orig_node_emb = th.zeros(node_emb.shape, dtype=node_emb.dtype)
orig_node_emb[nmap] = node_emb

分布式训练脚本

分布式训练脚本与多 GPU 训练脚本非常相似,只需进行少量修改。它也依赖于 PyTorch 的分布式组件来交换梯度和更新模型参数。分布式训练脚本仅包含训练器的代码。

初始化网络通信

分布式 GNN 训练需要访问已划分的图结构和节点/边特征,并聚合来自多个训练器的模型参数梯度。DGL 的分布式组件负责访问分布式图结构和分布式节点特征及边特征,而 PyTorch 分布式组件负责交换模型参数的梯度。因此,在训练脚本开始时,我们需要初始化 DGL 和 PyTorch 的分布式组件。

在分布式训练脚本的最开始,我们需要调用 DGL 的初始化函数来初始化训练器的网络通信并连接 DGL 的服务器。此函数接受集群配置文件路径作为参数。

import dgl
import torch as th
dgl.distributed.initialize(ip_config='ip_config.txt')

配置文件 ip_config.txt 具有以下格式

ip_addr1 [port1]
ip_addr2 [port2]

每行表示一台机器。第一列是 IP 地址,第二列是用于连接该机器上的 DGL 服务器的端口。端口是可选的,默认端口为 30050。

初始化 DGL 的网络通信后,用户可以初始化 PyTorch 的分布式通信。

th.distributed.init_process_group(backend='gloo')

引用分布式图

DGL 的服务器会自动加载图分区。服务器加载分区后,训练器连接到服务器,并可以开始引用集群中的分布式图,如下所示。

g = dgl.distributed.DistGraph('ogbn-products')

如代码所示,我们通过名称引用分布式图。这个名称基本上就是上面一节中传递给 partition_graph 函数的名称。

获取训练和验证节点 ID

对于分布式训练,每个训练器可以运行自己的训练节点集。整个图的训练节点存储在分布式张量中,作为 train_mask 节点数据,这是我们在划分图之前构建的。每个训练器可以对其训练节点集调用 node_split 函数。node_split 函数将完整的训练集均匀分割,并返回训练节点,其中大部分存储在本地分区中,以确保良好的数据局部性。

train_nid = dgl.distributed.node_split(g.ndata['train_mask'])

我们可以像上面一样分割验证节点。在这种情况下,每个训练器会获得一组不同的验证节点。

valid_nid = dgl.distributed.node_split(g.ndata['val_mask'])

定义 GNN 模型

对于分布式训练,我们定义 GNN 模型的方式与mini-batch 训练full-graph 训练完全相同。下面的代码定义了 GraphSage 模型。

import torch.nn as nn
import torch.nn.functional as F
import dgl.nn as dglnn
import torch.optim as optim

class SAGE(nn.Module):
    def __init__(self, in_feats, n_hidden, n_classes, n_layers):
        super().__init__()
        self.n_layers = n_layers
        self.n_hidden = n_hidden
        self.n_classes = n_classes
        self.layers = nn.ModuleList()
        self.layers.append(dglnn.SAGEConv(in_feats, n_hidden, 'mean'))
        for i in range(1, n_layers - 1):
            self.layers.append(dglnn.SAGEConv(n_hidden, n_hidden, 'mean'))
        self.layers.append(dglnn.SAGEConv(n_hidden, n_classes, 'mean'))

    def forward(self, blocks, x):
        for l, (layer, block) in enumerate(zip(self.layers, blocks)):
            x = layer(block, x)
            if l != self.n_layers - 1:
                x = F.relu(x)
        return x

num_hidden = 256
num_labels = len(th.unique(g.ndata['labels'][0:g.num_nodes()]))
num_layers = 2
lr = 0.001
model = SAGE(g.ndata['feat'].shape[1], num_hidden, num_labels, num_layers)
loss_fcn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)

对于分布式训练,我们需要使用 PyTorch 的 DistributedDataParallel 将模型转换为分布式模型。

model = th.nn.parallel.DistributedDataParallel(model)

分布式 mini-batch 采样器

我们可以使用与 NodeDataLoader 相对应的分布式类 DistNodeDataLoader 来创建用于节点分类的分布式 mini-batch 采样器。

sampler = dgl.dataloading.MultiLayerNeighborSampler([25,10])
train_dataloader = dgl.dataloading.DistNodeDataLoader(
                             g, train_nid, sampler, batch_size=1024,
                             shuffle=True, drop_last=False)
valid_dataloader = dgl.dataloading.DistNodeDataLoader(
                             g, valid_nid, sampler, batch_size=1024,
                             shuffle=False, drop_last=False)

训练循环

分布式训练的训练循环也与单进程训练完全相同。

import sklearn.metrics
import numpy as np

for epoch in range(10):
    # Loop over the dataloader to sample mini-batches.
    losses = []
    with model.join():
        for step, (input_nodes, seeds, blocks) in enumerate(train_dataloader):
            # Load the input features as well as output labels
            batch_inputs = g.ndata['feat'][input_nodes]
            batch_labels = g.ndata['labels'][seeds]

            # Compute loss and prediction
            batch_pred = model(blocks, batch_inputs)
            loss = loss_fcn(batch_pred, batch_labels)
            optimizer.zero_grad()
            loss.backward()
            losses.append(loss.detach().cpu().numpy())
            optimizer.step()

    # validation
    predictions = []
    labels = []
    with th.no_grad(), model.join():
        for step, (input_nodes, seeds, blocks) in enumerate(valid_dataloader):
            inputs = g.ndata['feat'][input_nodes]
            labels.append(g.ndata['labels'][seeds].numpy())
            predictions.append(model(blocks, inputs).argmax(1).numpy())
        predictions = np.concatenate(predictions)
        labels = np.concatenate(labels)
        accuracy = sklearn.metrics.accuracy_score(labels, predictions)
        print('Epoch {}: Validation Accuracy {}'.format(epoch, accuracy))

设置分布式训练环境

划分图并准备好训练脚本后,我们现在需要设置分布式训练环境并启动训练作业。基本上,我们需要创建一个机器集群,并将训练脚本和划分好的数据上传到集群中的每台机器。在集群中共享训练脚本和划分数据的推荐解决方案是使用 NFS (网络文件系统)。

对于不熟悉 NFS 的用户,下面是一个在现有集群中设置 NFS 的简短教程。

NFS 服务器端设置(仅限 Ubuntu)

首先,在存储服务器上安装必要的库

sudo apt-get install nfs-kernel-server

下面我们假设用户账户是 ubuntu,并且我们在主目录中创建一个 workspace 目录。

mkdir -p /home/ubuntu/workspace

我们假设所有服务器都处于一个子网下,IP 范围为 192.168.0.0 到 192.168.255.255。我们需要将以下行添加到 /etc/exports

/home/ubuntu/workspace  192.168.0.0/16(rw,sync,no_subtree_check)

然后重启 NFS,服务器端设置完成。

sudo systemctl restart nfs-kernel-server

有关配置详情,请参考 NFS ArchWiki (https://wiki.archlinux.org.cn/index.php/NFS)。

NFS 客户端设置(仅限 Ubuntu)

要使用 NFS,客户端也需要安装必要的软件包

sudo apt-get install nfs-common

您可以手动挂载 NFS

mkdir -p /home/ubuntu/workspace
sudo mount -t nfs <nfs-server-ip>:/home/ubuntu/workspace /home/ubuntu/workspace

或者将以下行添加到 /etc/fstab 中,以便该文件夹自动挂载

<nfs-server-ip>:/home/ubuntu/workspace   /home/ubuntu/workspace   nfs   defaults    0 0

然后运行

mount -a

现在进入 /home/ubuntu/workspace 目录,并将训练脚本和划分好的数据保存在该文件夹中。

SSH 访问

启动脚本通过 SSH 访问集群中的机器。用户应按照本文档中的说明在集群中的每台机器上设置无密码 SSH 登录。设置无密码 SSH 后,用户需要对每台机器进行连接认证,并将其密钥指纹添加到 ~/.ssh/known_hosts 文件中。这可以在首次通过 SSH 连接到机器时自动完成。

启动分布式训练作业

一切准备就绪后,我们现在可以使用 DGL 提供的启动脚本在集群中启动分布式训练作业。我们可以在集群中的任何机器上运行此启动脚本。

python3 ~/workspace/dgl/tools/launch.py   --workspace ~/workspace/   --num_trainers 1   --num_samplers 0   --num_servers 1   --part_config 4part_data/ogbn-products.json   --ip_config ip_config.txt   "python3 train_dist.py"

如果我们将图划分为四个分区,如教程开头所示,则集群必须包含四台机器。上面的命令将在集群中的每台机器上启动一个训练器和一个服务器。ip_config.txt 文件列出了集群中所有机器的 IP 地址,格式如下

ip_addr1
ip_addr2
ip_addr3
ip_addr4

使用 GraphBolt 采样邻居

自 DGL 2.0 起,我们引入了一个新的数据加载框架 GraphBolt,与 DGL 先前的实现相比,其采样性能得到了极大提升。因此,我们将 GraphBolt 引入到分布式训练中,以提高分布式采样的性能。此外,图分区可以比以前小得多,这对于分布式训练期间的加载速度和内存使用非常有益。

图划分

为了利用 GraphBolt 进行分布式采样,我们需要将分区从 DGL 格式转换为 GraphBolt 格式。这可以通过 dgl.distributed.dgl_partition_to_graphbolt 函数完成。或者,我们可以使用 dgl.distributed.partition_graph 函数直接生成 GraphBolt 格式的分区。

  1. 将分区从 DGL 格式转换为 GraphBolt 格式。

part_config = "4part_data/ogbn-products.json"
dgl.distributed.dgl_partition_to_graphbolt(part_config)

新的分区将存储在与原始分区相同的目录中。

2. 直接生成 GraphBolt 格式的分区。只需在 partition_graph 函数中将 use_graphbolt 标志设置为 True

dgl.distributed.partition_graph(graph, graph_name='ogbn-products', num_parts=4,
                                out_path='4part_data',
                                balance_ntypes=graph.ndata['train_mask'],
                                balance_edges=True,
                                use_graphbolt=True)

在训练脚本中启用 GraphBolt 采样

只需在 dgl.distributed.initialize 函数中将 use_graphbolt 标志设置为 True。这是在训练脚本中启用 GraphBolt 采样所需的唯一更改。

dgl.distributed.initialize('ip_config.txt', use_graphbolt=True)

脚本总运行时间: (0 minutes 0.000 seconds)

由 Sphinx-Gallery 生成的图库