Skip to content

Commit

Permalink
Add RenderConfig.airflow_vars_to_purge_cache
Browse files Browse the repository at this point in the history
  • Loading branch information
tatiana committed Jun 11, 2024
1 parent c265814 commit 8efae2f
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 7 deletions.
1 change: 1 addition & 0 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class RenderConfig:
dbt_ls_path: Path | None = None
project_path: Path | None = field(init=False)
enable_mock_profile: bool = True
airflow_vars_to_purge_cache: list[str] = field(default_factory=list)

def __post_init__(self, dbt_project_path: str | Path | None) -> None:
if self.env_vars:
Expand Down
13 changes: 9 additions & 4 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ def load_via_dbt_ls_cache(self) -> bool:

cache_dict = self.get_cache()
if not cache_dict:
logger.warning(f"Cosmos performance: Cache miss for {self.cache_identifier}")
return False

cache_version = cache_dict.get("version")
Expand All @@ -229,7 +230,9 @@ def load_via_dbt_ls_cache(self) -> bool:
nodes = parse_dbt_ls_output(project_path=project_path, ls_stdout=dbt_ls_cache)
self.nodes = nodes
self.filtered_nodes = nodes
logger.warning(f"Cosmos performance: Cache hit for {self.cache_identifier}")
return True
logger.warning(f"Cosmos performance: Cache miss for {self.cache_identifier}")
return False

def get_dbt_ls_args(self) -> list[str]:
Expand All @@ -248,6 +251,7 @@ def get_dbt_ls_args(self) -> list[str]:

if not self.project.partial_parse:
args.append("--no-partial-parse")

return args

def get_project_path(self) -> Path:
Expand All @@ -269,6 +273,10 @@ def cache_key_args(self) -> list[str]:
if env_vars:
envvars_str = json.dumps(env_vars, sort_keys=True)
args.append(envvars_str)
if self.render_config.airflow_vars_to_purge_cache:
airflow_vars = [Variable.get(var_name, "") for var_name in self.render_config.airflow_vars_to_purge_cache]
args.extend(airflow_vars)

return args

def save_cache(self, dbt_ls_output: str) -> None:
Expand All @@ -286,10 +294,7 @@ def get_cache(self) -> dict[str, str]:
try:
cache_dict = Variable.get(self.cache_identifier, deserialize_json=True)
except (json.decoder.JSONDecodeError, KeyError):
logger.info(f"Cosmos performance: Cache miss for {self.cache_identifier}")

else:
logger.info(f"Cosmos performance: Cache hit for {self.cache_identifier}")
cache_dict = {}
return cache_dict

def run_dbt_ls(
Expand Down
9 changes: 6 additions & 3 deletions dev/dags/basic_cosmos_task_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,13 @@ def basic_cosmos_task_group() -> None:

customers = DbtTaskGroup(
group_id="customers",
project_config=ProjectConfig(
(DBT_ROOT_PATH / "jaffle_shop").as_posix(),
project_config=ProjectConfig((DBT_ROOT_PATH / "jaffle_shop").as_posix(), dbt_vars={"var": "2"}),
render_config=RenderConfig(
select=["path:seeds/raw_customers.csv"],
enable_mock_profile=False,
env_vars={"PURGE": os.getenv("PURGE", "0")},
airflow_vars_to_purge_cache=["purge"],
),
render_config=RenderConfig(select=["path:seeds/raw_customers.csv"], enable_mock_profile=False),
execution_config=shared_execution_config,
operator_args={"install_deps": True},
profile_config=profile_config,
Expand Down

0 comments on commit 8efae2f

Please sign in to comment.