Skip to content

Commit

Permalink
Resolved code review comments from PR 256
Browse files Browse the repository at this point in the history
  • Loading branch information
wenneman committed Oct 5, 2023
1 parent 7bd3615 commit 909f39d
Show file tree
Hide file tree
Showing 5 changed files with 399 additions and 380 deletions.
1 change: 1 addition & 0 deletions src/tape/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .analysis import * # noqa
from .ensemble import * # noqa
from .timeseries import * # noqa
from .ensemble_readers import * # noqa
325 changes: 1 addition & 324 deletions src/tape/ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import os
import warnings
import requests

import dask.dataframe as dd
import numpy as np
import pandas as pd

from dask.distributed import Client

from .analysis.base import AnalysisFunction
Expand Down Expand Up @@ -1587,326 +1587,3 @@ def sf2(self, sf_method="basic", argument_container=None, use_map=True):
result = self.batch(calc_sf2, use_map=use_map, argument_container=argument_container)

return result


"""
The following package-level methods can be used to create a new Ensemble object
by reading in the given data source.
"""


def read_pandas_dataframe(
source_frame,
object_frame=None,
dask_client=True,
column_mapper=None,
sync_tables=True,
npartitions=None,
partition_size=None,
**kwargs,
):
"""Read in Pandas dataframe(s) and return an ensemble object
Parameters
----------
source_frame: 'pandas.Dataframe'
A Dask dataframe that contains source information to be read into the ensemble
object_frame: 'pandas.Dataframe', optional
If not specified, the object frame is generated from the source frame
dask_client: `dask.distributed.client` or `bool`, optional
Accepts an existing `dask.distributed.Client`, or creates one if
`client=True`, passing any additional kwargs to a
dask.distributed.Client constructor call. If `client=False`, the
Ensemble is created without a distributed client.
column_mapper: 'ColumnMapper' object
If provided, the ColumnMapper is used to populate relevant column
information mapped from the input dataset.
sync_tables: 'bool', optional
In the case where an `object_frame`is provided, determines whether an
initial sync is performed between the object and source tables. If
not performed, dynamic information like the number of observations
may be out of date until a sync is performed internally.
npartitions: `int`, optional
If specified, attempts to repartition the ensemble to the specified
number of partitions
partition_size: `int`, optional
If specified, attempts to repartition the ensemble to partitions
of size `partition_size`.
Returns
----------
ensemble: `tape.ensemble.Ensemble`
The ensemble object with the Dask dataframe data loaded.
"""
# Construct Dask DataFrames of the source and object tables
source = dd.from_pandas(source_frame, npartitions=npartitions)
object = None if object_frame is None else dd.from_pandas(object_frame, npartitions=npartitions)

return read_dask_dataframe(
source_frame=source,
object_frame=object,
dask_client=dask_client,
column_mapper=column_mapper,
sync_tables=sync_tables,
npartitions=npartitions,
partition_size=partition_size,
**kwargs,
)


def read_dask_dataframe(
source_frame,
object_frame=None,
dask_client=True,
column_mapper=None,
sync_tables=True,
npartitions=None,
partition_size=None,
**kwargs,
):
"""Read in Dask dataframe(s) and return an ensemble object
Parameters
----------
source_frame: 'dask.Dataframe'
A Dask dataframe that contains source information to be read into the ensemble
object_frame: 'dask.Dataframe', optional
If not specified, the object frame is generated from the source frame
dask_client: `dask.distributed.client` or `bool`, optional
Accepts an existing `dask.distributed.Client`, or creates one if
`client=True`, passing any additional kwargs to a
dask.distributed.Client constructor call. If `client=False`, the
Ensemble is created without a distributed client.
column_mapper: 'ColumnMapper' object
If provided, the ColumnMapper is used to populate relevant column
information mapped from the input dataset.
sync_tables: 'bool', optional
In the case where an `object_frame`is provided, determines whether an
initial sync is performed between the object and source tables. If
not performed, dynamic information like the number of observations
may be out of date until a sync is performed internally.
npartitions: `int`, optional
If specified, attempts to repartition the ensemble to the specified
number of partitions
partition_size: `int`, optional
If specified, attempts to repartition the ensemble to partitions
of size `partition_size`.
Returns
----------
ensemble: `tape.ensemble.Ensemble`
The ensemble object with the Dask dataframe data loaded.
"""
new_ens = Ensemble(dask_client, **kwargs)

new_ens.from_dask_dataframe(
source_frame=source_frame,
object_frame=object_frame,
column_mapper=column_mapper,
sync_tables=sync_tables,
npartitions=npartitions,
partition_size=partition_size,
**kwargs,
)

return new_ens


