注意
跳转至末尾以下载完整的示例代码。
分布式链接预测
在本教程中,我们将逐步介绍如何为链接预测任务执行分布式 GNN 训练。本教程假设您已经阅读过分布式节点分类和用于链接预测的 GNN 随机训练。总体流程如下图所示。

图划分
在本教程中,我们将使用 OGBL citation2 图作为示例来说明图划分。首先,将图加载到 DGL 图中,并使用 AsLinkPredDataset
将其转换为训练图、验证边和测试边。
import os
os.environ['DGLBACKEND'] = 'pytorch'
import dgl
import torch as th
from ogb.linkproppred import DglLinkPropPredDataset
data = DglLinkPropPredDataset(name='ogbl-citation2')
graph = data[0]
data = dgl.data.AsLinkPredDataset(data, [0.8, 0.1, 0.1])
graph_train = data[0]
dgl.distributed.partition_graph(graph_train, graph_name='ogbl-citation2', num_parts=4,
out_path='4part_data',
balance_edges=True)
然后,我们将验证边和测试边与图划分一同存储。
import pickle
with open('4part_data/val.pkl', 'wb') as f:
pickle.dump(data.val_edges, f)
with open('4part_data/test.pkl', 'wb') as f:
pickle.dump(data.test_edges, f)
分布式训练脚本
分布式链接预测脚本与分布式节点分类脚本非常相似,仅有一些修改。
初始化网络通信
我们首先初始化网络通信和 PyTorch 的分布式通信。
import dgl
import torch as th
dgl.distributed.initialize(ip_config='ip_config.txt')
th.distributed.init_process_group(backend='gloo')
配置文件 ip_config.txt 的格式如下:
ip_addr1 [port1]
ip_addr2 [port2]
每一行代表一台机器。第一列是 IP 地址,第二列是用于连接该机器上 DGL 服务器的端口。端口是可选的,默认端口为 30050。
引用分布式图
DGL 服务器会自动加载图划分。服务器加载划分后,训练器连接到服务器并可以开始引用集群中的分布式图,如下所示。
g = dgl.distributed.DistGraph('ogbl-citation2')
如代码所示,我们通过名称引用分布式图。这个名称基本上是如上面章节所示,传递给 partition_graph 函数的那个名称。
获取训练和验证节点 ID
对于分布式训练,每个训练器都可以运行其自己的训练节点集。我们可以通过调用 node_split 和 edge_split 来获取训练器中当前图的节点 ID 和边 ID。我们还可以通过加载 pickle 文件来获取有效边和测试边。
train_eids = dgl.distributed.edge_split(th.ones((g.num_edges(),), dtype=th.bool), g.get_partition_book(), force_even=True)
train_nids = dgl.distributed.node_split(th.ones((g.num_nodes(),), dtype=th.bool), g.get_partition_book())
with open('4part_data/val.pkl', 'rb') as f:
global_valid_eid = pickle.load(f)
with open('4part_data/test.pkl', 'rb') as f:
global_test_eid = pickle.load(f)
定义 GNN 模型
对于分布式训练,我们定义 GNN 模型的方式与mini-batch 训练或全图训练完全相同。下面的代码定义了 GraphSage 模型。
import torch.nn as nn
import torch.nn.functional as F
import dgl.nn as dglnn
import torch.optim as optim
class SAGE(nn.Module):
def __init__(self, in_feats, n_hidden, n_classes, n_layers):
super().__init__()
self.n_layers = n_layers
self.n_hidden = n_hidden
self.n_classes = n_classes
self.layers = nn.ModuleList()
self.layers.append(dglnn.SAGEConv(in_feats, n_hidden, 'mean'))
for i in range(1, n_layers - 1):
self.layers.append(dglnn.SAGEConv(n_hidden, n_hidden, 'mean'))
self.layers.append(dglnn.SAGEConv(n_hidden, n_classes, 'mean'))
def forward(self, blocks, x):
for l, (layer, block) in enumerate(zip(self.layers, blocks)):
x = layer(block, x)
if l != self.n_layers - 1:
x = F.relu(x)
return x
num_hidden = 256
num_labels = len(th.unique(g.ndata['labels'][0:g.num_nodes()]))
num_layers = 2
lr = 0.001
model = SAGE(g.ndata['feat'].shape[1], num_hidden, num_labels, num_layers)
loss_fcn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)
对于分布式训练,我们需要使用 PyTorch 的 DistributedDataParallel 将模型转换为分布式模型。
model = th.nn.parallel.DistributedDataParallel(model)
我们还定义了一个边预测器 EdgePredictor
来预测节点表示对的边分数。
from dgl.nn import EdgePredictor
predictor = EdgePredictor('dot')
分布式 mini-batch 采样器
我们可以使用 DistEdgeDataLoader
,它是 EdgeDataLoader
的分布式对应项,来为链接预测创建一个分布式 mini-batch 采样器。
训练循环
分布式训练的训练循环与单进程训练完全相同。
import sklearn.metrics
import numpy as np
epoch = 0
for epoch in range(10):
for step, (input_nodes, pos_graph, neg_graph, mfgs) in enumerate(dataloader):
pos_graph = pos_graph
neg_graph = neg_graph
node_inputs = mfgs[0].srcdata[dgl.NID]
batch_inputs = g.ndata['feat'][node_inputs]
batch_pred = model(mfgs, batch_inputs)
pos_feature = batch_pred
pos_graph.ndata['h'] = batch_pred
pos_src, pos_dst = pos_graph.edges()
pos_score = predictor(pos_feature[pos_src], pos_feature[pos_dst])
neg_feature = batch_pred
neg_graph.ndata['h'] = batch_pred
neg_src, neg_dst = neg_graph.edges()
neg_score = predictor(neg_feature[pos_src], neg_feature[pos_dst])
score = th.cat([pos_score, neg_score])
label = th.cat([th.ones_like(pos_score), th.zeros_like(neg_score)])
loss = F.binary_cross_entropy_with_logits(score, label)
optimizer.zero_grad()
loss.backward()
optimizer.step()
推理
在推理阶段,我们使用训练循环后的模型来获取节点的嵌入。
def inference(model, graph, node_features, args):
with th.no_grad():
sampler = dgl.dataloading.MultiLayerNeighborSampler([25,10])
train_dataloader = dgl.dataloading.DistNodeDataLoader(
graph, th.arange(graph.num_nodes()), sampler,
batch_size=1024,
shuffle=False,
drop_last=False)
result = []
for input_nodes, output_nodes, mfgs in train_dataloader:
node_inputs = mfgs[0].srcdata[dgl.NID]
inputs = node_features[node_inputs]
result.append(model(mfgs, inputs))
return th.cat(result)
node_reprs = inference(model, g, g.ndata['feat'], args)
测试边被编码为 ((positive_edge_src, positive_edge_dst), (negative_edge_src, negative_edge_dst))。因此,我们可以通过正样本对和负样本对获得 ground truth。
test_pos_src = global_test_eid[0][0]
test_pos_dst = global_test_eid[0][1]
test_neg_src = global_test_eid[1][0]
test_neg_dst = global_test_eid[1][1]
test_labels = th.cat([th.ones_like(test_pos_src), th.zeros_like(test_neg_src)]).cpu().numpy()
然后,我们使用点积预测器获取正样本对和负样本测试对的分数,以计算诸如 AUC 等指标。
h_pos_src = node_reprs[test_pos_src]
h_pos_dst = node_reprs[test_pos_dst]
h_neg_src = node_reprs[test_neg_src]
h_neg_dst = node_reprs[test_neg_dst]
score_pos = predictor(h_pos_src, h_pos_dst)
score_neg = predictor(h_neg_src, h_neg_dst)
test_preds = th.cat([score_pos, score_neg]).cpu().numpy()
auc = skm.roc_auc_score(test_labels, test_preds)
设置分布式训练环境
分布式训练环境的设置与分布式节点分类类似。请参阅此处了解更多详情:设置分布式训练环境
脚本总运行时间: (0 minutes 0.000 seconds)