Skip to content

Commit

Permalink
automatically add the brickflow library to the cluster and flag to op…
Browse files Browse the repository at this point in the history
…t out. Run bundles cli from anywhere within the project root
  • Loading branch information
stikkireddy committed Aug 9, 2023
1 parent 5b8a585 commit d67a230
Show file tree
Hide file tree
Showing 9 changed files with 742 additions and 31 deletions.
92 changes: 91 additions & 1 deletion brickflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
from __future__ import annotations

import logging
import os
import sys
from enum import Enum
from typing import List, Callable, Any
from typing import List, Callable, Any, Union, Optional

import warnings
import functools
Expand Down Expand Up @@ -70,10 +71,99 @@ class BrickflowEnvVars(Enum):
BRICKFLOW_BUNDLE_NO_DOWNLOAD = "BRICKFLOW_BUNDLE_NO_DOWNLOAD"
BRICKFLOW_BUNDLE_CLI_VERSION = "BRICKFLOW_BUNDLE_CLI_VERSION"
BRICKFLOW_MONOREPO_PATH_TO_BUNDLE_ROOT = "BRICKFLOW_MONOREPO_PATH_TO_BUNDLE_ROOT"

BRICKFLOW_ENABLE_PLUGINS = "BRICKFLOW_ENABLE_PLUGINS"
BRICKFLOW_PROJECT_RUNTIME_VERSION = "BRICKFLOW_PROJECT_RUNTIME_VERSION"
BRICKFLOW_PROJECT_NAME = "BRICKFLOW_PROJECT_NAME"
BRICKFLOW_AUTO_ADD_LIBRARIES = "BRICKFLOW_AUTO_ADD_LIBRARIES"
BRICKFLOW_USE_PROJECT_NAME = "BRICKFLOW_USE_PROJECT_NAME" # for projects which injects project name to cli context


class Empty:
pass


class BrickflowProjectDeploymentSettings:
def __init__(self):
# purely here for type hinting
self.brickflow_env = Empty()
self.brickflow_force_deploy = Empty()
self.brickflow_mode = Empty()
self.brickflow_deployment_mode = Empty()
self.brickflow_git_repo = Empty()
self.brickflow_git_ref = Empty()
self.brickflow_git_provider = Empty()
self.brickflow_databricks_config_profile = Empty()
self.brickflow_deploy_only_workflows = Empty()
self.brickflow_workflow_prefix = Empty()
self.brickflow_workflow_suffix = Empty()
self.brickflow_s3_backend_bucket = Empty()
self.brickflow_s3_backend_key = Empty()
self.brickflow_s3_backend_region = Empty()
self.brickflow_s3_backend_dynamodb_table = Empty()
self.brickflow_interactive_mode = Empty()
self.brickflow_bundle_base_path = Empty()
self.brickflow_bundle_obj_name = Empty()
self.brickflow_bundle_cli_exec = Empty()
self.brickflow_bundle_no_download = Empty()
self.brickflow_bundle_cli_version = Empty()
self.brickflow_monorepo_path_to_bundle_root = Empty()
self.brickflow_enable_plugins = Empty()
self.brickflow_project_runtime_version = Empty()
self.brickflow_project_name = Empty()
self.brickflow_use_project_name = Empty()
self.brickflow_auto_add_libraries = Empty()

@staticmethod
def _possible_string_to_bool(value: Optional[str]) -> Optional[Union[bool, str]]:
"""
https://github.com/python/cpython/blob/878ead1ac1651965126322c1b3d124faf5484dc6/Lib/distutils/util.py#L308-L321
Convert a string representation of truth to true (1) or false (0).
True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values
are 'n', 'no', 'f', 'false', 'off', and '0'. Raises ValueError if
'val' is anything else.
"""
if value is None:
return value
val = value.lower()
if val in ("y", "yes", "t", "true", "on", "1"):
return True
elif val in ("n", "no", "f", "false", "off", "0"):
return False
else:
return value

