DGL

入门

  • 安装与设置
  • DGL 闪电入门

进阶资料

  • 🆕 使用 GraphBolt 进行 GNN 的随机训练
  • 用户指南
  • 用户指南【包含过时信息】
  • 사용자 가이드[시대에 뒤쳐진]
  • 🆕 教程:图 Transformer
  • 教程:dgl.sparse
  • 在 CPU 上训练
  • 在多块 GPU 上训练
  • 分布式训练
    • 分布式节点分类
    • 分布式链接预测
  • DGL 论文复现

API 参考

  • dgl
  • dgl.data
  • dgl.dataloading
  • dgl.DGLGraph
  • dgl.distributed
  • dgl.function
  • dgl.geometry
  • 🆕 dgl.graphbolt
  • dgl.nn (PyTorch)
  • dgl.nn.functional
  • dgl.ops
  • dgl.optim
  • dgl.sampling
  • dgl.sparse
  • dgl.multiprocessing
  • dgl.transforms
  • 用户自定义函数

注解

  • 贡献 DGL
  • DGL 外部函数接口 (FFI)
  • 性能基准测试

其他

  • 常见问题解答 (FAQ)
  • 环境变量
  • 资源
DGL
  • 分布式训练
  • 分布式链接预测
  • 查看页面源码

注意

跳转至末尾以下载完整的示例代码。

分布式链接预测

在本教程中,我们将逐步介绍如何为链接预测任务执行分布式 GNN 训练。本教程假设您已经阅读过分布式节点分类和用于链接预测的 GNN 随机训练。总体流程如下图所示。

Imgur

图划分

在本教程中,我们将使用 OGBL citation2 图作为示例来说明图划分。首先,将图加载到 DGL 图中,并使用 AsLinkPredDataset 将其转换为训练图、验证边和测试边。

import os
os.environ['DGLBACKEND'] = 'pytorch'
import dgl
import torch as th
from ogb.linkproppred import DglLinkPropPredDataset
data = DglLinkPropPredDataset(name='ogbl-citation2')
graph = data[0]
data = dgl.data.AsLinkPredDataset(data, [0.8, 0.1, 0.1])
graph_train = data[0]
dgl.distributed.partition_graph(graph_train, graph_name='ogbl-citation2', num_parts=4,
                            out_path='4part_data',
                            balance_edges=True)

然后,我们将验证边和测试边与图划分一同存储。

import pickle
with open('4part_data/val.pkl', 'wb') as f:
    pickle.dump(data.val_edges, f)
with open('4part_data/test.pkl', 'wb') as f:
    pickle.dump(data.test_edges, f)

分布式训练脚本

分布式链接预测脚本与分布式节点分类脚本非常相似,仅有一些修改。

初始化网络通信

我们首先初始化网络通信和 PyTorch 的分布式通信。

import dgl
import torch as th
dgl.distributed.initialize(ip_config='ip_config.txt')
th.distributed.init_process_group(backend='gloo')

配置文件 ip_config.txt 的格式如下:

ip_addr1 [port1]
ip_addr2 [port2]

每一行代表一台机器。第一列是 IP 地址,第二列是用于连接该机器上 DGL 服务器的端口。端口是可选的,默认端口为 30050。

引用分布式图

DGL 服务器会自动加载图划分。服务器加载划分后,训练器连接到服务器并可以开始引用集群中的分布式图,如下所示。

g = dgl.distributed.DistGraph('ogbl-citation2')

如代码所示,我们通过名称引用分布式图。这个名称基本上是如上面章节所示,传递给 partition_graph 函数的那个名称。

获取训练和验证节点 ID

对于分布式训练,每个训练器都可以运行其自己的训练节点集。我们可以通过调用 node_split 和 edge_split 来获取训练器中当前图的节点 ID 和边 ID。我们还可以通过加载 pickle 文件来获取有效边和测试边。

train_eids = dgl.distributed.edge_split(th.ones((g.num_edges(),), dtype=th.bool), g.get_partition_book(), force_even=True)
train_nids = dgl.distributed.node_split(th.ones((g.num_nodes(),), dtype=th.bool), g.get_partition_book())
with open('4part_data/val.pkl', 'rb') as f:
    global_valid_eid = pickle.load(f)
with open('4part_data/test.pkl', 'rb') as f:
    global_test_eid = pickle.load(f)

定义 GNN 模型

对于分布式训练,我们定义 GNN 模型的方式与mini-batch 训练或全图训练完全相同。下面的代码定义了 GraphSage 模型。

import torch.nn as nn
import torch.nn.functional as F
import dgl.nn as dglnn
import torch.optim as optim

