对照PyG官方文档,源码和examples对GAT进行了代码学习。
API文档
首先来看一下,PyG官方文档定义的GATConv
API:
源码
from typing import Union, Tuple, Optional
from torch_geometric.typing import (OptPairTensor, Adj, Size, NoneType,
OptTensor)
import torch
from torch import Tensor
import torch.nn.functional as F
from torch.nn import Parameter, Linear
from torch_sparse import SparseTensor, set_diag
from torch_geometric.nn.conv import MessagePassing
from torch_geometric.utils import remove_self_loops, add_self_loops, softmax
from ..inits import glorot, zeros
# 继承自PyG独有的MessagePassing父类,用于实现消息传递。
class GATConv(MessagePassing):
r"""The graph attentional operator from the `"Graph Attention Networks"
<https://arxiv.org/abs/1710.10903>`_ paper
.. math::
\mathbf{x}^{\prime}_i = \alpha_{i,i}\mathbf{\Theta}\mathbf{x}_{i} +
\sum_{j \in \mathcal{N}(i)} \alpha_{i,j}\mathbf{\Theta}\mathbf{x}_{j},
where the attention coefficients :math:`\alpha_{i,j}` are computed as
.. math::
\alpha_{i,j} =
\frac{
\exp\left(\mathrm{LeakyReLU}\left(\mathbf{a}^{\top}
[\mathbf{\Theta}\mathbf{x}_i \, \Vert \, \mathbf{\Theta}\mathbf{x}_j]
\right)\right)}
{\sum_{k \in \mathcal{N}(i) \cup \{ i \}}
\exp\left(\mathrm{LeakyReLU}\left(\mathbf{a}^{\top}
[\mathbf{\Theta}\mathbf{x}_i \, \Vert \, \mathbf{\Theta}\mathbf{x}_k]
\right)\right)}.
Args:
in_channels (int or tuple): Size of each input sample. A tuple
corresponds to the sizes of source and target dimensionalities.
out_channels (int): Size of each output sample.
heads (int, optional): Number of multi-head-attentions.
(default: :obj:`1`)
concat (bool, optional): If set to :obj:`False`, the multi-head
attentions are averaged instead of concatenated.
(default: :obj:`True`)
negative_slope (float, optional): LeakyReLU angle of the negative
slope. (default: :obj:`0.2`)
dropout (float, optional): Dropout probability of the normalized
attention coefficients which exposes each node to a stochastically
sampled neighborhood during training. (default: :obj:`0`)
add_self_loops (bool, optional): If set to :obj:`False`, will not add
self-loops to the input graph. (default: :obj:`True`)
bias (bool, optional): If set to :obj:`False`, the layer will not learn
an additive bias. (default: :obj:`True`)
**kwargs (optional): Additional arguments of
:class:`torch_geometric.nn.conv.MessagePassing`.
"""
_alpha: OptTensor
def __init__(self, in_channels: Union[int, Tuple[int, int]],
out_channels: int, heads: int = 1, concat: bool = True,
negative_slope: float = 0.2, dropout: float = 0.,
add_self_loops: bool = True, bias: bool = True, **kwargs):
kwargs.setdefault('aggr', 'add')
super(GATConv, self).__init__(node_dim=0, **kwargs)
self.in_channels = in_channels
self.out_channels = out_channels
self.heads = heads
self.concat = concat
self.negative_slope = negative_slope
self.dropout = dropout
self.add_self_loops = add_self_loops
#判断输入的in_channels是int类型还是tuple类型
if isinstance(in_channels, int):
#如果是int类型,直接同时对l和r执行输入为in_channels,输出为heads * out_channels的单层MLP
self.lin_l = Linear(in_channels, heads * out_channels, bias=False)
self.lin_r = self.lin_l
else:
self.lin_l = Linear(in_channels[0], heads * out_channels, False)
self.lin_r = Linear(in_channels[1], heads * out_channels, False)
# attention向量
self.att_l = Parameter(torch.Tensor(1, heads, out_channels))
self.att_r = Parameter(torch.Tensor(1, heads, out_channels))
if bias and concat:
#为输出增加转置
self.bias = Parameter(torch.Tensor(heads * out_channels))
elif bias and not concat:
self.bias = Parameter(torch.Tensor(out_channels))
else:
self.register_parameter('bias', None)
self._alpha = None
self.reset_parameters()
def reset_parameters(self):
glorot(self.lin_l.weight)
glorot(self.lin_r.weight)
glorot(self.att_l)
glorot(self.att_r)
zeros(self.bias)
def forward(self, x: Union[Tensor, OptPairTensor], edge_index: Adj,
size: Size = None, return_attention_weights=None):
# type: (Union[Tensor, OptPairTensor], Tensor, Size, NoneType) -> Tensor # noqa
# type: (Union[Tensor, OptPairTensor], SparseTensor, Size, NoneType) -> Tensor # noqa
# type: (Union[Tensor, OptPairTensor], Tensor, Size, bool) -> Tuple[Tensor, Tuple[Tensor, Tensor]] # noqa
# type: (Union[Tensor, OptPairTensor], SparseTensor, Size, bool) -> Tuple[Tensor, SparseTensor] # noqa
r"""
Args:
return_attention_weights (bool, optional): If set to :obj:`True`,
will additionally return the tuple
:obj:`(edge_index, attention_weights)`, holding the computed
attention weights for each edge. (default: :obj:`None`)
"""
H, C = self.heads, self.out_channels
x_l: OptTensor = None
x_r: OptTensor = None
alpha_l: OptTensor = None
alpha_r: OptTensor = None
if isinstance(x, Tensor):
assert x.dim() == 2, 'Static graphs not supported in `GATConv`.'
x_l = x_r = self.lin_l(x).view(-1, H, C)
alpha_l = (x_l * self.att_l).sum(dim=-1)
alpha_r = (x_r * self.att_r).sum(dim=-1)
else:
x_l, x_r = x[0], x[1]
assert x[0].dim() == 2, 'Static graphs not supported in `GATConv`.'
x_l = self.lin_l(x_l).view(-1, H, C)
alpha_l = (x_l * self.att_l).sum(dim=-1)
if x_r is not None:
x_r = self.lin_r(x_r).view(-1, H, C)
alpha_r = (x_r * self.att_r).sum(dim=-1)
assert x_l is not None
assert alpha_l is not None
if self.add_self_loops:
if isinstance(edge_index, Tensor):
num_nodes = x_l.size(0)
if x_r is not None:
num_nodes = min(num_nodes, x_r.size(0))
if size is not None:
num_nodes = min(size[0], size[1])
edge_index, _ = remove_self_loops(edge_index)
edge_index, _ = add_self_loops(edge_index, num_nodes=num_nodes)
elif isinstance(edge_index, SparseTensor):
edge_index = set_diag(edge_index)
# propagate_type: (x: OptPairTensor, alpha: OptPairTensor)
out = self.propagate(edge_index, x=(x_l, x_r),
alpha=(alpha_l, alpha_r), size=size)
alpha = self._alpha
self._alpha = None
if self.concat:
out = out.view(-1, self.heads * self.out_channels)
else:
out = out.mean(dim=1)
if self.bias is not None:
out += self.bias
if isinstance(return_attention_weights, bool):
assert alpha is not None
if isinstance(edge_index, Tensor):
return out, (edge_index, alpha)
elif isinstance(edge_index, SparseTensor):
return out, edge_index.set_value(alpha, layout='coo')
else:
return out
def message(self, x_j: Tensor, alpha_j: Tensor, alpha_i: OptTensor,
index: Tensor, ptr: OptTensor,
size_i: Optional[int]) -> Tensor:
alpha = alpha_j if alpha_i is None else alpha_j + alpha_i
alpha = F.leaky_relu(alpha, self.negative_slope)
alpha = softmax(alpha, index, ptr, size_i)
self._alpha = alpha
alpha = F.dropout(alpha, p=self.dropout, training=self.training)
return x_j * alpha.unsqueeze(-1)
def __repr__(self):
return '{}({}, {}, heads={})'.format(self.__class__.__name__,
self.in_channels,
self.out_channels, self.heads)
propagate()
函数中x=(x_l,x_r)
和alpha=(alpha_l,alpha_r)
,l
表示起点,r
表示终点,表明当一个顶点是起点或者终点时,它的x
和alpha
是不一样的。message()
函数中对注意力系数进行了dropout
操作,这是在论文里面有提到的;
实验:Cora数据集
# -*- coding:utf-8 -*-
import os.path as osp
import torch
import torch.nn.functional as F
from torch_geometric.datasets import Planetoid
import torch_geometric.transforms as T
from torch_geometric.nn import GATConv
dataset = 'Cora'
path = osp.join(osp.dirname(osp.realpath(__file__)), '..', 'data', dataset)
#使用了transform,作用是以行为单位,对特征进行归一化
dataset = Planetoid(path, dataset, transform=T.NormalizeFeatures())
data = dataset[0]
class Net(torch.nn.Module):
def __init__(self):
super(Net, self).__init__()
#下面的配置都是按照原论文的要求配置的
self.conv1 = GATConv(dataset.num_features, 8, heads=8, dropout=0.6)
# On the Pubmed dataset, use heads=8 in conv2.
self.conv2 = GATConv(8 * 8, dataset.num_classes, heads=1, concat=False,
dropout=0.6)
def forward(self):
#在使用F.dropout的时候一定要记得设置training=self.training,否则会在model.eval()时出现问题
x = F.dropout(data.x, p=0.6, training=self.training) #论文中要求的每次进入GATconv之前需要dropout
x = F.elu(self.conv1(x, data.edge_index)) #论文中要求的使用exponential linear unit(ELU)
x = F.dropout(x, p=0.6, training=self.training) #论文中要求的每次进入GATconv之前需要dropout
x = self.conv2(x, data.edge_index)
return F.log_softmax(x, dim=1)
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
model,data = Net().to(device),data.to(device)
optimizer = torch.optim.Adam(model.parameters(),lr=0.005,weight_decay=5e-4)
def train():
model.train()
optimizer.zero_grad()
F.nll_loss(model()[data.train_mask], data.y[data.train_mask]).backward()
optimizer.step()
def test():
model.eval()
logits, accs = model(), []
for _, mask in data('train_mask', 'val_mask', 'test_mask'): #访问类的属性很奇特的方式
pred = logits[mask].max(1)[1]
acc = pred.eq(data.y[mask]).sum().item() / mask.sum().item()
accs.append(acc)
return accs
for epoch in range(1, 201):
train()
log = 'Epoch: {:03d}, Train: {:.4f}, Val: {:.4f}, Test: {:.4f}'
print(log.format(epoch, *test()))
反思
反思1:关于log_softmax()
,nll_loss()
,cross_entropy()
和binary_cross_entropy_with_logits()
torch.nn.functional.log_softmax
(input, dim=None, _stacklevel=3, dtype=None)
- Applies a softmax followed by a logarithm.
- While mathematically equivalent to log(softmax(x)), doing these two operations separately is slower, and numerically unstable. This function uses an alternative formulation to compute the output and gradient correctly.
torch.nn.functional.nll_loss
(input, target, weight=None, size_average=None, ignore_index=-100, reduce=None, reduction=’mean’)
- The negative log likelihood loss.
- The input given through a forward call is expected to contain log-probabilities of each class. inputhas to be a Tensor of size either $(minibatch, C)$ or $(minibatch, C, d_1, d_2, …, d_K)$with $K \geq 1$ for the K-dimensional case (described later).
- Obtaining log-probabilities in a neural network is easily achieved by adding a LogSoftmax layer in the last layer of your network. You may use CrossEntropyLoss instead, if you prefer not to add an extra layer.
- The target that this loss expects should be a class index in the range [0, C-1][0,C−1] where C = number of classes; if ignore_index is specified, this loss also accepts this class index (this index may not necessarily be in the class range).
综上,其实log_softmax()
和nll_loss()
可以直接由cross_entropy()
代替:
- This criterion combines
log_softmax
andnll_loss
in a single function.
注意区分cross_entropy()
和binary_cross_entropy_with_logits()
,两者的区别在于log
里面是sigmoid
还是softmax
函数。
反思2:访问类的属性
类的属性还能这样访问
for _, mask in data('train_mask', 'val_mask', 'test_mask'): #访问类的属性很奇特的方式
反思3:Questions & Help
这个是在PyG的Git repo上面的issue,刚好解决了我的疑惑。看来看issue还是非常有用的。
I have a few questions from a newbie in PyTorch Geometric regarding the GAT model:
1/ In the forward
method, we call propagate
as follows:
out = self.propagate(edge_index, x=(x_l, x_r), alpha=(alpha_l, alpha_r), size=size)
Should I understand that x_l
will map to the source features (x_j
in message
) whereas x_r
will map to the target features (x_i
in message
) ? same thing for alpha
?
2/ In message
, we treat the messages on all edges of the graph (well, the graphs in the mini-batch); I have hard times understanding how we can perform a softmax here; it is as if the message
method was dealing only with the messages to a single target (can I stick to this view though it might not be right?). How is it done under the hood?
3/ What is the difference between a Tensor
and a OptTensor
?
Thanks !
Hi,
- That’s correct. We refer to
x_l
as the source nodes, andx_r
as the target nodes (in a bipartite graph). So for example,GATConv((128, 256), 256)
will aggregate 128-dimensional feature vectors and combine them to 256-dimensional feature vectors. - Usually,
message
can be seen as a method that operates independently for each edge. However, inGATConv
messages are inter-dependent (via the softmax). PyTorch Geometric provides asoftmax
function (torch_geometric.utils.softmax
) that normalizes inputs across the same target nodes. This function is used here. OptTensor
describes an optional tensor, i.e. aTensor
orNone
. This might be useful in same cases, e.g., when you have a bipartite graph for which features only exist for source nodes (and you want to obtain target node features based on aggregating incoming edges). You can then do something like:conv((x_l, None), edge_index)
.
反思4:关于源码中$\vec a^{T}[W \vec h_i || W \vec h_j ]$的实现
源码中实现的实在是太完美了!将$\vec a^{T}$拆分成att_l
和att_r
,这样就能完美解决二部图的问题了!实在是妙啊。我之前都没有意识到。还是没有好好地理解算法呀~
TO DO
- PyG的二部图思想