diff --git a/.circleci/config.yml b/.circleci/config.yml index 8d6fa14..febbaba 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -1,8 +1,11 @@ -version: 2 +version: 2.1 +orbs: + slack: circleci/slack@3.4.2 + jobs: build: docker: - - image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:tap-tester-v4 + - image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:stitch-tap-tester - image: amazon/dynamodb-local entrypoint: ["java", "-Xmx1G", "-jar", "DynamoDBLocal.jar"] steps: @@ -14,30 +17,40 @@ jobs: virtualenv -p python3 /usr/local/share/virtualenvs/tap-dynamodb source /usr/local/share/virtualenvs/tap-dynamodb/bin/activate pip install .[dev] + pip install pytest-cov - run: name: 'pylint' command: | source /usr/local/share/virtualenvs/tap-dynamodb/bin/activate make lint + - run: + name: 'Unit Tests' + command: | + source /usr/local/share/virtualenvs/tap-dynamodb/bin/activate + pip install nose coverage + nosetests --with-coverage --cover-erase --cover-package=tap_dynamodb --cover-html-dir=htmlcov tests/unittests + coverage html + - store_test_results: + path: test_output/report.xml + - store_artifacts: + path: htmlcov - run: name: 'Tap Tester' command: | aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/sandbox tap-tester.env source tap-tester.env source /usr/local/share/virtualenvs/tap-tester/bin/activate - run-test --tap=tap-dynamodb \ - --target=target-stitch \ - --orchestrator=stitch-orchestrator \ - --email=harrison+sandboxtest@stitchdata.com \ - --password=$SANDBOX_PASSWORD \ - --client-id=50 \ - tests + run-test --tap=tap-dynamodb tests + - slack/notify-on-failure: + only_for_branches: master workflows: version: 2 commit: jobs: - build: - context: circleci-user + context: + - circleci-user + - tap-tester-user build_daily: triggers: - schedule: @@ -48,4 +61,6 @@ workflows: - master jobs: - build: - context: circleci-user + context: + - circleci-user + - tap-tester-user diff --git a/tap_dynamodb/dynamodb.py b/tap_dynamodb/dynamodb.py index e503b2b..69d45b2 100644 --- a/tap_dynamodb/dynamodb.py +++ b/tap_dynamodb/dynamodb.py @@ -1,3 +1,4 @@ +import json import backoff import boto3 import singer @@ -79,3 +80,10 @@ def get_stream_client(config): region_name=config['region_name']) return boto3.client('dynamodbstreams', region_name=config['region_name']) + +def decode_expression(expression): + '''Convert the string into JSON object and raise an exception if invalid JSON format''' + try: + return json.loads(expression) + except json.decoder.JSONDecodeError: + raise Exception("Invalid JSON format. The expression attributes should contain a valid JSON format.") diff --git a/tap_dynamodb/sync.py b/tap_dynamodb/sync.py index bfe26fd..e6d81c3 100644 --- a/tap_dynamodb/sync.py +++ b/tap_dynamodb/sync.py @@ -25,6 +25,7 @@ def sync_stream(config, state, stream): table_name = stream['tap_stream_id'] md_map = metadata.to_map(stream['metadata']) + replication_method = metadata.get(md_map, (), 'replication-method') key_properties = metadata.get(md_map, (), 'table-key-properties') diff --git a/tap_dynamodb/sync_strategies/full_table.py b/tap_dynamodb/sync_strategies/full_table.py index 7aa4dfc..6523e99 100644 --- a/tap_dynamodb/sync_strategies/full_table.py +++ b/tap_dynamodb/sync_strategies/full_table.py @@ -1,5 +1,4 @@ import time - import singer from singer import metadata from tap_dynamodb import dynamodb @@ -8,14 +7,16 @@ LOGGER = singer.get_logger() -def scan_table(table_name, projection, last_evaluated_key, config): +def scan_table(table_name, projection, expression, last_evaluated_key, config): scan_params = { 'TableName': table_name, 'Limit': 1000 } - if projection is not None and projection != '': scan_params['ProjectionExpression'] = projection + if expression: + # Add `ExpressionAttributeNames` parameter for reserved word. + scan_params['ExpressionAttributeNames'] = dynamodb.decode_expression(expression) if last_evaluated_key is not None: scan_params['ExclusiveStartKey'] = last_evaluated_key @@ -71,10 +72,15 @@ def sync(config, state, stream): md_map = metadata.to_map(stream['metadata']) projection = metadata.get(md_map, (), 'tap-mongodb.projection') + # An expression attribute name is a placeholder that one uses in an Amazon DynamoDB expression as an alternative to an actual attribute name. + # Sometimes it might need to write an expression containing an attribute name that conflicts with a DynamoDB reserved word. + # For example, table `A` contains the field `Comment` but `Comment` is a reserved word. So, it fails during fetch. + expression = metadata.get(md_map, (), 'tap-dynamodb.expression-attributes') + rows_saved = 0 deserializer = Deserializer() - for result in scan_table(table_name, projection, last_evaluated_key, config): + for result in scan_table(table_name, projection, expression, last_evaluated_key, config): for item in result.get('Items', []): rows_saved += 1 # TODO: Do we actually have to put the item we retreive from diff --git a/tap_dynamodb/sync_strategies/log_based.py b/tap_dynamodb/sync_strategies/log_based.py index 4ac37e0..7ca19b3 100644 --- a/tap_dynamodb/sync_strategies/log_based.py +++ b/tap_dynamodb/sync_strategies/log_based.py @@ -115,6 +115,27 @@ def sync_shard(shard, seq_number_bookmarks, streams_client, stream_arn, projecti singer.write_state(state) return rows_synced +def prepare_projection(projection, expression, exp_key_traverse): + ''' + Prepare the projection based on the expression attributes + For example: + projections = "#c", expressions = {"#c": "Comment"} + return = ["Comment"] + Example of nested expression : + In Catalog - projections = "#n[0].#a", expressions = {"#n": "Name", "#a": "Age"} + projection passed in the function = ['#n[0]', '#a'] + projection returned from the function = ['Name[0]', 'Age'] + ''' + for index, part in enumerate(projection): + if '#' in part: + replaceable_part = part.split('[', 1)[0] if '[' in part else part + # check if the projection placeholder is defined in the expression else raise an exception + if replaceable_part in expression: + exp_key_traverse.discard(replaceable_part) + # replace the value given in the expression with the key in the projection + projection[index] = part.replace(replaceable_part, expression[replaceable_part]) + else: + raise Exception("No expression is available for the given projection: {}.".format(replaceable_part)) def sync(config, state, stream): table_name = stream['tap_stream_id'] @@ -124,9 +145,26 @@ def sync(config, state, stream): md_map = metadata.to_map(stream['metadata']) projection = metadata.get(md_map, (), 'tap-mongodb.projection') + expression = metadata.get(md_map, (), 'tap-dynamodb.expression-attributes') if projection is not None: projection = [x.strip().split('.') for x in projection.split(',')] - + if expression: + # decode the expression in jsonified object. + expression = dynamodb.decode_expression(expression) + # loop over the expression keys + for expr in expression.keys(): + # check if the expression key starts with `#` and if not raise an exception + if not expr[0].startswith("#"): + raise Exception("Expression key '{}' must start with '#'.".format(expr)) + + # A set to check all the expression keys are used in the projections or not. + exp_key_traverse = set(expression.keys()) + + for proj in projection: + prepare_projection(proj, expression, exp_key_traverse) + + if exp_key_traverse: + raise Exception("No projection is available for the expression keys: {}.".format(exp_key_traverse)) # Write activate version message stream_version = singer.get_bookmark(state, table_name, 'version') singer.write_version(table_name, stream_version) diff --git a/tests/base.py b/tests/base.py index a0ea07c..d820826 100644 --- a/tests/base.py +++ b/tests/base.py @@ -5,21 +5,19 @@ import boto3 from boto3.dynamodb.types import TypeSerializer, TypeDeserializer -import singer - from tap_tester import connections from tap_tester import menagerie from tap_tester import runner +from tap_tester.logger import LOGGER ALL_TABLE_NAMES_TO_CLEAR = frozenset({ 'simple_table_1', 'simple_table_2', 'simple_table_3', + 'simple_table_4', 'com-stitchdata-test-dynamodb-integration-simple_table_1', }) -LOGGER = singer.get_logger() - class TestDynamoDBBase(unittest.TestCase): _client = None diff --git a/tests/test_dynamodb_discovery.py b/tests/test_dynamodb_discovery.py index d20df97..d5da72a 100644 --- a/tests/test_dynamodb_discovery.py +++ b/tests/test_dynamodb_discovery.py @@ -1,6 +1,5 @@ from boto3.dynamodb.types import TypeSerializer -from tap_tester.scenario import (SCENARIOS) from base import TestDynamoDBBase @@ -53,6 +52,3 @@ def name(): def test_run(self): self.pre_sync_test() - - -SCENARIOS.add(DynamoDBDiscovery) diff --git a/tests/test_dynamodb_full_table_interruptible_sync.py b/tests/test_dynamodb_full_table_interruptible_sync.py index 24902a5..aee0d72 100644 --- a/tests/test_dynamodb_full_table_interruptible_sync.py +++ b/tests/test_dynamodb_full_table_interruptible_sync.py @@ -1,17 +1,13 @@ import decimal -import singer from boto3.dynamodb.types import TypeSerializer -from tap_tester.scenario import (SCENARIOS) from tap_tester import connections from tap_tester import menagerie from tap_tester import runner from base import TestDynamoDBBase -LOGGER = singer.get_logger() - class DynamoDBFullTableInterruptible(TestDynamoDBBase): def expected_table_config(self): @@ -129,6 +125,3 @@ def test_run(self): self.assertIsNone(state['bookmarks'][table_name].get('last_evaluated_key')) self.assertTrue(state['bookmarks'][table_name].get('initial_full_table_complete', False)) - - -SCENARIOS.add(DynamoDBFullTableInterruptible) diff --git a/tests/test_dynamodb_full_table_primary_and_hash_key_projections.py b/tests/test_dynamodb_full_table_primary_and_hash_key_projections.py new file mode 100644 index 0000000..9fbce87 --- /dev/null +++ b/tests/test_dynamodb_full_table_primary_and_hash_key_projections.py @@ -0,0 +1,113 @@ +from boto3.dynamodb.types import TypeSerializer + +from tap_tester import connections +from tap_tester import menagerie +from tap_tester import runner +from base import TestDynamoDBBase + + +class DynamoDBFullTablePrimaryAndHashKeyReservedWords(TestDynamoDBBase): + def expected_table_config(self): + return [ + { + 'TableName': 'simple_table_4', + # Added the `Comment` which is a reserved word as the primary key (HashKey) to verify the expression attributes works for them + 'HashKey': 'Comment', + 'HashType': 'N', + # Added the `Name` which is a reserved word as the replication key (SortKey) to verify the expression attributes works for them + 'SortKey': 'Name', + 'SortType': 'S', + 'generator': self.generate_simple_items_4, + 'num_rows': 100, + 'ProjectionExpression': '#cmt, #name', + 'top_level_keys': {'Name', 'Comment'} + } + ] + + @staticmethod + def generate_simple_items_4(num_items, start_key=0): + '''Generate unique records for the table.''' + serializer = TypeSerializer() + for i in range(start_key, start_key + num_items): + record = { + 'Comment': i, + 'Name': 'Test Name' + str(i), + 'boolean_field': True, + } + yield serializer.serialize(record) + + @staticmethod + def name(): + return "tt_dynamodb_ft_pkhk_projections" + + def test_run(self): + (table_configs, conn_id, _) = self.pre_sync_test() + + # Select simple_coll_1 and simple_coll_2 streams and add replication method metadata + found_catalogs = menagerie.get_catalogs(conn_id) + for stream_catalog in found_catalogs: + expected_config = [x for x in table_configs if x['TableName'] == stream_catalog['tap_stream_id']][0] + annotated_schema = menagerie.get_annotated_schema(conn_id, stream_catalog['stream_id']) + additional_md = [{"breadcrumb" : [], "metadata" : { + 'replication-method' : 'FULL_TABLE', + 'tap-dynamodb.expression-attributes': "{\"#cmt\": \"Comment\", \"#name\": \"Name\"}", # `expression` field for reserve word. + 'tap-mongodb.projection': expected_config['ProjectionExpression'] + }}] + connections.select_catalog_and_fields_via_metadata(conn_id, + stream_catalog, + annotated_schema, + additional_md) + + # run full table sync + sync_job_name = runner.run_sync_mode(self, conn_id) + + exit_status = menagerie.get_exit_status(conn_id, sync_job_name) + menagerie.verify_sync_exit_status(self, exit_status, sync_job_name) + + # verify the persisted schema was correct + messages_by_stream = runner.get_records_from_target_output() + + expected_pks = {} + + for config in table_configs: + key = {config['HashKey']} + if config.get('SortKey'): + key |= {config.get('SortKey')} + expected_pks[config['TableName']] = key + + # assert that each of the streams that we synced are the ones that we expect to see + record_count_by_stream = runner.examine_target_output_file(self, + conn_id, + {x['TableName'] for x in table_configs}, + expected_pks) + + state = menagerie.get_state(conn_id) + + first_versions = {} + + # assert that we get the correct number of records for each stream + for config in table_configs: + table_name = config['TableName'] + + self.assertEqual(config['num_rows'], + record_count_by_stream[table_name]) + + # assert that an activate_version_message is first and last message sent for each stream + self.assertEqual('activate_version', + messages_by_stream[table_name]['messages'][0]['action']) + self.assertEqual('activate_version', + messages_by_stream[table_name]['messages'][-1]['action']) + + # assert that the state has an initial_full_table_complete == True + self.assertTrue(state['bookmarks'][table_name]['initial_full_table_complete']) + # assert that there is a version bookmark in state + first_versions[table_name] = state['bookmarks'][table_name]['version'] + self.assertIsNotNone(first_versions[table_name]) + + # assert that the projection causes the correct fields to be returned + for message in messages_by_stream[table_name]['messages']: + if message['action'] == 'upsert': + if not message['data'].get('_sdc_deleted_at'): + top_level_keys = {*message['data'].keys()} + # Verify that the reserved words as primary keys and replication keys are replicated. + self.assertEqual(config['top_level_keys'], top_level_keys) diff --git a/tests/test_dynamodb_full_table_sync.py b/tests/test_dynamodb_full_table_sync.py index 77f055e..2bb1d60 100644 --- a/tests/test_dynamodb_full_table_sync.py +++ b/tests/test_dynamodb_full_table_sync.py @@ -1,18 +1,13 @@ import decimal -import singer - from boto3.dynamodb.types import TypeSerializer -from tap_tester.scenario import (SCENARIOS) from tap_tester import connections from tap_tester import menagerie from tap_tester import runner from base import TestDynamoDBBase -LOGGER = singer.get_logger() - class DynamoDBFullTable(TestDynamoDBBase): def expected_table_config(self): @@ -108,5 +103,3 @@ def test_run(self): # assert that there is a version bookmark in state first_versions[table_name] = state['bookmarks'][table_name]['version'] self.assertIsNotNone(first_versions[table_name]) - -SCENARIOS.add(DynamoDBFullTable) diff --git a/tests/test_dynamodb_log_based.py b/tests/test_dynamodb_log_based.py index 79d14ea..f518e9f 100644 --- a/tests/test_dynamodb_log_based.py +++ b/tests/test_dynamodb_log_based.py @@ -1,16 +1,11 @@ -import singer - from boto3.dynamodb.types import TypeSerializer -from tap_tester.scenario import (SCENARIOS) from tap_tester import connections from tap_tester import menagerie from tap_tester import runner from base import TestDynamoDBBase -LOGGER = singer.get_logger() - class DynamoDBLogBased(TestDynamoDBBase): def expected_table_config(self): @@ -143,6 +138,3 @@ def test_run(self): self.assertEqual(31, len(stream['messages'])) state = menagerie.get_state(conn_id) - - -SCENARIOS.add(DynamoDBLogBased) diff --git a/tests/test_dynamodb_log_based_interruptible.py b/tests/test_dynamodb_log_based_interruptible.py index eaf39f6..f13ff5d 100644 --- a/tests/test_dynamodb_log_based_interruptible.py +++ b/tests/test_dynamodb_log_based_interruptible.py @@ -1,16 +1,11 @@ -import singer - from boto3.dynamodb.types import TypeSerializer -from tap_tester.scenario import (SCENARIOS) from tap_tester import connections from tap_tester import menagerie from tap_tester import runner from base import TestDynamoDBBase -LOGGER = singer.get_logger() - class DynamoDBLogBased(TestDynamoDBBase): def expected_table_config(self): @@ -202,7 +197,3 @@ def first_sync_test(self, table_configs, conn_id): # as the full table sync state['bookmarks'][table_name].pop('finished_shards') menagerie.set_state(conn_id, state, version=state_version) - - - -SCENARIOS.add(DynamoDBLogBased) diff --git a/tests/test_dynamodb_log_based_primary_and_hash_key_projections.py b/tests/test_dynamodb_log_based_primary_and_hash_key_projections.py new file mode 100644 index 0000000..5828923 --- /dev/null +++ b/tests/test_dynamodb_log_based_primary_and_hash_key_projections.py @@ -0,0 +1,171 @@ +from boto3.dynamodb.types import TypeSerializer + +from tap_tester import connections +from tap_tester import menagerie +from tap_tester import runner + +from base import TestDynamoDBBase + + +class DynamoDBLogBasedPrimaryAndHashKeyReservedWords(TestDynamoDBBase): + def expected_table_config(self): + return [ + { + 'TableName': 'simple_table_4', + # Added the `Comment` which is a reserved word as the primary key (HashKey) to verify the expression attributes works for them + 'HashKey': 'Comment', + 'HashType': 'N', + # Added the `Name` which is a reserved word as the replication key (SortKey) to verify the expression attributes works for them + 'SortKey': 'Name', + 'SortType': 'S', + 'generator': self.generate_simple_items_4, + 'num_rows': 100, + 'ProjectionExpression': '#cmt, #name', + 'top_level_keys': {'Name', 'Comment'} + } + ] + + @staticmethod + def generate_simple_items_4(num_items, start_key=0): + '''Generate unique records for the table.''' + serializer = TypeSerializer() + for i in range(start_key, start_key + num_items): + record = { + 'Comment': i, + 'Name': 'Test Name' + str(i) + } + yield serializer.serialize(record) + + @staticmethod + def name(): + return "tt_dynamodb_log_pkhk_projections" + + def test_run(self): + (table_configs, conn_id, expected_streams) = self.pre_sync_test() + + # Select simple_coll_1 and simple_coll_2 streams and add replication method metadata + found_catalogs = menagerie.get_catalogs(conn_id) + for stream_catalog in found_catalogs: + annotated_schema = menagerie.get_annotated_schema(conn_id, stream_catalog['stream_id']) + additional_md = [ + { + "breadcrumb": [], + "metadata": { + 'replication-method': 'LOG_BASED', + 'tap-dynamodb.expression-attributes': "{\"#cmt\": \"Comment\", \"#name\": \"Name\"}", # `expression` field for reserve word. + 'tap-mongodb.projection': table_configs[0]['ProjectionExpression'] + } + } + ] + connections.select_catalog_and_fields_via_metadata(conn_id, + stream_catalog, + annotated_schema, + additional_md) + + self.first_sync_test(table_configs, conn_id, expected_streams) + + ################################ + # Run sync SECOND TIME and check that no records came through + ################################ + # Disable streams forces shards to close + self.disableStreams(expected_streams) + sync_job_name = runner.run_sync_mode(self, conn_id) + self.enableStreams(expected_streams) + + exit_status = menagerie.get_exit_status(conn_id, sync_job_name) + menagerie.verify_sync_exit_status(self, exit_status, sync_job_name) + + # verify the persisted schema was correct + messages_by_stream = runner.get_records_from_target_output() + + # Check that we only have 1 message (activate_version) on syncing + # a stream without changes + for stream in messages_by_stream.values(): + self.assertEqual(1, len(stream['messages'])) + + menagerie.get_state(conn_id) + + ################################ + # Run sync THIRD TIME and check that records did come through + ################################ + # Disable streams forces shards to close + self.disableStreams(expected_streams) + sync_job_name = runner.run_sync_mode(self, conn_id) + + exit_status = menagerie.get_exit_status(conn_id, sync_job_name) + menagerie.verify_sync_exit_status(self, exit_status, sync_job_name) + + # verify the persisted schema was correct + messages_by_stream = runner.get_records_from_target_output() + + for config in table_configs: + table_name = config['TableName'] + + for message in messages_by_stream[table_name]['messages']: + if message['action'] == 'upsert': + if not message['data'].get('_sdc_deleted_at'): + top_level_keys = {*message['data'].keys()} + self.assertEqual(config['top_level_keys'], top_level_keys) + + menagerie.get_state(conn_id) + + def first_sync_test(self, table_configs, conn_id, expected_streams): + ################################### + # SYNC MODE FIRST RUN + ################################### + # Disable streams forces shards to close + self.disableStreams(expected_streams) + sync_job_name = runner.run_sync_mode(self, conn_id) + self.enableStreams(expected_streams) + + exit_status = menagerie.get_exit_status(conn_id, sync_job_name) + menagerie.verify_sync_exit_status(self, exit_status, sync_job_name) + + # verify the persisted schema was correct + messages_by_stream = runner.get_records_from_target_output() + expected_pks = {} + + for config in table_configs: + key = {config['HashKey']} + if config.get('SortKey'): + key |= {config.get('SortKey')} + expected_pks[config['TableName']] = key + + # assert that each of the streams that we synced are the ones that we expect to see + record_count_by_stream = runner.examine_target_output_file(self, + conn_id, + {x['TableName'] for x in table_configs}, + expected_pks) + + state = menagerie.get_state(conn_id) + + first_versions = {} + + # assert that we get the correct number of records for each stream + for config in table_configs: + table_name = config['TableName'] + + self.assertEqual(config['num_rows'], + record_count_by_stream[table_name]) + + # assert that an activate_version_message is first and last message sent for each stream + self.assertEqual('activate_version', + messages_by_stream[table_name]['messages'][0]['action']) + self.assertEqual('activate_version', + messages_by_stream[table_name]['messages'][-1]['action']) + + # assert that the state has an initial_full_table_complete == True + self.assertTrue(state['bookmarks'][table_name]['initial_full_table_complete']) + # assert that there is a version bookmark in state + first_versions[table_name] = state['bookmarks'][table_name]['version'] + self.assertIsNotNone(first_versions[table_name]) + + for config in table_configs: + table_name = config['TableName'] + + for message in messages_by_stream[table_name]['messages']: + if message['action'] == 'upsert': + if not message['data'].get('_sdc_deleted_at'): + top_level_keys = {*message['data'].keys()} + # Verify that the reserved words as primary keys and replication keys are replicated. + self.assertEqual(config['top_level_keys'], top_level_keys) diff --git a/tests/test_dynamodb_log_based_projections.py b/tests/test_dynamodb_log_based_projections.py index 7c79e69..5f8b1ef 100644 --- a/tests/test_dynamodb_log_based_projections.py +++ b/tests/test_dynamodb_log_based_projections.py @@ -1,17 +1,13 @@ import decimal -import singer from boto3.dynamodb.types import TypeSerializer -from tap_tester.scenario import (SCENARIOS) from tap_tester import connections from tap_tester import menagerie from tap_tester import runner from base import TestDynamoDBBase -LOGGER = singer.get_logger() - class DynamoDBLogBasedProjections(TestDynamoDBBase): def expected_table_config(self): @@ -22,9 +18,11 @@ def expected_table_config(self): 'HashType': 'N', 'generator': self.generate_items, 'num_rows': 100, - 'ProjectionExpression': 'int_id, string_field, decimal_field, int_list_field[1], map_field.map_entry_1, string_list[2], map_field.list_entry[2], list_map[1].a', - 'top_level_keys': {'int_id', 'string_field', 'decimal_field', 'int_list_field', 'map_field', 'string_list', 'list_map'}, - 'top_level_list_keys': {'int_list_field', 'string_list', 'list_map'}, + # Added extra reserve words to verify the sync to retrieve `Absolute`, `Comment` and `Name[1].Comment` + # and test_object.nested_field as a field and test_object.nested_field as a nested field. + 'ProjectionExpression': '#name[1].#cmt, #name[2].#testfield.#cmt, #abs, #cmt, #tstobj.#nestf, #tobj_nested, int_id, string_field, decimal_field, int_list_field[1], map_field.map_entry_1, string_list[2], map_field.list_entry[2], list_map[1].a', + 'top_level_keys': {'Name', 'Absolute', 'Comment', 'int_id', 'string_field', 'test_object', 'test_object.nested_field', 'decimal_field', 'int_list_field', 'map_field', 'string_list', 'list_map'}, + 'top_level_list_keys': {'int_list_field', 'string_list', 'list_map', 'Name'}, 'nested_map_keys': {'map_field': {'map_entry_1', 'list_entry'}}, } ] @@ -33,6 +31,11 @@ def generate_items(self, num_items, start_key=0): serializer = TypeSerializer() for i in range(start_key, start_key + num_items): record = { + 'Comment': 'Talend stitch', + 'Name': ['name1', {'Comment': "Test_comment"}, {"TestField": {"Comment": "For test"}}], + 'test_object': {"nested_field": "nested test value"}, + 'test_object.nested_field': "test value with special character", # added this field to verify the `.` in the field names works properly. + 'Absolute': 'true', 'int_id': i, 'decimal_field': decimal.Decimal(str(i) + '.00000000001'), 'string_field': self.random_string_generator(), @@ -59,7 +62,7 @@ def generate_items(self, num_items, start_key=0): @staticmethod def name(): - return "tap_tester_dynamodb_log_based_projections" + return "tt_dynamodb_log_projections" def test_run(self): (table_configs, conn_id, expected_streams) = self.pre_sync_test() @@ -73,6 +76,7 @@ def test_run(self): "breadcrumb": [], "metadata": { 'replication-method': 'LOG_BASED', + 'tap-dynamodb.expression-attributes': "{\"#cmt\": \"Comment\", \"#testfield\": \"TestField\", \"#name\": \"Name\", \"#abs\": \"Absolute\", \"#tstobj\": \"test_object\", \"#nestf\": \"nested_field\", \"#tobj_nested\": \"test_object.nested_field\"}", # `expression` field for reserve word. 'tap-mongodb.projection': table_configs[0]['ProjectionExpression'] } } @@ -205,6 +209,3 @@ def first_sync_test(self, table_configs, conn_id, expected_streams): for list_key in config['top_level_list_keys']: self.assertTrue(isinstance(message['data'][list_key], list)) self.assertEqual(config['nested_map_keys']['map_field'], {*message['data']['map_field'].keys()}) - - -SCENARIOS.add(DynamoDBLogBasedProjections) diff --git a/tests/test_dynamodb_projections.py b/tests/test_dynamodb_projections.py index 0f8c416..6b22ab5 100644 --- a/tests/test_dynamodb_projections.py +++ b/tests/test_dynamodb_projections.py @@ -1,16 +1,12 @@ import decimal -import singer from boto3.dynamodb.types import TypeSerializer -from tap_tester.scenario import (SCENARIOS) from tap_tester import connections from tap_tester import menagerie from tap_tester import runner from base import TestDynamoDBBase -LOGGER = singer.get_logger() - class DynamoDBProjections(TestDynamoDBBase): def expected_table_config(self): @@ -22,9 +18,11 @@ def expected_table_config(self): 'SortType': 'S', 'generator': self.generate_items, 'num_rows': 100, - 'ProjectionExpression': 'int_id, string_field, decimal_field, int_list_field[1], map_field.map_entry_1, string_list[2], map_field.list_entry[2], list_map[1].a', - 'top_level_keys': {'int_id', 'string_field', 'decimal_field', 'int_list_field', 'map_field', 'string_list', 'list_map'}, - 'top_level_list_keys': {'int_list_field', 'string_list', 'list_map'}, + # Added extra reserve words to verify the sync to retrieve `Comment` and `Name[1].Comment` + # and test_object.nested_field as a field and test_object.nested_field as a nested field. + 'ProjectionExpression': '#name[1].#cmt, #name[2].#testfield.#cmt, #cmt, #tstobj.#nestf, #tobj_nested, int_id, string_field, decimal_field, int_list_field[1], map_field.map_entry_1, string_list[2], map_field.list_entry[2], list_map[1].a', + 'top_level_keys': {'Name', 'Comment', 'test_object', 'test_object.nested_field', 'int_id', 'string_field', 'decimal_field', 'int_list_field', 'map_field', 'string_list', 'list_map'}, + 'top_level_list_keys': {'int_list_field', 'string_list', 'list_map', 'Name'}, 'nested_map_keys': {'map_field': {'map_entry_1', 'list_entry'}}, 'map_projection': {'map_field': {'map_entry_1': 'map_value_1'}} }, @@ -34,6 +32,10 @@ def generate_items(self, num_items): serializer = TypeSerializer() for i in range(num_items): record = { + 'Comment': 'Talend stitch', + 'Name': ['name1', {'Comment': "Test_comment"}, {"TestField": {"Comment": "For test"}}], + 'test_object': {"nested_field": "nested test value"}, + 'test_object.nested_field': "test value with special character", # added this field to verify the `.` in the field names works properly. 'int_id': i, 'decimal_field': decimal.Decimal(str(i) + '.00000000001'), 'string_field': self.random_string_generator(), @@ -72,6 +74,7 @@ def test_run(self): annotated_schema = menagerie.get_annotated_schema(conn_id, stream_catalog['stream_id']) additional_md = [{"breadcrumb" : [], "metadata" : { 'replication-method' : 'FULL_TABLE', + 'tap-dynamodb.expression-attributes': "{\"#cmt\": \"Comment\", \"#testfield\": \"TestField\", \"#name\": \"Name\", \"#tstobj\": \"test_object\", \"#nestf\": \"nested_field\", \"#tobj_nested\": \"test_object.nested_field\"}", # `expression` field for reserve word. 'tap-mongodb.projection': expected_config['ProjectionExpression'] }}] connections.select_catalog_and_fields_via_metadata(conn_id, @@ -134,6 +137,3 @@ def test_run(self): for list_key in config['top_level_list_keys']: self.assertTrue(isinstance(message['data'][list_key], list)) self.assertEqual(config['nested_map_keys']['map_field'], {*message['data']['map_field'].keys()}) - - -SCENARIOS.add(DynamoDBProjections) diff --git a/tests/unittests/test_expression_attributes_log_based.py b/tests/unittests/test_expression_attributes_log_based.py new file mode 100644 index 0000000..f57ed27 --- /dev/null +++ b/tests/unittests/test_expression_attributes_log_based.py @@ -0,0 +1,223 @@ +import unittest +from unittest.mock import patch +from tap_dynamodb.sync_strategies.log_based import sync, prepare_projection + +CONFIG = { + "start_date": "2017-01-01", + "account_id": "dummy_account_id", + "role_name": "dummy_role", + "external_id": "dummy_external_id", + "region_name": "dummy_region_name" +} +STATE = {} +STREAM = { + "table_name": "GoogleDocs", + "stream": "GoogleDocs", + "tap_stream_id": "GoogleDocs", + "metadata": [], + "schema": [] +} +class MockClient(): + def scan(self, **kwargs): + '''Mock the scan() function of the client.''' + return kwargs + + def describe_table(self, **kwargs): + '''Mock the describe_table() function of the client.''' + return {'Table': {'LatestStreamArn': 'dummy_arn'}} + + def describe_stream(self, **kwargs): + '''Mock the describe stream function of the client.''' + return {'StreamDescription': {'Shards': [{'SequenceNumberRange': {'EndingSequenceNumber': 'dummy_no'}, 'ShardId': 'dummy_id'}]}} + + def get_shard_iterator(self, **kwargs): + '''Mock the get_shard_iterator() of the client.''' + return {'ShardIterator': {}} + +class MockDeserializer(): + def __init__(self): + return {} + +client = MockClient() +@patch('singer.metadata.to_map', return_value = {}) +@patch('singer.write_state', return_value = {}) +@patch('singer.write_bookmark', return_value = {}) +@patch('singer.get_bookmark', return_value = {}) +@patch('tap_dynamodb.dynamodb.get_client', return_value = client) +@patch('tap_dynamodb.dynamodb.get_stream_client', return_value = client) +class TestExpressionAttributesInLogBasedSync(unittest.TestCase): + """Test expression attributes for reserved word in log_based sync. Mocked some method of singer package""" + + @patch('singer.metadata.get', side_effect = ["#c, Sheet", "{\"#c\": \"Comment\"}"]) + @patch('tap_dynamodb.sync_strategies.log_based.sync_shard', return_value = 1) + @patch('tap_dynamodb.deserialize.Deserializer', return_value = {}) + def test_sync_with_single_expression(self, mock_deserializer, mock_sync_shard, mock_stream_client, mock_client, mock_metadata_get, mock_get_bookmark, mock_write_bookmark, mock_write_state, mock_to_map): + """Test expression attribute for single reserve word passed in `expression` field.""" + res = sync(CONFIG, STATE, STREAM) + + mock_sync_shard.assert_called_with({'SequenceNumberRange': {'EndingSequenceNumber': 'dummy_no'}, 'ShardId': 'dummy_id'}, {}, client, 'dummy_arn', [['Comment'], ['Sheet']], {}, 'GoogleDocs', {}, {}) + + @patch('singer.metadata.get', side_effect = ["#tst[4], #n, Test", "{\"#tst\": \"test1\", \"#n\": \"Name\"}"]) + @patch('tap_dynamodb.sync_strategies.log_based.sync_shard', return_value = 1) + @patch('tap_dynamodb.deserialize.Deserializer', return_value = {}) + def test_sync_with_multiple_expression(self, mock_deserializer, mock_sync_shard, mock_stream_client, mock_client, mock_metadata_get, mock_get_bookmark, mock_write_bookmark, mock_write_state, mock_to_map): + """Test expression attribute for multiple reserve words passed in `expression` field.""" + res = sync(CONFIG, STATE, STREAM) + + mock_sync_shard.assert_called_with({'SequenceNumberRange': {'EndingSequenceNumber': 'dummy_no'}, 'ShardId': 'dummy_id'}, {}, client, 'dummy_arn', [['test1[4]'], ['Name'], ['Test']], {}, 'GoogleDocs', {}, {}) + + @patch('singer.metadata.get', side_effect =["Comment, Sheet", ""]) + @patch('tap_dynamodb.sync_strategies.log_based.sync_shard', return_value = 1) + @patch('tap_dynamodb.deserialize.Deserializer', return_value = {}) + def test_sync_without_expression(self, mock_deserializer, mock_sync_shard, mock_stream_client, mock_client, mock_metadata_get, mock_get_bookmark, mock_write_bookmark, mock_write_state, mock_to_map): + """Test expression attribute with empty string passed in `expression` field.""" + res = sync(CONFIG, STATE, STREAM) + + mock_sync_shard.assert_called_with({'SequenceNumberRange': {'EndingSequenceNumber': 'dummy_no'}, 'ShardId': 'dummy_id'}, {}, client, 'dummy_arn', [['Comment'], ['Sheet']], {}, 'GoogleDocs', {}, {}) + + @patch('singer.metadata.get', side_effect =["", ""]) + @patch('tap_dynamodb.sync_strategies.log_based.sync_shard', return_value = 1) + @patch('tap_dynamodb.deserialize.Deserializer', return_value = {}) + def test_sync_without_projection(self, mock_deserializer, mock_sync_shard, mock_stream_client, mock_client, mock_metadata_get, mock_get_bookmark, mock_write_bookmark, mock_write_state, mock_to_map): + """Test expression attribute with empty string passed in `projection` field.""" + res = sync(CONFIG, STATE, STREAM) + + mock_sync_shard.assert_called_with({'SequenceNumberRange': {'EndingSequenceNumber': 'dummy_no'}, 'ShardId': 'dummy_id'}, {}, client, 'dummy_arn', [['']], {}, 'GoogleDocs', {}, {}) + + @patch('singer.metadata.get', side_effect =["#tst[4].#n, #tst[4].#a, Test", "{\"#tst\": \"test1\", \"#n\": \"Name\", \"#a\": \"Age\"}"]) + @patch('tap_dynamodb.sync_strategies.log_based.sync_shard', return_value = 1) + @patch('tap_dynamodb.deserialize.Deserializer', return_value = {}) + def test_sync_with_nested_expr_with_dict_and_list(self, mock_deserializer, mock_sync_shard, mock_stream_client, mock_client, mock_metadata_get, mock_get_bookmark, mock_write_bookmark, mock_write_state, mock_to_map): + """Test expression attribute for nested reserved words with dictionary and list passed in `expression` field.""" + res = sync(CONFIG, STATE, STREAM) + + mock_sync_shard.assert_called_with({'SequenceNumberRange': {'EndingSequenceNumber': 'dummy_no'}, 'ShardId': 'dummy_id'}, {}, client, 'dummy_arn', [['test1[4]', 'Name'], ['test1[4]', 'Age'], ['Test']], {}, 'GoogleDocs', {}, {}) + + @patch('singer.metadata.get', side_effect =["#tst[4], Test", "{\"#tst\": \"test1\"}"]) + @patch('tap_dynamodb.sync_strategies.log_based.sync_shard', return_value = 1) + @patch('tap_dynamodb.deserialize.Deserializer', return_value = {}) + def test_sync_with_nested_expr_with_list(self, mock_deserializer, mock_sync_shard, mock_stream_client, mock_client, mock_metadata_get, mock_get_bookmark, mock_write_bookmark, mock_write_state, mock_to_map): + """Test expression attribute for reserve words with list passed in `expression` field.""" + res = sync(CONFIG, STATE, STREAM) + + mock_sync_shard.assert_called_with({'SequenceNumberRange': {'EndingSequenceNumber': 'dummy_no'}, 'ShardId': 'dummy_id'}, {}, client, 'dummy_arn', [['test1[4]'], ['Test']], {}, 'GoogleDocs', {}, {}) + + @patch('singer.metadata.get', side_effect =["#tst.#n.#a", "{\"#tst\": \"test1\", \"#n\": \"Name\", \"#a\": \"Age\"}"]) + @patch('tap_dynamodb.sync_strategies.log_based.sync_shard', return_value = 1) + @patch('tap_dynamodb.deserialize.Deserializer', return_value = {}) + def test_sync_with_nested_expr_with_nested_dict(self, mock_deserializer, mock_sync_shard, mock_stream_client, mock_client, mock_metadata_get, mock_get_bookmark, mock_write_bookmark, mock_write_state, mock_to_map): + """Test expression attribute for nested reserved words with nested dictionary passed in `expression` field.""" + res = sync(CONFIG, STATE, STREAM) + + mock_sync_shard.assert_called_with({'SequenceNumberRange': {'EndingSequenceNumber': 'dummy_no'}, 'ShardId': 'dummy_id'}, {}, client, 'dummy_arn', [['test1', 'Name', 'Age']], {}, 'GoogleDocs', {}, {}) + + @patch('singer.metadata.get', side_effect =["#tst.#f, #tf", "{\"#tst\": \"test1\", \"#f\": \"field\", \"#tf\": \"test1.field\"}"]) + @patch('tap_dynamodb.sync_strategies.log_based.sync_shard', return_value = 1) + @patch('tap_dynamodb.deserialize.Deserializer', return_value = {}) + def test_sync_with_special_character_in_field_name(self, mock_deserializer, mock_sync_shard, mock_stream_client, mock_client, mock_metadata_get, mock_get_bookmark, mock_write_bookmark, mock_write_state, mock_to_map): + """Test expression attribute for `.` in projection field passed in `expression` field.""" + res = sync(CONFIG, STATE, STREAM) + + mock_sync_shard.assert_called_with({'SequenceNumberRange': {'EndingSequenceNumber': 'dummy_no'}, 'ShardId': 'dummy_id'}, {}, client, 'dummy_arn', [['test1', 'field'], ['test1.field']], {}, 'GoogleDocs', {}, {}) + + @patch('singer.metadata.get', side_effect =["#test, #t[1].#n", "{\"#t\": \"test1\", \"#n\": \"Name\", \"#test\": \"test\"}"]) + @patch('tap_dynamodb.sync_strategies.log_based.sync_shard', return_value = 1) + @patch('tap_dynamodb.deserialize.Deserializer', return_value = {}) + def test_sync_for_different_order_in_projections(self, mock_deserializer, mock_sync_shard, mock_stream_client, mock_client, mock_metadata_get, mock_get_bookmark, mock_write_bookmark, mock_write_state, mock_to_map): + """Test expression attribute to check different order in projection and expression field.""" + res = sync(CONFIG, STATE, STREAM) + + mock_sync_shard.assert_called_with({'SequenceNumberRange': {'EndingSequenceNumber': 'dummy_no'}, 'ShardId': 'dummy_id'}, {}, client, 'dummy_arn', [['test'], ['test1[1]', 'Name']], {}, 'GoogleDocs', {}, {}) + + @patch('singer.metadata.get', side_effect =["Test", None]) + @patch('tap_dynamodb.sync_strategies.log_based.sync_shard', return_value = 1) + @patch('tap_dynamodb.deserialize.Deserializer', return_value = {}) + def test_sync_for_valid_proj_and_no_expr(self, mock_deserializer, mock_sync_shard, mock_stream_client, mock_client, mock_metadata_get, mock_get_bookmark, mock_write_bookmark, mock_write_state, mock_to_map): + """Test sync should work when valid projection passed with no expressions.""" + res = sync(CONFIG, STATE, STREAM) + + mock_sync_shard.assert_called_with({'SequenceNumberRange': {'EndingSequenceNumber': 'dummy_no'}, 'ShardId': 'dummy_id'}, {}, client, 'dummy_arn', [['Test']], {}, 'GoogleDocs', {}, {}) + + @patch('singer.metadata.get', side_effect =["", "{\"#cmt\": \"Comment\"}"]) + @patch('tap_dynamodb.sync_strategies.log_based.sync_shard', return_value = 1) + @patch('tap_dynamodb.deserialize.Deserializer', return_value = {}) + def test_sync_for_expr_not_in_proj(self, mock_deserializer, mock_sync_shard, mock_stream_client, mock_client, mock_metadata_get, mock_get_bookmark, mock_write_bookmark, mock_write_state, mock_to_map): + """Test sync for expression keys not in projection field.""" + try: + res = sync(CONFIG, STATE, STREAM) + except Exception as e: + expected_error_message = "No projection is available for the expression keys: {'#cmt'}." + self.assertEqual(str(e), expected_error_message) + + @patch('singer.metadata.get', side_effect =["#c", "{\"cmt\": \"Comment\"}"]) + @patch('tap_dynamodb.sync_strategies.log_based.sync_shard', return_value = 1) + @patch('tap_dynamodb.deserialize.Deserializer', return_value = {}) + def test_sync_for_expr_key_without_hash(self, mock_deserializer, mock_sync_shard, mock_stream_client, mock_client, mock_metadata_get, mock_get_bookmark, mock_write_bookmark, mock_write_state, mock_to_map): + """Test sync for expression key defined without `#` field.""" + try: + res = sync(CONFIG, STATE, STREAM) + except Exception as e: + expected_error_message = "Expression key 'cmt' must start with '#'." + self.assertEqual(str(e), expected_error_message) + + @patch('singer.metadata.get', side_effect =["#c", "{\"#cmt\": \"Comment\"}"]) + @patch('tap_dynamodb.sync_strategies.log_based.sync_shard', return_value = 1) + @patch('tap_dynamodb.deserialize.Deserializer', return_value = {}) + def test_sync_for_proj_not_in_expr(self, mock_deserializer, mock_sync_shard, mock_stream_client, mock_client, mock_metadata_get, mock_get_bookmark, mock_write_bookmark, mock_write_state, mock_to_map): + """Test sync for projection key defined with # but not in expression field.""" + try: + res = sync(CONFIG, STATE, STREAM) + except Exception as e: + expected_error_message = "No expression is available for the given projection: #c." + self.assertEqual(str(e), expected_error_message) + + @patch('singer.metadata.get', side_effect =["#cmt", ""]) + @patch('tap_dynamodb.sync_strategies.log_based.prepare_projection', return_value = 1) + def test_prepare_projections_not_called_when_null_expressions(self, mock_prepare_projection, mock_metadata_get, mock_stream_client, mock_client, mock_get_bookmark, mock_write_bookmark, mock_write_state, mock_to_map): + """Test that the prepare_projection() is not called when expression attributes are not passed in the catalog.""" + res = sync(CONFIG, STATE, STREAM) + self.assertEqual(mock_prepare_projection.call_count, 0) + + @patch('singer.metadata.get', side_effect = ["#c", "{\"#c\":, \"Comment\"}"]) + @patch('tap_dynamodb.sync_strategies.log_based.sync_shard', return_value = 1) + @patch('tap_dynamodb.deserialize.Deserializer', return_value = {}) + def test_sync_with_invalid_json(self, mock_deserializer, mock_sync_shard, mock_stream_client, mock_client, mock_metadata_get, mock_get_bookmark, mock_write_bookmark, mock_write_state, mock_to_map): + """Test sync raises exception with proper error message with invalid json in expression.""" + try: + res = sync(CONFIG, STATE, STREAM) + except Exception as e: + expected_error_message = "Invalid JSON format. The expression attributes should contain a valid JSON format." + self.assertEqual(str(e), expected_error_message) + + @patch('singer.metadata.get', side_effect = ["Test", ""]) + @patch('tap_dynamodb.sync_strategies.log_based.sync_shard', return_value = 1) + @patch('tap_dynamodb.sync.clear_state_on_replication_change') + @patch('singer.set_currently_syncing') + @patch('tap_dynamodb.deserialize.Deserializer', return_value = {}) + @patch('json.loads') + def test_sync_stream_with_empty_expression(self, mock_loads, mock_deserializer, mock_currently_sync, mock_clear_state, mock_sync_shard, mock_stream_client, mock_client, mock_metadata_get, mock_get_bookmark, mock_write_bookmark, mock_write_state, mock_to_map): + """Test for empty value in expression does not call json.loads().""" + + res = sync(CONFIG, STATE, STREAM) + self.assertEqual(mock_loads.call_count, 0) + +class TestPrepareProjection(unittest.TestCase): + def test_prepare_projection_output(self): + """Test that the prepare projection returns correct output""" + projection = ["#tst", "#cmt"] + expression = {"#tst": "test", "#cmt": "Comment"} + exp_keys = {"#tst", "#cmt"} + prepare_projection(projection, expression, exp_keys) + expected_projection = ["test", "Comment"] + self.assertEqual(projection, expected_projection) + + def test_prepare_projection_for_proj_not_in_expr(self): + """Test that the prepare_projection() throws an error for projection value not in expression.""" + proj = ["#tst", "#cmt"] + expr = {"#tst": "Test"} + expr_keys = {"#tst", "#cmt"} + try: + prepare_projection(proj, expr, expr_keys) + except Exception as e: + expected_err_msg = "No expression is available for the given projection: #cmt." + self.assertEqual(str(e), expected_err_msg)