Skip to content

Commit

Permalink
Merge branch 'develop-dask' of https://github.com/zhanghaoc/solar-dat…
Browse files Browse the repository at this point in the history
…a-tools into develop-dask
  • Loading branch information
nimishy committed Mar 28, 2024
2 parents 34114c7 + e988b44 commit 239bb30
Show file tree
Hide file tree
Showing 10 changed files with 659 additions and 1,167,612 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -304,3 +304,6 @@ sdt_dask/results/summary_report.csv
sdt_dask/results/dask-report.html
sdt_dask/examples/summary_report.csv
/sdt_dask/dataplugs/spwr_sensor_0
/sdt_dask/dataplugs/example_data
/sdt_dask/results
/sdt_dask/dataplugs/example_data
49 changes: 40 additions & 9 deletions sdt_dask/dask_tool/sdt_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,14 @@ def run_pipeline(datahandler, **kwargs):
"""
# TODO: add loss analysis
# TODO: if dataset failed to run, throw python error
datahandler.run_pipeline(**kwargs)
return datahandler
try:
datahandler.run_pipeline(**kwargs)
datahandler.run_loss_factor_analysis()
return datahandler
except Exception as e:
print(f"Error running pipeline: {e}")
return None


class SDTDask:
"""A class to run the SolarDataTools pipeline on a Dask cluster.
Expand Down Expand Up @@ -60,6 +66,29 @@ def set_up(self, KEYS, **kwargs):

reports = []
runtimes = []
losses = []

class Data:
def __init__(self, report, loss_report, runtime):
self.report = report
self.loss_report = loss_report
self.runtime = runtime

def helper(datahandler, key):
report = None
loss_report = None
runtime = None
try:
report = datahandler.report(return_values=True, verbose=False)
loss_report = datahandler.loss_analysis.report()
runtime = datahandler.total_time
except Exception as e:
print(e)

return Data(report, loss_report, runtime)

def helper_data(datas):
return [data if data is not None else {} for data in datas]

for key in KEYS:
# TODO: to check if a key is valid explicitly
Expand All @@ -68,16 +97,18 @@ def set_up(self, KEYS, **kwargs):
dh = delayed(DataHandler)(df)
dh_run = delayed(run_pipeline)(dh, **kwargs)

report = dh_run.report
runtime = dh_run.total_time

report = delayed(report)(return_values=True, verbose=False)
runtime = delayed(runtime)
data = delayed(helper)(dh_run, key)

reports.append(report)
runtimes.append(runtime)
reports.append(data.report)
losses.append(data.loss_report)
runtimes.append(data.runtime)

reports = delayed(helper_data)(reports)
losses = delayed(helper_data)(losses)
self.df_reports = delayed(pd.DataFrame)(reports)
self.loss_reports = delayed(pd.DataFrame)(losses)
# append losses to the report
self.df_reports = delayed(pd.concat)([self.df_reports, self.loss_reports], axis=1)
self.df_reports = delayed(self.df_reports.assign)(runtime=runtimes, keys=KEYS)

def visualize(self, filename="sdt_graph.png"):
Expand Down
7 changes: 4 additions & 3 deletions sdt_dask/dataplugs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ To create your own DataPlug, you must provide two key files: a Python module (`y
2. **Define Initialization Parameters**: Customize the __init__ method to accept parameters specific to your data source. These parameters can vary widely depending on the nature of the data source, such as file paths, API keys, database credentials, or any other configuration necessary for data access.
3. **Implement `get_data` Method**: This method is the core of your DataPlug, tasked with retrieving and cleaning the data before returning a pandas DataFrame. The method should accept a keys argument as a tuple, which contains the necessary identifiers or parameters to fetch the specific dataset. This flexible approach allows for a wide range of data retrieval scenarios, accommodating various data sources and user requirements.

The keys tuple might include unique identifiers, timestamps, filenames, or any combination of parameters that your data source requires to locate and retrieve the desired data.
4. **Important - Non-Serializable Object**:
When distributing tasks across Dask workers, avoid using pre-initialized instances of objects that maintain state, open connections, or hold resources that cannot be serialized (e.g., botocore.client.S3 instances). These objects should not be serialized or transferred across processes due to their internal state and open connections. Instead, create and utilize such instances within the scope of each task. This guidance ensures that each task independently manages its resources, enhancing process safety and stability. This principle applies broadly to all non-serializable objects used in distributed computing tasks.

5. **(Optional) Additional Methods**: Beyond get_data, you may implement any number of private or public methods to aid in data retrieval, transformation, or cleaning. Examples include methods for parsing file names, performing complex queries on databases, or applying specific data cleaning operations tailored to your data source.

Ensure that the method's input arguments (keys) and the return type (pandas DataFrame) adhere to this specification to maintain compatibility with the SDT Dask tool.

4. **(Optional) Additional Methods**: Beyond get_data, you may implement any number of private or public methods to aid in data retrieval, transformation, or cleaning. Examples include methods for parsing file names, performing complex queries on databases, or applying specific data cleaning operations tailored to your data source.

Example structure:

Expand Down
28 changes: 22 additions & 6 deletions sdt_dask/dataplugs/S3Bucket_plug.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,25 @@


class S3Bucket(DataPlug):
"""
""" Dataplug class for retrieving data from an S3 bucket.
aws configurations for the AWS CLI must be set up in local environment
"""
def __init__(self, bucket_name):
"""Initialize the S3Bucket object with the bucket name.
:param bucket_name: The name of the S3 bucket. (type: str)
"""
self.bucket_name = bucket_name

def _create_s3_client(self):
# Creating a new session for each call to ensure thread safety
session = boto3.session.Session()
s3_client = session.client('s3')
return s3_client

def _pull_data(self, key):

s3_client = boto3.client('s3')
print(f"Loading file from S3 bucket: {key}...")

s3_client = self._create_s3_client()
obj = s3_client.get_object(Bucket=self.bucket_name, Key=key)

# Assume file is CSV
Expand All @@ -32,7 +41,14 @@ def get_data(self, key: tuple[str]) -> pd.DataFrame:
Users should keep the args and returns as defined here when writing
their custom dataplugs.
:param key: Filename (without the extension suffix)--typically designating in tuple
Note: if this example dataplug is used, the data in the S3 bucket should
be in CSV format and the format should be consistent across all files: a
timestamp column and a power column.
:param key: Filename (which could be get by _pull_keys method) inside
the tuple
:return: Returns a pandas DataFrame with a timestamp column and
a power column
"""
self._pull_data(key[0])
self._clean_data()
Expand All @@ -47,7 +63,7 @@ def _pull_keys(self) -> list:
"""
KEYS = []

s3_client = boto3.client('s3')
s3_client = self._create_s3_client()
objects = s3_client.list_objects_v2(Bucket=self.bucket_name)

if 'Contents' in objects:
Expand Down
Loading

0 comments on commit 239bb30

Please sign in to comment.