Skip to content

Commit

Permalink
windows process is true, revised code
Browse files Browse the repository at this point in the history
  • Loading branch information
nimishy committed Mar 29, 2024
1 parent 239bb30 commit 1403206
Show file tree
Hide file tree
Showing 4 changed files with 372 additions and 1 deletion.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -307,3 +307,5 @@ sdt_dask/examples/summary_report.csv
/sdt_dask/dataplugs/example_data
/sdt_dask/results
/sdt_dask/dataplugs/example_data
/sdt_dask/results
/sdt_dask/results
5 changes: 4 additions & 1 deletion sdt_dask/clients/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ def _check(self):
# memory per worker >= 5 GB but total memory use should be less than the system memory available
if self.n_workers * self.threads_per_worker > self.cpu_count:
raise Exception(f"workers and threads exceed local resources, {self.cpu_count} cores present")
elif self.memory_per_worker < 5:
raise Exception(f"memory per worker too small, minimum memory size per worker 5 GB")
if self.n_workers * self.memory_per_worker > self.memory:
self.dask_config.set({'distributed.worker.memory.spill': True})
print(f"[!] memory per worker exceeds system memory ({self.memory} GB), activating memory spill fraction\n")
Expand All @@ -49,7 +51,8 @@ def init_client(self) -> Client:
self._check()

