Skip to content

Commit

Permalink
propodate workflow tags to cluster tags (#160)
Browse files Browse the repository at this point in the history
  • Loading branch information
pariksheet authored Sep 17, 2024
1 parent 63011dc commit 4a4beb6
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 7 deletions.
12 changes: 11 additions & 1 deletion brickflow/codegen/databricks_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,17 @@ def _mutate_jobs(self, resource: Resources, ci: CodegenInterface) -> Resources:
for job in resource.jobs.values():
# set correct names
job.name = self._rewrite_name(job.name)
# set tags

if job.job_clusters:
# update cluster tags
for cluster in job.job_clusters:
if cluster.new_cluster:
cluster.new_cluster.custom_tags = {
**self._get_default_tags(ci),
**self._get_runtime_tags(),
**(cluster.new_cluster.custom_tags or {}),
}
# update job tags
job.tags = {
**self._get_default_tags(ci),
**self._get_runtime_tags(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,30 @@
- "metric": "RUN_DURATION_SECONDS"
"op": "GREATER_THAN"
"value": 7200.0
"job_clusters": []
"job_clusters":
- "job_cluster_key": "sample_job_cluster"
"new_cluster":
"aws_attributes": null
"custom_tags":
"brickflow_deployment_mode": "Databricks Asset Bundles"
"brickflow_project_name": "test-project"
"brickflow_version": "1.0.0"
"deployed_at": "1704067200000"
"deployed_by": "test_user"
"environment": "local"
"data_security_mode": "SINGLE_USER"
"driver_instance_pool_id": null
"driver_node_type_id": null
"enable_elastic_disk": null
"init_scripts": null
"instance_pool_id": null
"node_type_id": "m6gd.xlarge"
"num_workers": 1.0
"policy_id": null
"runtime_engine": null
"spark_conf": null
"spark_env_vars": null
"spark_version": "13.3.x-scala2.12"
"max_concurrent_runs": 1.0
"name": "test_user_wf-test-2"
"notification_settings": null
Expand All @@ -34,14 +57,14 @@
"brickflow_deployment_mode": "Databricks Asset Bundles"
"brickflow_project_name": "test-project"
"brickflow_version": "1.0.0"
"deployed_by": "test_user"
"deployed_at": "1704067200000"
"deployed_by": "test_user"
"environment": "local"
"test": "test2"
"tasks":
- "depends_on": []
"email_notifications": {}
"existing_cluster_id": "existing_cluster_id"
"job_cluster_key": "sample_job_cluster"
"libraries": []
"max_retries": null
"min_retry_interval_millis": null
Expand Down Expand Up @@ -76,4 +99,4 @@
"file_path": "/Users/${workspace.current_user.userName}/.brickflow_bundles/test-project/local/files"
"root_path": "/Users/${workspace.current_user.userName}/.brickflow_bundles/test-project/local"
"state_path": "/Users/${workspace.current_user.userName}/.brickflow_bundles/test-project/local/state"
"workspace": {}
"workspace": {}
9 changes: 8 additions & 1 deletion tests/codegen/sample_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,16 @@ def custom_python_task_push():
pass


job_cluster = Cluster(
name="sample_job_cluster",
node_type_id="m6gd.xlarge",
spark_version="13.3.x-scala2.12",
num_workers=1,
)

wf2 = Workflow(
"wf-test-2",
default_cluster=Cluster.from_existing_cluster("existing_cluster_id"),
default_cluster=job_cluster,
schedule_continuous=JobsContinuous(pause_status="PAUSED"),
permissions=WorkflowPermissions(
owner=User("[email protected]"),
Expand Down
1 change: 0 additions & 1 deletion tests/codegen/test_databricks_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,6 @@ def test_schedule_continuous(

actual = read_yaml_file(BUNDLE_FILE_NAME)
expected = get_expected_bundle_yaml("local_bundle_continuous_schedule.yml")
bf_version_mock.assert_called_once()
assert_equal_dicts(actual, expected)
if os.path.exists(BUNDLE_FILE_NAME):
os.remove(BUNDLE_FILE_NAME)
Expand Down

0 comments on commit 4a4beb6

Please sign in to comment.