From 4ee227c7c84ef487828ecadd5fe86934f1fce4eb Mon Sep 17 00:00:00 2001 From: Joseph Nke <76006812+jnke2016@users.noreply.github.com> Date: Thu, 7 Sep 2023 16:39:22 -0500 Subject: [PATCH] Remove the assumption made on the client data's keys (#3835) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When calling `client.has_what(`) which returns the data's key that are held in each worker’s memory, those keys used to be returned as string but a recent change in `dask` changed the type to tuples   From `{worker_ip_address: ("('from-delayed-190587f1b2318dc54d5f92a79e59b71a', 0)", "('from-delayed-190587f1b2318dc54d5f92a79e59b71a', 1)")}` to`{worker_ip_address: (('from-delayed-c3d92b2cc9948634e82a0b2b62453a6c', 0), ('from-delayed-c3d92b2cc9948634e82a0b2b62453a6c', 1))}`   When mapping workers to persisted data in the function `get_persisted_df_worker_map`, an assumption about the type of those keys was made thereby breaking our MG tests. This PR removes that assumption. Closes #3834 Authors: - Joseph Nke (https://github.com/jnke2016) - Alex Barghi (https://github.com/alexbarghi-nv) Approvers: - Alex Barghi (https://github.com/alexbarghi-nv) URL: https://github.com/rapidsai/cugraph/pull/3835 --- .../cugraph/cugraph/dask/common/part_utils.py | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/python/cugraph/cugraph/dask/common/part_utils.py b/python/cugraph/cugraph/dask/common/part_utils.py index fda7e257367..7c0aad6c3ee 100644 --- a/python/cugraph/cugraph/dask/common/part_utils.py +++ b/python/cugraph/cugraph/dask/common/part_utils.py @@ -73,7 +73,7 @@ def persist_distributed_data(dask_df, client): _keys = dask_df.__dask_keys__() worker_dict = {} for i, key in enumerate(_keys): - worker_dict[str(key)] = tuple([worker_addresses[i]]) + worker_dict[key] = tuple([worker_addresses[i]]) persisted = client.persist(dask_df, workers=worker_dict) parts = futures_of(persisted) return parts @@ -89,7 +89,7 @@ def get_persisted_df_worker_map(dask_df, client): ddf_keys = futures_of(dask_df) output_map = {} for w, w_keys in client.has_what().items(): - output_map[w] = [ddf_k for ddf_k in ddf_keys if str(ddf_k.key) in w_keys] + output_map[w] = [ddf_k for ddf_k in ddf_keys if ddf_k.key in w_keys] if len(output_map[w]) == 0: output_map[w] = _create_empty_dask_df_future(dask_df._meta, client, w) return output_map @@ -157,7 +157,7 @@ async def _extract_partitions( # NOTE: We colocate (X, y) here by zipping delayed # n partitions of them as (X1, y1), (X2, y2)... # and asking client to compute a single future for - # each tuple in the list + # each tuple in the list. dela = [np.asarray(d.to_delayed()) for d in dask_obj] # TODO: ravel() is causing strange behavior w/ delayed Arrays which are @@ -167,7 +167,7 @@ async def _extract_partitions( parts = client.compute([p for p in zip(*raveled)]) await wait(parts) - key_to_part = [(str(part.key), part) for part in parts] + key_to_part = [(part.key, part) for part in parts] who_has = await client.who_has(parts) return [(first(who_has[key]), part) for key, part in key_to_part] @@ -229,7 +229,7 @@ def load_balance_func(ddf_, by, client=None): wait(parts) who_has = client.who_has(parts) - key_to_part = [(str(part.key), part) for part in parts] + key_to_part = [(part.key, part) for part in parts] gpu_fututres = [ (first(who_has[key]), part.key[1], part) for key, part in key_to_part ] @@ -245,7 +245,7 @@ def load_balance_func(ddf_, by, client=None): for cumsum in cumsum_parts: num_rows.append(cumsum.iloc[-1]) - # Calculate current partition divisions + # Calculate current partition divisions. divisions = [sum(num_rows[0:x:1]) for x in range(0, len(num_rows) + 1)] divisions[-1] = divisions[-1] - 1 divisions = tuple(divisions) @@ -271,7 +271,7 @@ def load_balance_func(ddf_, by, client=None): def concat_dfs(df_list): """ - Concat a list of cudf dataframes + Concat a list of cudf dataframes. """ return cudf.concat(df_list) @@ -279,17 +279,17 @@ def concat_dfs(df_list): def get_delayed_dict(ddf): """ Returns a dicitionary with the dataframe tasks as keys and - the dataframe delayed objects as values + the dataframe delayed objects as values. """ df_delayed = {} for delayed_obj in ddf.to_delayed(): - df_delayed[str(delayed_obj.key)] = delayed_obj + df_delayed[delayed_obj.key] = delayed_obj return df_delayed def concat_within_workers(client, ddf): """ - Concats all partitions within workers without transfers + Concats all partitions within workers without transfers. """ df_delayed = get_delayed_dict(ddf)