Skip to content

Commit

Permalink
Merge branch 'master' into 0.8.x
Browse files Browse the repository at this point in the history
  • Loading branch information
BarclayII committed Mar 7, 2022
2 parents 5834604 + 44638b9 commit 3800da2
Show file tree
Hide file tree
Showing 18 changed files with 67 additions and 32 deletions.
2 changes: 0 additions & 2 deletions Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,6 @@ pipeline {
}
stage('Tutorial test') {
steps {
sh 'ls -l /tmp/dataset/*'
sh 'ls -l /tmp/dataset/'
tutorial_test_linux('pytorch')
}
}
Expand Down
4 changes: 3 additions & 1 deletion conda/dgl/conda_build_config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
python:
- 3.5
- 3.6
- 3.7
- 3.8
- 3.9
- 3.10
2 changes: 1 addition & 1 deletion conda/dgl/meta.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package:
name: dgl{{ environ.get('DGL_PACKAGE_SUFFIX', '') }}
version: "0.8.0"
version: "0.8.0post1"

source:
git_rev: 0.8.x
Expand Down
10 changes: 5 additions & 5 deletions docker/pods/ci-cpu.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ spec:
requests:
cpu: 16
volumeMounts:
- name: persistent-storage
mountPath: /tmp/dataset
# - name: persistent-storage
# mountPath: /tmp/dataset
- name: dshm
mountPath: /dev/shm
volumes:
- name: persistent-storage
persistentVolumeClaim:
claimName: ogb-efs-claim
# - name: persistent-storage
# persistentVolumeClaim:
# claimName: ogb-efs-claim
- name: dshm
emptyDir:
medium: Memory
2 changes: 1 addition & 1 deletion examples/pytorch/graphsage/advanced/train_lightning.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import glob
import os
import sys
sys.path.append('../')
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from load_graph import load_reddit, inductive_split, load_ogb

from torchmetrics import Accuracy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from pytorch_lightning import LightningDataModule, LightningModule, Trainer
from model import SAGE, compute_acc_unsupervised as compute_acc
import sys
sys.path.append('../')
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from load_graph import load_reddit, inductive_split, load_ogb

class CrossEntropyLoss(nn.Module):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from model import SAGE, compute_acc_unsupervised as compute_acc
from negative_sampler import NegativeSampler
import sys
sys.path.append('../')
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from load_graph import load_reddit, load_ogb

class CrossEntropyLoss(nn.Module):
Expand Down
3 changes: 2 additions & 1 deletion examples/pytorch/graphsage/dist/partition_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import argparse
import time
import sys
sys.path.append('../')
import os
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from load_graph import load_reddit, load_ogb

