Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaohanzhan-db committed Dec 2, 2023
1 parent 1349944 commit 2bb208b
Showing 1 changed file with 10 additions and 8 deletions.
18 changes: 10 additions & 8 deletions scripts/data_prep/convert_delta_to_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
- python scripts/data_prep/convert_delta_to_json.py --delta_table_name 'main.streaming.dummy_table' --json_output_path /tmp/delta2json2 --debug False --http_path 'sql/protocolv1/o/7395834863327820/1116-234530-6seh113n'
- python scripts/data_prep/convert_delta_to_json.py --delta_table_name 'main.streaming.dummy_table' --json_output_path /tmp/delta2json2 --debug False --http_path /sql/1.0/warehouses/7e083095329f3ca5 --DATABRICKS_HOST e2-dogfood.staging.cloud.databricks.com --DATABRICKS_TOKEN dapi18a0a6fa53b5fb1afbf1101c93eee31f
- python scripts/data_prep/convert_delta_to_json.py --delta_table_name 'main.streaming.dummy_table' --json_output_path /tmp/delta2json2 --debug False --http_path {http_path} --DATABRICKS_HOST {your host} --DATABRICKS_TOKEN {your token}
"""

def stream_delta_to_json(connection, tablename, json_output_folder, key = 'name', batch_size=3):
def stream_delta_to_json(connection, tablename, json_output_folder, key = 'name', batch_size=1<<20):

cursor = connection.cursor()
cursor.execute(f"USE CATALOG main;")
Expand All @@ -32,16 +32,17 @@ def stream_delta_to_json(connection, tablename, json_output_folder, key = 'name'
ans = cursor.fetchall()

total_rows = [ row.asDict() for row in ans ][0].popitem()[1]
print('total_rows = ', total_rows)
log.info(f'total_rows = {total_rows}')

cursor.execute(f"SHOW COLUMNS IN {tablename}")
ans = cursor.fetchall()

order_by = [ row.asDict() for row in ans ][0].popitem()[1]
print('order by column ', order_by)
log.info(f'order by column {order_by}')

for start in range(0, total_rows, batch_size):
end = min(start + batch_size, total_rows)

query = f"""
WITH NumberedRows AS (
SELECT
Expand All @@ -53,6 +54,7 @@ def stream_delta_to_json(connection, tablename, json_output_folder, key = 'name'
SELECT *
FROM NumberedRows
WHERE rn BETWEEN {start+1} AND {end}"""

cursor.execute(query)
ans = cursor.fetchall()

Expand All @@ -78,7 +80,7 @@ def stream_delta_to_json(connection, tablename, json_output_folder, key = 'name'

server_hostname = args.DATABRICKS_HOST if args.DATABRICKS_HOST else os.getenv("DATABRICKS_HOST")
access_token = args.DATABRICKS_TOKEN if args.DATABRICKS_TOKEN else os.getenv("DATABRICKS_TOKEN")
http_path= args.http_path # "
http_path= args.http_path

try:
connection = sql.connect(
Expand All @@ -89,9 +91,9 @@ def stream_delta_to_json(connection, tablename, json_output_folder, key = 'name'
except Exception as e:
raise RuntimeError("Failed to create sql connection to db workspace. Check {server_hostname} and {http_path} and access token!") from exc

#if os.path.exists(args.json_output_path):
# if not os.path.isdir(args.json_output_path) or os.listdir(args.json_output_path):
# raise RuntimeError(f"A file or a folder {args.json_output_path} already exists and is not empty. Remove it and retry!")
if os.path.exists(args.json_output_path):
if not os.path.isdir(args.json_output_path) or os.listdir(args.json_output_path):
raise RuntimeError(f"A file or a folder {args.json_output_path} already exists and is not empty. Remove it and retry!")

os.makedirs(args.json_output_path, exist_ok=True)
log.info(f"Directory {args.json_output_path} created.")
Expand Down

0 comments on commit 2bb208b

Please sign in to comment.