Skip to content

Commit

Permalink
perf(ingest/redshift): limit copy lineage (datahub-project#11662)
Browse files Browse the repository at this point in the history
Co-authored-by: Mayuri Nehate <[email protected]>
  • Loading branch information
hsheth2 and mayurinehate authored Nov 20, 2024
1 parent 8638bf9 commit 524ef8c
Showing 1 changed file with 29 additions and 18 deletions.
47 changes: 29 additions & 18 deletions metadata-ingestion/src/datahub/ingestion/source/redshift/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
# We use 290 instead instead of the standard 320, because escape characters can add to the length.
_QUERY_SEQUENCE_LIMIT = 290

_MAX_COPY_ENTRIES_PER_TABLE = 20


class RedshiftCommonQuery:
CREATE_TEMP_TABLE_CLAUSE = "create temp table"
Expand Down Expand Up @@ -293,28 +295,37 @@ def alter_table_rename_query(
def list_copy_commands_sql(
db_name: str, start_time: datetime, end_time: datetime
) -> str:
return """
select
distinct
"schema" as target_schema,
"table" as target_table,
c.file_name as filename
from
SYS_QUERY_DETAIL as si
join SYS_LOAD_DETAIL as c on
si.query_id = c.query_id
join SVV_TABLE_INFO sti on
sti.table_id = si.table_id
where
database = '{db_name}'
and si.start_time >= '{start_time}'
and si.start_time < '{end_time}'
order by target_schema, target_table, si.start_time asc
""".format(
return """\
SELECT DISTINCT
target_schema,
target_table,
filename
FROM (
SELECT
sti."schema" AS target_schema,
sti."table" AS target_table,
c.file_name AS filename,
ROW_NUMBER() OVER (
PARTITION BY sti."schema", sti."table"
ORDER BY si.start_time DESC
) AS rn
FROM
SYS_QUERY_DETAIL AS si
JOIN SYS_LOAD_DETAIL AS c ON si.query_id = c.query_id
JOIN SVV_TABLE_INFO sti ON sti.table_id = si.table_id
WHERE
sti.database = '{db_name}'
AND si.start_time >= '{start_time}'
AND si.start_time < '{end_time}'
) subquery
WHERE rn <= {_MAX_COPY_ENTRIES_PER_TABLE}
ORDER BY target_schema, target_table, filename
""".format(
# We need the original database name for filtering
db_name=db_name,
start_time=start_time.strftime(redshift_datetime_format),
end_time=end_time.strftime(redshift_datetime_format),
_MAX_COPY_ENTRIES_PER_TABLE=_MAX_COPY_ENTRIES_PER_TABLE,
)

@staticmethod
Expand Down

0 comments on commit 524ef8c

Please sign in to comment.