Skip to content

Commit

Permalink
Copy existing metadata when archiving load files (#744)
Browse files Browse the repository at this point in the history
  • Loading branch information
Lauri Lehtinen authored Jul 15, 2021
1 parent 6606e24 commit f715cc2
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 8 deletions.
18 changes: 10 additions & 8 deletions pipelinewise/fastsync/commons/target_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,6 @@ def copy_to_archive(self, source_s3_key, tap_id, table):
archive_table = table_dict.get('table_name')
archive_schema = table_dict.get('schema_name', '')

archive_metadata = {
'tap': tap_id,
'schema': archive_schema,
'table': archive_table,
'archived-by': 'pipelinewise_fastsync_postgres_to_snowflake'
}

# Retain same filename
archive_file_basename = os.path.basename(source_s3_key)

Expand All @@ -155,6 +148,15 @@ def copy_to_archive(self, source_s3_key, tap_id, table):

source_s3_bucket = self.connection_config.get('s3_bucket')

# Combine existing metadata with archive related headers
metadata = self.s3.head_object(Bucket=source_s3_bucket, Key=source_s3_key).get('Metadata', {})
metadata.update({
'tap': tap_id,
'schema': archive_schema,
'table': archive_table,
'archived-by': 'pipelinewise_fastsync_postgres_to_snowflake'
})

# Get archive s3 bucket from config, defaulting to same bucket used for Snowflake imports if not specified
archive_s3_bucket = self.connection_config.get('archive_load_files_s3_bucket', source_s3_bucket)

Expand All @@ -163,7 +165,7 @@ def copy_to_archive(self, source_s3_key, tap_id, table):
LOGGER.info('Archiving %s to %s', copy_source, archive_key)

self.s3.copy_object(CopySource=copy_source, Bucket=archive_s3_bucket, Key=archive_key,
Metadata=archive_metadata, MetadataDirective='REPLACE')
Metadata=metadata, MetadataDirective='REPLACE')

def create_schema(self, schema):
sql = 'CREATE SCHEMA IF NOT EXISTS {}'.format(schema)
Expand Down
32 changes: 32 additions & 0 deletions tests/units/fastsync/commons/test_fastsync_target_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ def delete_object(self, Bucket, Key):
def copy_object(self, **kwargs):
"""Mock if needed"""

# pylint: disable=no-self-use, unused-argument
def head_object(self, **kwargs):
"""Mock if needed"""
return {}


class FastSyncTargetSnowflakeMock(FastSyncTargetSnowflake):
"""
Expand Down Expand Up @@ -584,3 +589,30 @@ def test_custom_archive_destination(self):
'archived-by': 'pipelinewise_fastsync_postgres_to_snowflake'
},
MetadataDirective='REPLACE')

# pylint: disable=invalid-name
def test_copied_archive_metadata(self):
"""
Validate parameters passed to s3 copy_object method when custom s3 bucket and folder are not defined
"""
mock_head_object = MagicMock()
mock_head_object.return_value = {'Metadata': {'copied-old-key': 'copied-old-value'}}
mock_copy_object = MagicMock()
self.snowflake.s3.copy_object = mock_copy_object
self.snowflake.s3.head_object = mock_head_object
self.snowflake.connection_config['s3_bucket'] = 'some_bucket'
self.snowflake.copy_to_archive(
'snowflake-import/ppw_20210615115603_fastsync.csv.gz', 'some-tap', 'some_schema.some_table')

mock_copy_object.assert_called_with(
Bucket='some_bucket',
CopySource='some_bucket/snowflake-import/ppw_20210615115603_fastsync.csv.gz',
Key='archive/some-tap/some_table/ppw_20210615115603_fastsync.csv.gz',
Metadata={
'copied-old-key': 'copied-old-value',
'tap': 'some-tap',
'schema': 'some_schema',
'table': 'some_table',
'archived-by': 'pipelinewise_fastsync_postgres_to_snowflake'
},
MetadataDirective='REPLACE')

0 comments on commit f715cc2

Please sign in to comment.