From 7beba4b7dc4eacf426a2e1336ca147f8b3ee0eb9 Mon Sep 17 00:00:00 2001 From: Alexandria Barghi Date: Fri, 22 Sep 2023 09:40:17 -0700 Subject: [PATCH] style --- cpp/include/cugraph_c/sampling_algorithms.h | 7 +- cpp/src/c_api/uniform_neighbor_sampling.cpp | 64 ++++----- .../sampling_post_processing_impl.cuh | 2 +- .../dask/sampling/uniform_neighbor_sample.py | 127 ++++++++++-------- .../cugraph/gnn/data_loading/bulk_sampler.py | 2 +- .../gnn/data_loading/bulk_sampler_io.py | 107 +++++++++------ .../cugraph/sampling/sampling_utilities.py | 99 ++++++++------ .../sampling/uniform_neighbor_sample.py | 46 ++++--- .../tests/sampling/test_bulk_sampler_io.py | 55 ++++---- .../tests/sampling/test_bulk_sampler_io_mg.py | 4 +- .../sampling/test_uniform_neighbor_sample.py | 76 +++++++---- .../test_uniform_neighbor_sample_mg.py | 73 ++++++---- 12 files changed, 387 insertions(+), 275 deletions(-) diff --git a/cpp/include/cugraph_c/sampling_algorithms.h b/cpp/include/cugraph_c/sampling_algorithms.h index a9a310db7a5..67fdfc6d946 100644 --- a/cpp/include/cugraph_c/sampling_algorithms.h +++ b/cpp/include/cugraph_c/sampling_algorithms.h @@ -238,7 +238,7 @@ void cugraph_sampling_set_renumber_results(cugraph_sampling_options_t* options, /** * @brief Set whether to compress per-hop (True) or globally (False) - * + * * @param options - opaque pointer to the sampling options * @param value - Boolean value to assign to the option */ @@ -262,11 +262,12 @@ void cugraph_sampling_set_return_hops(cugraph_sampling_options_t* options, bool_ /** * @brief Set compression type - * + * * @param options - opaque pointer to the sampling options * @param value - Enum defining the compresion type */ -void cugraph_sampling_set_compression_type(cugraph_sampling_options_t* options, cugraph_compression_type_t value); +void cugraph_sampling_set_compression_type(cugraph_sampling_options_t* options, + cugraph_compression_type_t value); /** * @brief Set prior sources behavior diff --git a/cpp/src/c_api/uniform_neighbor_sampling.cpp b/cpp/src/c_api/uniform_neighbor_sampling.cpp index 907dbada35f..075f79dd857 100644 --- a/cpp/src/c_api/uniform_neighbor_sampling.cpp +++ b/cpp/src/c_api/uniform_neighbor_sampling.cpp @@ -244,12 +244,12 @@ struct uniform_neighbor_sampling_functor : public cugraph::c_api::abstract_funct std::optional> renumber_map_offsets{std::nullopt}; bool src_is_major = (options_.compression_type_ == cugraph::compression_type_t::CSR) || - (options_.compression_type_ == cugraph::compression_type_t::DCSR); + (options_.compression_type_ == cugraph::compression_type_t::DCSR); if (options_.renumber_results_) { if (options_.compression_type_ == cugraph::compression_type_t::COO) { // COO - + rmm::device_uvector output_majors(0, handle_.get_stream()); rmm::device_uvector output_renumber_map(0, handle_.get_stream()); std::tie(output_majors, @@ -275,15 +275,15 @@ struct uniform_neighbor_sampling_functor : public cugraph::c_api::abstract_funct : std::nullopt, src_is_major, do_expensive_check_); - + majors.emplace(std::move(output_majors)); renumber_map.emplace(std::move(output_renumber_map)); } else { // (D)CSC, (D)CSR bool doubly_compress = - (options_.compression_type_ == cugraph::compression_type_t::DCSR) || - (options_.compression_type_ == cugraph::compression_type_t::DCSC); + (options_.compression_type_ == cugraph::compression_type_t::DCSR) || + (options_.compression_type_ == cugraph::compression_type_t::DCSC); rmm::device_uvector output_major_offsets(0, handle_.get_stream()); rmm::device_uvector output_renumber_map(0, handle_.get_stream()); @@ -335,18 +335,17 @@ struct uniform_neighbor_sampling_functor : public cugraph::c_api::abstract_funct edge_id ? std::move(edge_id) : std::nullopt, edge_type ? std::move(edge_type) : std::nullopt, hop ? std::make_optional(std::make_tuple(std::move(*hop), fan_out_->size_)) - : std::nullopt, + : std::nullopt, offsets ? std::make_optional(std::make_tuple( - raft::device_span{offsets->data(), offsets->size()}, - edge_label->size())) - : std::nullopt, + raft::device_span{offsets->data(), offsets->size()}, + edge_label->size())) + : std::nullopt, src_is_major, - do_expensive_check_ - ); + do_expensive_check_); majors.emplace(std::move(src)); minors = std::move(dst); - + hop.reset(); offsets.reset(); } @@ -367,9 +366,11 @@ struct uniform_neighbor_sampling_functor : public cugraph::c_api::abstract_funct : nullptr, (wgt) ? new cugraph::c_api::cugraph_type_erased_device_array_t(*wgt, graph_->weight_type_) : nullptr, - (hop) ? new cugraph::c_api::cugraph_type_erased_device_array_t(*hop, INT32) : nullptr, // FIXME get rid of this - (label_hop_offsets) ? new cugraph::c_api::cugraph_type_erased_device_array_t(*label_hop_offsets, SIZE_T) - : nullptr, + (hop) ? new cugraph::c_api::cugraph_type_erased_device_array_t(*hop, INT32) + : nullptr, // FIXME get rid of this + (label_hop_offsets) + ? new cugraph::c_api::cugraph_type_erased_device_array_t(*label_hop_offsets, SIZE_T) + : nullptr, (edge_label) ? new cugraph::c_api::cugraph_type_erased_device_array_t(edge_label.value(), INT32) : nullptr, @@ -406,7 +407,9 @@ extern "C" void cugraph_sampling_set_renumber_results(cugraph_sampling_options_t internal_pointer->renumber_results_ = value; } -extern "C" void cugraph_sampling_set_compress_per_hop(cugraph_sampling_options_t* options, bool_t value) { +extern "C" void cugraph_sampling_set_compress_per_hop(cugraph_sampling_options_t* options, + bool_t value) +{ auto internal_pointer = reinterpret_cast(options); internal_pointer->compress_per_hop_ = value; } @@ -424,26 +427,17 @@ extern "C" void cugraph_sampling_set_return_hops(cugraph_sampling_options_t* opt internal_pointer->return_hops_ = value; } -extern "C" void cugraph_sampling_set_compression_type(cugraph_sampling_options_t* options, cugraph_compression_type_t value) { +extern "C" void cugraph_sampling_set_compression_type(cugraph_sampling_options_t* options, + cugraph_compression_type_t value) +{ auto internal_pointer = reinterpret_cast(options); - switch(value) { - case COO: - internal_pointer->compression_type_ = cugraph::compression_type_t::COO; - break; - case CSR: - internal_pointer->compression_type_ = cugraph::compression_type_t::CSR; - break; - case CSC: - internal_pointer->compression_type_ = cugraph::compression_type_t::CSC; - break; - case DCSR: - internal_pointer->compression_type_ = cugraph::compression_type_t::DCSR; - break; - case DCSC: - internal_pointer->compression_type_ = cugraph::compression_type_t::DCSC; - break; - default: - CUGRAPH_FAIL("Invalid compression type"); + switch (value) { + case COO: internal_pointer->compression_type_ = cugraph::compression_type_t::COO; break; + case CSR: internal_pointer->compression_type_ = cugraph::compression_type_t::CSR; break; + case CSC: internal_pointer->compression_type_ = cugraph::compression_type_t::CSC; break; + case DCSR: internal_pointer->compression_type_ = cugraph::compression_type_t::DCSR; break; + case DCSC: internal_pointer->compression_type_ = cugraph::compression_type_t::DCSC; break; + default: CUGRAPH_FAIL("Invalid compression type"); } } diff --git a/cpp/src/sampling/sampling_post_processing_impl.cuh b/cpp/src/sampling/sampling_post_processing_impl.cuh index e8fecf47414..2e48d7598fa 100644 --- a/cpp/src/sampling/sampling_post_processing_impl.cuh +++ b/cpp/src/sampling/sampling_post_processing_impl.cuh @@ -171,7 +171,7 @@ void check_input_edges( "Invlaid input arguments: there should be 1 or more labels if " "edgelist_label_offsets.has_value() is true."); */ - + CUGRAPH_EXPECTS( !edgelist_label_offsets.has_value() || (std::get<0>(*edgelist_label_offsets).size() == std::get<1>(*edgelist_label_offsets) + 1), diff --git a/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py b/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py index 72d869243d2..fc2abea2a5c 100644 --- a/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py +++ b/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py @@ -41,7 +41,7 @@ if TYPE_CHECKING: from cugraph import Graph - + src_n = "sources" dst_n = "destinations" @@ -72,14 +72,20 @@ def create_empty_df(indices_t, weight_t): def create_empty_df_with_edge_props( - indices_t, weight_t, return_offsets=False, renumber=False, use_legacy_names=True, include_hop_column=True, compression='COO' + indices_t, + weight_t, + return_offsets=False, + renumber=False, + use_legacy_names=True, + include_hop_column=True, + compression="COO", ): - if compression != 'COO': - majors_name = 'major_offsets' + if compression != "COO": + majors_name = "major_offsets" else: - majors_name = (src_n if use_legacy_names else 'majors') + majors_name = src_n if use_legacy_names else "majors" - minors_name = (dst_n if use_legacy_names else 'minors') + minors_name = dst_n if use_legacy_names else "minors" if renumber: empty_df_renumber = cudf.DataFrame( @@ -131,6 +137,7 @@ def create_empty_df_with_edge_props( else: return df + def __get_label_to_output_comm_rank(min_batch_id, max_batch_id, n_workers): num_batches = max_batch_id - min_batch_id + 1 num_batches = int(num_batches) @@ -163,7 +170,7 @@ def _call_plc_uniform_neighbor_sample( use_legacy_names=True, include_hop_column=True, compress_per_hop=False, - compression='COO', + compression="COO", ): st_x = st_x[0] start_list_x = st_x[start_col_name] @@ -195,11 +202,14 @@ def _call_plc_uniform_neighbor_sample( renumber=renumber, compression=compression, compress_per_hop=compress_per_hop, - return_dict=True + return_dict=True, ) # have to import here due to circular import issue - from cugraph.sampling.sampling_utilities import sampling_results_from_cupy_array_dict + from cugraph.sampling.sampling_utilities import ( + sampling_results_from_cupy_array_dict, + ) + return sampling_results_from_cupy_array_dict( cupy_array_dict, weight_t, @@ -208,7 +218,7 @@ def _call_plc_uniform_neighbor_sample( return_offsets=return_offsets, renumber=renumber, use_legacy_names=use_legacy_names, - include_hop_column=include_hop_column + include_hop_column=include_hop_column, ) @@ -234,7 +244,7 @@ def _mg_call_plc_uniform_neighbor_sample( use_legacy_names=True, include_hop_column=True, compress_per_hop=False, - compression='COO', + compression="COO", ): n_workers = None if keep_batches_together: @@ -266,8 +276,8 @@ def _mg_call_plc_uniform_neighbor_sample( prior_sources_behavior=prior_sources_behavior, deduplicate_sources=deduplicate_sources, renumber=renumber, - use_legacy_names=use_legacy_names, # remove in 23.12 - include_hop_column=include_hop_column, # remove in 23.12 + use_legacy_names=use_legacy_names, # remove in 23.12 + include_hop_column=include_hop_column, # remove in 23.12 compress_per_hop=compress_per_hop, compression=compression, allow_other_workers=False, @@ -343,13 +353,13 @@ def uniform_neighbor_sample( random_state: int = None, return_offsets: bool = False, return_hops: bool = True, - include_hop_column: bool = True, # deprecated + include_hop_column: bool = True, # deprecated prior_sources_behavior: str = None, deduplicate_sources: bool = False, renumber: bool = False, - use_legacy_names=True, # deprecated + use_legacy_names=True, # deprecated compress_per_hop=False, - compression='COO', + compression="COO", _multiple_clients: bool = False, ) -> Union[dask_cudf.DataFrame, Tuple[dask_cudf.DataFrame, dask_cudf.DataFrame]]: """ @@ -403,7 +413,7 @@ def uniform_neighbor_sample( Whether to return the sampling results with hop ids corresponding to the hop where the edge appeared. Defaults to True. - + include_hop_column: bool, optional (default=True) Deprecated. Defaults to True. If True, will include the hop column even if @@ -427,7 +437,7 @@ def uniform_neighbor_sample( Whether to renumber on a per-batch basis. If True, will return the renumber map and renumber map offsets as an additional dataframe. - + use_legacy_names: bool, optional (default=True) Whether to use the legacy column names (sources, destinations). If True, will use "sources" and "destinations" as the column names. @@ -517,24 +527,27 @@ def uniform_neighbor_sample( ) warnings.warn(warning_msg, FutureWarning) - if (compression != 'COO') and (not compress_per_hop) and prior_sources_behavior != 'exclude': + if ( + (compression != "COO") + and (not compress_per_hop) + and prior_sources_behavior != "exclude" + ): raise ValueError( - 'hop-agnostic compression is only supported with' - ' the exclude prior sources behavior due to limitations ' - 'of the libcugraph C++ API' + "hop-agnostic compression is only supported with" + " the exclude prior sources behavior due to limitations " + "of the libcugraph C++ API" ) - - if compress_per_hop and prior_sources_behavior != 'carryover': + + if compress_per_hop and prior_sources_behavior != "carryover": raise ValueError( - 'Compressing the edgelist per hop is only supported ' - 'with the carryover prior sources behavior due to limitations' - ' of the libcugraph C++ API' + "Compressing the edgelist per hop is only supported " + "with the carryover prior sources behavior due to limitations" + " of the libcugraph C++ API" ) - - if include_hop_column and compression != 'COO': + + if include_hop_column and compression != "COO": raise ValueError( - 'Including the hop id column is only supported ' - 'with COO compression.' + "Including the hop id column is only supported " "with COO compression." ) if isinstance(start_list, int): @@ -626,28 +639,28 @@ def uniform_neighbor_sample( ddf = get_persisted_df_worker_map(ddf, client) sample_call_kwargs = { - 'client':client, - 'session_id':session_id, - 'input_graph':input_graph, - 'ddf':ddf, - 'keep_batches_together':keep_batches_together, - 'min_batch_id':min_batch_id, - 'max_batch_id':max_batch_id, - 'fanout_vals':fanout_vals, - 'with_replacement':with_replacement, - 'weight_t':weight_t, - 'indices_t':indices_t, - 'with_edge_properties':with_edge_properties, - 'random_state':random_state, - 'return_offsets':return_offsets, - 'return_hops':return_hops, - 'prior_sources_behavior':prior_sources_behavior, - 'deduplicate_sources':deduplicate_sources, - 'renumber':renumber, - 'use_legacy_names':use_legacy_names, - 'include_hop_column':include_hop_column, - 'compress_per_hop':compress_per_hop, - 'compression':compression, + "client": client, + "session_id": session_id, + "input_graph": input_graph, + "ddf": ddf, + "keep_batches_together": keep_batches_together, + "min_batch_id": min_batch_id, + "max_batch_id": max_batch_id, + "fanout_vals": fanout_vals, + "with_replacement": with_replacement, + "weight_t": weight_t, + "indices_t": indices_t, + "with_edge_properties": with_edge_properties, + "random_state": random_state, + "return_offsets": return_offsets, + "return_hops": return_hops, + "prior_sources_behavior": prior_sources_behavior, + "deduplicate_sources": deduplicate_sources, + "renumber": renumber, + "use_legacy_names": use_legacy_names, + "include_hop_column": include_hop_column, + "compress_per_hop": compress_per_hop, + "compression": compression, } if _multiple_clients: @@ -657,9 +670,7 @@ def uniform_neighbor_sample( lock = Lock("plc_graph_access") if lock.acquire(timeout=100): try: - ddf = _mg_call_plc_uniform_neighbor_sample( - **sample_call_kwargs - ) + ddf = _mg_call_plc_uniform_neighbor_sample(**sample_call_kwargs) finally: lock.release() else: @@ -667,9 +678,7 @@ def uniform_neighbor_sample( "Failed to acquire lock(plc_graph_access) while trying to sampling" ) else: - ddf = _mg_call_plc_uniform_neighbor_sample( - **sample_call_kwargs - ) + ddf = _mg_call_plc_uniform_neighbor_sample(**sample_call_kwargs) if return_offsets: if renumber: diff --git a/python/cugraph/cugraph/gnn/data_loading/bulk_sampler.py b/python/cugraph/cugraph/gnn/data_loading/bulk_sampler.py index 9497b28cd82..dbfcb124ce5 100644 --- a/python/cugraph/cugraph/gnn/data_loading/bulk_sampler.py +++ b/python/cugraph/cugraph/gnn/data_loading/bulk_sampler.py @@ -269,7 +269,7 @@ def flush(self) -> None: with_edge_properties=True, return_offsets=True, renumber=self.__renumber, - #use_legacy_names=False, + # use_legacy_names=False, ) if self.__renumber: diff --git a/python/cugraph/cugraph/gnn/data_loading/bulk_sampler_io.py b/python/cugraph/cugraph/gnn/data_loading/bulk_sampler_io.py index 3e8050c315f..741a7478b58 100644 --- a/python/cugraph/cugraph/gnn/data_loading/bulk_sampler_io.py +++ b/python/cugraph/cugraph/gnn/data_loading/bulk_sampler_io.py @@ -23,7 +23,7 @@ def create_df_from_disjoint_series(series_list: List[cudf.Series]): - series_list.sort(key=lambda s : len(s), reverse=True) + series_list.sort(key=lambda s: len(s), reverse=True) df = cudf.DataFrame() for s in series_list: @@ -31,6 +31,7 @@ def create_df_from_disjoint_series(series_list: List[cudf.Series]): return df + def _write_samples_to_parquet_csr( results: cudf.DataFrame, offsets: cudf.DataFrame, @@ -74,7 +75,7 @@ def _write_samples_to_parquet_csr( # Additional check to skip dummy partitions required for CSR format. if isna(offsets.batch_id.iloc[0]): - return cudf.Series(dtype='int64') + return cudf.Series(dtype="int64") # Output: # major_offsets - CSR/CSC row/col pointers @@ -99,76 +100,106 @@ def _write_samples_to_parquet_csr( renumber_map_offsets.dropna(inplace=True) major_offsets_array = results.major_offsets - results.drop(columns='major_offsets', inplace=True) + results.drop(columns="major_offsets", inplace=True) major_offsets_array.dropna(inplace=True) major_offsets_array = major_offsets_array.values minors_array = results.minors - results.drop(columns='minors', inplace=True) + results.drop(columns="minors", inplace=True) minors_array.dropna(inplace=True) minors_array = minors_array.values weight_array = results.weight - results.drop(columns='weight', inplace=True) + results.drop(columns="weight", inplace=True) weight_array.dropna(inplace=True) - weight_array = cupy.array([], dtype='float32') if weight_array.empty else weight_array.values + weight_array = ( + cupy.array([], dtype="float32") if weight_array.empty else weight_array.values + ) edge_id_array = results.edge_id - results.drop(columns='edge_id', inplace=True) + results.drop(columns="edge_id", inplace=True) edge_id_array.dropna(inplace=True) - edge_id_array = cupy.array([], dtype='int64') if edge_id_array.empty else edge_id_array.values + edge_id_array = ( + cupy.array([], dtype="int64") if edge_id_array.empty else edge_id_array.values + ) edge_type_array = results.edge_type - results.drop(columns='edge_type', inplace=True) + results.drop(columns="edge_type", inplace=True) edge_type_array.dropna(inplace=True) - edge_type_array = cupy.array([], dtype='int32') if edge_type_array.empty else edge_type_array.values + edge_type_array = ( + cupy.array([], dtype="int32") + if edge_type_array.empty + else edge_type_array.values + ) del results - + offsets_length = len(label_hop_offsets) - 1 if offsets_length % len(batch_ids) != 0: - raise ValueError('Invalid hop offsets') + raise ValueError("Invalid hop offsets") fanout_length = int(offsets_length / len(batch_ids)) - + for p in range(0, int(ceil(len(batch_ids) / batches_per_partition))): partition_start = p * (batches_per_partition) partition_end = (p + 1) * (batches_per_partition) - label_hop_offsets_current_partition = label_hop_offsets.iloc[partition_start * fanout_length : partition_end * fanout_length + 1].reset_index(drop=True) + label_hop_offsets_current_partition = label_hop_offsets.iloc[ + partition_start * fanout_length : partition_end * fanout_length + 1 + ].reset_index(drop=True) label_hop_offsets_current_partition.name = "label_hop_offsets" - batch_ids_current_partition = batch_ids.iloc[partition_start : partition_end] + batch_ids_current_partition = batch_ids.iloc[partition_start:partition_end] + + ( + major_offsets_start, + major_offsets_end, + ) = label_hop_offsets_current_partition.iloc[ + [0, -1] + ].values # legal since offsets has the 1 extra offset + results_start, results_end = major_offsets_array[ + [major_offsets_start, major_offsets_end] + ] # avoid d2h copy - major_offsets_start, major_offsets_end = label_hop_offsets_current_partition.iloc[[0, -1]].values # legal since offsets has the 1 extra offset - results_start, results_end = major_offsets_array[[major_offsets_start, major_offsets_end]] # avoid d2h copy - # no need to use end batch id, just ensure the batch is labeled correctly start_batch_id = batch_ids_current_partition.iloc[0] - #end_batch_id = batch_ids_current_partition.iloc[-1] + # end_batch_id = batch_ids_current_partition.iloc[-1] # create the renumber map offsets - renumber_map_offsets_current_partition = renumber_map_offsets.iloc[partition_start : partition_end + 1].reset_index(drop=True) + renumber_map_offsets_current_partition = renumber_map_offsets.iloc[ + partition_start : partition_end + 1 + ].reset_index(drop=True) renumber_map_offsets_current_partition.name = "renumber_map_offsets" - renumber_map_start, renumber_map_end = renumber_map_offsets_current_partition.iloc[[0, -1]].values # avoid d2h copy + ( + renumber_map_start, + renumber_map_end, + ) = renumber_map_offsets_current_partition.iloc[ + [0, -1] + ].values # avoid d2h copy results_current_partition = create_df_from_disjoint_series( [ - cudf.Series(minors_array[results_start : results_end], name='minors'), - cudf.Series(renumber_map.map.values[renumber_map_start : renumber_map_end], name='map'), + cudf.Series(minors_array[results_start:results_end], name="minors"), + cudf.Series( + renumber_map.map.values[renumber_map_start:renumber_map_end], + name="map", + ), label_hop_offsets_current_partition, - cudf.Series(major_offsets_array[results_start : results_end],name='major_offsets'), - cudf.Series(weight_array[results_start : results_end], name='weight'), - cudf.Series(edge_id_array[results_start : results_end], name='edge_id'), - cudf.Series(edge_type_array[results_start : results_end], name='edge_type'), + cudf.Series( + major_offsets_array[results_start:results_end], name="major_offsets" + ), + cudf.Series(weight_array[results_start:results_end], name="weight"), + cudf.Series(edge_id_array[results_start:results_end], name="edge_id"), + cudf.Series( + edge_type_array[results_start:results_end], name="edge_type" + ), renumber_map_offsets_current_partition, ] ) - filename = f'batch={start_batch_id}-{start_batch_id + len(batch_ids_current_partition) - 1}.parquet' - full_output_path = os.path.join( - output_path, filename - ) + end_batch_id = start_batch_id + len(batch_ids_current_partition) - 1 + filename = f"batch={start_batch_id}-{end_batch_id}.parquet" + full_output_path = os.path.join(output_path, filename) results_current_partition.to_parquet( full_output_path, compression=None, index=False, force_nullable_schema=True @@ -343,19 +374,19 @@ def write_samples( output_path: str The output path (where parquet files should be written to). """ - - if ('majors' in results) and ('minors' in results): + + if ("majors" in results) and ("minors" in results): write_fn = _write_samples_to_parquet_coo - + # TODO these names will be deprecated in release 23.12 - elif ('sources' in results) and ('destinations' in results): + elif ("sources" in results) and ("destinations" in results): write_fn = _write_samples_to_parquet_coo - elif ('major_offsets' in results and 'minors' in results): + elif "major_offsets" in results and "minors" in results: write_fn = _write_samples_to_parquet_csr - + else: - raise ValueError('invalid columns') + raise ValueError("invalid columns") if hasattr(results, "compute"): results.map_partitions( diff --git a/python/cugraph/cugraph/sampling/sampling_utilities.py b/python/cugraph/cugraph/sampling/sampling_utilities.py index edf69abd362..50c315129dc 100644 --- a/python/cugraph/cugraph/sampling/sampling_utilities.py +++ b/python/cugraph/cugraph/sampling/sampling_utilities.py @@ -16,7 +16,16 @@ import warnings -def sampling_results_from_cupy_array_dict(cupy_array_dict, weight_t, num_hops, with_edge_properties=False, return_offsets=False, renumber=False, use_legacy_names=True,include_hop_column=True, + +def sampling_results_from_cupy_array_dict( + cupy_array_dict, + weight_t, + num_hops, + with_edge_properties=False, + return_offsets=False, + renumber=False, + use_legacy_names=True, + include_hop_column=True, ): """ Creates a cudf DataFrame from cupy arrays from pylibcugraph wrapper @@ -39,15 +48,15 @@ def sampling_results_from_cupy_array_dict(cupy_array_dict, weight_t, num_hops, w minor_col_name = "minors" if with_edge_properties: - majors = cupy_array_dict['majors'] + majors = cupy_array_dict["majors"] if majors is not None: - results_df['majors'] = majors + results_df["majors"] = majors results_df_cols = [ - 'minors', - 'weight', - 'edge_id', - 'edge_type', + "minors", + "weight", + "edge_id", + "edge_type", ] for col in results_df_cols: @@ -55,25 +64,29 @@ def sampling_results_from_cupy_array_dict(cupy_array_dict, weight_t, num_hops, w # The length of each of these arrays should be the same results_df[col] = array - results_df.rename(columns={'majors':major_col_name, 'minors':minor_col_name},inplace=True) + results_df.rename( + columns={"majors": major_col_name, "minors": minor_col_name}, inplace=True + ) - label_hop_offsets = cupy_array_dict['label_hop_offsets'] - batch_ids = cupy_array_dict['batch_id'] + label_hop_offsets = cupy_array_dict["label_hop_offsets"] + batch_ids = cupy_array_dict["batch_id"] if renumber: - renumber_df = cudf.DataFrame({ - 'map': cupy_array_dict['renumber_map'], - }) + renumber_df = cudf.DataFrame( + { + "map": cupy_array_dict["renumber_map"], + } + ) if not return_offsets: if len(batch_ids) > 0: batch_ids_r = cudf.Series(batch_ids).repeat( - cupy.diff(cupy_array_dict['renumber_map_offsets']) + cupy.diff(cupy_array_dict["renumber_map_offsets"]) ) batch_ids_r.reset_index(drop=True, inplace=True) renumber_df["batch_id"] = batch_ids_r else: - renumber_df['batch_id'] = None + renumber_df["batch_id"] = None if return_offsets: batches_series = cudf.Series( @@ -83,8 +96,8 @@ def sampling_results_from_cupy_array_dict(cupy_array_dict, weight_t, num_hops, w if include_hop_column: # TODO remove this logic in release 23.12 offsets_df = cudf.Series( - label_hop_offsets[cupy.arange(len(batch_ids)+1) * num_hops], - name='offsets', + label_hop_offsets[cupy.arange(len(batch_ids) + 1) * num_hops], + name="offsets", ).to_frame() else: offsets_df = cudf.Series( @@ -94,65 +107,73 @@ def sampling_results_from_cupy_array_dict(cupy_array_dict, weight_t, num_hops, w if len(batches_series) > len(offsets_df): # this is extremely rare so the inefficiency is ok - offsets_df = offsets_df.join(batches_series, how='outer').sort_index() + offsets_df = offsets_df.join(batches_series, how="outer").sort_index() else: - offsets_df['batch_id'] = batches_series + offsets_df["batch_id"] = batches_series if renumber: renumber_offset_series = cudf.Series( - cupy_array_dict['renumber_map_offsets'], - name="renumber_map_offsets" + cupy_array_dict["renumber_map_offsets"], name="renumber_map_offsets" ) if len(renumber_offset_series) > len(offsets_df): # this is extremely rare so the inefficiency is ok - offsets_df = offsets_df.join(renumber_offset_series, how='outer').sort_index() + offsets_df = offsets_df.join( + renumber_offset_series, how="outer" + ).sort_index() else: - offsets_df['renumber_map_offsets'] = renumber_offset_series - + offsets_df["renumber_map_offsets"] = renumber_offset_series else: if len(batch_ids) > 0: batch_ids_r = cudf.Series(cupy.repeat(batch_ids, num_hops)) - batch_ids_r = cudf.Series(batch_ids_r).repeat(cupy.diff(label_hop_offsets)) + batch_ids_r = cudf.Series(batch_ids_r).repeat( + cupy.diff(label_hop_offsets) + ) batch_ids_r.reset_index(drop=True, inplace=True) results_df["batch_id"] = batch_ids_r else: - results_df['batch_id'] = None - + results_df["batch_id"] = None + # TODO remove this logic in release 23.12, hops will always returned as offsets if include_hop_column: if len(batch_ids) > 0: hop_ids_r = cudf.Series(cupy.arange(num_hops)) - hop_ids_r = cudf.concat([hop_ids_r] * len(batch_ids),ignore_index=True) + hop_ids_r = cudf.concat([hop_ids_r] * len(batch_ids), ignore_index=True) # generate the hop column - hop_ids_r = cudf.Series(hop_ids_r, name='hop_id').repeat( - cupy.diff(label_hop_offsets) - ).reset_index(drop=True) + hop_ids_r = ( + cudf.Series(hop_ids_r, name="hop_id") + .repeat(cupy.diff(label_hop_offsets)) + .reset_index(drop=True) + ) else: - hop_ids_r = cudf.Series(name='hop_id', dtype='int32') + hop_ids_r = cudf.Series(name="hop_id", dtype="int32") - results_df = results_df.join(hop_ids_r, how='outer').sort_index() + results_df = results_df.join(hop_ids_r, how="outer").sort_index() if major_col_name not in results_df: if use_legacy_names: raise ValueError("Can't use legacy names with major offsets") - major_offsets_series = cudf.Series(cupy_array_dict['major_offsets'], name='major_offsets') + major_offsets_series = cudf.Series( + cupy_array_dict["major_offsets"], name="major_offsets" + ) if len(major_offsets_series) > len(results_df): # this is extremely rare so the inefficiency is ok - results_df = results_df.join(major_offsets_series, how='outer').sort_index() + results_df = results_df.join( + major_offsets_series, how="outer" + ).sort_index() else: - results_df['major_offsets'] = major_offsets_series + results_df["major_offsets"] = major_offsets_series else: # TODO this is deprecated, remove it in 23.12 - results_df[major_col_name] = cupy_array_dict['sources'] - results_df[minor_col_name] = cupy_array_dict['destinations'] - indices = cupy_array_dict['indices'] + results_df[major_col_name] = cupy_array_dict["sources"] + results_df[minor_col_name] = cupy_array_dict["destinations"] + indices = cupy_array_dict["indices"] if indices is None: results_df["indices"] = None diff --git a/python/cugraph/cugraph/sampling/uniform_neighbor_sample.py b/python/cugraph/cugraph/sampling/uniform_neighbor_sample.py index c9741cd1c5e..3b15e1d6050 100644 --- a/python/cugraph/cugraph/sampling/uniform_neighbor_sample.py +++ b/python/cugraph/cugraph/sampling/uniform_neighbor_sample.py @@ -67,13 +67,13 @@ def uniform_neighbor_sample( random_state: int = None, return_offsets: bool = False, return_hops: bool = True, - include_hop_column: bool = True, # deprecated + include_hop_column: bool = True, # deprecated prior_sources_behavior: str = None, deduplicate_sources: bool = False, renumber: bool = False, - use_legacy_names=True, # deprecated + use_legacy_names=True, # deprecated compress_per_hop=False, - compression='COO', + compression="COO", ) -> Union[cudf.DataFrame, Tuple[cudf.DataFrame, cudf.DataFrame]]: """ Does neighborhood sampling, which samples nodes from a graph based on the @@ -117,7 +117,7 @@ def uniform_neighbor_sample( Whether to return the sampling results with hop ids corresponding to the hop where the edge appeared. Defaults to True. - + include_hop_column: bool, optional (default=True) Deprecated. Defaults to True. If True, will include the hop column even if @@ -141,14 +141,14 @@ def uniform_neighbor_sample( Whether to renumber on a per-batch basis. If True, will return the renumber map and renumber map offsets as an additional dataframe. - + use_legacy_names: bool, optional (default=True) Whether to use the legacy column names (sources, destinations). If True, will use "sources" and "destinations" as the column names. If False, will use "majors" and "minors" as the column names. Deprecated. Will be removed in release 23.12 in favor of always using the new names "majors" and "minors". - + compress_per_hop: bool, optional (default=False) Whether to compress globally (default), or to produce a separate compressed edgelist per hop. @@ -236,24 +236,27 @@ def uniform_neighbor_sample( major_col_name = "majors" minor_col_name = "minors" - if (compression != 'COO') and (not compress_per_hop) and prior_sources_behavior != 'exclude': + if ( + (compression != "COO") + and (not compress_per_hop) + and prior_sources_behavior != "exclude" + ): raise ValueError( - 'hop-agnostic compression is only supported with' - ' the exclude prior sources behavior due to limitations ' - 'of the libcugraph C++ API' + "hop-agnostic compression is only supported with" + " the exclude prior sources behavior due to limitations " + "of the libcugraph C++ API" ) - - if compress_per_hop and prior_sources_behavior != 'carryover': + + if compress_per_hop and prior_sources_behavior != "carryover": raise ValueError( - 'Compressing the edgelist per hop is only supported ' - 'with the carryover prior sources behavior due to limitations' - ' of the libcugraph C++ API' + "Compressing the edgelist per hop is only supported " + "with the carryover prior sources behavior due to limitations" + " of the libcugraph C++ API" ) - - if include_hop_column and compression != 'COO': + + if include_hop_column and compression != "COO": raise ValueError( - 'Including the hop id column is only supported ' - 'with COO compression.' + "Including the hop id column is only supported " "with COO compression." ) if with_edge_properties: @@ -319,7 +322,6 @@ def uniform_neighbor_sample( start_list = G.lookup_internal_vertex_id(start_list, columns) start_list = start_list.rename(columns={columns[0]: start_col_name}) - sampling_result_array_dict = pylibcugraph_uniform_neighbor_sample( resource_handle=ResourceHandle(), input_graph=G._plc_graph, @@ -349,7 +351,7 @@ def uniform_neighbor_sample( return_offsets=return_offsets, renumber=renumber, use_legacy_names=use_legacy_names, - include_hop_column=include_hop_column + include_hop_column=include_hop_column, ) if G.renumbered and not renumber: @@ -358,5 +360,5 @@ def uniform_neighbor_sample( if len(dfs) > 1: return dfs - + return dfs[0] diff --git a/python/cugraph/cugraph/tests/sampling/test_bulk_sampler_io.py b/python/cugraph/cugraph/tests/sampling/test_bulk_sampler_io.py index f8ba624b264..5eafe89ea83 100644 --- a/python/cugraph/cugraph/tests/sampling/test_bulk_sampler_io.py +++ b/python/cugraph/cugraph/tests/sampling/test_bulk_sampler_io.py @@ -144,7 +144,9 @@ def test_bulk_sampler_io_empty_batch(scratch_dir): assert len(results) == 20 # some batches are missing - offsets = cudf.DataFrame({"offsets": [0, 8, 12, 16, 20], "batch_id": [0, 3, 4, 10, None]}) + offsets = cudf.DataFrame( + {"offsets": [0, 8, 12, 16, 20], "batch_id": [0, 3, 4, 10, None]} + ) samples_path = os.path.join(scratch_dir, "test_bulk_sampler_io_empty_batch") create_directory_with_overwrite(samples_path) @@ -180,40 +182,43 @@ def test_bulk_sampler_io_mock_csr(scratch_dir): renumber_map_offsets = cudf.Series([0, 10]) results_df = cudf.DataFrame() - results_df['minors'] = minors_array - results_df['major_offsets'] = major_offsets_array - results_df['edge_id'] = edge_ids - results_df['edge_type'] = None - results_df['weight'] = None + results_df["minors"] = minors_array + results_df["major_offsets"] = major_offsets_array + results_df["edge_id"] = edge_ids + results_df["edge_type"] = None + results_df["weight"] = None offsets_df = cudf.DataFrame() - offsets_df['offsets'] = label_hop_offsets - offsets_df['renumber_map_offsets'] = renumber_map_offsets - offsets_df['batch_id'] = cudf.Series([0]) + offsets_df["offsets"] = label_hop_offsets + offsets_df["renumber_map_offsets"] = renumber_map_offsets + offsets_df["batch_id"] = cudf.Series([0]) renumber_df = cudf.DataFrame() - renumber_df['map'] = renumber_map + renumber_df["map"] = renumber_map samples_path = os.path.join(scratch_dir, "test_bulk_sampler_io_mock_csr") create_directory_with_overwrite(samples_path) - write_samples( - results_df, - offsets_df, - renumber_df, - 1, - samples_path - ) + write_samples(results_df, offsets_df, renumber_df, 1, samples_path) - result = cudf.read_parquet( - os.path.join(samples_path, 'batch=0-0.parquet') - ) + result = cudf.read_parquet(os.path.join(samples_path, "batch=0-0.parquet")) - assert result.minors.dropna().values_host.tolist() == minors_array.values_host.tolist() - assert result.major_offsets.dropna().values_host.tolist() == major_offsets_array.values_host.tolist() + assert ( + result.minors.dropna().values_host.tolist() == minors_array.values_host.tolist() + ) + assert ( + result.major_offsets.dropna().values_host.tolist() + == major_offsets_array.values_host.tolist() + ) assert result.edge_id.dropna().values_host.tolist() == edge_ids.values_host.tolist() - assert result.renumber_map_offsets.dropna().values_host.tolist() == renumber_map_offsets.values_host.tolist() + assert ( + result.renumber_map_offsets.dropna().values_host.tolist() + == renumber_map_offsets.values_host.tolist() + ) assert result.map.dropna().values_host.tolist() == renumber_map.values_host.tolist() - assert result.label_hop_offsets.dropna().values_host.tolist() == label_hop_offsets.values_host.tolist() + assert ( + result.label_hop_offsets.dropna().values_host.tolist() + == label_hop_offsets.values_host.tolist() + ) - shutil.rmtree(samples_path) \ No newline at end of file + shutil.rmtree(samples_path) diff --git a/python/cugraph/cugraph/tests/sampling/test_bulk_sampler_io_mg.py b/python/cugraph/cugraph/tests/sampling/test_bulk_sampler_io_mg.py index ca0b4a7ae35..638cccbdcaa 100644 --- a/python/cugraph/cugraph/tests/sampling/test_bulk_sampler_io_mg.py +++ b/python/cugraph/cugraph/tests/sampling/test_bulk_sampler_io_mg.py @@ -153,7 +153,9 @@ def test_bulk_sampler_io_empty_batch(scratch_dir): ) # some batches are missing - offsets = cudf.DataFrame({"offsets": [0, 8, 12, 0, 4, 8], "batch_id": [0, 3, None, 4, 10, None]}) + offsets = cudf.DataFrame( + {"offsets": [0, 8, 12, 0, 4, 8], "batch_id": [0, 3, None, 4, 10, None]} + ) offsets = dask_cudf.from_cudf(offsets, npartitions=1).repartition( divisions=[0, 3, 5] ) diff --git a/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample.py b/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample.py index 5edb8fb2e95..206898088ab 100644 --- a/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample.py +++ b/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample.py @@ -256,10 +256,7 @@ def test_uniform_neighbor_sample_tree(directed): fanout_vals = [4, 1, 3] with_replacement = True result_nbr = uniform_neighbor_sample( - G, - start_list, - fanout_vals, - with_replacement=with_replacement + G, start_list, fanout_vals, with_replacement=with_replacement ) result_nbr = result_nbr.drop_duplicates() @@ -344,7 +341,7 @@ def test_uniform_neighbor_sample_edge_properties(return_offsets, include_hop_col with_edge_properties=True, with_batch_ids=True, return_offsets=return_offsets, - include_hop_column=include_hop_column + include_hop_column=include_hop_column, ) if return_offsets: sampling_results, sampling_offsets = sampling_results @@ -368,16 +365,28 @@ def test_uniform_neighbor_sample_edge_properties(return_offsets, include_hop_col ) if include_hop_column: - assert sampling_results["hop_id"].values_host.tolist() == ([0, 0, 1, 1, 1, 1] * 2) + assert sampling_results["hop_id"].values_host.tolist() == ( + [0, 0, 1, 1, 1, 1] * 2 + ) else: - assert 'hop_id' not in sampling_results + assert "hop_id" not in sampling_results if return_offsets: assert sampling_offsets["batch_id"].dropna().values_host.tolist() == [0, 1] if include_hop_column: - assert sampling_offsets["offsets"].dropna().values_host.tolist() == [0, 6, 12] + assert sampling_offsets["offsets"].dropna().values_host.tolist() == [ + 0, + 6, + 12, + ] else: - assert sampling_offsets["offsets"].dropna().values_host.tolist() == [0, 2, 6, 8, 12] + assert sampling_offsets["offsets"].dropna().values_host.tolist() == [ + 0, + 2, + 6, + 8, + 12, + ] else: assert sampling_results["batch_id"].values_host.tolist() == ([0] * 6 + [1] * 6) @@ -802,7 +811,10 @@ def test_uniform_neighbor_sample_offset_renumber(hops): seeds = G.select_random_vertices(62, int(0.0001 * len(el))) - sampling_results_unrenumbered, offsets_unrenumbered = cugraph.uniform_neighbor_sample( + ( + sampling_results_unrenumbered, + offsets_unrenumbered, + ) = cugraph.uniform_neighbor_sample( G, seeds, hops, @@ -815,7 +827,11 @@ def test_uniform_neighbor_sample_offset_renumber(hops): random_state=62, ) - sampling_results_renumbered, offsets_renumbered, renumber_map = cugraph.uniform_neighbor_sample( + ( + sampling_results_renumbered, + offsets_renumbered, + renumber_map, + ) = cugraph.uniform_neighbor_sample( G, seeds, hops, @@ -840,7 +856,7 @@ def test_uniform_neighbor_sample_offset_renumber(hops): assert sorted(expected_renumber_map.values_host.tolist()) == sorted( renumber_map.map[0 : len(expected_renumber_map)].values_host.tolist() ) - + renumber_map_offsets = offsets_renumbered.renumber_map_offsets.dropna() assert len(renumber_map_offsets) == 2 assert renumber_map_offsets.iloc[0] == 0 @@ -868,39 +884,43 @@ def test_uniform_neighbor_sample_csr_csc_global(hops, seed): with_edge_properties=True, with_batch_ids=False, deduplicate_sources=True, - prior_sources_behavior='exclude', # carryover not valid because C++ sorts on (hop,src) + # carryover not valid because C++ sorts on (hop,src) + prior_sources_behavior="exclude", renumber=True, return_offsets=True, random_state=seed, use_legacy_names=False, compress_per_hop=False, - compression='CSR', + compression="CSR", include_hop_column=False, ) - major_offsets = sampling_results['major_offsets'].dropna().values + major_offsets = sampling_results["major_offsets"].dropna().values majors = cudf.Series(cupy.arange(len(major_offsets) - 1)) majors = majors.repeat(cupy.diff(major_offsets)) - - minors = sampling_results['minors'].dropna() + + minors = sampling_results["minors"].dropna() assert len(majors) == len(minors) majors = renumber_map.map.iloc[majors] minors = renumber_map.map.iloc[minors] for i in range(len(majors)): - assert 1 == len(el[(el.src==majors.iloc[i]) & (el.dst==minors.iloc[i])]) + assert 1 == len(el[(el.src == majors.iloc[i]) & (el.dst == minors.iloc[i])]) + @pytest.mark.sg @pytest.mark.parametrize("seed", [62, 66, 68]) -@pytest.mark.parametrize("hops", [[5], [5,5], [5,5,5]]) +@pytest.mark.parametrize("hops", [[5], [5, 5], [5, 5, 5]]) def test_uniform_neighbor_sample_csr_csc_local(hops, seed): el = email_Eu_core.get_edgelist(download=True) G = cugraph.Graph(directed=True) G.from_cudf_edgelist(el, source="src", destination="dst") - seeds = cudf.Series([49,71], dtype='int32') # hardcoded to ensure out-degree is high enough + seeds = cudf.Series( + [49, 71], dtype="int32" + ) # hardcoded to ensure out-degree is high enough sampling_results, offsets, renumber_map = cugraph.uniform_neighbor_sample( G, @@ -910,22 +930,24 @@ def test_uniform_neighbor_sample_csr_csc_local(hops, seed): with_edge_properties=True, with_batch_ids=False, deduplicate_sources=True, - prior_sources_behavior='carryover', + prior_sources_behavior="carryover", renumber=True, return_offsets=True, random_state=seed, use_legacy_names=False, compress_per_hop=True, - compression='CSR', + compression="CSR", include_hop_column=False, ) for hop in range(len(hops)): - major_offsets = sampling_results['major_offsets'].iloc[ - offsets.offsets.iloc[hop] : (offsets.offsets.iloc[hop+1] + 1) + major_offsets = sampling_results["major_offsets"].iloc[ + offsets.offsets.iloc[hop] : (offsets.offsets.iloc[hop + 1] + 1) ] - minors = sampling_results['minors'].iloc[major_offsets.iloc[0]:major_offsets.iloc[-1]] + minors = sampling_results["minors"].iloc[ + major_offsets.iloc[0] : major_offsets.iloc[-1] + ] majors = cudf.Series(cupy.arange(len(major_offsets) - 1)) majors = majors.repeat(cupy.diff(major_offsets)) @@ -934,7 +956,7 @@ def test_uniform_neighbor_sample_csr_csc_local(hops, seed): minors = renumber_map.map.iloc[minors] for i in range(len(majors)): - assert 1 == len(el[(el.src==majors.iloc[i]) & (el.dst==minors.iloc[i])]) + assert 1 == len(el[(el.src == majors.iloc[i]) & (el.dst == minors.iloc[i])]) @pytest.mark.sg @@ -942,11 +964,13 @@ def test_uniform_neighbor_sample_csr_csc_local(hops, seed): def test_uniform_neighbor_sample_dcsr_dcsc_global(): raise NotImplementedError + @pytest.mark.sg @pytest.mark.skip(reason="needs to be written!") def test_uniform_neighbor_sample_dcsr_dcsc_local(): raise NotImplementedError + @pytest.mark.sg @pytest.mark.skip(reason="needs to be written!") def test_multi_client_sampling(): diff --git a/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample_mg.py b/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample_mg.py index 6cecf5c6e9c..feff4fd3576 100644 --- a/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample_mg.py +++ b/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample_mg.py @@ -229,7 +229,9 @@ def test_mg_uniform_neighbor_sample_tree(dask_client, directed): start_list = cudf.Series([0, 0], dtype="int32") fanout_vals = [4, 1, 3] with_replacement = True - result_nbr = uniform_neighbor_sample(G, start_list, fanout_vals, with_replacement=with_replacement) + result_nbr = uniform_neighbor_sample( + G, start_list, fanout_vals, with_replacement=with_replacement + ) result_nbr = result_nbr.drop_duplicates() @@ -388,7 +390,10 @@ def test_uniform_neighbor_sample_edge_properties(dask_client, return_offsets): batches_found[1] += 1 assert offsets_p.batch_id.dropna().values_host.tolist() == [1] - assert offsets_p.offsets.dropna().values_host.tolist() == [0, len(dfp)] + assert offsets_p.offsets.dropna().values_host.tolist() == [ + 0, + len(dfp), + ] assert sorted(dfp.sources.values_host.tolist()) == ( [1, 1, 3, 3, 4, 4] @@ -400,7 +405,10 @@ def test_uniform_neighbor_sample_edge_properties(dask_client, return_offsets): batches_found[0] += 1 assert offsets_p.batch_id.dropna().values_host.tolist() == [0] - assert offsets_p.offsets.dropna().values_host.tolist() == [0, len(dfp)] + assert offsets_p.offsets.dropna().values_host.tolist() == [ + 0, + len(dfp), + ] assert sorted(dfp.sources.values_host.tolist()) == ( [0, 0, 0, 1, 1, 2, 2, 2, 4, 4] @@ -1004,6 +1012,7 @@ def test_uniform_neighbor_sample_renumber(dask_client, hops): ).nunique() ) + @pytest.mark.mg @pytest.mark.parametrize("hops", [[5], [5, 5], [5, 5, 5]]) def test_uniform_neighbor_sample_offset_renumber(dask_client, hops): @@ -1014,7 +1023,10 @@ def test_uniform_neighbor_sample_offset_renumber(dask_client, hops): seeds = G.select_random_vertices(62, int(0.0001 * len(el))) - sampling_results_unrenumbered, offsets_unrenumbered = cugraph.dask.uniform_neighbor_sample( + ( + sampling_results_unrenumbered, + offsets_unrenumbered, + ) = cugraph.dask.uniform_neighbor_sample( G, seeds, hops, @@ -1029,7 +1041,11 @@ def test_uniform_neighbor_sample_offset_renumber(dask_client, hops): sampling_results_unrenumbered = sampling_results_unrenumbered.compute() offsets_unrenumbered = offsets_unrenumbered.compute() - sampling_results_renumbered, offsets_renumbered, renumber_map = cugraph.dask.uniform_neighbor_sample( + ( + sampling_results_renumbered, + offsets_renumbered, + renumber_map, + ) = cugraph.dask.uniform_neighbor_sample( G, seeds, hops, @@ -1051,7 +1067,7 @@ def test_uniform_neighbor_sample_offset_renumber(dask_client, hops): partition = offsets_renumbered.get_partition(p).compute() if not pandas.isna(partition.batch_id.iloc[0]): break - + sampling_results_renumbered = sampling_results_renumbered.get_partition(p).compute() offsets_renumbered = offsets_renumbered.get_partition(p).compute() renumber_map = renumber_map.get_partition(p).compute() @@ -1068,7 +1084,7 @@ def test_uniform_neighbor_sample_offset_renumber(dask_client, hops): assert sorted(expected_renumber_map.values_host.tolist()) == sorted( renumber_map.map[0 : len(expected_renumber_map)].values_host.tolist() ) - + renumber_map_offsets = offsets_renumbered.renumber_map_offsets.dropna() assert len(renumber_map_offsets) == 2 assert renumber_map_offsets.iloc[0] == 0 @@ -1077,7 +1093,6 @@ def test_uniform_neighbor_sample_offset_renumber(dask_client, hops): assert len(offsets_renumbered) == 2 - @pytest.mark.mg @pytest.mark.parametrize("hops", [[5], [5, 5], [5, 5, 5]]) @pytest.mark.parametrize("seed", [62, 66, 68]) @@ -1097,13 +1112,14 @@ def test_uniform_neighbor_sample_csr_csc_global(dask_client, hops, seed): with_edge_properties=True, with_batch_ids=False, deduplicate_sources=True, - prior_sources_behavior='exclude', # carryover not valid because C++ sorts on (hop,src) + # carryover not valid because C++ sorts on (hop,src) + prior_sources_behavior="exclude", renumber=True, return_offsets=True, random_state=seed, use_legacy_names=False, compress_per_hop=False, - compression='CSR', + compression="CSR", include_hop_column=False, keep_batches_together=True, min_batch_id=0, @@ -1116,27 +1132,28 @@ def test_uniform_neighbor_sample_csr_csc_global(dask_client, hops, seed): partition = offsets.get_partition(p).compute() if not pandas.isna(partition.batch_id.iloc[0]): break - + sampling_results = sampling_results.get_partition(p).compute() offsets = offsets.get_partition(p).compute() renumber_map = renumber_map.get_partition(p).compute() - major_offsets = sampling_results['major_offsets'].dropna().values + major_offsets = sampling_results["major_offsets"].dropna().values majors = cudf.Series(cupy.arange(len(major_offsets) - 1)) majors = majors.repeat(cupy.diff(major_offsets)) - - minors = sampling_results['minors'].dropna() + + minors = sampling_results["minors"].dropna() assert len(majors) == len(minors) majors = renumber_map.map.iloc[majors] minors = renumber_map.map.iloc[minors] for i in range(len(majors)): - assert 1 == len(el[(el.src==majors.iloc[i]) & (el.dst==minors.iloc[i])]) + assert 1 == len(el[(el.src == majors.iloc[i]) & (el.dst == minors.iloc[i])]) + @pytest.mark.mg @pytest.mark.parametrize("seed", [62, 66, 68]) -@pytest.mark.parametrize("hops", [[5], [5,5], [5,5,5]]) +@pytest.mark.parametrize("hops", [[5], [5, 5], [5, 5, 5]]) @pytest.mark.tags("runme") def test_uniform_neighbor_sample_csr_csc_local(dask_client, hops, seed): el = dask_cudf.from_cudf(email_Eu_core.get_edgelist(), npartitions=4) @@ -1144,7 +1161,9 @@ def test_uniform_neighbor_sample_csr_csc_local(dask_client, hops, seed): G = cugraph.Graph(directed=True) G.from_dask_cudf_edgelist(el, source="src", destination="dst") - seeds = dask_cudf.from_cudf(cudf.Series([49,71],dtype='int32'),npartitions=1) # hardcoded to ensure out-degree is high enough + seeds = dask_cudf.from_cudf( + cudf.Series([49, 71], dtype="int32"), npartitions=1 + ) # hardcoded to ensure out-degree is high enough sampling_results, offsets, renumber_map = cugraph.dask.uniform_neighbor_sample( G, @@ -1154,13 +1173,13 @@ def test_uniform_neighbor_sample_csr_csc_local(dask_client, hops, seed): with_edge_properties=True, with_batch_ids=False, deduplicate_sources=True, - prior_sources_behavior='carryover', + prior_sources_behavior="carryover", renumber=True, return_offsets=True, random_state=seed, use_legacy_names=False, compress_per_hop=True, - compression='CSR', + compression="CSR", include_hop_column=False, keep_batches_together=True, min_batch_id=0, @@ -1171,10 +1190,10 @@ def test_uniform_neighbor_sample_csr_csc_local(dask_client, hops, seed): n_workers = len(dask_client.scheduler_info()["workers"]) for p in range(n_workers): partition = offsets.get_partition(p).compute() - + if not pandas.isna(partition.batch_id.iloc[0]): break - + sampling_results = sampling_results.get_partition(p).compute() offsets = offsets.get_partition(p).compute() renumber_map = renumber_map.get_partition(p).compute() @@ -1183,11 +1202,13 @@ def test_uniform_neighbor_sample_csr_csc_local(dask_client, hops, seed): print(offsets) for hop in range(len(hops)): - major_offsets = sampling_results['major_offsets'].iloc[ - offsets.offsets.iloc[hop] : (offsets.offsets.iloc[hop+1] + 1) + major_offsets = sampling_results["major_offsets"].iloc[ + offsets.offsets.iloc[hop] : (offsets.offsets.iloc[hop + 1] + 1) ] - minors = sampling_results['minors'].iloc[major_offsets.iloc[0]:major_offsets.iloc[-1]] + minors = sampling_results["minors"].iloc[ + major_offsets.iloc[0] : major_offsets.iloc[-1] + ] majors = cudf.Series(cupy.arange(len(major_offsets) - 1)) majors = majors.repeat(cupy.diff(major_offsets)) @@ -1196,7 +1217,7 @@ def test_uniform_neighbor_sample_csr_csc_local(dask_client, hops, seed): minors = renumber_map.map.iloc[minors] for i in range(len(majors)): - assert 1 == len(el[(el.src==majors.iloc[i]) & (el.dst==minors.iloc[i])]) + assert 1 == len(el[(el.src == majors.iloc[i]) & (el.dst == minors.iloc[i])]) @pytest.mark.mg @@ -1204,11 +1225,13 @@ def test_uniform_neighbor_sample_csr_csc_local(dask_client, hops, seed): def test_uniform_neighbor_sample_dcsr_dcsc_global(): raise NotImplementedError + @pytest.mark.mg @pytest.mark.skip(reason="needs to be written!") def test_uniform_neighbor_sample_dcsr_dcsc_local(): raise NotImplementedError + # ============================================================================= # Benchmarks # =============================================================================