Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TDL-17512: Add missing tap-tester tests #134

Open
wants to merge 7 commits into
base: crest-work
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,11 @@ def expected_metadata(self):

def expected_streams(self):
"""A set of expected stream names"""
return set(self.expected_metadata().keys())
# removed "abandoned_checkouts", as per the Doc:
# https://help.shopify.com/en/manual/orders/abandoned-checkouts?st_source=admin&st_campaign=abandoned_checkouts_footer&utm_source=admin&utm_campaign=abandoned_checkouts_footer#review-your-abandoned-checkouts
# abandoned checkouts are saved in the Shopify admin for three months.
# Every Monday, abandoned checkouts that are older than three months are removed from your admin.
return set(self.expected_metadata().keys()) - {"abandoned_checkouts"}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comments about the unavailability of the POST call with a reference link.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment in the test cases.


def child_streams(self):
"""
Expand Down Expand Up @@ -301,7 +305,11 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.start_date = self.get_properties().get("start_date")
self.store_1_streams = {'custom_collections', 'orders', 'products', 'customers', 'locations', 'inventory_levels', 'inventory_items', 'events'}
self.store_2_streams = {'abandoned_checkouts', 'collects', 'metafields', 'transactions', 'order_refunds', 'products', 'locations', 'inventory_levels', 'inventory_items', 'events'}
# removed 'abandoned_checkouts' from store 2 streams, as per the Doc:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please put back, see previous comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Included abandoned_checkouts stream

# https://help.shopify.com/en/manual/orders/abandoned-checkouts?st_source=admin&st_campaign=abandoned_checkouts_footer&utm_source=admin&utm_campaign=abandoned_checkouts_footer#review-your-abandoned-checkouts
# abandoned checkouts are saved in the Shopify admin for three months.
# Every Monday, abandoned checkouts that are older than three months are removed from your admin.
self.store_2_streams = {'collects', 'metafields', 'transactions', 'order_refunds', 'products', 'locations', 'inventory_levels', 'inventory_items', 'events'}

#modified this method to accommodate replication key in the current_state
def calculated_states_by_stream(self, current_state):
Expand Down
100 changes: 100 additions & 0 deletions tests/test_all_fields.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import os

from tap_tester import runner, menagerie
from base import BaseTapTest

class AllFieldsTest(BaseTapTest):

@staticmethod
def name():
return "tap_tester_shopify_all_fields_test"

def get_properties(self, original: bool = True):
"""Configuration properties required for the tap."""
return_value = {
'start_date': '2021-04-01T00:00:00Z',
'shop': 'talenddatawearhouse',
'date_window_size': 30
}

return return_value

@staticmethod
def get_credentials(original_credentials: bool = True):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is already available in base.py, Any reason for redefining it?

Copy link
Contributor Author

@hpatel41 hpatel41 Mar 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the test case to use the function from base test file.

"""Authentication information for the test account"""

return {
'api_key': os.getenv('TAP_SHOPIFY_API_KEY_TALENDDATAWEARHOUSE')
}

def test_run(self):
"""
Ensure running the tap with all streams and fields selected results in the
replication of all fields.
- Verify no unexpected streams were replicated
- Verify that more than just the automatic fields are replicated for each stream
"""

expected_streams = self.expected_streams()

# instantiate connection
conn_id = self.create_connection()

# run check mode
found_catalogs = menagerie.get_catalogs(conn_id)

# table and field selection
test_catalogs_all_fields = [catalog for catalog in found_catalogs
if catalog.get('stream_name') in expected_streams]
self.select_all_streams_and_fields(conn_id, test_catalogs_all_fields, select_all_fields=True)

# grab metadata after performing table-and-field selection to set expectations
stream_to_all_catalog_fields = dict() # used for asserting all fields are replicated
for catalog in test_catalogs_all_fields:
stream_id, stream_name = catalog['stream_id'], catalog['stream_name']
catalog_entry = menagerie.get_annotated_schema(conn_id, stream_id)
fields_from_field_level_md = [md_entry['breadcrumb'][1]
for md_entry in catalog_entry['metadata']
if md_entry['breadcrumb'] != []]
stream_to_all_catalog_fields[stream_name] = set(fields_from_field_level_md)

# run initial sync
record_count_by_stream = self.run_sync(conn_id)
synced_records = runner.get_records_from_target_output()

# Verify no unexpected streams were replicated
synced_stream_names = set(synced_records.keys())
self.assertSetEqual(expected_streams, synced_stream_names)

for stream in expected_streams:
with self.subTest(stream=stream):

# expected values
expected_automatic_keys = self.expected_primary_keys().get(stream, set()) | self.expected_replication_keys().get(stream, set())
# get all expected keys
expected_all_keys = stream_to_all_catalog_fields[stream]

# collect actual values
messages = synced_records.get(stream)

actual_all_keys = set()
# collect actual values
for message in messages['messages']:
if message['action'] == 'upsert':
actual_all_keys.update(message['data'].keys())

# Verify that you get some records for each stream
self.assertGreater(record_count_by_stream.get(stream, -1), 0)
dbshah1212 marked this conversation as resolved.
Show resolved Hide resolved

# verify all fields for a stream were replicated
self.assertGreater(len(expected_all_keys), len(expected_automatic_keys))
self.assertTrue(expected_automatic_keys.issubset(expected_all_keys), msg=f'{expected_automatic_keys-expected_all_keys} is not in "expected_all_keys"')

if stream == 'abandoned_checkouts':
expected_all_keys.remove('billing_address')
elif stream == 'orders':
# No field named 'order_adjustments' present in the 'order' object
# Documentation: https://shopify.dev/api/admin-rest/2021-10/resources/order#resource_object
expected_all_keys.remove('order_adjustments')

self.assertSetEqual(expected_all_keys, actual_all_keys)
17 changes: 16 additions & 1 deletion tests/test_automatic_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def automatic_test(self, conn_id, testable_streams):
"""
Verify that for each stream you can get multiple pages of data
when no fields are selected and only the automatic fields are replicated.
Verify that all replicated records have unique primary key values.

