From f715cc2afb2644989b1dee38dcb01f0a38d7f3df Mon Sep 17 00:00:00 2001 From: Lauri Lehtinen Date: Thu, 15 Jul 2021 17:44:50 +0300 Subject: [PATCH] Copy existing metadata when archiving load files (#744) --- .../fastsync/commons/target_snowflake.py | 18 ++++++----- .../commons/test_fastsync_target_snowflake.py | 32 +++++++++++++++++++ 2 files changed, 42 insertions(+), 8 deletions(-) diff --git a/pipelinewise/fastsync/commons/target_snowflake.py b/pipelinewise/fastsync/commons/target_snowflake.py index 9312d5be2..f5ee65d3c 100644 --- a/pipelinewise/fastsync/commons/target_snowflake.py +++ b/pipelinewise/fastsync/commons/target_snowflake.py @@ -140,13 +140,6 @@ def copy_to_archive(self, source_s3_key, tap_id, table): archive_table = table_dict.get('table_name') archive_schema = table_dict.get('schema_name', '') - archive_metadata = { - 'tap': tap_id, - 'schema': archive_schema, - 'table': archive_table, - 'archived-by': 'pipelinewise_fastsync_postgres_to_snowflake' - } - # Retain same filename archive_file_basename = os.path.basename(source_s3_key) @@ -155,6 +148,15 @@ def copy_to_archive(self, source_s3_key, tap_id, table): source_s3_bucket = self.connection_config.get('s3_bucket') + # Combine existing metadata with archive related headers + metadata = self.s3.head_object(Bucket=source_s3_bucket, Key=source_s3_key).get('Metadata', {}) + metadata.update({ + 'tap': tap_id, + 'schema': archive_schema, + 'table': archive_table, + 'archived-by': 'pipelinewise_fastsync_postgres_to_snowflake' + }) + # Get archive s3 bucket from config, defaulting to same bucket used for Snowflake imports if not specified archive_s3_bucket = self.connection_config.get('archive_load_files_s3_bucket', source_s3_bucket) @@ -163,7 +165,7 @@ def copy_to_archive(self, source_s3_key, tap_id, table): LOGGER.info('Archiving %s to %s', copy_source, archive_key) self.s3.copy_object(CopySource=copy_source, Bucket=archive_s3_bucket, Key=archive_key, - Metadata=archive_metadata, MetadataDirective='REPLACE') + Metadata=metadata, MetadataDirective='REPLACE') def create_schema(self, schema): sql = 'CREATE SCHEMA IF NOT EXISTS {}'.format(schema) diff --git a/tests/units/fastsync/commons/test_fastsync_target_snowflake.py b/tests/units/fastsync/commons/test_fastsync_target_snowflake.py index 548a90fa1..cf465f375 100644 --- a/tests/units/fastsync/commons/test_fastsync_target_snowflake.py +++ b/tests/units/fastsync/commons/test_fastsync_target_snowflake.py @@ -21,6 +21,11 @@ def delete_object(self, Bucket, Key): def copy_object(self, **kwargs): """Mock if needed""" + # pylint: disable=no-self-use, unused-argument + def head_object(self, **kwargs): + """Mock if needed""" + return {} + class FastSyncTargetSnowflakeMock(FastSyncTargetSnowflake): """ @@ -584,3 +589,30 @@ def test_custom_archive_destination(self): 'archived-by': 'pipelinewise_fastsync_postgres_to_snowflake' }, MetadataDirective='REPLACE') + + # pylint: disable=invalid-name + def test_copied_archive_metadata(self): + """ + Validate parameters passed to s3 copy_object method when custom s3 bucket and folder are not defined + """ + mock_head_object = MagicMock() + mock_head_object.return_value = {'Metadata': {'copied-old-key': 'copied-old-value'}} + mock_copy_object = MagicMock() + self.snowflake.s3.copy_object = mock_copy_object + self.snowflake.s3.head_object = mock_head_object + self.snowflake.connection_config['s3_bucket'] = 'some_bucket' + self.snowflake.copy_to_archive( + 'snowflake-import/ppw_20210615115603_fastsync.csv.gz', 'some-tap', 'some_schema.some_table') + + mock_copy_object.assert_called_with( + Bucket='some_bucket', + CopySource='some_bucket/snowflake-import/ppw_20210615115603_fastsync.csv.gz', + Key='archive/some-tap/some_table/ppw_20210615115603_fastsync.csv.gz', + Metadata={ + 'copied-old-key': 'copied-old-value', + 'tap': 'some-tap', + 'schema': 'some_schema', + 'table': 'some_table', + 'archived-by': 'pipelinewise_fastsync_postgres_to_snowflake' + }, + MetadataDirective='REPLACE')