Skip to content

Commit

Permalink
feat: added cluster ids from jobs system tables
Browse files Browse the repository at this point in the history
  • Loading branch information
kartikay-bagla committed Jul 23, 2024
1 parent 9ce898e commit 5389539
Showing 1 changed file with 26 additions and 5 deletions.
31 changes: 26 additions & 5 deletions databricks/sdk/chaosgenius/data_puller.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pandas as pd
from pyspark.sql.session import SparkSession
from pyspark.sql import DataFrame as SparkDataFrame
from pyspark.sql.functions import explode
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import sql as databricks_sql

Expand Down Expand Up @@ -53,23 +54,43 @@ def __init__(

logger.info("Getting cluster list")
self._cluster_list = [i for i in self._workspace_client.clusters.list()]
cl_id_list = [i.cluster_id for i in self._cluster_list]
logger.info(f"Total clusters: {len(self._cluster_list)}")
logger.info("Getting instance pools list")
self._ip_list = [i for i in self._workspace_client.instance_pools.list()]
logger.info(f"Total pools: {len(self._ip_list)}")
logger.info("Getting warehouses list")
self._wh_list = [i for i in self._workspace_client.warehouses.list()]
logger.info(f"Total warehouses: {len(self._wh_list)}")
logger.info("Getting jobs list")
self._job_list = [
i for i in self._workspace_client.jobs.list(expand_tasks=True)
]
logger.info(f"Total jobs: {len(self._job_list)}")
# Not being used currently
# logger.info("Getting jobs list")
# self._job_list = [
# i for i in self._workspace_client.jobs.list(expand_tasks=True)
# ]
# logger.info(f"Total jobs: {len(self._job_list)}")
logger.info("Getting users list")
self._user_list = [i for i in self._workspace_client.users.list()]
logger.info(f"Total users: {len(self._user_list)}")

logger.info("Getting compute IDs from job system tables.")
compute_id_list = spark_session.sql(
"select compute_ids from system.workflow.job_task_run_timeline "
f"where period_start_time < from_unixtime({self._start_time//1000}) "
f"and period_start_time >= from_unixtime({self._end_time//1000}) "
)
compute_id_list = compute_id_list.select(
explode(compute_id_list.compute_ids).alias("compute_id")
).distinct().toPandas()["compute_id"].values.tolist()
ci_list = [
i for i in compute_id_list if len(i.split("-")) == 3 and i not in cl_id_list
]
logger.info(f"Found {len(ci_list)} new clusters from job tables.")
logger.info("Adding cluster info for these clusters.")
self._cluster_list += [self._workspace_client.clusters.get(i) for i in ci_list]
logger.info("Done adding cluster info for job clusters.")

logger.info("Starting data pull")
# TODO(KB): refactor into multiple files by clusters, wh, etc
results = self.get_all()
success = True
for res in results:
Expand Down

0 comments on commit 5389539

Please sign in to comment.