PREREQUISITE
For EACH stream add enough data that you surpass the limit of a single
Expand All @@ -52,6 +53,7 @@ def automatic_test(self, conn_id, testable_streams):
record_count_by_stream = self.run_sync(conn_id)

actual_fields_by_stream = runner.examine_target_output_for_fields()
synced_records = runner.get_records_from_target_output()

for stream in incremental_streams:
with self.subTest(stream=stream):
Expand All @@ -60,6 +62,11 @@ def automatic_test(self, conn_id, testable_streams):
# SKIP THIS ASSERTION FOR STREAMS WHERE YOU CANNOT GET
# MORE THAN 1 PAGE OF DATA IN THE TEST ACCOUNT
stream_metadata = self.expected_metadata().get(stream, {})
expected_primary_keys = self.expected_primary_keys().get(stream, set())

# collect records
messages = synced_records.get(stream)

minimum_record_count = stream_metadata.get(
self.API_LIMIT,
self.get_properties().get('result_per_page', self.DEFAULT_RESULTS_PER_PAGE)
Expand All @@ -72,7 +79,15 @@ def automatic_test(self, conn_id, testable_streams):
# verify that only the automatic fields are sent to the target
self.assertEqual(
actual_fields_by_stream.get(stream, set()),
self.expected_primary_keys().get(stream, set()) |
expected_primary_keys |
self.expected_replication_keys().get(stream, set()),
msg="The fields sent to the target are not the automatic fields"
)

# Verify that all replicated records have unique primary key values.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good addition

records_pks_set = {tuple([message.get('data').get(primary_key) for primary_key in expected_primary_keys])
for message in messages.get('messages')}
records_pks_list = [tuple([message.get('data').get(primary_key) for primary_key in expected_primary_keys])
for message in messages.get('messages')]
self.assertCountEqual(records_pks_set, records_pks_list,
msg="We have duplicate records for {}".format(stream))
19 changes: 18 additions & 1 deletion tests/test_bookmarks_updated.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@ class BookmarkTest(BaseTapTest):
def name():
return "tap_tester_shopify_bookmark_test"

# function for verifying the date format
def is_expected_date_format(self, date):
try:
# parse date
dt.strptime(date, "%Y-%m-%dT%H:%M:%S.%fZ")
except ValueError:
# return False if date is in not expected format
return False
# return True in case of no error
return True

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.start_date = '2021-04-01T00:00:00Z'
Expand All @@ -26,7 +37,11 @@ def __init__(self, *args, **kwargs):

# creating this global variable for store 2 which is required only for this test, all the other test are referencing from base
global store_2_streams
store_2_streams = {'abandoned_checkouts', 'collects', 'metafields', 'transactions', 'order_refunds', 'products', 'locations', 'inventory_items', 'events', 'customers', 'custom_collections', 'orders'}
# removed 'abandoned_checkouts' from store 2 streams, as per the Doc:
# https://help.shopify.com/en/manual/orders/abandoned-checkouts?st_source=admin&st_campaign=abandoned_checkouts_footer&utm_source=admin&utm_campaign=abandoned_checkouts_footer#review-your-abandoned-checkouts
# abandoned checkouts are saved in the Shopify admin for three months.
# Every Monday, abandoned checkouts that are older than three months are removed from your admin.
store_2_streams = {'collects', 'metafields', 'transactions', 'order_refunds', 'products', 'locations', 'inventory_items', 'events', 'customers', 'custom_collections', 'orders'}

def test_run_store_2(self):
with self.subTest(store="store_2"):
Expand Down Expand Up @@ -112,7 +127,9 @@ def bookmarks_test(self, conn_id, testable_streams):

# verify the syncs sets a bookmark of the expected form
self.assertIsNotNone(first_bookmark_value)
self.assertTrue(self.is_expected_date_format(first_bookmark_value))
self.assertIsNotNone(second_bookmark_value)
self.assertTrue(self.is_expected_date_format(second_bookmark_value))

# verify the 2nd bookmark is equal to 1st sync bookmark
#NOT A BUG (IS the expected behaviour for shopify as they are using date windowing : TDL-17096 : 2nd bookmark value is getting assigned from the execution time rather than the actual bookmark time. This is an invalid assertion for shopify
Expand Down
17 changes: 13 additions & 4 deletions tests/test_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,24 @@ def test_run(self):
• verify that all other fields have inclusion of available (metadata and schema)
"""
conn_id = self.create_connection()
expected_streams = self.expected_streams() | {"abandoned_checkouts"}

# Verify number of actual streams discovered match expected
found_catalogs = menagerie.get_catalogs(conn_id)
self.assertGreater(len(found_catalogs), 0,
msg="unable to locate schemas for connection {}".format(conn_id))
self.assertEqual(len(found_catalogs),
len(self.expected_streams()),
len(expected_streams),
msg="Expected {} streams, actual was {} for connection {},"
" actual {}".format(
len(self.expected_streams()),
len(expected_streams),
len(found_catalogs),
found_catalogs,
conn_id))

# Verify the stream names discovered were what we expect
found_catalog_names = {c['tap_stream_id'] for c in found_catalogs}
self.assertEqual(set(self.expected_streams()),
self.assertEqual(set(expected_streams),
set(found_catalog_names),
msg="Expected streams don't match actual streams")

Expand All @@ -58,7 +59,7 @@ def test_run(self):
self.assertTrue(all([re.fullmatch(r"[a-z_]+", name) for name in found_catalog_names]),
msg="One or more streams don't follow standard naming")

for stream in self.expected_streams():
for stream in expected_streams:
with self.subTest(stream=stream):
catalog = next(iter([catalog for catalog in found_catalogs
if catalog["stream_name"] == stream]))
Expand All @@ -74,6 +75,14 @@ def test_run(self):
self.assertTrue(len(stream_properties) == 1,
msg="There is more than one top level breadcrumb")

# collect fields
actual_fields = []
for md_entry in metadata:
if md_entry['breadcrumb'] != []:
actual_fields.append(md_entry['breadcrumb'][1])
# Verify there are no duplicate/conflicting metadata entries.
self.assertEqual(len(actual_fields), len(set(actual_fields)), msg="There are duplicate entries in the fields of '{}' stream".format(stream))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment about this assertion at the function(test_run()) level comment above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added function comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good addition


# verify replication key(s)
self.assertEqual(
set(stream_properties[0].get(
Expand Down
58 changes: 43 additions & 15 deletions tests/test_start_date.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,13 @@ def test_run(self):

# Select all streams and all fields within streams
found_catalogs = menagerie.get_catalogs(conn_id)
incremental_streams = {key for key, value in self.expected_replication_method().items()
# removed 'abandoned_checkouts', as per the Doc:
# https://help.shopify.com/en/manual/orders/abandoned-checkouts?st_source=admin&st_campaign=abandoned_checkouts_footer&utm_source=admin&utm_campaign=abandoned_checkouts_footer#review-your-abandoned-checkouts
# abandoned checkouts are saved in the Shopify admin for three months.
# Every Monday, abandoned checkouts that are older than three months are removed from your admin.
expected_replication_method = self.expected_replication_method()
dbshah1212 marked this conversation as resolved.
Show resolved Hide resolved
expected_replication_method.pop("abandoned_checkouts")
incremental_streams = {key for key, value in expected_replication_method.items()
if value == self.INCREMENTAL}

# IF THERE ARE STREAMS THAT SHOULD NOT BE TESTED
Expand All @@ -72,6 +78,7 @@ def test_run(self):

# Count actual rows synced
first_sync_records = runner.get_records_from_target_output()
first_min_bookmarks = self.min_bookmarks_by_stream(first_sync_records)

# set the start date for a new connection based off bookmarks largest value
first_max_bookmarks = self.max_bookmarks_by_stream(first_sync_records)
Expand Down Expand Up @@ -112,27 +119,48 @@ def test_run(self):
for stream in incremental_streams:
with self.subTest(stream=stream):

# get primary key values for both sync records
expected_primary_keys = self.expected_primary_keys()[stream]
primary_keys_list_1 = [tuple(message.get('data').get(expected_pk) for expected_pk in expected_primary_keys)
for message in first_sync_records.get(stream).get('messages')
if message.get('action') == 'upsert']
primary_keys_list_2 = [tuple(message.get('data').get(expected_pk) for expected_pk in expected_primary_keys)
for message in second_sync_records.get(stream).get('messages')
if message.get('action') == 'upsert']
primary_keys_sync_1 = set(primary_keys_list_1)
primary_keys_sync_2 = set(primary_keys_list_2)

# verify that each stream has less records than the first connection sync
self.assertGreaterEqual(
first_sync_record_count.get(stream, 0),
second_sync_record_count.get(stream, 0),
msg="second had more records, start_date usage not verified")

# verify all data from 2nd sync >= start_date
target_mark = second_min_bookmarks.get(stream, {"mark": None})
target_value = next(iter(target_mark.values())) # there should be only one
# Verify by primary key values, that all records of the 2nd sync are included in the 1st sync since 2nd sync has a later start date.
self.assertTrue(primary_keys_sync_2.issubset(primary_keys_sync_1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment about this assertion at the function(test_run()) level comment above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added function comment.


# verify all data from both syncs >= start_date
first_sync_target_mark = first_min_bookmarks.get(stream, {"mark": None})
second_sync_target_mark = second_min_bookmarks.get(stream, {"mark": None})

# get start dates for both syncs
first_sync_start_date = self.get_properties()["start_date"]
second_sync_start_date = self.start_date

for start_date, target_mark in zip((first_sync_start_date, second_sync_start_date), (first_sync_target_mark, second_sync_target_mark)):
target_value = next(iter(target_mark.values())) # there should be only one
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment here about the addition of for loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment.


if target_value:
if target_value:

# it's okay if there isn't target data for a stream
try:
target_value = self.local_to_utc(parse(target_value))
# it's okay if there isn't target data for a stream
try:
target_value = self.local_to_utc(parse(target_value))

# verify that the minimum bookmark sent to the target for the second sync
# is greater than or equal to the start date
self.assertGreaterEqual(target_value,
self.local_to_utc(parse(self.start_date)))
# verify that the minimum bookmark sent to the target for the second sync
# is greater than or equal to the start date
self.assertGreaterEqual(target_value,
self.local_to_utc(parse(start_date)))

except (OverflowError, ValueError, TypeError):
print("bookmarks cannot be converted to dates, "
"can't test start_date for {}".format(stream))
except (OverflowError, ValueError, TypeError):
print("bookmarks cannot be converted to dates, "
"can't test start_date for {}".format(stream))