Skip to content

Commit

Permalink
Added clustering and partitioning
Browse files Browse the repository at this point in the history
- Also added pylint configuration file
  • Loading branch information
H-Max committed Jul 2, 2024
1 parent 44cc169 commit 11bfc7d
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 24 deletions.
16 changes: 16 additions & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[MAIN]
ignore=test
jobs=0
fail-under=8.0

[MESSAGES CONTROL]
disable=
missing-docstring,
too-few-public-methods,
import-error,

output-format=colorized
score=yes

[FORMAT]
max-line-length=120
19 changes: 11 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,23 @@ bq2dbt myproject.mydataset

# CLI arguments

| Option | Description |
|-----------------------|---------------------------------------------------------------------------|
| `-l`, `--lower` | Output type names as lowercase in YAML file |
| `--snake` | Convert field names to snake_case (SQL and YAML) |
| `--prefix` | Prefix to add to columns names (default: None) |
| `--suffix` | Suffix to add to column names (default: None) |
| `--output` | Destination folder for scripts. By default target/bq2dbt |
| `--empty_description` | Add empty description property to YAML file if field description is empty |
| Option | Description |
|-----------------------|-----------------------------------------------------------------------------------------|
| `-l`, `--lower` | Output type names as lowercase in YAML file |
| `--snake` | Convert field names to snake_case |
| `--prefix` | Prefix to add to columns names (default: None) |
| `--suffix` | Suffix to add to column names (default: None) |
| `--output` | Destination folder for scripts. (default: target/bq2dbt) |
| `--empty_description` | Add empty description property to YAML file if field description is empty (placeholder) |

# TODO

- [ ] Error handling
- [ ] Unit testing
- [ ] Merging with existing yaml definition files
- [x] Generate the files for a complete dataset rather than a single table
- [x] Support for clustering
- [x] Support for time partitioning
- [ ] Support for range paritioning
- [ ] Option to output to stdout
- [ ] With the option to select SQL or YAML file only
69 changes: 53 additions & 16 deletions bq2dbt/bq2dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import argparse
import logging
import os
import sys
import re
import math
import yaml

from google.cloud import bigquery
Expand All @@ -20,7 +22,7 @@

logger = logging.getLogger(__name__)
case_convert_regex = re.compile(r'(?<!^)(?=[A-Z])')
sql_indent = "\t"
SQL_INDENTATION = "\t"


def convert_to_snake_case(input_string: str) -> str:
Expand All @@ -36,37 +38,43 @@ def convert_to_snake_case(input_string: str) -> str:
return case_convert_regex.sub('_', input_string).lower()


def bq2dbt():
"""
Main function. Parse arguments and do the job
"""
def parse_command_line():
parser = argparse.ArgumentParser(description="Generate YAML and SQL output for a BigQuery table.")
parser.add_argument("target", help="Complete BigQuery dataset or table ID (project.dataset[.table])")
parser.add_argument("-l", "--lower", action="store_true", help="Lowercase type names in YAML file")
parser.add_argument("--snake", action="store_true", help="Convert field names to snake_case")
parser.add_argument("--empty_description", action="store_true", help="Include empty description property in YAML file")
parser.add_argument("--empty_description", action="store_true",
help="Include empty description property in YAML file")
parser.add_argument("--prefix", help="Prefix to add to columns names", default=None)
parser.add_argument("--suffix", help="Suffix to add to column names", default=None)
parser.add_argument("--output", help="Output folder of scripts. By default 'target/bq2dbt'",
default='target/bq2dbt')
args = parser.parse_args()
return parser.parse_args()


def bq2dbt():
"""
Main function. Parse arguments and do the job
"""
args = parse_command_line()

