From 93249f3ead662f7e5cb070e343ae261937f363a3 Mon Sep 17 00:00:00 2001 From: Vipul Rai Date: Sat, 23 Mar 2024 16:35:05 +0100 Subject: [PATCH 1/2] poc to use sqlachemy: wip --- setup.cfg | 2 + src/diepvries/template_sql/hub_link_dml.py | 73 ++++++++++++++++++++++ 2 files changed, 75 insertions(+) create mode 100644 src/diepvries/template_sql/hub_link_dml.py diff --git a/setup.cfg b/setup.cfg index 0f85d1b..d7c77c8 100644 --- a/setup.cfg +++ b/setup.cfg @@ -23,6 +23,8 @@ include_package_data = true package_dir = = src install_requires = snowflake-connector-python~=3.0 + snowflake-sqlalchemy + pyyaml test_requires = pytest~=6.2 diff --git a/src/diepvries/template_sql/hub_link_dml.py b/src/diepvries/template_sql/hub_link_dml.py new file mode 100644 index 0000000..346feb7 --- /dev/null +++ b/src/diepvries/template_sql/hub_link_dml.py @@ -0,0 +1,73 @@ +from sqlalchemy import create_engine +import yaml + +with open("params.yaml", "r") as yamlfile: + parmas = yaml.load(yamlfile, Loader=yaml.FullLoader) + +engine = create_engine( + "snowflake://{user}:{password}@{account_identifier}/diepvries_tutorial".format( + user=parmas["user"], + password=parmas["password"], + account_identifier=parmas["account"], + + ) +) +# try: +# connection = engine.connect() +# results = connection.execute("select current_version()").fetchone() +# print(results[0]) +# finally: +# connection.close() +# engine.dispose() + + +# def set_minimum_timestamp(engine): +# min_timestamp = engine.execute( +# """SELECT +# DATEADD(HOUR, -4, COALESCE(MIN(target.{record_start_timestamp}), CURRENT_TIMESTAMP())) +# FROM {staging_schema}.{staging_table} AS staging +# INNER JOIN {target_schema}.{target_table} AS target +# ON (staging.{source_hashkey_field} = target.{target_hashkey_field}) +# )""" +# ) +# return min_timestamp + + +connection = engine.connect() + +def fetch_timestamp(engine): + min_timestamp = engine.execute( + """SELECT create_ts from dv_extract.order_customer""" + ).fetchone() + return min_timestamp + + + +print(fetch_timestamp(engine=engine)) + + + + +# def merge_SQL(min_timestamp): +# SQL = """MERGE INTO {target_schema}.{target_table} AS target +# USING ( +# SELECT DISTINCT +# {source_hashkey_field}, +# -- If multiple sources for the same hashkey are received, their values +# -- are concatenated using a comma. +# LISTAGG(DISTINCT {record_source_field}, ',') +# WITHIN GROUP (ORDER BY {record_source_field}) +# OVER (PARTITION BY {source_hashkey_field}) AS {record_source_field}, +# {source_fields} +# FROM {staging_schema}.{staging_table} +# ) AS staging ON (target.{target_hashkey_field} = staging.{source_hashkey_field} +# AND target.{record_start_timestamp} >= min_timestamp) +# WHEN NOT MATCHED THEN INSERT ({target_fields}) +# VALUES ({staging_source_fields})""" + + +# sql_load_statement = ( +# (TEMPLATES_DIR / "hub_link_dml.sql") +# .read_text() +# .format(**self.sql_placeholders) +# ) \ No newline at end of file From b9ff5c3373d7bb678977c5bdbd6a32f9c4c919a6 Mon Sep 17 00:00:00 2001 From: Ignas Krikstaponis Date: Sat, 23 Mar 2024 17:31:15 +0100 Subject: [PATCH 2/2] placeholder function --- src/diepvries/template_sql/hub_link_dml.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/diepvries/template_sql/hub_link_dml.py b/src/diepvries/template_sql/hub_link_dml.py index 346feb7..e39d888 100644 --- a/src/diepvries/template_sql/hub_link_dml.py +++ b/src/diepvries/template_sql/hub_link_dml.py @@ -42,8 +42,17 @@ def fetch_timestamp(engine): return min_timestamp +def fetch_timestamp_placeholder(engine, params): + min_timestamp = engine.execute( + """SELECT create_ts from dv_extract.{table}""".format(**params) + ).fetchone() + return min_timestamp + + + -print(fetch_timestamp(engine=engine)) +params = {"table": "order_customer"} +print(fetch_timestamp_placeholder(engine=engine, params= params))