def __getattr__(self, attr: str) -> Union[str, bool]:
upper_attr = attr.upper()
if upper_attr in BrickflowEnvVars.__members__:
value = config(upper_attr, default=None)
log.info("Getting attr: %s which has value: %s", upper_attr, value)
return self._possible_string_to_bool(value)
else:
raise AttributeError(
f"Attribute {upper_attr} is not a valid BrickflowEnvVars"
)

def __setattr__(self, attr: str, value: Union[str, bool, Empty]) -> None:
if isinstance(value, Empty):
return
upper_attr = attr.upper()
if upper_attr in BrickflowEnvVars.__members__:
log.info("Configuring attr: %s with value: %s", upper_attr, value)
if value is None:
os.environ.pop(upper_attr, None)
elif isinstance(value, bool):
os.environ[upper_attr] = str(value).lower()
else:
os.environ[upper_attr] = value
else:
raise AttributeError(
f"Attribute {upper_attr} is not a valid BrickflowEnvVars"
)


class BrickflowDefaultEnvs(Enum):
LOCAL = "local"
DEV = "dev"
Expand Down
83 changes: 72 additions & 11 deletions brickflow/cli/projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
BrickflowDefaultEnvs,
BrickflowEnvVars,
_ilog,
BrickflowProjectDeploymentSettings,
ctx,
)
from brickflow.cli.bundles import (
bundle_deploy,
Expand All @@ -31,6 +33,7 @@
bind_env_var,
)
from brickflow.cli.constants import INTERACTIVE_MODE, BrickflowDeployMode
from brickflow.resolver import get_notebook_ws_path

DEFAULT_BRICKFLOW_VERSION_MODE = "auto"

Expand All @@ -41,6 +44,7 @@ class BrickflowProject(BaseModel):
path_project_root_to_workflows_dir: str # used for repos with multiple batches of workflows
deployment_mode: str = BrickflowDeployMode.BUNDLE.value
brickflow_version: str = DEFAULT_BRICKFLOW_VERSION_MODE
enable_plugins: bool = False

class Config:
extra = "forbid"
Expand Down Expand Up @@ -103,10 +107,12 @@ def __init__(
else {}
)

def __new__(cls) -> "MultiProjectManager":
def __new__(cls, *args: Any, **kwargs: Any) -> "MultiProjectManager":
# singleton
if not hasattr(cls, "instance"):
cls.instance = super(MultiProjectManager, cls).__new__(cls)
if "config_file_name" in kwargs:
cls.instance._config_file = Path(kwargs["config_file_name"])
return cls.instance # noqa

def _config_exists(self) -> bool:
Expand Down Expand Up @@ -140,6 +146,9 @@ def _load_roots(self) -> Dict[str, BrickflowRootProjectConfig]:
)
return root_dict

def root(self) -> Path:
return self._config_file.parent

def add_project(self, project: BrickflowProject) -> None:
if self.project_exists(project) is True:
raise ValueError(f"Project with name {project.name} already exists")
Expand Down Expand Up @@ -173,6 +182,13 @@ def update_project(self, project: BrickflowProject) -> None:
project.name
] = project

@staticmethod
def set_current_project_settings(project: BrickflowProject) -> None:
settings = BrickflowProjectDeploymentSettings()
settings.brickflow_project_runtime_version = project.brickflow_version
settings.brickflow_enable_plugins = project.enable_plugins
settings.brickflow_project_name = project.name

def list_project_names(self) -> List[str]:
return list(self._brickflow_multi_project_config.project_roots.keys())

Expand Down Expand Up @@ -207,7 +223,28 @@ def save(self) -> None:
yaml.dump(self._brickflow_multi_project_config.dict(), f)


multi_project_manager = MultiProjectManager()
class BrickflowRootNotFound(Exception):
pass