def read_parquet(
source_file,
object_file=None,
column_mapper=None,
dask_client=True,
provenance_label="survey_1",
sync_tables=True,
additional_cols=True,
npartitions=None,
partition_size=None,
**kwargs,
):
"""Read in parquet file(s) into an ensemble object
Parameters
----------
source_file: 'str'
Path to a parquet file, or multiple parquet files that contain
source information to be read into the ensemble
object_file: 'str'
Path to a parquet file, or multiple parquet files that contain
object information. If not specified, it is generated from the
source table
column_mapper: 'ColumnMapper' object
If provided, the ColumnMapper is used to populate relevant column
information mapped from the input dataset.
dask_client: `dask.distributed.client` or `bool`, optional
Accepts an existing `dask.distributed.Client`, or creates one if
`client=True`, passing any additional kwargs to a
dask.distributed.Client constructor call. If `client=False`, the
Ensemble is created without a distributed client.
provenance_label: 'str', optional
Determines the label to use if a provenance column is generated
sync_tables: 'bool', optional
In the case where object files are loaded in, determines whether an
initial sync is performed between the object and source tables. If
not performed, dynamic information like the number of observations
may be out of date until a sync is performed internally.
additional_cols: 'bool', optional
Boolean to indicate whether to carry in columns beyond the
critical columns, true will, while false will only load the columns
containing the critical quantities (id,time,flux,err,band)
npartitions: `int`, optional
If specified, attempts to repartition the ensemble to the specified
number of partitions
partition_size: `int`, optional
If specified, attempts to repartition the ensemble to partitions
of size `partition_size`.
Returns
----------
ensemble: `tape.ensemble.Ensemble`
The ensemble object with parquet data loaded
"""

new_ens = Ensemble(dask_client, **kwargs)

new_ens.from_parquet(
source_file=source_file,
object_file=object_file,
column_mapper=column_mapper,
provenance_label=provenance_label,
sync_tables=sync_tables,
additional_cols=additional_cols,
npartitions=npartitions,
partition_size=partition_size,
**kwargs,
)

return new_ens


def read_hipscat(
dir,
source_subdir="source",
object_subdir="object",
column_mapper=None,
dask_client=True,
**kwargs,
):
"""Read in parquet files from a hipscat-formatted directory structure
Parameters
----------
dir: 'str'
Path to the directory structure
source_subdir: 'str'
Path to the subdirectory which contains source files
object_subdir: 'str'
Path to the subdirectory which contains object files, if None then
files will only be read from the source_subdir
column_mapper: 'ColumnMapper' object
If provided, the ColumnMapper is used to populate relevant column
information mapped from the input dataset.
dask_client: `dask.distributed.client` or `bool`, optional
Accepts an existing `dask.distributed.Client`, or creates one if
`client=True`, passing any additional kwargs to a
dask.distributed.Client constructor call. If `client=False`, the
Ensemble is created without a distributed client.
**kwargs:
keyword arguments passed along to
`tape.ensemble.Ensemble.from_parquet`
Returns
----------
ensemble: `tape.ensemble.Ensemble`
The ensemble object with parquet data loaded
"""

new_ens = Ensemble(dask_client, **kwargs)

new_ens.from_hipscat(
dir=dir,
source_subdir=source_subdir,
object_subdir=object_subdir,
column_mapper=column_mapper,
**kwargs,
)

return new_ens


def read_source_dict(source_dict, column_mapper=None, npartitions=1, dask_client=True, **kwargs):
"""Load the sources into an ensemble from a dictionary.
Parameters
----------
source_dict: 'dict'
The dictionary containing the source information.
column_mapper: 'ColumnMapper' object
If provided, the ColumnMapper is used to populate relevant column
information mapped from the input dataset.
npartitions: `int`, optional
If specified, attempts to repartition the ensemble to the specified
number of partitions
dask_client: `dask.distributed.client` or `bool`, optional
Accepts an existing `dask.distributed.Client`, or creates one if
`client=True`, passing any additional kwargs to a
dask.distributed.Client constructor call. If `client=False`, the
Ensemble is created without a distributed client.
Returns
----------
ensemble: `tape.ensemble.Ensemble`
The ensemble object with dictionary data loaded
"""

new_ens = Ensemble(dask_client, **kwargs)

new_ens.from_source_dict(
source_dict=source_dict, column_mapper=column_mapper, npartitions=npartitions, **kwargs
)

return new_ens


def read_dataset(dataset, dask_client=True, **kwargs):
"""Load the ensemble from a TAPE dataset.
Parameters
----------
dataset: 'str'
The name of the dataset to import
dask_client: `dask.distributed.client` or `bool`, optional
Accepts an existing `dask.distributed.Client`, or creates one if
`client=True`, passing any additional kwargs to a
dask.distributed.Client constructor call. If `client=False`, the
Ensemble is created without a distributed client.
Returns
-------
ensemble: `tape.ensemble.Ensemble`
The ensemble object with the dataset loaded
"""

req = requests.get(
"https://github.com/lincc-frameworks/tape_benchmarking/blob/main/data/datasets.json?raw=True"
)
datasets_file = req.json()
dataset_info = datasets_file[dataset]

# Make column map from dataset
dataset_map = dataset_info["column_map"]
col_map = ColumnMapper(
id_col=dataset_map["id"],
time_col=dataset_map["time"],
flux_col=dataset_map["flux"],
err_col=dataset_map["error"],
band_col=dataset_map["band"],
)

return read_parquet(
source_file=dataset_info["source_file"],
object_file=dataset_info["object_file"],
column_mapper=col_map,
provenance_label=dataset,
dask_client=dask_client,
**kwargs,
)
Loading

0 comments on commit 909f39d

Please sign in to comment.