-
Notifications
You must be signed in to change notification settings - Fork 88
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TDL-17512: Add missing tap-tester tests #134
base: crest-work
Are you sure you want to change the base?
Changes from 2 commits
b7b0d79
7fd7010
3088df8
3daafa7
273ce0e
7bfc94e
012242e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -132,7 +132,11 @@ def expected_metadata(self): | |
|
||
def expected_streams(self): | ||
"""A set of expected stream names""" | ||
return set(self.expected_metadata().keys()) | ||
# removed "abandoned_checkouts", as per the Doc: | ||
# https://help.shopify.com/en/manual/orders/abandoned-checkouts?st_source=admin&st_campaign=abandoned_checkouts_footer&utm_source=admin&utm_campaign=abandoned_checkouts_footer#review-your-abandoned-checkouts | ||
# abandoned checkouts are saved in the Shopify admin for three months. | ||
# Every Monday, abandoned checkouts that are older than three months are removed from your admin. | ||
return set(self.expected_metadata().keys()) - {"abandoned_checkouts"} | ||
|
||
def child_streams(self): | ||
""" | ||
|
@@ -301,7 +305,11 @@ def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | ||
self.start_date = self.get_properties().get("start_date") | ||
self.store_1_streams = {'custom_collections', 'orders', 'products', 'customers', 'locations', 'inventory_levels', 'inventory_items', 'events'} | ||
self.store_2_streams = {'abandoned_checkouts', 'collects', 'metafields', 'transactions', 'order_refunds', 'products', 'locations', 'inventory_levels', 'inventory_items', 'events'} | ||
# removed 'abandoned_checkouts' from store 2 streams, as per the Doc: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please put back, see previous comment There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Included abandoned_checkouts stream |
||
# https://help.shopify.com/en/manual/orders/abandoned-checkouts?st_source=admin&st_campaign=abandoned_checkouts_footer&utm_source=admin&utm_campaign=abandoned_checkouts_footer#review-your-abandoned-checkouts | ||
# abandoned checkouts are saved in the Shopify admin for three months. | ||
# Every Monday, abandoned checkouts that are older than three months are removed from your admin. | ||
self.store_2_streams = {'collects', 'metafields', 'transactions', 'order_refunds', 'products', 'locations', 'inventory_levels', 'inventory_items', 'events'} | ||
|
||
#modified this method to accommodate replication key in the current_state | ||
def calculated_states_by_stream(self, current_state): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
import os | ||
|
||
from tap_tester import runner, menagerie | ||
from base import BaseTapTest | ||
|
||
class AllFieldsTest(BaseTapTest): | ||
|
||
@staticmethod | ||
def name(): | ||
return "tap_tester_shopify_all_fields_test" | ||
|
||
def get_properties(self, original: bool = True): | ||
"""Configuration properties required for the tap.""" | ||
return_value = { | ||
'start_date': '2021-04-01T00:00:00Z', | ||
'shop': 'talenddatawearhouse', | ||
'date_window_size': 30 | ||
} | ||
|
||
return return_value | ||
|
||
@staticmethod | ||
def get_credentials(original_credentials: bool = True): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is already available in base.py, Any reason for redefining it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated the test case to use the function from base test file. |
||
"""Authentication information for the test account""" | ||
|
||
return { | ||
'api_key': os.getenv('TAP_SHOPIFY_API_KEY_TALENDDATAWEARHOUSE') | ||
} | ||
|
||
def test_run(self): | ||
""" | ||
Ensure running the tap with all streams and fields selected results in the | ||
replication of all fields. | ||
- Verify no unexpected streams were replicated | ||
- Verify that more than just the automatic fields are replicated for each stream | ||
""" | ||
|
||
expected_streams = self.expected_streams() | ||
|
||
# instantiate connection | ||
conn_id = self.create_connection() | ||
|
||
# run check mode | ||
found_catalogs = menagerie.get_catalogs(conn_id) | ||
|
||
# table and field selection | ||
test_catalogs_all_fields = [catalog for catalog in found_catalogs | ||
if catalog.get('stream_name') in expected_streams] | ||
self.select_all_streams_and_fields(conn_id, test_catalogs_all_fields, select_all_fields=True) | ||
|
||
# grab metadata after performing table-and-field selection to set expectations | ||
stream_to_all_catalog_fields = dict() # used for asserting all fields are replicated | ||
for catalog in test_catalogs_all_fields: | ||
stream_id, stream_name = catalog['stream_id'], catalog['stream_name'] | ||
catalog_entry = menagerie.get_annotated_schema(conn_id, stream_id) | ||
fields_from_field_level_md = [md_entry['breadcrumb'][1] | ||
for md_entry in catalog_entry['metadata'] | ||
if md_entry['breadcrumb'] != []] | ||
stream_to_all_catalog_fields[stream_name] = set(fields_from_field_level_md) | ||
|
||
# run initial sync | ||
record_count_by_stream = self.run_sync(conn_id) | ||
synced_records = runner.get_records_from_target_output() | ||
|
||
# Verify no unexpected streams were replicated | ||
synced_stream_names = set(synced_records.keys()) | ||
self.assertSetEqual(expected_streams, synced_stream_names) | ||
|
||
for stream in expected_streams: | ||
with self.subTest(stream=stream): | ||
|
||
# expected values | ||
expected_automatic_keys = self.expected_primary_keys().get(stream, set()) | self.expected_replication_keys().get(stream, set()) | ||
# get all expected keys | ||
expected_all_keys = stream_to_all_catalog_fields[stream] | ||
|
||
# collect actual values | ||
messages = synced_records.get(stream) | ||
|
||
actual_all_keys = set() | ||
# collect actual values | ||
for message in messages['messages']: | ||
if message['action'] == 'upsert': | ||
actual_all_keys.update(message['data'].keys()) | ||
|
||
# Verify that you get some records for each stream | ||
self.assertGreater(record_count_by_stream.get(stream, -1), 0) | ||
dbshah1212 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# verify all fields for a stream were replicated | ||
self.assertGreater(len(expected_all_keys), len(expected_automatic_keys)) | ||
self.assertTrue(expected_automatic_keys.issubset(expected_all_keys), msg=f'{expected_automatic_keys-expected_all_keys} is not in "expected_all_keys"') | ||
|
||
if stream == 'abandoned_checkouts': | ||
expected_all_keys.remove('billing_address') | ||
elif stream == 'orders': | ||
# No field named 'order_adjustments' present in the 'order' object | ||
# Documentation: https://shopify.dev/api/admin-rest/2021-10/resources/order#resource_object | ||
expected_all_keys.remove('order_adjustments') | ||
|
||
self.assertSetEqual(expected_all_keys, actual_all_keys) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,7 @@ def automatic_test(self, conn_id, testable_streams): | |
""" | ||
Verify that for each stream you can get multiple pages of data | ||
when no fields are selected and only the automatic fields are replicated. | ||
Verify that all replicated records have unique primary key values. | ||
|
||
PREREQUISITE | ||
For EACH stream add enough data that you surpass the limit of a single | ||
|
@@ -52,6 +53,7 @@ def automatic_test(self, conn_id, testable_streams): | |
record_count_by_stream = self.run_sync(conn_id) | ||
|
||
actual_fields_by_stream = runner.examine_target_output_for_fields() | ||
synced_records = runner.get_records_from_target_output() | ||
|
||
for stream in incremental_streams: | ||
with self.subTest(stream=stream): | ||
|
@@ -60,6 +62,11 @@ def automatic_test(self, conn_id, testable_streams): | |
# SKIP THIS ASSERTION FOR STREAMS WHERE YOU CANNOT GET | ||
# MORE THAN 1 PAGE OF DATA IN THE TEST ACCOUNT | ||
stream_metadata = self.expected_metadata().get(stream, {}) | ||
expected_primary_keys = self.expected_primary_keys().get(stream, set()) | ||
|
||
# collect records | ||
messages = synced_records.get(stream) | ||
|
||
minimum_record_count = stream_metadata.get( | ||
self.API_LIMIT, | ||
self.get_properties().get('result_per_page', self.DEFAULT_RESULTS_PER_PAGE) | ||
|
@@ -72,7 +79,15 @@ def automatic_test(self, conn_id, testable_streams): | |
# verify that only the automatic fields are sent to the target | ||
self.assertEqual( | ||
actual_fields_by_stream.get(stream, set()), | ||
self.expected_primary_keys().get(stream, set()) | | ||
expected_primary_keys | | ||
self.expected_replication_keys().get(stream, set()), | ||
msg="The fields sent to the target are not the automatic fields" | ||
) | ||
|
||
# Verify that all replicated records have unique primary key values. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good addition |
||
records_pks_set = {tuple([message.get('data').get(primary_key) for primary_key in expected_primary_keys]) | ||
for message in messages.get('messages')} | ||
records_pks_list = [tuple([message.get('data').get(primary_key) for primary_key in expected_primary_keys]) | ||
for message in messages.get('messages')] | ||
self.assertCountEqual(records_pks_set, records_pks_list, | ||
msg="We have duplicate records for {}".format(stream)) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,23 +33,24 @@ def test_run(self): | |
• verify that all other fields have inclusion of available (metadata and schema) | ||
""" | ||
conn_id = self.create_connection() | ||
expected_streams = self.expected_streams() | {"abandoned_checkouts"} | ||
|
||
# Verify number of actual streams discovered match expected | ||
found_catalogs = menagerie.get_catalogs(conn_id) | ||
self.assertGreater(len(found_catalogs), 0, | ||
msg="unable to locate schemas for connection {}".format(conn_id)) | ||
self.assertEqual(len(found_catalogs), | ||
len(self.expected_streams()), | ||
len(expected_streams), | ||
msg="Expected {} streams, actual was {} for connection {}," | ||
" actual {}".format( | ||
len(self.expected_streams()), | ||
len(expected_streams), | ||
len(found_catalogs), | ||
found_catalogs, | ||
conn_id)) | ||
|
||
# Verify the stream names discovered were what we expect | ||
found_catalog_names = {c['tap_stream_id'] for c in found_catalogs} | ||
self.assertEqual(set(self.expected_streams()), | ||
self.assertEqual(set(expected_streams), | ||
set(found_catalog_names), | ||
msg="Expected streams don't match actual streams") | ||
|
||
|
@@ -58,7 +59,7 @@ def test_run(self): | |
self.assertTrue(all([re.fullmatch(r"[a-z_]+", name) for name in found_catalog_names]), | ||
msg="One or more streams don't follow standard naming") | ||
|
||
for stream in self.expected_streams(): | ||
for stream in expected_streams: | ||
with self.subTest(stream=stream): | ||
catalog = next(iter([catalog for catalog in found_catalogs | ||
if catalog["stream_name"] == stream])) | ||
|
@@ -74,6 +75,14 @@ def test_run(self): | |
self.assertTrue(len(stream_properties) == 1, | ||
msg="There is more than one top level breadcrumb") | ||
|
||
# collect fields | ||
actual_fields = [] | ||
for md_entry in metadata: | ||
if md_entry['breadcrumb'] != []: | ||
actual_fields.append(md_entry['breadcrumb'][1]) | ||
# Verify there are no duplicate/conflicting metadata entries. | ||
self.assertEqual(len(actual_fields), len(set(actual_fields)), msg="There are duplicate entries in the fields of '{}' stream".format(stream)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a comment about this assertion at the function(test_run()) level comment above. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added function comment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good addition |
||
|
||
# verify replication key(s) | ||
self.assertEqual( | ||
set(stream_properties[0].get( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -56,7 +56,13 @@ def test_run(self): | |
|
||
# Select all streams and all fields within streams | ||
found_catalogs = menagerie.get_catalogs(conn_id) | ||
incremental_streams = {key for key, value in self.expected_replication_method().items() | ||
# removed 'abandoned_checkouts', as per the Doc: | ||
# https://help.shopify.com/en/manual/orders/abandoned-checkouts?st_source=admin&st_campaign=abandoned_checkouts_footer&utm_source=admin&utm_campaign=abandoned_checkouts_footer#review-your-abandoned-checkouts | ||
# abandoned checkouts are saved in the Shopify admin for three months. | ||
# Every Monday, abandoned checkouts that are older than three months are removed from your admin. | ||
expected_replication_method = self.expected_replication_method() | ||
dbshah1212 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
expected_replication_method.pop("abandoned_checkouts") | ||
incremental_streams = {key for key, value in expected_replication_method.items() | ||
if value == self.INCREMENTAL} | ||
|
||
# IF THERE ARE STREAMS THAT SHOULD NOT BE TESTED | ||
|
@@ -72,6 +78,7 @@ def test_run(self): | |
|
||
# Count actual rows synced | ||
first_sync_records = runner.get_records_from_target_output() | ||
first_min_bookmarks = self.min_bookmarks_by_stream(first_sync_records) | ||
|
||
# set the start date for a new connection based off bookmarks largest value | ||
first_max_bookmarks = self.max_bookmarks_by_stream(first_sync_records) | ||
|
@@ -112,27 +119,48 @@ def test_run(self): | |
for stream in incremental_streams: | ||
with self.subTest(stream=stream): | ||
|
||
# get primary key values for both sync records | ||
expected_primary_keys = self.expected_primary_keys()[stream] | ||
primary_keys_list_1 = [tuple(message.get('data').get(expected_pk) for expected_pk in expected_primary_keys) | ||
for message in first_sync_records.get(stream).get('messages') | ||
if message.get('action') == 'upsert'] | ||
primary_keys_list_2 = [tuple(message.get('data').get(expected_pk) for expected_pk in expected_primary_keys) | ||
for message in second_sync_records.get(stream).get('messages') | ||
if message.get('action') == 'upsert'] | ||
primary_keys_sync_1 = set(primary_keys_list_1) | ||
primary_keys_sync_2 = set(primary_keys_list_2) | ||
|
||
# verify that each stream has less records than the first connection sync | ||
self.assertGreaterEqual( | ||
first_sync_record_count.get(stream, 0), | ||
second_sync_record_count.get(stream, 0), | ||
msg="second had more records, start_date usage not verified") | ||
|
||
# verify all data from 2nd sync >= start_date | ||
target_mark = second_min_bookmarks.get(stream, {"mark": None}) | ||
target_value = next(iter(target_mark.values())) # there should be only one | ||
# Verify by primary key values, that all records of the 2nd sync are included in the 1st sync since 2nd sync has a later start date. | ||
self.assertTrue(primary_keys_sync_2.issubset(primary_keys_sync_1)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a comment about this assertion at the function(test_run()) level comment above. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added function comment. |
||
|
||
# verify all data from both syncs >= start_date | ||
first_sync_target_mark = first_min_bookmarks.get(stream, {"mark": None}) | ||
second_sync_target_mark = second_min_bookmarks.get(stream, {"mark": None}) | ||
|
||
# get start dates for both syncs | ||
first_sync_start_date = self.get_properties()["start_date"] | ||
second_sync_start_date = self.start_date | ||
|
||
for start_date, target_mark in zip((first_sync_start_date, second_sync_start_date), (first_sync_target_mark, second_sync_target_mark)): | ||
target_value = next(iter(target_mark.values())) # there should be only one | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a comment here about the addition of for loop. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added comment. |
||
|
||
if target_value: | ||
if target_value: | ||
|
||
# it's okay if there isn't target data for a stream | ||
try: | ||
target_value = self.local_to_utc(parse(target_value)) | ||
# it's okay if there isn't target data for a stream | ||
try: | ||
target_value = self.local_to_utc(parse(target_value)) | ||
|
||
# verify that the minimum bookmark sent to the target for the second sync | ||
# is greater than or equal to the start date | ||
self.assertGreaterEqual(target_value, | ||
self.local_to_utc(parse(self.start_date))) | ||
# verify that the minimum bookmark sent to the target for the second sync | ||
# is greater than or equal to the start date | ||
self.assertGreaterEqual(target_value, | ||
self.local_to_utc(parse(start_date))) | ||
|
||
except (OverflowError, ValueError, TypeError): | ||
print("bookmarks cannot be converted to dates, " | ||
"can't test start_date for {}".format(stream)) | ||
except (OverflowError, ValueError, TypeError): | ||
print("bookmarks cannot be converted to dates, " | ||
"can't test start_date for {}".format(stream)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add comments about the unavailability of the POST call with a reference link.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comment in the test cases.