Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Qa/rip singer from tests #44

Open
wants to merge 49 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
d62530c
Initial commit for expression attributes
prijendev Nov 8, 2021
9a18c82
Updated circleci and full_table.py
prijendev Nov 8, 2021
f5027fe
resolved pylint error
prijendev Nov 8, 2021
6b9bae6
Updated circleci
prijendev Nov 8, 2021
1376687
Updated unittest case path in circleci
prijendev Nov 8, 2021
a1fd3af
Updated expression attribute
prijendev Nov 15, 2021
ea524af
resolved pylint error
prijendev Nov 15, 2021
d4f6c77
Updated prepare exprssion logic
prijendev Nov 15, 2021
242403f
Handled empty projection
prijendev Dec 1, 2021
9d488f3
added logic for nested expressions
namrata270998 Dec 7, 2021
68c74d6
added in log_based test and comments
namrata270998 Dec 7, 2021
6e9832a
added code comments
namrata270998 Dec 8, 2021
144098d
Updated comments
prijendev Jan 4, 2022
a643e03
Resolved review comments
prijendev Jan 5, 2022
fc58a17
added comment
namrata270998 Jan 5, 2022
efacad2
resolved comments
namrata270998 Jan 7, 2022
c7c7078
resolved pylint errors
namrata270998 Jan 7, 2022
675adb4
updated config.yml and a typo
namrata270998 Jan 13, 2022
e5f0089
resolved slack-on-notify failure
namrata270998 Jan 13, 2022
1e198db
removed scenarios
namrata270998 Jan 13, 2022
d923847
Changed the expressionattribute logic as suggested
namrata270998 Jan 19, 2022
71ebc06
resolved pylint error
namrata270998 Jan 19, 2022
5a97ba7
fixed typo
namrata270998 Jan 19, 2022
2b5199b
added test case for '.' in fields
namrata270998 Jan 20, 2022
b319ac1
added testcase for prepare_projection()
namrata270998 Jan 20, 2022
b83f6ee
resolved comments
namrata270998 Jan 20, 2022
9bfd082
handled corner case
namrata270998 Jan 21, 2022
d9a4cc1
added check for empty expression
namrata270998 Jan 20, 2022
3a0016c
resolved comments and updated unittests
namrata270998 Jan 21, 2022
aa9dc49
removed unused import
namrata270998 Jan 21, 2022
251e400
resolved comments
namrata270998 Jan 21, 2022
0981142
fixed pylint failure
namrata270998 Jan 21, 2022
28a64b3
updated comment
namrata270998 Jan 21, 2022
8e8d73e
changed the name of catalog's expression to expression-attributes
namrata270998 Jan 27, 2022
a094a9f
added tap-tester-user to the config, updated to nosetests
namrata270998 Jan 27, 2022
99fd7ee
updated to nosetests
namrata270998 Jan 27, 2022
0f08f4b
Changed expression-attributes to expression-attribute
Jan 27, 2022
9daad7b
Changed expression-attributes to expression-attribute
Jan 27, 2022
ec1acdf
added tests for primary and replication key reserved words
namrata270998 Jan 28, 2022
00e119f
renamed the file
namrata270998 Jan 28, 2022
82d8395
added funciton comments
namrata270998 Jan 28, 2022
b3c148f
fixed pylint
namrata270998 Jan 28, 2022
d0e6f42
expression-attribute -> expression-attributes in full table
Feb 1, 2022
d3f7b3e
test fixes for pk and hash key as expression-attributes
Feb 1, 2022
1f726db
changed the parameter name to attribute
namrata270998 Feb 1, 2022
dc8c579
expression-attribute -> expression-attributes in one more test
Feb 1, 2022
5d2ff51
fix log sync ref and log projection test name
Feb 1, 2022
86e791b
Changed from attribute to attributes
Feb 8, 2022
e3be082
remove singer logger, replace with tt logger
May 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 26 additions & 11 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
version: 2
version: 2.1
orbs:
slack: circleci/[email protected]

