Skip to content

Commit

Permalink
complete csr/csc tests for both sg/mg
Browse files Browse the repository at this point in the history
  • Loading branch information
alexbarghi-nv committed Sep 20, 2023
1 parent c770a17 commit b569563
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,6 @@ def _mg_call_plc_uniform_neighbor_sample(
if not isinstance(empty_df, (list, tuple)):
empty_df = [empty_df]

print('expected meta:', empty_df)

wait(result)

nout = 1
Expand Down Expand Up @@ -519,7 +517,7 @@ def uniform_neighbor_sample(
)
warnings.warn(warning_msg, FutureWarning)

if (not compress_per_hop) and prior_sources_behavior != 'exclude':
if (compression != 'COO') and (not compress_per_hop) and prior_sources_behavior != 'exclude':
raise ValueError(
'hop-agnostic compression is only supported with'
' the exclude prior sources behavior due to limitations '
Expand Down
2 changes: 1 addition & 1 deletion python/cugraph/cugraph/sampling/uniform_neighbor_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def uniform_neighbor_sample(
major_col_name = "majors"
minor_col_name = "minors"

if (not compress_per_hop) and prior_sources_behavior != 'exclude':
if (compression != 'COO') and (not compress_per_hop) and prior_sources_behavior != 'exclude':
raise ValueError(
'hop-agnostic compression is only supported with'
' the exclude prior sources behavior due to limitations '
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -894,14 +894,13 @@ def test_uniform_neighbor_sample_csr_csc_global(hops, seed):
@pytest.mark.sg
@pytest.mark.parametrize("seed", [62, 66, 68])
@pytest.mark.parametrize("hops", [[5], [5,5], [5,5,5]])
@pytest.mark.tags("runme")
def test_uniform_neighbor_sample_csr_csc_local(hops, seed):
el = email_Eu_core.get_edgelist(download=True)

G = cugraph.Graph(directed=True)
G.from_cudf_edgelist(el, source="src", destination="dst")

seeds = [49,71] # hardcoded to ensure out-degree is high enough
seeds = cudf.Series([49,71], dtype='int32') # hardcoded to ensure out-degree is high enough

sampling_results, offsets, renumber_map = cugraph.uniform_neighbor_sample(
G,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import pytest

import pandas
import cupy
import cudf
import cugraph
Expand Down Expand Up @@ -745,7 +746,6 @@ def test_uniform_neighbor_sample_batched(dask_client, dataset, input_df, max_bat


@pytest.mark.mg
@pytest.mark.tags("runme")
def test_uniform_neighbor_sample_exclude_sources_basic(dask_client):
df = dask_cudf.from_cudf(
cudf.DataFrame(
Expand Down Expand Up @@ -1006,7 +1006,6 @@ def test_uniform_neighbor_sample_renumber(dask_client, hops):

@pytest.mark.mg
@pytest.mark.parametrize("hops", [[5], [5, 5], [5, 5, 5]])
@pytest.mark.tags("runme")
def test_uniform_neighbor_sample_offset_renumber(dask_client, hops):
el = dask_cudf.from_cudf(email_Eu_core.get_edgelist(), npartitions=4)

Expand Down Expand Up @@ -1049,8 +1048,8 @@ def test_uniform_neighbor_sample_offset_renumber(dask_client, hops):
# can't use compute() since empty batches still get a partition
n_workers = len(dask_client.scheduler_info()["workers"])
for p in range(n_workers):
partition = sampling_results_renumbered.get_partition(p).compute()
if len(partition) > 0:
partition = offsets_renumbered.get_partition(p).compute()
if not pandas.isna(partition.batch_id.iloc[0]):
break

sampling_results_renumbered = sampling_results_renumbered.get_partition(p).compute()
Expand All @@ -1077,7 +1076,127 @@ def test_uniform_neighbor_sample_offset_renumber(dask_client, hops):

assert len(offsets_renumbered) == 2

# TODO add tests for (D)CSR/(D)CSC


@pytest.mark.mg
@pytest.mark.parametrize("hops", [[5], [5, 5], [5, 5, 5]])
@pytest.mark.parametrize("seed", [62, 66, 68])
def test_uniform_neighbor_sample_csr_csc_global(dask_client, hops, seed):
el = dask_cudf.from_cudf(email_Eu_core.get_edgelist(), npartitions=4)

G = cugraph.Graph(directed=True)
G.from_dask_cudf_edgelist(el, source="src", destination="dst")

seeds = G.select_random_vertices(seed, int(0.0001 * len(el)))

sampling_results, offsets, renumber_map = cugraph.dask.uniform_neighbor_sample(
G,
seeds,
hops,
with_replacement=False,
with_edge_properties=True,
with_batch_ids=False,
deduplicate_sources=True,
prior_sources_behavior='exclude', # carryover not valid because C++ sorts on (hop,src)
renumber=True,
return_offsets=True,
random_state=seed,
use_legacy_names=False,
compress_per_hop=False,
compression='CSR',
include_hop_column=False,
keep_batches_together=True,
min_batch_id=0,
max_batch_id=0,
)

# can't use compute() since empty batches still get a partition
n_workers = len(dask_client.scheduler_info()["workers"])
for p in range(n_workers):
partition = offsets.get_partition(p).compute()
if not pandas.isna(partition.batch_id.iloc[0]):
break

sampling_results = sampling_results.get_partition(p).compute()
offsets = offsets.get_partition(p).compute()
renumber_map = renumber_map.get_partition(p).compute()

major_offsets = sampling_results['major_offsets'].dropna().values
majors = cudf.Series(cupy.arange(len(major_offsets) - 1))
majors = majors.repeat(cupy.diff(major_offsets))

minors = sampling_results['minors'].dropna()
assert len(majors) == len(minors)

majors = renumber_map.map.iloc[majors]
minors = renumber_map.map.iloc[minors]

for i in range(len(majors)):
assert 1 == len(el[(el.src==majors.iloc[i]) & (el.dst==minors.iloc[i])])

@pytest.mark.mg
@pytest.mark.parametrize("seed", [62, 66, 68])
@pytest.mark.parametrize("hops", [[5], [5,5], [5,5,5]])
@pytest.mark.tags("runme")
def test_uniform_neighbor_sample_csr_csc_local(dask_client, hops, seed):
el = dask_cudf.from_cudf(email_Eu_core.get_edgelist(), npartitions=4)

G = cugraph.Graph(directed=True)
G.from_dask_cudf_edgelist(el, source="src", destination="dst")

seeds = dask_cudf.from_cudf(cudf.Series([49,71],dtype='int32'),npartitions=1) # hardcoded to ensure out-degree is high enough

sampling_results, offsets, renumber_map = cugraph.dask.uniform_neighbor_sample(
G,
seeds,
hops,
with_replacement=False,
with_edge_properties=True,
with_batch_ids=False,
deduplicate_sources=True,
prior_sources_behavior='carryover',
renumber=True,
return_offsets=True,
random_state=seed,
use_legacy_names=False,
compress_per_hop=True,
compression='CSR',
include_hop_column=False,
keep_batches_together=True,
min_batch_id=0,
max_batch_id=0,
)

# can't use compute() since empty batches still get a partition
n_workers = len(dask_client.scheduler_info()["workers"])
for p in range(n_workers):
partition = offsets.get_partition(p).compute()

if not pandas.isna(partition.batch_id.iloc[0]):
break

sampling_results = sampling_results.get_partition(p).compute()
offsets = offsets.get_partition(p).compute()
renumber_map = renumber_map.get_partition(p).compute()

print(sampling_results)
print(offsets)

for hop in range(len(hops)):
major_offsets = sampling_results['major_offsets'].iloc[
offsets.offsets.iloc[hop] : (offsets.offsets.iloc[hop+1] + 1)
]

minors = sampling_results['minors'].iloc[major_offsets.iloc[0]:major_offsets.iloc[-1]]

majors = cudf.Series(cupy.arange(len(major_offsets) - 1))
majors = majors.repeat(cupy.diff(major_offsets))

majors = renumber_map.map.iloc[majors]
minors = renumber_map.map.iloc[minors]

for i in range(len(majors)):
assert 1 == len(el[(el.src==majors.iloc[i]) & (el.dst==minors.iloc[i])])


# =============================================================================
Expand Down

0 comments on commit b569563

Please sign in to comment.