Skip to content

Commit

Permalink
Merge branch 'develop-dask' into develop-dask
Browse files Browse the repository at this point in the history
  • Loading branch information
pluflou authored Mar 29, 2024
2 parents 850519a + 8fda18d commit 03ce74d
Show file tree
Hide file tree
Showing 30 changed files with 4,644 additions and 1,168,519 deletions.
13 changes: 13 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -323,3 +323,16 @@ tags
[._]*.un~

# End of https://www.toptal.com/developers/gitignore/api/vim
/sdt_dask/dataplugs/example_data
sdt_dask/examples/summary_report_fargate.csv
sdt_dask/results/dask-report-fargate-or.html
sdt_dask/results/summary_report.csv
sdt_dask/results/dask-report.html
sdt_dask/examples/summary_report.csv
/sdt_dask/dataplugs/spwr_sensor_0
/sdt_dask/dataplugs/example_data
/sdt_dask/results
/sdt_dask/dataplugs/example_data
/sdt_dask/results
/sdt_dask/results

8 changes: 8 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions .idea/solar-data-tools.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

114 changes: 69 additions & 45 deletions sdt_dask/clients/aws/fargate.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,69 @@
"""Note: this module is not yet implemented.
This can be written as an example of how to create a Dask client (given some resources)
that will integrate seamlessly with our SDT Dask tool.
We should determine if this module/class would be helpful/is needed.
"""

from dask.distributed import Client
from dask_cloudprovider.aws import FargateCluster
"""
Optional:
tags: only use this if your organization enforces tag policies
Required:
image: should be a dockerhub public image. Please customize your image if needed
"""


def _init_fargate_cluster(**kwargs) -> FargateCluster:
cluster = FargateCluster(kwargs)
return cluster


def _init_dask_client(cluster: FargateCluster) -> Client:
client = Client(cluster)
return client


def get_fargate_cluster(
tags={},
image="",
vpc="",
region_name="",
environment={},
n_workers=10
) -> Client:

cluster = _init_fargate_cluster(
tags={},
image="",
vpc="",
region_name="",
environment={},
n_workers=10
)

return _init_dask_client(cluster)
# TODO: Change all documentation to sphinx
try:
# Import checks
import os
from sdt_dask.clients.clients import Clients
from dask_cloudprovider.aws import FargateCluster
from dask.distributed import Client

# Raises exception if modules aren't installed in the environment
except ModuleNotFoundError as error:
packages = "\tos\n\tdask.distributed\n\tdask_cloudprovider"
msg = f"{error}\n[!] Check or reinstall the following packages\n{packages}"
raise ModuleNotFoundError(msg)

finally:
class Fargate(Clients):

def __init__(self,
image: str = "",
tags: dict = {}, # optional
vpc: str = "",
region_name: str = "",
environment: dict = {},
n_workers: int = 10,
threads_per_worker: int = 2
):
self.image = image
self.tags = tags
self.vpc = vpc
self.region_name = region_name
self.environment = environment
self.n_workers = n_workers
self.threads_per_worker = threads_per_worker
def _check_versions(self):
data = self.client.get_versions(check=True)
scheduler_pkgs = data['scheduler']['packages']
client_pkgs = data['client']['packages']

for (c_pkg, c_ver), (s_pkg, s_ver) in zip(scheduler_pkgs.items(), client_pkgs.items()):
if c_ver != s_ver:
msg = 'Please Update the client version to match the Scheduler version'
raise EnvironmentError(f"{c_pkg} version Mismatch:\n\tScheduler: {s_ver} vs Client: {c_ver}\n{msg}")

def init_client(self) -> tuple:
try:
print("[i] Initilializing Fargate Cluster ...")

cluster = FargateCluster(
tags = self.tags,
image = self.image,
vpc = self.vpc,
region_name = self.region_name,
environment = self.environment,
n_workers = self.n_workers,
worker_nthreads = self.threads_per_worker
)

print("[i] Initialized Fargate Cluster")
print("[i] Initilializing Dask Client ...")

self.client = Client(cluster)

self._check_versions()

print(f"[>] Dask Dashboard: {self.client.dashboard_link}")