if self.system == "windows":
self.client = Client(processes=False,
self.dask_config.set({'distributed.worker.memory.terminate': False})
self.client = Client(processes=True,
n_workers=self.n_workers,
threads_per_worker=self.threads_per_worker,
memory_limit=f"{self.memory_per_worker:.2f}GiB"
Expand Down
174 changes: 174 additions & 0 deletions sdt_dask/examples/revised_baseline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import glob, os, sys, logging, argparse

import numpy as np
import pandas as pd
from time import gmtime, strftime
from dask import delayed, compute
from dask.distributed import performance_report
from solardatatools import DataHandler
from sdt_dask.dataplugs.csv_plug import LocalFiles
from sdt_dask.clients.local import Local

os.system('cls')
time_stamp = strftime("%Y%m%d-%H%M%S", gmtime())

parser = argparse.ArgumentParser()
parser.add_argument(
"-log",
"--log",
default="warning",
help=(
"Provide logging level. "
"Example --log debug', default='warning'"),
)

parser.add_argument(
"-workers",
"--workers",
default=4,
help=(
"Declare number of workers. "
"Example --workers 3', default=4"),
)

parser.add_argument(
"-threads",
"--threads",
default=2,
help=(
"Declare number of threads per worker. "
"Example --threads 3', default=2"),
)

parser.add_argument(
"-memory",
"--memory",
default=5,
help=(
"Declare memory limit per worker. "
"Example --memory 5', default=5"),
)
options = parser.parse_args()
levels = {
'debug': logging.DEBUG,
'info': logging.INFO,
'warning': logging.WARNING,
'error': logging.ERROR,
'critical': logging.CRITICAL,
'warn': logging.WARN
}
level = levels[options.log.lower()]
log_file = f'../results/rev_win_{options.workers}w-{options.threads}t-{time_stamp}.log'

WORKERS = int(options.workers)
THREADS_PER_WORKER = int(options.threads)
MEMORY = float(options.memory)

def _init_logger(level):
logger = logging.getLogger(__name__)
# logger.setLevel(level=level)
logging.basicConfig(filename=log_file,
format='%(asctime)s:%(levelname)s:%(name)s:%(module)s: %(message)s',
encoding='utf-8',
level=level)
handler = logging.StreamHandler(sys.stdout)

formatter = logging.Formatter('%(asctime)s:%(levelname)s:%(name)s:%(module)s: %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)

_init_logger(level)
__logger__ = logging.getLogger(__name__)
__logger__.info('Code started in %s', os.getcwd())
__logger__.info('Saving Logs to %s', log_file)


path = "C:\\Users\\Zander\\Documents\\spw_sensor_0\\"
__logger__.info('Grabbing files from path: %s', path)
data_plug = LocalFiles(path_to_files=path)
KEYS = [os.path.basename(fname)[:-4] for fname in glob.glob(path + '*')]
__logger__.info('Grabbed %s files from %s', len(KEYS), path)

def run_pipeline(datahandler, key, fix_shifts, verbose=False):
try:
datahandler.run_pipeline(fix_shifts=fix_shifts, verbose=verbose)
__logger__.info('%s:: run_pipeline Successful', key)
return datahandler
except Exception as e:
__logger__.exception('%s:: run_pipeline: Exception: %s', key, e)
return datahandler

@delayed
def get_report(datahandler, key):
if datahandler._ran_pipeline:
report = datahandler.report
report = report(return_values=True, verbose=False)
__logger__.debug("%s:: Report: Successful", key)
return report
else:
__logger__.warning('%s:: Report: failed to run_pipeline', key)
return {}

@delayed
def get_runtime(datahandler, key):
if datahandler._ran_pipeline:
runtime = datahandler.total_time
__logger__.debug("%s:: Runtime: Successful", key)
return runtime
else:
__logger__.warning('%s:: Runtime: failed to run_pipeline', key)
return None

reports = []
runtimes = []

if __name__ == '__main__':
for key in KEYS:
df = delayed(data_plug.get_data)((key,))
dh = delayed(DataHandler)(df)
dh_run = delayed(run_pipeline)(dh, fix_shifts=True, verbose=False, key=key)
reports.append(get_report(dh_run, key))
runtimes.append(get_runtime(dh_run, key))

df_reports = delayed(pd.DataFrame)(reports)
df_reports = delayed(df_reports.assign)(runtime=runtimes, keys=KEYS)

# Visualizing the graph
df_reports.visualize()
try:
config_client = Local(
n_workers = WORKERS,
threads_per_worker = THREADS_PER_WORKER,
memory_per_worker = MEMORY,
verbose = True
)
client = config_client.init_client()
__logger__.info('Local Dask Client Initialized with %s worker(s), %s thread(s) and %s GiB memory per worker',
WORKERS, THREADS_PER_WORKER, MEMORY)

with performance_report(filename=f"../results/dask-report-windows-rev-{WORKERS}w-{THREADS_PER_WORKER}t-{MEMORY}g-{time_stamp}.html"):
__logger__.info('Starting Computation')
summary_table = client.compute(df_reports)
df = summary_table.result()

scheduler_logs = client.get_scheduler_logs()
__logger__.info('Scheduler Logs:')
for log in scheduler_logs:
__logger__.info('%s', log)
worker_logs = client.get_worker_logs()
__logger__.info('Worker Logs:')
for items, keys in worker_logs.items():
__logger__.info('%s', items)
for key in keys:
__logger__.info('%s', key)

filename = f'../results/summary_report_windows_rev_{WORKERS}w_{THREADS_PER_WORKER}t_{MEMORY}g_{time_stamp}.csv'
df.sort_values(by=['keys'], inplace=True, ascending=True)
df.to_csv(filename)
__logger__.info('Creating summary report %s', filename)


client.shutdown()
__logger__.info('Dask Client Shutdown')
except Exception as e:
__logger__.exception('%s', e)
192 changes: 192 additions & 0 deletions sdt_dask/examples/revised_fargate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
import glob, os, sys, logging, argparse

import numpy as np
import pandas as pd
from time import gmtime, strftime
from dask import delayed, compute
from dask.distributed import performance_report
from solardatatools import DataHandler
from sdt_dask.dataplugs.S3Bucket_plug import S3Bucket
from sdt_dask.clients.aws.fargate import Fargate

os.system('cls')
time_stamp = strftime("%Y%m%d-%H%M%S", gmtime())

parser = argparse.ArgumentParser()
parser.add_argument(
"-log",
"--log",
default="warning",
help=(
"Provide logging level. "
"Example --log debug', default='warning'"),
)

parser.add_argument(
"-workers",
"--workers",
default=4,
help=(
"Declare number of workers. "
"Example --workers 3', default=4"),
)

parser.add_argument(
"-threads",
"--threads",
default=2,
help=(
"Declare number of threads per worker. "
"Example --threads 3', default=2"),
)

options = parser.parse_args()
levels = {
'debug': logging.DEBUG,
'info': logging.INFO,
'warning': logging.WARNING,
'error': logging.ERROR,
'critical': logging.CRITICAL,
'warn': logging.WARN
}
level = levels[options.log.lower()]

log_file = f'../results/rev_far_{options.workers}w-{options.threads}t-{time_stamp}.log'


def _init_logger(level):
logger = logging.getLogger(__name__)
logging.basicConfig(filename=log_file,
format='%(asctime)s:%(levelname)s:%(name)s:%(module)s: %(message)s',
encoding='utf-8',
level=level)
handler = logging.StreamHandler(sys.stdout)

formatter = logging.Formatter('%(asctime)s:%(levelname)s:%(name)s:%(module)s: %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)


_init_logger(level)
__logger__ = logging.getLogger(__name__)

__logger__.info('Code started in %s', os.getcwd())
__logger__.info('Saving Logs to %s', log_file)

PA_NUMBER = os.getenv("project-pa-number")
TAGS = {
"project-pa-number": PA_NUMBER,
"project": "pvinsight"
}
VPC = "vpc-ab2ff6d3" # for us-west-2
IMAGE = "nimishy/sdt-windows:latest"

AWS_DEFAULT_REGION = os.getenv('AWS_DEFAULT_REGION')
ENVIRONMENT = {
'AWS_ACCESS_KEY_ID': os.getenv('AWS_ACCESS_KEY_ID'),
'AWS_SECRET_ACCESS_KEY': os.getenv('AWS_SECRET_ACCESS_KEY')
}

WORKERS = int(options.workers)
THREADS_PER_WORKER = int(options.threads)

bucket = "pvinsight-dask-baseline"
__logger__.info('Grabbing files from S3 bucket: %s', bucket)
data_plug = S3Bucket(bucket)
KEYS = data_plug._pull_keys()
__logger__.info('Grabbed %s files from %s', len(KEYS), bucket)


def run_pipeline(datahandler, key, fix_shifts, verbose=False):
try:
datahandler.run_pipeline(fix_shifts=fix_shifts, verbose=verbose)
__logger__.debug('%s:: run_pipeline Successful', key)
return datahandler
except Exception as e:
__logger__.exception('%s:: run_pipeline: Exception: %s', key, e)
return datahandler


@delayed
def get_report(datahandler, key):
if datahandler._ran_pipeline:
report = datahandler.report
report = report(return_values=True, verbose=False)
__logger__.debug("%s:: Report: Successful", key)
return report
else:
__logger__.warning('%s:: Report: failed to run_pipeline', key)
return {}


@delayed
def get_runtime(datahandler, key):
if datahandler._ran_pipeline:
runtime = datahandler.total_time
__logger__.debug("%s:: Runtime: Successful", key)
return runtime
else:
__logger__.warning('%s:: Runtime: failed to run_pipeline', key)
return None


reports = []
runtimes = []

for key in KEYS:
df = delayed(data_plug.get_data)((key,))
dh = delayed(DataHandler)(df)
dh_run = delayed(run_pipeline)(dh, fix_shifts=True, verbose=False, key=key)
reports.append(get_report(dh_run, key))
runtimes.append(get_runtime(dh_run, key))

df_reports = delayed(pd.DataFrame)(reports)
df_reports = delayed(df_reports.assign)(runtime=runtimes, keys=KEYS)

# Visualizing the graph
df_reports.visualize()

try:
config_client = Fargate(image=IMAGE,
tags=TAGS,
vpc=VPC,
region_name=AWS_DEFAULT_REGION,
environment=ENVIRONMENT,
n_workers=WORKERS,
threads_per_worker=THREADS_PER_WORKER
)
client, cluster = config_client.init_client()
__logger__.info('Fargate Dask Client Initialized with %s worker(s) and %s thread(s)', WORKERS, THREADS_PER_WORKER)

with performance_report(
filename=f"../results/dask-report-fargate-rev-{WORKERS}w-{THREADS_PER_WORKER}t-{time_stamp}.html"):
__logger__.info('Starting Computation')
summary_table = client.compute(df_reports)
df = summary_table.result()

scheduler_logs = client.get_scheduler_logs()
__logger__.info('Scheduler Logs:')
for log in scheduler_logs:
__logger__.info('%s', log)
worker_logs = client.get_worker_logs()
__logger__.info('Worker Logs:')
for items, keys in worker_logs.items():
__logger__.info('%s', items)
for key in keys:
__logger__.info('%s', key)

__logger__.info('Generating Report')
filename = f'../results/summary_report_fargate_rev_{WORKERS}w_{THREADS_PER_WORKER}t_{time_stamp}.csv'
df.sort_values(by=['keys'], inplace=True, ascending=True)
df.to_csv(filename)
__logger__.info('Creating summary report %s', filename)

client.shutdown()
__logger__.info('Dask Client Shutdown')
cluster.close()
__logger__.info('Fargate Cluster Closed')

except Exception as e:
__logger__.exception('%s', e)


0 comments on commit 1403206

Please sign in to comment.