Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NEW: in case of column mismatch, the dataset will be created again #3

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ package/
plio-rds-to-bigquery.zip

.DS_Store
venv/
64 changes: 57 additions & 7 deletions lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import boto3
import os
import psycopg2
import csv
from google.cloud import bigquery
from google.cloud.exceptions import NotFound

Expand All @@ -24,19 +25,68 @@ def lambda_handler(event, context):
# create dataset if NotFound error
client.create_dataset(dataset_id)

# process tables in public schema
table_names = get_tables_in_schema(public_mode=True)
process_tables(client, dataset_ref, "public", table_names)

# process tables in organization schema
# get list of tables in public schema and organization schema
public_table_names = get_tables_in_schema(public_mode=True)
schema = os.getenv("DB_SCHEMA_NAME", None)
table_names = get_tables_in_schema(public_mode=False)
schema_table_names = get_tables_in_schema(public_mode=False)

# check if the columns of all tables are consistent
# across the s3 tables and bigquery tables
are_public_columns_consistent = verify_total_columns(
client, dataset_ref, "public", public_table_names
)
are_schema_columns_consistent = verify_total_columns(
client, dataset_ref, schema, schema_table_names
)

# if the columns are not consistent, delete the dataset and create again
if not are_public_columns_consistent or not are_schema_columns_consistent:
client.delete_dataset(dataset_id, delete_contents=True)
client.create_dataset(dataset_id)
Comment on lines +43 to +45
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a bit of different thoughts here. Instead of dropping the whole dataset, shouldn't we just drop and create at the table level?

If we do at the table level, I think then we can easily call this function inside process_tables function and avoid duplicate things like making BigQuery connection and S3 download files.


# process tables in public and organization schema
process_tables(client, dataset_ref, "public", public_table_names)
if schema is not None:
process_tables(client, dataset_ref, schema, table_names)
process_tables(client, dataset_ref, schema, schema_table_names)

return {"statusCode": 200, "body": "All done!"}


def verify_total_columns(client, dataset_ref, schema, table_names):
"""Verifies if the columns in s3 tables and bigQuery tables match"""
bucket_name = os.getenv("S3_BUCKET_NAME")
s3_directory = os.getenv("S3_DIRECTORY", "")
s3 = boto3.client("s3")

if schema is None:
return True

for table_name in table_names:
# iterating over tables
table_ref = dataset_ref.table(table_name)
try:
# table found in bigQuery
bigquery_table = client.get_table(table_ref)
except NotFound:
# table not found in bigQuery
return False

# download s3 file into lambda /tmp/ directory
file = s3_directory + table_name + ".csv"
local_file_name = "/tmp/" + table_name + ".csv"
s3.download_file(bucket_name, file, local_file_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to re-use these files in the process_tables function as well and avoid the download there? That may help with reduced run time for lambda.


# calculate total columns of the s3 table
reader = csv.reader(open(local_file_name, "r"), delimiter=",")
list_reader = list(reader)
if len(list_reader) != 0:
s3_table_cols = len(list_reader[0])
# compare the number of columns in both the tables
if s3_table_cols != len(bigquery_table.schema):
return False
return True


def get_table_schema(table_name, schema):
"""Returns the schema for the specified database table as per BigQuery table schema format."""
host = os.getenv("DB_HOST")
Expand Down