return self.client, cluster
except Exception as e:
raise Exception(e)
2 changes: 1 addition & 1 deletion sdt_dask/clients/aws/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
solar-data-tools @ git+https://github.com/slacgismo/solar-data-tools@develop-dask
solar-data-tools @ git+https://github.com/zhanghaoc/solar-data-tools@develop-dask
accessible-pygments==0.0.4
aiobotocore==2.4.2
aiohttp==3.8.3
Expand Down
29 changes: 29 additions & 0 deletions sdt_dask/clients/clients.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""
Class for clients to be used with SDT Dask Tool
"""
from dask.distributed import Client
class Clients:
"""
Clients class for configuring dask client on local and cloud services.
It's recommended that the user-created client inherit from this class to
ensure compatibility.
The initialization argument for each class will be different depending on
the client service. The main requiremnet is to keep the
``Clients.init_client`` method, and make sure the args and returns are as
defined here.
"""
def __init__(self):
pass


def init_client(self, **kwargs) -> Client:
"""
This is the main function that the Dask Tool will create the clusters
and the clients. Users should keep the args and returns as defined here
when writing their custom clients.
:return: Returns a initialized dask client with the user designed
configuration
"""
pass
80 changes: 80 additions & 0 deletions sdt_dask/clients/local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""
TODO: Change documentation to sphinx format
"""
try:
import os, platform, psutil, dask.config
from dask.distributed import Client
from sdt_dask.clients.clients import Clients

except ModuleNotFoundError as error:
# Could work on installing modules from the code
# Or just raise an error like so
packages = "\tos\n\tplatfrom\n\tpsutil\n\tdask.distributed"
raise ModuleNotFoundError(f"{error}\n[!] Check or reinstall the following packages\n{packages}")

finally:

class Local(Clients):
def __init__(self, n_workers: int = 2, threads_per_worker: int = 2, memory_per_worker: int = 5, verbose: bool = False):
self.verbose = verbose
self.n_workers = n_workers
self.threads_per_worker = threads_per_worker
self.memory_per_worker = memory_per_worker
self.dask_config = dask.config

def _get_sys_var(self):
self.system = platform.system().lower()
self.cpu_count = os.cpu_count()
self.memory = int((psutil.virtual_memory().total / (1024.**3)))

def _config_init(self):
tmp_dir = dask.config.get('temporary_directory')
if not tmp_dir:
self.dask_config.set({'distributed.worker.memory.spill': False})
self.dask_config.set({'distributed.worker.memory.pause': False})
self.dask_config.set({'distributed.worker.memory.target': 0.8})

def _check(self):
self._get_sys_var()
# workers and threads need to be less than cpu core count
# memory per worker >= 5 GB but total memory use should be less than the system memory available
if self.n_workers * self.threads_per_worker > self.cpu_count:
raise Exception(f"workers and threads exceed local resources, {self.cpu_count} cores present")
elif self.memory_per_worker < 5:
raise Exception(f"memory per worker too small, minimum memory size per worker 5 GB")
if self.n_workers * self.memory_per_worker > self.memory:
self.dask_config.set({'distributed.worker.memory.spill': True})
print(f"[!] memory per worker exceeds system memory ({self.memory} GB), activating memory spill fraction\n")

def init_client(self) -> Client:
self._config_init()
self._check()

if self.system == "windows":
self.dask_config.set({'distributed.worker.memory.terminate': False})
self.client = Client(processes=True,
n_workers=self.n_workers,
threads_per_worker=self.threads_per_worker,
memory_limit=f"{self.memory_per_worker:.2f}GiB"
)
else:
self.client = Client(processes=True,
n_workers=self.n_workers,
threads_per_worker=self.threads_per_worker,
memory_limit=f"{self.memory_per_worker:.2f}GiB"
)

if self.verbose:
print(f"[i] System: {self.system}")
print(f"[i] CPU Count: {self.cpu_count}")
print(f"[i] System Memory: {self.memory}")
print(f"[i] Workers: {self.n_workers}")
print(f"[i] Threads per Worker: {self.threads_per_worker}")
print(f"[i] Memory per Worker: {self.memory_per_worker}")
print(f"[i] Dask worker config:")
for key, value in self.dask_config.get('distributed.worker').items():
print(f"{key} : {value}")

print(f"\n[>] Dask Dashboard: {self.client.dashboard_link}\n")

return self.client
Empty file added sdt_dask/dask_tool/__init__.py
Empty file.
Loading

0 comments on commit 03ce74d

Please sign in to comment.