def get_brickflow_root(current_path: Optional[Path] = None) -> Path:
current_dir = Path(current_path or get_notebook_ws_path(ctx.dbutils) or os.getcwd())
potential_config_file_path = (
current_dir
/ BrickflowProjectConstants.DEFAULT_MULTI_PROJECT_CONFIG_FILE_NAME.value
)
if potential_config_file_path.exists():
return potential_config_file_path
elif current_dir.parent == current_dir:
# Reached the filesystem root, return just raw file value
return Path(
BrickflowProjectConstants.DEFAULT_MULTI_PROJECT_CONFIG_FILE_NAME.value
)
else:
return get_brickflow_root(current_dir.parent)


multi_project_manager = MultiProjectManager(config_file_name=str(get_brickflow_root()))


def initialize_project_entrypoint(
Expand Down Expand Up @@ -257,7 +294,7 @@ def initialize_project_entrypoint(
@click.option(
"-bfv",
"--brickflow-version",
default=get_brickflow_version(),
default=DEFAULT_BRICKFLOW_VERSION_MODE,
type=str,
prompt=INTERACTIVE_MODE,
)
Expand Down Expand Up @@ -301,14 +338,19 @@ def projects() -> None:

@contextlib.contextmanager
def use_project(
name: str, project_root_dir: Optional[str] = None
name: str,
project: BrickflowProject,
brickflow_root: Optional[Path] = None,
project_root_dir: Optional[str] = None,
) -> Generator[None, None, None]:
# if no directory is provided do nothing
if project_root_dir is not None:
if brickflow_root is not None and project_root_dir is not None:
current_directory = os.getcwd()
_ilog.info("Changed to directory: %s", project_root_dir)
os.chdir(project_root_dir)
project_path = str(brickflow_root / project_root_dir)
_ilog.info("Changed to directory: %s", project_path)
os.chdir(brickflow_root / project_root_dir)
os.environ[BrickflowEnvVars.BRICKFLOW_PROJECT_NAME.value] = name
multi_project_manager.set_current_project_settings(project)
try:
yield
finally:
Expand Down Expand Up @@ -391,7 +433,12 @@ def add(
_create_gitignore_if_not_exists()
_update_gitignore()

with use_project(name, project.path_from_repo_root_to_project_root):
with use_project(
name,
project,
multi_project_manager.root(),
project.path_from_repo_root_to_project_root,
):
if skip_entrypoint is False:
initialize_project_entrypoint(
project.name,
Expand Down Expand Up @@ -471,6 +518,14 @@ def apply_bundles_deployment_options(
default=False,
help="Auto approve brickflow pipeline without being prompted to approve.",
),
"skip-libraries": click.option(
"--skip-libraries",
type=bool,
is_flag=True,
show_default=True,
default=False,
help="Skip automatically adding brickflow libraries.",
),
"force-acquire-lock": click.option(
"--force-acquire-lock",
type=bool,
Expand Down Expand Up @@ -501,7 +556,9 @@ def destroy_project(project: str, **kwargs: Any) -> None:
"""Destroy projects in the brickflow-multi-project.yml file"""
bf_project = multi_project_manager.get_project(project)
dir_to_change = multi_project_manager.get_project_ref(project).root_yaml_rel_path
with use_project(project, dir_to_change):
if kwargs.get("skip_libraries", None) is True:
BrickflowProjectDeploymentSettings().brickflow_auto_add_libraries = False
with use_project(project, bf_project, multi_project_manager.root(), dir_to_change):
bundle_destroy(
workflows_dir=bf_project.path_project_root_to_workflows_dir, **kwargs
)
Expand All @@ -524,7 +581,9 @@ def synth_bundles_for_project(project: str, **kwargs: Any) -> None:
"""Synth the bundle.yml for project"""
bf_project = multi_project_manager.get_project(project)
dir_to_change = multi_project_manager.get_project_ref(project).root_yaml_rel_path
with use_project(project, dir_to_change):
if kwargs.get("skip_libraries", None) is True:
BrickflowProjectDeploymentSettings().brickflow_auto_add_libraries = False
with use_project(project, bf_project, multi_project_manager.root(), dir_to_change):
# wf dir is required for generating the bundle.yml in the workflows dir
project_synth(
workflows_dir=bf_project.path_project_root_to_workflows_dir, **kwargs
Expand All @@ -537,7 +596,9 @@ def deploy_project(project: str, **kwargs: Any) -> None:
"""Deploy projects in the brickflow-multi-project.yml file"""
bf_project = multi_project_manager.get_project(project)
dir_to_change = multi_project_manager.get_project_ref(project).root_yaml_rel_path
with use_project(project, dir_to_change):
if kwargs.get("skip_libraries", None) is True:
BrickflowProjectDeploymentSettings().brickflow_auto_add_libraries = False
with use_project(project, bf_project, multi_project_manager.root(), dir_to_change):
bundle_deploy(
workflows_dir=bf_project.path_project_root_to_workflows_dir, **kwargs
)
31 changes: 24 additions & 7 deletions brickflow/codegen/databricks_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import yaml
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import GetPipelineResponse
from decouple import config
from pydantic import BaseModel

Expand Down Expand Up @@ -49,10 +50,18 @@
handle_mono_repo_path,
DatabricksDefaultClusterTagKeys,
)
from brickflow.engine.task import TaskLibrary, DLTPipeline, TaskSettings
from brickflow.engine.task import (
TaskLibrary,
DLTPipeline,
TaskSettings,
filter_bf_related_libraries,
get_brickflow_libraries,
)

if typing.TYPE_CHECKING:
from brickflow.engine.project import _Project # noqa
from brickflow.engine.project import (
_Project,
) # noqa


class DatabricksBundleResourceMutator(abc.ABC):
Expand Down Expand Up @@ -158,10 +167,13 @@ def belongs_to_current_project(
ctx.current_project is not None and resource_project == ctx.current_project
)
_ilog.info(
"Handling if resource %s: %s belongs to current project: %s",
"Checking if resource %s: %s belongs to current project: %s; "
"handle project validation mode is %s, and the resource belongs to project: %s",
ref.type_,
ref.name,
resource_project,
handle_project_validation,
belongs_to_project,
)
return handle_project_validation is True and belongs_to_project is True

Expand Down Expand Up @@ -240,16 +252,17 @@ def _resolve(self, ref: ResourceReference) -> List[ImportBlock]:
for pipeline in self.databricks_client.pipelines.list_pipelines(
filter=f"name LIKE '{ref.name}'"
):
pipeline_details = self.databricks_client.pipelines.get(
pipeline_id=pipeline.pipeline_id
pipeline_details: GetPipelineResponse = (
self.databricks_client.pipelines.get(pipeline_id=pipeline.pipeline_id)
)
if pipeline_details.clusters is None:

if pipeline_details.spec.clusters is None:
continue # no clusters no way to identify if pipeline belongs to project
project_tag = DatabricksDefaultClusterTagKeys.BRICKFLOW_PROJECT_NAME.value
pipeline_belongs_to_current_project = any(
cluster.custom_tags is not None
and cluster.custom_tags.get(project_tag, None) == ctx.current_project
for cluster in pipeline_details.clusters
for cluster in pipeline_details.spec.clusters
)
if pipeline_belongs_to_current_project is True:
blocks.append(ImportBlock(to=ref.reference, id_=pipeline.pipeline_id))
Expand Down Expand Up @@ -467,6 +480,10 @@ def workflow_obj_to_tasks(
libraries = TaskLibrary.unique_libraries(
task.libraries + (self.project.libraries or [])
)
if workflow.enable_plugins is True:
libraries = filter_bf_related_libraries(libraries)
libraries += get_brickflow_libraries(workflow.enable_plugins)

task_libraries = [
JobsTasksLibraries(**library.dict) for library in libraries
]
Expand Down
Loading

0 comments on commit d67a230

Please sign in to comment.