diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py index b18b526ef30fce..71a20890d35e88 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py @@ -9,6 +9,8 @@ # We use 290 instead instead of the standard 320, because escape characters can add to the length. _QUERY_SEQUENCE_LIMIT = 290 +_MAX_COPY_ENTRIES_PER_TABLE = 20 + class RedshiftCommonQuery: CREATE_TEMP_TABLE_CLAUSE = "create temp table" @@ -293,28 +295,37 @@ def alter_table_rename_query( def list_copy_commands_sql( db_name: str, start_time: datetime, end_time: datetime ) -> str: - return """ - select - distinct - "schema" as target_schema, - "table" as target_table, - c.file_name as filename - from - SYS_QUERY_DETAIL as si - join SYS_LOAD_DETAIL as c on - si.query_id = c.query_id - join SVV_TABLE_INFO sti on - sti.table_id = si.table_id - where - database = '{db_name}' - and si.start_time >= '{start_time}' - and si.start_time < '{end_time}' - order by target_schema, target_table, si.start_time asc - """.format( + return """\ +SELECT DISTINCT + target_schema, + target_table, + filename +FROM ( + SELECT + sti."schema" AS target_schema, + sti."table" AS target_table, + c.file_name AS filename, + ROW_NUMBER() OVER ( + PARTITION BY sti."schema", sti."table" + ORDER BY si.start_time DESC + ) AS rn + FROM + SYS_QUERY_DETAIL AS si + JOIN SYS_LOAD_DETAIL AS c ON si.query_id = c.query_id + JOIN SVV_TABLE_INFO sti ON sti.table_id = si.table_id + WHERE + sti.database = '{db_name}' + AND si.start_time >= '{start_time}' + AND si.start_time < '{end_time}' +) subquery +WHERE rn <= {_MAX_COPY_ENTRIES_PER_TABLE} +ORDER BY target_schema, target_table, filename +""".format( # We need the original database name for filtering db_name=db_name, start_time=start_time.strftime(redshift_datetime_format), end_time=end_time.strftime(redshift_datetime_format), + _MAX_COPY_ENTRIES_PER_TABLE=_MAX_COPY_ENTRIES_PER_TABLE, ) @staticmethod