From 2bb208ba30359214e34c680f825db94a7ea58fb3 Mon Sep 17 00:00:00 2001 From: Xiaohan Zhang Date: Fri, 1 Dec 2023 21:45:18 -0800 Subject: [PATCH] update --- scripts/data_prep/convert_delta_to_json.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/scripts/data_prep/convert_delta_to_json.py b/scripts/data_prep/convert_delta_to_json.py index d6e0c2fc27..300c04fe9d 100644 --- a/scripts/data_prep/convert_delta_to_json.py +++ b/scripts/data_prep/convert_delta_to_json.py @@ -19,11 +19,11 @@ - python scripts/data_prep/convert_delta_to_json.py --delta_table_name 'main.streaming.dummy_table' --json_output_path /tmp/delta2json2 --debug False --http_path 'sql/protocolv1/o/7395834863327820/1116-234530-6seh113n' - - python scripts/data_prep/convert_delta_to_json.py --delta_table_name 'main.streaming.dummy_table' --json_output_path /tmp/delta2json2 --debug False --http_path /sql/1.0/warehouses/7e083095329f3ca5 --DATABRICKS_HOST e2-dogfood.staging.cloud.databricks.com --DATABRICKS_TOKEN dapi18a0a6fa53b5fb1afbf1101c93eee31f + - python scripts/data_prep/convert_delta_to_json.py --delta_table_name 'main.streaming.dummy_table' --json_output_path /tmp/delta2json2 --debug False --http_path {http_path} --DATABRICKS_HOST {your host} --DATABRICKS_TOKEN {your token} """ -def stream_delta_to_json(connection, tablename, json_output_folder, key = 'name', batch_size=3): +def stream_delta_to_json(connection, tablename, json_output_folder, key = 'name', batch_size=1<<20): cursor = connection.cursor() cursor.execute(f"USE CATALOG main;") @@ -32,16 +32,17 @@ def stream_delta_to_json(connection, tablename, json_output_folder, key = 'name' ans = cursor.fetchall() total_rows = [ row.asDict() for row in ans ][0].popitem()[1] - print('total_rows = ', total_rows) + log.info(f'total_rows = {total_rows}') cursor.execute(f"SHOW COLUMNS IN {tablename}") ans = cursor.fetchall() order_by = [ row.asDict() for row in ans ][0].popitem()[1] - print('order by column ', order_by) + log.info(f'order by column {order_by}') for start in range(0, total_rows, batch_size): end = min(start + batch_size, total_rows) + query = f""" WITH NumberedRows AS ( SELECT @@ -53,6 +54,7 @@ def stream_delta_to_json(connection, tablename, json_output_folder, key = 'name' SELECT * FROM NumberedRows WHERE rn BETWEEN {start+1} AND {end}""" + cursor.execute(query) ans = cursor.fetchall() @@ -78,7 +80,7 @@ def stream_delta_to_json(connection, tablename, json_output_folder, key = 'name' server_hostname = args.DATABRICKS_HOST if args.DATABRICKS_HOST else os.getenv("DATABRICKS_HOST") access_token = args.DATABRICKS_TOKEN if args.DATABRICKS_TOKEN else os.getenv("DATABRICKS_TOKEN") - http_path= args.http_path # " + http_path= args.http_path try: connection = sql.connect( @@ -89,9 +91,9 @@ def stream_delta_to_json(connection, tablename, json_output_folder, key = 'name' except Exception as e: raise RuntimeError("Failed to create sql connection to db workspace. Check {server_hostname} and {http_path} and access token!") from exc - #if os.path.exists(args.json_output_path): - # if not os.path.isdir(args.json_output_path) or os.listdir(args.json_output_path): - # raise RuntimeError(f"A file or a folder {args.json_output_path} already exists and is not empty. Remove it and retry!") + if os.path.exists(args.json_output_path): + if not os.path.isdir(args.json_output_path) or os.listdir(args.json_output_path): + raise RuntimeError(f"A file or a folder {args.json_output_path} already exists and is not empty. Remove it and retry!") os.makedirs(args.json_output_path, exist_ok=True) log.info(f"Directory {args.json_output_path} created.")