jobs:
build:
docker:
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:tap-tester-v4
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:stitch-tap-tester
- image: amazon/dynamodb-local
entrypoint: ["java", "-Xmx1G", "-jar", "DynamoDBLocal.jar"]
steps:
Expand All @@ -14,30 +17,40 @@ jobs:
virtualenv -p python3 /usr/local/share/virtualenvs/tap-dynamodb
source /usr/local/share/virtualenvs/tap-dynamodb/bin/activate
pip install .[dev]
pip install pytest-cov
- run:
name: 'pylint'
command: |
source /usr/local/share/virtualenvs/tap-dynamodb/bin/activate
make lint
- run:
name: 'Unit Tests'
command: |
source /usr/local/share/virtualenvs/tap-dynamodb/bin/activate
pip install nose coverage
nosetests --with-coverage --cover-erase --cover-package=tap_dynamodb --cover-html-dir=htmlcov tests/unittests
coverage html
- store_test_results:
path: test_output/report.xml
- store_artifacts:
path: htmlcov
- run:
name: 'Tap Tester'
command: |
aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/sandbox tap-tester.env
source tap-tester.env
source /usr/local/share/virtualenvs/tap-tester/bin/activate
run-test --tap=tap-dynamodb \
--target=target-stitch \
--orchestrator=stitch-orchestrator \
[email protected] \
--password=$SANDBOX_PASSWORD \
--client-id=50 \
tests
run-test --tap=tap-dynamodb tests
- slack/notify-on-failure:
only_for_branches: master
workflows:
version: 2
commit:
jobs:
- build:
context: circleci-user
context:
- circleci-user
- tap-tester-user
build_daily:
triggers:
- schedule:
Expand All @@ -48,4 +61,6 @@ workflows:
- master
jobs:
- build:
context: circleci-user
context:
- circleci-user
- tap-tester-user
8 changes: 8 additions & 0 deletions tap_dynamodb/dynamodb.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import backoff
import boto3
import singer
Expand Down Expand Up @@ -79,3 +80,10 @@ def get_stream_client(config):
region_name=config['region_name'])
return boto3.client('dynamodbstreams',
region_name=config['region_name'])

def decode_expression(expression):
'''Convert the string into JSON object and raise an exception if invalid JSON format'''
try:
return json.loads(expression)
except json.decoder.JSONDecodeError:
raise Exception("Invalid JSON format. The expression attributes should contain a valid JSON format.")
1 change: 1 addition & 0 deletions tap_dynamodb/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def sync_stream(config, state, stream):
table_name = stream['tap_stream_id']

md_map = metadata.to_map(stream['metadata'])

replication_method = metadata.get(md_map, (), 'replication-method')
key_properties = metadata.get(md_map, (), 'table-key-properties')

Expand Down
14 changes: 10 additions & 4 deletions tap_dynamodb/sync_strategies/full_table.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import time

import singer
from singer import metadata
from tap_dynamodb import dynamodb
Expand All @@ -8,14 +7,16 @@
LOGGER = singer.get_logger()


def scan_table(table_name, projection, last_evaluated_key, config):
def scan_table(table_name, projection, expression, last_evaluated_key, config):
scan_params = {
'TableName': table_name,
'Limit': 1000
}

if projection is not None and projection != '':
scan_params['ProjectionExpression'] = projection
if expression:
# Add `ExpressionAttributeNames` parameter for reserved word.
scan_params['ExpressionAttributeNames'] = dynamodb.decode_expression(expression)
if last_evaluated_key is not None:
scan_params['ExclusiveStartKey'] = last_evaluated_key

Expand Down Expand Up @@ -71,10 +72,15 @@ def sync(config, state, stream):
md_map = metadata.to_map(stream['metadata'])
projection = metadata.get(md_map, (), 'tap-mongodb.projection')

# An expression attribute name is a placeholder that one uses in an Amazon DynamoDB expression as an alternative to an actual attribute name.
# Sometimes it might need to write an expression containing an attribute name that conflicts with a DynamoDB reserved word.
# For example, table `A` contains the field `Comment` but `Comment` is a reserved word. So, it fails during fetch.
expression = metadata.get(md_map, (), 'tap-dynamodb.expression-attributes')

rows_saved = 0