tables = []
dataset_id = None
target_split = args.target.split(".")
if len(target_split) == 3:
if len(target_split) == 3: # Complete project.dataset.table
project_id = target_split[0]
dataset_id = target_split[1]
tables.append(target_split[2])
elif len(target_split) == 2:
elif len(target_split) == 2: # project.dataset only
project_id = target_split[0]
dataset_id = target_split[1]
else:
logger.error("Invalid BigQuery dataset or table ID.")
exit(1)
sys.exit(1)

if not dataset_id:
logger.error("Dataset ID cannot be empty.")
exit(1)
sys.exit(1)

client = bigquery.Client(project=project_id)

Expand Down Expand Up @@ -108,6 +116,7 @@ def bq2dbt():

# Run the query to get column information and fetch the results
fields = client.query(fields_query).result()
field_types = {}

# Get table description
table = client.get_table(f"{project_id}.{dataset_id}.{table_id}")
Expand All @@ -133,7 +142,7 @@ def bq2dbt():
# Add primary key constraint to table if there is any field declared as such in table
# Note : this is done in respect of fields order in the primary key declaration
primary_key = list(client.query(pk_query).result())
if len(primary_key):
if primary_key:
yaml_data['models'][0]['constraints'] = [
{
"type": "primary_key",
Expand All @@ -148,13 +157,15 @@ def bq2dbt():
# Iterate through the query results and add them to the YAML data
for field in fields:
data_type = field.data_type.split('<')[0]
data_type = data_type.lower() if args.lower else data_type
field_types[field.field_path] = data_type

destination = convert_to_snake_case(field.field_path) if args.snake else field.field_path
destination = "_".join(filter(None, [prefix, destination, suffix]))

field_info = {
"name": destination,
"data_type": data_type.lower() if args.lower else data_type,
"data_type": data_type
}
if field.description or empty_description:
field_info['description'] = field.description or ""
Expand All @@ -175,13 +186,39 @@ def bq2dbt():
else:
sql_columns.append(f"`{field.field_path}`")

# Support for time partioning retrieval
table_time_partitioning = table.time_partitioning
table_partition_expiration = table.time_partitioning.expiration_ms
table_require_partition_filter = table.require_partition_filter
logger.info("Time partition : %s", table_time_partitioning)
logger.info("Partition expiration : %s", table_partition_expiration)
logger.info("Require partition filter : %s", table_require_partition_filter)
if table_time_partitioning:
yaml_data['models'][0]['config']['partition_by'] = {
"field": table_time_partitioning.field,
"granularity": table_time_partitioning.type_,
"data_type": field_types[table_time_partitioning.field]
}

if table_require_partition_filter:
yaml_data['models'][0]['config']['require_partition_filter'] = True

if table_partition_expiration:
yaml_data['models'][0]['config']['partition_expiration_days'] \
= math.floor(table_partition_expiration / (1000 * 60 * 60 * 24))

table_clustering_fields = table.clustering_fields
if table_clustering_fields:
yaml_data['models'][0]['config']['clustering'] = table_clustering_fields

# Generate the YAML output
yaml_output = yaml.dump(yaml_data, default_flow_style=False, sort_keys=False)

# Generate the SQL output
sql_columns_statement = f"\n{sql_indent}, ".join(sql_columns)
sql_from_statement = f"\n{sql_indent}`{project_id}.{dataset_id}.{table_id}`"
sql_output = f"SELECT\n{sql_indent}{sql_columns_statement}\nFROM{sql_from_statement} -- Replace this with ref() or source() macro"
sql_columns_statement = f"\n{SQL_INDENTATION}, ".join(sql_columns)
sql_from_statement = f"\n{SQL_INDENTATION}`{project_id}.{dataset_id}.{table_id}`"
sql_output = (f"SELECT\n{SQL_INDENTATION}{sql_columns_statement}\nFROM{sql_from_statement}"
f" -- Replace this with ref() or source() macro")

output_path = f"./{output_folder}/{project_id}/{dataset_id}"
os.makedirs(output_path, exist_ok=True)
Expand Down

0 comments on commit 11bfc7d

Please sign in to comment.