if __name__ == '__main__':
Expand Down
2 changes: 1 addition & 1 deletion examples/pytorch/graphsage/distgnn/partition_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import random
import time
import argparse
sys.path.append('../')
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from load_graph import load_ogb
import dgl
from dgl.data import load_data
Expand Down
4 changes: 2 additions & 2 deletions examples/pytorch/graphsage/multi_gpu_node_classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def inference(self, g, device, batch_size, num_workers, buffer_device=None):
# example is that the intermediate results can also benefit from prefetching.
g.ndata['h'] = g.ndata['feat']
sampler = dgl.dataloading.MultiLayerFullNeighborSampler(1, prefetch_node_feats=['h'])
dataloader = dgl.dataloading.NodeDataLoader(
dataloader = dgl.dataloading.DataLoader(
g, torch.arange(g.num_nodes()).to(g.device), sampler, device=device,
batch_size=1000, shuffle=False, drop_last=False, num_workers=num_workers,
persistent_workers=(num_workers > 0))
Expand Down Expand Up @@ -77,7 +77,7 @@ def train(rank, world_size, graph, num_classes, split_idx):
graph, train_idx, sampler,
device='cuda', batch_size=1000, shuffle=True, drop_last=False,
num_workers=0, use_ddp=True, use_uva=True)
valid_dataloader = dgl.dataloading.NodeDataLoader(
valid_dataloader = dgl.dataloading.DataLoader(
graph, valid_idx, sampler, device='cuda', batch_size=1024, shuffle=True,
drop_last=False, num_workers=0, use_uva=True)

Expand Down
2 changes: 1 addition & 1 deletion include/dgl/runtime/c_runtime_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
#endif

// DGL version
#define DGL_VERSION "0.8.0"
#define DGL_VERSION "0.8.0post1"


// DGL Runtime is DLPack compatible.
Expand Down
2 changes: 1 addition & 1 deletion python/dgl/_ffi/libinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,4 @@ def find_lib_path(name=None, search_path=None, optional=False):
# We use the version of the incoming release for code
# that is under development.
# The following line is set by dgl/python/update_version.py
__version__ = "0.8.0"
__version__ = "0.8.0post1"
12 changes: 7 additions & 5 deletions python/dgl/dataloading/dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,10 @@ class DDPTensorizedDataset(torch.utils.data.IterableDataset):
def __init__(self, indices, batch_size, drop_last, ddp_seed):
if isinstance(indices, Mapping):
self._mapping_keys = list(indices.keys())
len_indices = sum(len(v) for v in indices.values())
else:
self._mapping_keys = None
len_indices = len(indices)

self.rank = dist.get_rank()
self.num_replicas = dist.get_world_size()
Expand All @@ -179,17 +181,17 @@ def __init__(self, indices, batch_size, drop_last, ddp_seed):
self.batch_size = batch_size
self.drop_last = drop_last

if self.drop_last and len(indices) % self.num_replicas != 0:
self.num_samples = math.ceil((len(indices) - self.num_replicas) / self.num_replicas)
if self.drop_last and len_indices % self.num_replicas != 0:
self.num_samples = math.ceil((len_indices - self.num_replicas) / self.num_replicas)
else:
self.num_samples = math.ceil(len(indices) / self.num_replicas)
self.num_samples = math.ceil(len_indices / self.num_replicas)
self.total_size = self.num_samples * self.num_replicas
# If drop_last is True, we create a shared memory array larger than the number
# of indices since we will need to pad it after shuffling to make it evenly
# divisible before every epoch. If drop_last is False, we create an array
# with the same size as the indices so we can trim it later.
self.shared_mem_size = self.total_size if not self.drop_last else len(indices)
self.num_indices = len(indices)
self.shared_mem_size = self.total_size if not self.drop_last else len_indices
self.num_indices = len_indices

if isinstance(indices, Mapping):
self._device = next(iter(indices.values())).device
Expand Down
11 changes: 11 additions & 0 deletions python/dgl/distributed/nn/pytorch/sparse_emb.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,17 @@ def name(self):
"""
return self._tensor.tensor_name

@property
def data_name(self):
"""Return the data name of the embeddings
Returns
-------
str
The data name of the embeddings
"""
return self._tensor._name

@property
def kvstore(self):
"""Return the kvstore client
Expand Down
2 changes: 1 addition & 1 deletion python/dgl/distributed/optim/pytorch/sparse_optim.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def step(self):
# will send grad to each corresponding trainer
if self._world_size > 1:
# get idx split from kvstore
idx_split = kvstore.get_partid(name, idics)
idx_split = kvstore.get_partid(emb.data_name, idics)
idx_split_size = []
idics_list = []
grad_list = []
Expand Down
14 changes: 7 additions & 7 deletions python/dgl/distributed/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def _get_orig_ids(g, sim_g, reshuffle, orig_nids, orig_eids):
-------
tensor or dict of tensors, tensor or dict of tensors
'''
is_hetero = len(g.etypes) > 1 or len(g.ntypes) > 1
is_hetero = not g.is_homogeneous
if reshuffle and is_hetero:
# Get the type IDs
orig_ntype = F.gather_row(sim_g.ndata[NTYPE], orig_nids)
Expand Down Expand Up @@ -275,7 +275,7 @@ def _set_trainer_ids(g, sim_g, node_parts):
node_parts : tensor
The node partition ID for each node in `sim_g`.
'''
if len(g.etypes) == 1:
if g.is_homogeneous:
g.ndata['trainer_id'] = node_parts
# An edge is assigned to a partition based on its destination node.
g.edata['trainer_id'] = F.gather_row(node_parts, g.edges()[1])
Expand Down Expand Up @@ -497,7 +497,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
... 'output/test.json', 0)
'''
def get_homogeneous(g, balance_ntypes):
if len(g.etypes) == 1:
if g.is_homogeneous:
sim_g = to_homogeneous(g)
if isinstance(balance_ntypes, dict):
assert len(balance_ntypes) == 1
Expand Down Expand Up @@ -612,7 +612,7 @@ def get_homogeneous(g, balance_ntypes):
# NTYPE: the node type.
# orig_id: the global node IDs in the homogeneous version of input graph.
# NID: the global node IDs in the reshuffled homogeneous version of the input graph.
if len(g.etypes) > 1:
if not g.is_homogeneous:
if reshuffle:
for name in parts:
orig_ids = parts[name].ndata['orig_id']
Expand Down Expand Up @@ -778,7 +778,7 @@ def get_homogeneous(g, balance_ntypes):
inner_edge_mask = _get_inner_edge_mask(part, etype_id)
# This is global edge IDs.
local_edges = F.boolean_mask(part.edata[edata_name], inner_edge_mask)
if len(g.etypes) > 1:
if not g.is_homogeneous:
local_edges = F.gather_row(sim_g.edata[EID], local_edges)
print('part {} has {} edges of type {} and {} are inside the partition'.format(
part_id, F.as_scalar(F.sum(part.edata[ETYPE] == etype_id, 0)),
Expand Down Expand Up @@ -813,7 +813,7 @@ def get_homogeneous(g, balance_ntypes):
else:
node_feats[ntype + '/' + name] = g.nodes[ntype].data[name]
for etype in g.etypes:
if reshuffle and len(g.etypes) > 1:
if reshuffle and not g.is_homogeneous:
edata_name = 'orig_id'
etype_id = g.get_etype_id(etype)
inner_edge_mask = _get_inner_edge_mask(part, etype_id)
Expand All @@ -831,7 +831,7 @@ def get_homogeneous(g, balance_ntypes):
else:
edge_feats[etype + '/' + name] = g.edges[etype].data[name]
# Some adjustment for heterogeneous graphs.
if len(g.etypes) > 1:
if not g.is_homogeneous:
part.ndata['orig_id'] = F.gather_row(sim_g.ndata[NID], part.ndata['orig_id'])
part.edata['orig_id'] = F.gather_row(sim_g.edata[EID], part.edata['orig_id'])

Expand Down
2 changes: 1 addition & 1 deletion python/update_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# current version
# We use the version of the incoming release for code
# that is under development
__version__ = "0.8.0"
__version__ = "0.8.0post1"
print(__version__)

# Implementations
Expand Down
21 changes: 21 additions & 0 deletions tests/distributed/test_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import unittest
import pickle
import random
import tempfile

def _get_inner_node_mask(graph, ntype_id):
if dgl.NTYPE in graph.ndata:
Expand Down Expand Up @@ -374,6 +375,24 @@ def check_partition(g, part_method, reshuffle, num_parts=4, num_trainers_per_mac
assert F.dtype(eid2pid) in (F.int32, F.int64)
assert np.all(F.asnumpy(eid2pid) == edge_map)

def check_hetero_partition_single_etype(num_trainers):
user_ids = np.arange(1000)
item_ids = np.arange(2000)
num_edges = 3 * 1000
src_ids = np.random.choice(user_ids, size=num_edges)
dst_ids = np.random.choice(item_ids, size=num_edges)
hg = dgl.heterograph({('user', 'like', 'item'): (src_ids, dst_ids)})

with tempfile.TemporaryDirectory() as test_dir:
orig_nids, orig_eids = partition_graph(
hg, 'test', 2, test_dir, num_trainers_per_machine=num_trainers, return_mapping=True)
assert len(orig_nids) == len(hg.ntypes)
assert len(orig_eids) == len(hg.etypes)
for ntype in hg.ntypes:
assert len(orig_nids[ntype]) == hg.number_of_nodes(ntype)
for etype in hg.etypes:
assert len(orig_eids[etype]) == hg.number_of_edges(etype)

@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
def test_partition():
g = create_random_graph(1000)
Expand All @@ -387,6 +406,8 @@ def test_partition():
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support some of operations in DistGraph")
def test_hetero_partition():
check_hetero_partition_single_etype(1)
check_hetero_partition_single_etype(4)
hg = create_random_hetero()
check_hetero_partition(hg, 'metis')
check_hetero_partition(hg, 'metis', 1, 8)
Expand Down

0 comments on commit 3800da2

Please sign in to comment.