deserializer = Deserializer()
for result in scan_table(table_name, projection, last_evaluated_key, config):
for result in scan_table(table_name, projection, expression, last_evaluated_key, config):
for item in result.get('Items', []):
rows_saved += 1
# TODO: Do we actually have to put the item we retreive from
Expand Down
40 changes: 39 additions & 1 deletion tap_dynamodb/sync_strategies/log_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,27 @@ def sync_shard(shard, seq_number_bookmarks, streams_client, stream_arn, projecti
singer.write_state(state)
return rows_synced

def prepare_projection(projection, expression, exp_key_traverse):
'''
Prepare the projection based on the expression attributes
For example:
projections = "#c", expressions = {"#c": "Comment"}
return = ["Comment"]
Example of nested expression :
In Catalog - projections = "#n[0].#a", expressions = {"#n": "Name", "#a": "Age"}
projection passed in the function = ['#n[0]', '#a']
projection returned from the function = ['Name[0]', 'Age']
'''
for index, part in enumerate(projection):
if '#' in part:
replaceable_part = part.split('[', 1)[0] if '[' in part else part
# check if the projection placeholder is defined in the expression else raise an exception
if replaceable_part in expression:
exp_key_traverse.discard(replaceable_part)
# replace the value given in the expression with the key in the projection
projection[index] = part.replace(replaceable_part, expression[replaceable_part])
else:
raise Exception("No expression is available for the given projection: {}.".format(replaceable_part))

def sync(config, state, stream):
table_name = stream['tap_stream_id']
Expand All @@ -124,9 +145,26 @@ def sync(config, state, stream):

md_map = metadata.to_map(stream['metadata'])
projection = metadata.get(md_map, (), 'tap-mongodb.projection')
expression = metadata.get(md_map, (), 'tap-dynamodb.expression-attributes')
if projection is not None:
projection = [x.strip().split('.') for x in projection.split(',')]

if expression:
# decode the expression in jsonified object.
expression = dynamodb.decode_expression(expression)
# loop over the expression keys
for expr in expression.keys():
# check if the expression key starts with `#` and if not raise an exception
if not expr[0].startswith("#"):
raise Exception("Expression key '{}' must start with '#'.".format(expr))

# A set to check all the expression keys are used in the projections or not.
exp_key_traverse = set(expression.keys())

for proj in projection:
prepare_projection(proj, expression, exp_key_traverse)

if exp_key_traverse:
raise Exception("No projection is available for the expression keys: {}.".format(exp_key_traverse))
# Write activate version message
stream_version = singer.get_bookmark(state, table_name, 'version')
singer.write_version(table_name, stream_version)
Expand Down
6 changes: 2 additions & 4 deletions tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,19 @@
import boto3
from boto3.dynamodb.types import TypeSerializer, TypeDeserializer

import singer

from tap_tester import connections
from tap_tester import menagerie
from tap_tester import runner
from tap_tester.logger import LOGGER

ALL_TABLE_NAMES_TO_CLEAR = frozenset({
'simple_table_1',
'simple_table_2',
'simple_table_3',
'simple_table_4',
'com-stitchdata-test-dynamodb-integration-simple_table_1',
})

LOGGER = singer.get_logger()


class TestDynamoDBBase(unittest.TestCase):
_client = None
Expand Down
4 changes: 0 additions & 4 deletions tests/test_dynamodb_discovery.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from boto3.dynamodb.types import TypeSerializer

from tap_tester.scenario import (SCENARIOS)
from base import TestDynamoDBBase


Expand Down Expand Up @@ -53,6 +52,3 @@ def name():

def test_run(self):
self.pre_sync_test()


SCENARIOS.add(DynamoDBDiscovery)
7 changes: 0 additions & 7 deletions tests/test_dynamodb_full_table_interruptible_sync.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
import decimal
import singer

from boto3.dynamodb.types import TypeSerializer

from tap_tester.scenario import (SCENARIOS)
from tap_tester import connections
from tap_tester import menagerie
from tap_tester import runner

from base import TestDynamoDBBase

LOGGER = singer.get_logger()


class DynamoDBFullTableInterruptible(TestDynamoDBBase):
def expected_table_config(self):
Expand Down Expand Up @@ -129,6 +125,3 @@ def test_run(self):
self.assertIsNone(state['bookmarks'][table_name].get('last_evaluated_key'))

self.assertTrue(state['bookmarks'][table_name].get('initial_full_table_complete', False))


SCENARIOS.add(DynamoDBFullTableInterruptible)
113 changes: 113 additions & 0 deletions tests/test_dynamodb_full_table_primary_and_hash_key_projections.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
from boto3.dynamodb.types import TypeSerializer

from tap_tester import connections
from tap_tester import menagerie
from tap_tester import runner
from base import TestDynamoDBBase


class DynamoDBFullTablePrimaryAndHashKeyReservedWords(TestDynamoDBBase):
def expected_table_config(self):
return [
{
'TableName': 'simple_table_4',
# Added the `Comment` which is a reserved word as the primary key (HashKey) to verify the expression attributes works for them
'HashKey': 'Comment',
'HashType': 'N',
# Added the `Name` which is a reserved word as the replication key (SortKey) to verify the expression attributes works for them
'SortKey': 'Name',
'SortType': 'S',
'generator': self.generate_simple_items_4,
'num_rows': 100,
'ProjectionExpression': '#cmt, #name',
'top_level_keys': {'Name', 'Comment'}
}
]

@staticmethod
def generate_simple_items_4(num_items, start_key=0):
'''Generate unique records for the table.'''
serializer = TypeSerializer()
for i in range(start_key, start_key + num_items):
record = {
'Comment': i,
'Name': 'Test Name' + str(i),
'boolean_field': True,
}
yield serializer.serialize(record)

@staticmethod
def name():
return "tt_dynamodb_ft_pkhk_projections"

def test_run(self):
(table_configs, conn_id, _) = self.pre_sync_test()

# Select simple_coll_1 and simple_coll_2 streams and add replication method metadata
found_catalogs = menagerie.get_catalogs(conn_id)
for stream_catalog in found_catalogs:
expected_config = [x for x in table_configs if x['TableName'] == stream_catalog['tap_stream_id']][0]
annotated_schema = menagerie.get_annotated_schema(conn_id, stream_catalog['stream_id'])
additional_md = [{"breadcrumb" : [], "metadata" : {
'replication-method' : 'FULL_TABLE',
'tap-dynamodb.expression-attributes': "{\"#cmt\": \"Comment\", \"#name\": \"Name\"}", # `expression` field for reserve word.
'tap-mongodb.projection': expected_config['ProjectionExpression']
}}]
connections.select_catalog_and_fields_via_metadata(conn_id,
stream_catalog,
annotated_schema,
additional_md)

# run full table sync
sync_job_name = runner.run_sync_mode(self, conn_id)

exit_status = menagerie.get_exit_status(conn_id, sync_job_name)
menagerie.verify_sync_exit_status(self, exit_status, sync_job_name)

# verify the persisted schema was correct
messages_by_stream = runner.get_records_from_target_output()

expected_pks = {}

for config in table_configs:
key = {config['HashKey']}
if config.get('SortKey'):
key |= {config.get('SortKey')}
expected_pks[config['TableName']] = key

# assert that each of the streams that we synced are the ones that we expect to see
record_count_by_stream = runner.examine_target_output_file(self,
conn_id,
{x['TableName'] for x in table_configs},
expected_pks)

state = menagerie.get_state(conn_id)

first_versions = {}

# assert that we get the correct number of records for each stream
for config in table_configs:
table_name = config['TableName']

self.assertEqual(config['num_rows'],
record_count_by_stream[table_name])

# assert that an activate_version_message is first and last message sent for each stream
self.assertEqual('activate_version',
messages_by_stream[table_name]['messages'][0]['action'])
self.assertEqual('activate_version',
messages_by_stream[table_name]['messages'][-1]['action'])

# assert that the state has an initial_full_table_complete == True
self.assertTrue(state['bookmarks'][table_name]['initial_full_table_complete'])
# assert that there is a version bookmark in state
first_versions[table_name] = state['bookmarks'][table_name]['version']
self.assertIsNotNone(first_versions[table_name])

# assert that the projection causes the correct fields to be returned
for message in messages_by_stream[table_name]['messages']:
if message['action'] == 'upsert':
if not message['data'].get('_sdc_deleted_at'):
top_level_keys = {*message['data'].keys()}
# Verify that the reserved words as primary keys and replication keys are replicated.
self.assertEqual(config['top_level_keys'], top_level_keys)
7 changes: 0 additions & 7 deletions tests/test_dynamodb_full_table_sync.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
import decimal

import singer

from boto3.dynamodb.types import TypeSerializer

from tap_tester.scenario import (SCENARIOS)
from tap_tester import connections
from tap_tester import menagerie
from tap_tester import runner

from base import TestDynamoDBBase

LOGGER = singer.get_logger()


class DynamoDBFullTable(TestDynamoDBBase):
def expected_table_config(self):
Expand Down Expand Up @@ -108,5 +103,3 @@ def test_run(self):
# assert that there is a version bookmark in state
first_versions[table_name] = state['bookmarks'][table_name]['version']
self.assertIsNotNone(first_versions[table_name])

SCENARIOS.add(DynamoDBFullTable)
Loading