Skip to content

Commit

Permalink
Collapsed create dask client args to single arg
Browse files Browse the repository at this point in the history
  • Loading branch information
wenneman committed Oct 5, 2023
1 parent 11da9eb commit 1c33ca1
Showing 1 changed file with 27 additions and 70 deletions.
97 changes: 27 additions & 70 deletions src/tape/ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -1598,8 +1598,7 @@ def sf2(self, sf_method="basic", argument_container=None, use_map=True):
def read_pandas_dataframe(
source_frame,
object_frame=None,
create_client=True,
dask_client=None,
dask_client=True,
column_mapper=None,
sync_tables=True,
npartitions=None,
Expand All @@ -1614,7 +1613,7 @@ def read_pandas_dataframe(
A Dask dataframe that contains source information to be read into the ensemble
object_frame: 'pandas.Dataframe', optional
If not specified, the object frame is generated from the source frame
client: `dask.distributed.client` or `bool`, optional
dask_client: `dask.distributed.client` or `bool`, optional
Accepts an existing `dask.distributed.Client`, or creates one if
`client=True`, passing any additional kwargs to a
dask.distributed.Client constructor call. If `client=False`, the
Expand Down Expand Up @@ -1646,7 +1645,6 @@ def read_pandas_dataframe(
return read_dask_dataframe(
source_frame=source,
object_frame=object,
create_client=create_client,
dask_client=dask_client,
column_mapper=column_mapper,
sync_tables=sync_tables,
Expand All @@ -1659,8 +1657,7 @@ def read_pandas_dataframe(
def read_dask_dataframe(
source_frame,
object_frame=None,
create_client=True,
dask_client=None,
dask_client=True,
column_mapper=None,
sync_tables=True,
npartitions=None,
Expand All @@ -1675,16 +1672,11 @@ def read_dask_dataframe(
A Dask dataframe that contains source information to be read into the ensemble
object_frame: 'dask.Dataframe', optional
If not specified, the object frame is generated from the source frame
create_client: `bool`, optional
Creates a `dask.distributed.Client` if `client=True`, passing any additional kwargs to a
dask.distributed.Client constructor call. If `create_client=False`, the
Ensemble is created without a distributed client.
dask_client: `dask.distributed.client`, optional
dask_client: `dask.distributed.client` or `bool`, optional
Accepts an existing `dask.distributed.Client`, or creates one if
`create_client=True`, passing any additional kwargs to a
dask.distributed.Client constructor call.
If 'dask_client=None' and `create_client=False`,
the Ensemble is created without a distributed client.
`client=True`, passing any additional kwargs to a
dask.distributed.Client constructor call. If `client=False`, the
Ensemble is created without a distributed client.
column_mapper: 'ColumnMapper' object
If provided, the ColumnMapper is used to populate relevant column
information mapped from the input dataset.
Expand All @@ -1705,9 +1697,6 @@ def read_dask_dataframe(
ensemble: `tape.ensemble.Ensemble`
The ensemble object with the Dask dataframe data loaded.
"""
if dask_client is None:
dask_client = create_client

new_ens = Ensemble(dask_client, **kwargs)

new_ens.from_dask_dataframe(
Expand All @@ -1727,8 +1716,7 @@ def read_parquet(
source_file,
object_file=None,
column_mapper=None,
create_client=True,
dask_client=None,
dask_client=True,
provenance_label="survey_1",
sync_tables=True,
additional_cols=True,
Expand All @@ -1750,16 +1738,11 @@ def read_parquet(
column_mapper: 'ColumnMapper' object
If provided, the ColumnMapper is used to populate relevant column
information mapped from the input dataset.
create_client: `bool`, optional
Creates a `dask.distributed.Client` if `client=True`, passing any additional kwargs to a
dask.distributed.Client constructor call. If `create_client=False`, the
Ensemble is created without a distributed client.
dask_client: `dask.distributed.client`, optional
dask_client: `dask.distributed.client` or `bool`, optional
Accepts an existing `dask.distributed.Client`, or creates one if
`create_client=True`, passing any additional kwargs to a
dask.distributed.Client constructor call.
If 'dask_client=None' and `create_client=False`,
the Ensemble is created without a distributed client.
`client=True`, passing any additional kwargs to a
dask.distributed.Client constructor call. If `client=False`, the
Ensemble is created without a distributed client.
provenance_label: 'str', optional
Determines the label to use if a provenance column is generated
sync_tables: 'bool', optional
Expand All @@ -1784,9 +1767,6 @@ def read_parquet(
The ensemble object with parquet data loaded
"""

if dask_client is None:
dask_client = create_client

new_ens = Ensemble(dask_client, **kwargs)

new_ens.from_parquet(
Expand All @@ -1809,8 +1789,7 @@ def read_hipscat(
source_subdir="source",
object_subdir="object",
column_mapper=None,
create_client=True,
dask_client=None,
dask_client=True,
**kwargs,
):
"""Read in parquet files from a hipscat-formatted directory structure
Expand All @@ -1826,16 +1805,11 @@ def read_hipscat(
column_mapper: 'ColumnMapper' object
If provided, the ColumnMapper is used to populate relevant column
information mapped from the input dataset.
create_client: `bool`, optional
Creates a `dask.distributed.Client` if `client=True`, passing any additional kwargs to a
dask.distributed.Client constructor call. If `create_client=False`, the
Ensemble is created without a distributed client.
dask_client: `dask.distributed.client`, optional
dask_client: `dask.distributed.client` or `bool`, optional
Accepts an existing `dask.distributed.Client`, or creates one if
`create_client=True`, passing any additional kwargs to a
dask.distributed.Client constructor call.
If 'dask_client=None' and `create_client=False`,
the Ensemble is created without a distributed client.
`client=True`, passing any additional kwargs to a
dask.distributed.Client constructor call. If `client=False`, the
Ensemble is created without a distributed client.
**kwargs:
keyword arguments passed along to
`tape.ensemble.Ensemble.from_parquet`
Expand All @@ -1846,9 +1820,6 @@ def read_hipscat(
The ensemble object with parquet data loaded
"""

if dask_client is None:
dask_client = create_client

new_ens = Ensemble(dask_client, **kwargs)

new_ens.from_hipscat(
Expand All @@ -1863,7 +1834,7 @@ def read_hipscat(


def read_source_dict(
source_dict, column_mapper=None, npartitions=1, create_client=True, dask_client=None, **kwargs
source_dict, column_mapper=None, npartitions=1, dask_client=True, **kwargs
):
"""Load the sources into an ensemble from a dictionary.
Expand All @@ -1877,26 +1848,18 @@ def read_source_dict(
npartitions: `int`, optional
If specified, attempts to repartition the ensemble to the specified
number of partitions
create_client: `bool`, optional
Creates a `dask.distributed.Client` if `client=True`, passing any additional kwargs to a
dask.distributed.Client constructor call. If `create_client=False`, the
Ensemble is created without a distributed client.
dask_client: `dask.distributed.client`, optional
dask_client: `dask.distributed.client` or `bool`, optional
Accepts an existing `dask.distributed.Client`, or creates one if
`create_client=True`, passing any additional kwargs to a
dask.distributed.Client constructor call.
If 'dask_client=None' and `create_client=False`,
the Ensemble is created without a distributed client.
`client=True`, passing any additional kwargs to a
dask.distributed.Client constructor call. If `client=False`, the
Ensemble is created without a distributed client.
Returns
----------
ensemble: `tape.ensemble.Ensemble`
The ensemble object with dictionary data loaded
"""

if dask_client is None:
dask_client = create_client

new_ens = Ensemble(dask_client, **kwargs)

new_ens.from_source_dict(
Expand All @@ -1906,23 +1869,18 @@ def read_source_dict(
return new_ens


def read_dataset(dataset, create_client=True, dask_client=None, **kwargs):
def read_dataset(dataset, dask_client=True, **kwargs):
"""Load the ensemble from a TAPE dataset.
Parameters
----------
dataset: 'str'
The name of the dataset to import
create_client: `bool`, optional
Creates a `dask.distributed.Client` if `client=True`, passing any additional kwargs to a
dask.distributed.Client constructor call. If `create_client=False`, the
Ensemble is created without a distributed client.
dask_client: `dask.distributed.client`, optional
dask_client: `dask.distributed.client` or `bool`, optional
Accepts an existing `dask.distributed.Client`, or creates one if
`create_client=True`, passing any additional kwargs to a
dask.distributed.Client constructor call.
If 'dask_client=None' and `create_client=False`,
the Ensemble is created without a distributed client.
`client=True`, passing any additional kwargs to a
dask.distributed.Client constructor call. If `client=False`, the
Ensemble is created without a distributed client.
Returns
-------
Expand Down Expand Up @@ -1951,7 +1909,6 @@ def read_dataset(dataset, create_client=True, dask_client=None, **kwargs):
object_file=dataset_info["object_file"],
column_mapper=col_map,
provenance_label=dataset,
create_client=create_client,
dask_client=dask_client,
**kwargs,
)

0 comments on commit 1c33ca1

Please sign in to comment.