Skip to content

Commit

Permalink
Feat/demo data catalog changes (#44)
Browse files Browse the repository at this point in the history
* Change explore_nodes now takes a list of nodes as arg

* Add CatalogExploreCommand abstraction

* Rename DbtCloudBaseModel as CLIBaseModel

* Refactor explore method

* Rename CLIBaseModel as ClickBaseModel

Co-authored-by: Simo Tumelius <[email protected]>
  • Loading branch information
stumelius and datamie-simo authored Feb 13, 2022
1 parent c08fb00 commit fceb666
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 99 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1035,9 +1035,9 @@ An interactive CLI application for exploring `catalog.json` artifacts.
##
###

[?] Select attribute to explore: sources
> sources
nodes
[?] Select node type to explore: source
> source
node
```

## Acknowledgements
Expand Down
4 changes: 2 additions & 2 deletions dbt_cloud/command/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def translate_click_options(**kwargs) -> dict:
return kwargs_translated


class DbtCloudBaseModel(BaseModel):
class ClickBaseModel(BaseModel):
@classmethod
def click_options(cls, function, key_prefix: str = ""):
for key, field in reversed(cls.__fields__.items()):
Expand Down Expand Up @@ -73,7 +73,7 @@ def get_description(cls) -> str:
return cls.__doc__.strip()


class DbtCloudCommand(DbtCloudBaseModel):
class DbtCloudCommand(ClickBaseModel):
api_token: str = API_TOKEN_FIELD
account_id: int = ACCOUNT_ID_FIELD
dbt_cloud_host: str = DBT_CLOUD_HOST_FIELD
Expand Down
12 changes: 6 additions & 6 deletions dbt_cloud/command/job/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from enum import Enum
from typing import Optional, List
from pydantic import Field
from dbt_cloud.command.command import DbtCloudCommand, DbtCloudBaseModel
from dbt_cloud.command.command import DbtCloudCommand, ClickBaseModel


class DateTypeEnum(Enum):
Expand All @@ -16,13 +16,13 @@ class TimeTypeEnum(Enum):
AT_EXACT_HOURS = "at_exact_hours"


class DbtCloudJobTriggers(DbtCloudBaseModel):
class DbtCloudJobTriggers(ClickBaseModel):
github_webhook: bool = Field(default=False)
schedule: bool = Field(default=False)
custom_branch_only: bool = Field(default=False)


class DbtCloudJobSettings(DbtCloudBaseModel):
class DbtCloudJobSettings(ClickBaseModel):
threads: int = Field(
default=1,
description="The maximum number of models to run in parallel in a single dbt run.",
Expand All @@ -33,16 +33,16 @@ class DbtCloudJobSettings(DbtCloudBaseModel):
)


class DbtCloudJobScheduleDate(DbtCloudBaseModel):
class DbtCloudJobScheduleDate(ClickBaseModel):
type: DateTypeEnum = Field(default="every_day", description=None)


class DbtCloudJobScheduleTime(DbtCloudBaseModel):
class DbtCloudJobScheduleTime(ClickBaseModel):
type: TimeTypeEnum = Field(default="every_hour", description=None)
interval: int = Field(default=1)


class DbtCloudJobSchedule(DbtCloudBaseModel):
class DbtCloudJobSchedule(ClickBaseModel):
cron: str = Field(
default="0 * * * *", description="Cron-syntax schedule for the job."
)
Expand Down
190 changes: 102 additions & 88 deletions dbt_cloud/demo/catalog.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import click
from enum import Enum
from pathlib import Path
from typing import Optional, Dict, Any
from pydantic import Field, BaseModel
from dbt_cloud.command.command import DbtCloudBaseModel
from pydantic import BaseModel, Field
from dbt_cloud.command.command import ClickBaseModel


class Stats(DbtCloudBaseModel):
class Stats(BaseModel):
"""Represent node stats in the Catalog."""

id: str
Expand All @@ -17,7 +19,7 @@ def __str__(self):
return f"{self.label}: {self.value}"


class Column(DbtCloudBaseModel):
class Column(BaseModel):
"""Represents a column in the Catalog."""

type: str
Expand All @@ -29,7 +31,7 @@ def __str__(self):
return f"{self.name} (type: {self.type}, index: {self.index}, comment: {self.comment})"


class Node(DbtCloudBaseModel):
class Node(BaseModel):
"""Represents a node in the Catalog."""

unique_id: str
Expand All @@ -53,21 +55,17 @@ def schema(self):
def type(self):
return self.metadata["type"]

@property
def owner(self):
return self.metadata.get("owner")

def __str__(self):
return f"{self.name} (type: {self.type}, schema: {self.schema}, database: {self.database})"

def __gt__(self, other):
return self.name > other.name

def __lt__(self, other):
return self.name < other.name

def __str__(self):
return f"{self.name} (type: {self.type}, schema: {self.schema}, database: {self.database})"


class Catalog(DbtCloudBaseModel):
class Catalog(BaseModel):
"""Represents a dbt catalog.json artifact."""

metadata: Dict
Expand All @@ -76,78 +74,94 @@ class Catalog(DbtCloudBaseModel):
errors: Optional[Dict]


def explore_nodes(nodes: Dict[str, Node], node_type: str = "node"):
import inquirer

while True:
databases = sorted(set([node.database for node in nodes.values()]))
database_options = [
inquirer.List("database", message="Select database", choices=databases)
]
database = inquirer.prompt(database_options)["database"]
nodes_filtered = {
node_name: node
for node_name, node in nodes.items()
if node.database == database
}

schemas = sorted(set([node.schema for node in nodes_filtered.values()]))
schema_options = [
inquirer.List("schema", message="Select schema", choices=schemas)
]
schema = inquirer.prompt(schema_options)["schema"]
nodes_filtered = {
node_name: node
for node_name, node in nodes_filtered.items()
if node.schema == schema
}

node_options = [
inquirer.List(
"node", message="Select node", choices=sorted(nodes_filtered.values())
)
]
node = inquirer.prompt(node_options)["node"]
click.echo(f"{node.name} columns:")
for column in node.columns.values():
click.echo(f"- {column}")
click.echo("")
for stats in node.stats.values():
if stats.id == "has_stats":
continue
click.echo(stats)
if not click.confirm(f"Explore another {node_type}?"):
break


@click.command(help="An inteactive application for exploring catalog artifacts.")
@click.option(
"-f",
"--file",
default="catalog.json",
type=str,
help="Catalog file path.",
)
def data_catalog(file):
import inquirer
from art import tprint

catalog = Catalog.parse_file(file)
nodes = {node.name: node for node in catalog.nodes.values()}
tprint("Data Catalog", font="rand-large")
while True:
attribute_options = [
inquirer.List(
"attribute",
message="Select attribute to explore",
choices=["sources", "nodes"],
)
]
attribute = inquirer.prompt(attribute_options)["attribute"]

if attribute == "nodes":
explore_nodes(nodes)
elif attribute == "sources":
explore_nodes(catalog.sources, node_type="source")
if not click.confirm("Explore another attribute?"):
break
class NodeType(Enum):
SOURCE = "source"
NODE = "node"


class CatalogExploreCommand(ClickBaseModel):
"""An inteactive application for exploring catalog artifacts."""

file: Path = Field(default="catalog.json", description="Catalog file path.")
title: str = Field(
default="Data Catalog", description="ASCII art title for the app."
)
title_font: str = Field(
default="rand-large",
description="ASCII art title font (see https://github.com/sepandhaghighi/art#try-art-in-your-browser for a list of available fonts)",
)

def get_catalog(self) -> Catalog:
return Catalog.parse_file(self.file)

def print_title(self):
from art import tprint

tprint(self.title, font=self.title_font)

def execute(self):
import inquirer

self.print_title()

while True:
node_type_options = [
inquirer.List(
"node_type",
message="Select node type to explore",
choices=[node_type.value for node_type in NodeType],
)
]
node_type = NodeType(inquirer.prompt(node_type_options)["node_type"])
self.explore(node_type=node_type)
if not click.confirm("Explore another node type?"):
break

def explore(self, node_type: NodeType):
"""Interactive exploration of nodes to explore and display their metadata"""
import inquirer

catalog = self.get_catalog()
if node_type == NodeType.SOURCE:
nodes = list(catalog.sources.values())
else:
nodes = list(catalog.nodes.values())

while True:
databases = sorted(set(map(lambda x: x.database, nodes)))
database_options = [
inquirer.List("database", message="Select database", choices=databases)
]
database = inquirer.prompt(database_options)["database"]
nodes_filtered = list(filter(lambda x: x.database == database, nodes))

schemas = sorted(set(map(lambda x: x.schema, nodes_filtered)))
schema_options = [
inquirer.List("schema", message="Select schema", choices=schemas)
]
schema = inquirer.prompt(schema_options)["schema"]
nodes_filtered = list(filter(lambda x: x.schema == schema, nodes_filtered))

node_options = [
inquirer.List(
"node", message="Select node", choices=sorted(nodes_filtered)
)
]
node = inquirer.prompt(node_options)["node"]
click.echo(f"{node.name} columns:")
for column in node.columns.values():
click.echo(f"- {column}")
click.echo("")
for stats in node.stats.values():
if stats.id == "has_stats":
continue
click.echo(stats)
if not click.confirm(f"Explore another {node_type.value}?"):
break


@click.command(help=CatalogExploreCommand.get_description())
@CatalogExploreCommand.click_options
def data_catalog(**kwargs):
command = CatalogExploreCommand.from_click_options(**kwargs)
command.execute()

0 comments on commit fceb666

Please sign in to comment.