class SAGE(nn.Module):
    def __init__(self, in_feats, n_hidden, n_classes, n_layers):
        super().__init__()
        self.n_layers = n_layers
        self.n_hidden = n_hidden
        self.n_classes = n_classes
        self.layers = nn.ModuleList()
        self.layers.append(dglnn.SAGEConv(in_feats, n_hidden, 'mean'))
        for i in range(1, n_layers - 1):
            self.layers.append(dglnn.SAGEConv(n_hidden, n_hidden, 'mean'))
        self.layers.append(dglnn.SAGEConv(n_hidden, n_classes, 'mean'))

    def forward(self, blocks, x):
        for l, (layer, block) in enumerate(zip(self.layers, blocks)):
            x = layer(block, x)
            if l != self.n_layers - 1:
                x = F.relu(x)
        return x

num_hidden = 256
num_labels = len(th.unique(g.ndata['labels'][0:g.num_nodes()]))
num_layers = 2
lr = 0.001
model = SAGE(g.ndata['feat'].shape[1], num_hidden, num_labels, num_layers)
loss_fcn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)

对于分布式训练,我们需要使用 PyTorch 的 DistributedDataParallel 将模型转换为分布式模型。

model = th.nn.parallel.DistributedDataParallel(model)

我们还定义了一个边预测器 EdgePredictor 来预测节点表示对的边分数。

from dgl.nn import EdgePredictor
predictor = EdgePredictor('dot')

分布式 mini-batch 采样器

我们可以使用 DistEdgeDataLoader,它是 EdgeDataLoader 的分布式对应项,来为链接预测创建一个分布式 mini-batch 采样器。

训练循环

分布式训练的训练循环与单进程训练完全相同。

import sklearn.metrics
import numpy as np

epoch = 0
for epoch in range(10):
    for step, (input_nodes, pos_graph, neg_graph, mfgs) in enumerate(dataloader):
        pos_graph = pos_graph
        neg_graph = neg_graph
        node_inputs = mfgs[0].srcdata[dgl.NID]
        batch_inputs = g.ndata['feat'][node_inputs]

        batch_pred = model(mfgs, batch_inputs)
        pos_feature = batch_pred
        pos_graph.ndata['h'] = batch_pred
        pos_src, pos_dst = pos_graph.edges()
        pos_score = predictor(pos_feature[pos_src], pos_feature[pos_dst])

        neg_feature = batch_pred
        neg_graph.ndata['h'] = batch_pred
        neg_src, neg_dst = neg_graph.edges()
        neg_score = predictor(neg_feature[pos_src], neg_feature[pos_dst])

        score = th.cat([pos_score, neg_score])
        label = th.cat([th.ones_like(pos_score), th.zeros_like(neg_score)])
        loss = F.binary_cross_entropy_with_logits(score, label)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

推理

在推理阶段,我们使用训练循环后的模型来获取节点的嵌入。

def inference(model, graph, node_features, args):
    with th.no_grad():
        sampler = dgl.dataloading.MultiLayerNeighborSampler([25,10])
        train_dataloader = dgl.dataloading.DistNodeDataLoader(
            graph, th.arange(graph.num_nodes()), sampler,
            batch_size=1024,
            shuffle=False,
            drop_last=False)

        result = []
        for input_nodes, output_nodes, mfgs in train_dataloader:
            node_inputs = mfgs[0].srcdata[dgl.NID]
            inputs = node_features[node_inputs]
            result.append(model(mfgs, inputs))

        return th.cat(result)

node_reprs = inference(model, g, g.ndata['feat'], args)

测试边被编码为 ((positive_edge_src, positive_edge_dst), (negative_edge_src, negative_edge_dst))。因此,我们可以通过正样本对和负样本对获得 ground truth。

test_pos_src = global_test_eid[0][0]
test_pos_dst = global_test_eid[0][1]
test_neg_src = global_test_eid[1][0]
test_neg_dst = global_test_eid[1][1]
test_labels = th.cat([th.ones_like(test_pos_src), th.zeros_like(test_neg_src)]).cpu().numpy()

然后,我们使用点积预测器获取正样本对和负样本测试对的分数,以计算诸如 AUC 等指标。

h_pos_src = node_reprs[test_pos_src]
h_pos_dst = node_reprs[test_pos_dst]
h_neg_src = node_reprs[test_neg_src]
h_neg_dst = node_reprs[test_neg_dst]
score_pos = predictor(h_pos_src, h_pos_dst)
score_neg = predictor(h_neg_src, h_neg_dst)

test_preds = th.cat([score_pos, score_neg]).cpu().numpy()
auc = skm.roc_auc_score(test_labels, test_preds)

设置分布式训练环境

分布式训练环境的设置与分布式节点分类类似。请参阅此处了解更多详情:设置分布式训练环境

脚本总运行时间: (0 minutes 0.000 seconds)

下载 Jupyter notebook: 2_link_prediction.ipynb

下载 Python 源代码: 2_link_prediction.py

下载 压缩包: 2_link_prediction.zip

由 Sphinx-Gallery 生成的图库

上一页 下一页

© 版权所有 2018, DGL 团队。

使用 Sphinx 构建,并使用 Read the Docs 提供的一个主题。
阅读文档 v: latest
版本
下载
在 Read the Docs 上
项目主页
构建