diff --git a/tests/test_all_fields.py b/tests/test_all_fields.py new file mode 100644 index 00000000..07683fdd --- /dev/null +++ b/tests/test_all_fields.py @@ -0,0 +1,81 @@ +from tap_tester import runner, menagerie +from base import BaseTapTest + +class AllFieldsTest(BaseTapTest): + + @staticmethod + def name(): + return "tap_tester_shopify_all_fields_test" + + def test_run(self): + """ + Ensure running the tap with all streams and fields selected results in the + replication of all fields. + - Verify no unexpected streams were replicated + - Verify that more than just the automatic fields are replicated for each stream + """ + + self.start_date = "2021-04-01T00:00:00Z" + expected_streams = self.expected_streams() + + # instantiate connection to run on "talenddatawearhouse" + conn_id = self.create_connection(original_properties=False, original_credentials=False) + + # run check mode + found_catalogs = menagerie.get_catalogs(conn_id) + + # table and field selection + test_catalogs_all_fields = [catalog for catalog in found_catalogs + if catalog.get('stream_name') in expected_streams] + self.select_all_streams_and_fields(conn_id, test_catalogs_all_fields, select_all_fields=True) + + # grab metadata after performing table-and-field selection to set expectations + stream_to_all_catalog_fields = dict() # used for asserting all fields are replicated + for catalog in test_catalogs_all_fields: + stream_id, stream_name = catalog['stream_id'], catalog['stream_name'] + catalog_entry = menagerie.get_annotated_schema(conn_id, stream_id) + fields_from_field_level_md = [md_entry['breadcrumb'][1] + for md_entry in catalog_entry['metadata'] + if md_entry['breadcrumb'] != []] + stream_to_all_catalog_fields[stream_name] = set(fields_from_field_level_md) + + # run initial sync + record_count_by_stream = self.run_sync(conn_id) + synced_records = runner.get_records_from_target_output() + + # Verify no unexpected streams were replicated + synced_stream_names = set(synced_records.keys()) + self.assertSetEqual(expected_streams, synced_stream_names) + + for stream in expected_streams: + with self.subTest(stream=stream): + + # expected values + expected_automatic_keys = self.expected_primary_keys().get(stream, set()) | self.expected_replication_keys().get(stream, set()) + # get all expected keys + expected_all_keys = stream_to_all_catalog_fields[stream] + + # collect actual values + messages = synced_records.get(stream) + + actual_all_keys = set() + # collect actual values + for message in messages['messages']: + if message['action'] == 'upsert': + actual_all_keys.update(message['data'].keys()) + + # Verify that you get some records for each stream + self.assertGreater(record_count_by_stream.get(stream, -1), 0) + + # verify all fields for a stream were replicated + self.assertGreater(len(expected_all_keys), len(expected_automatic_keys)) + self.assertTrue(expected_automatic_keys.issubset(expected_all_keys), msg=f'{expected_automatic_keys-expected_all_keys} is not in "expected_all_keys"') + + if stream == 'abandoned_checkouts': + expected_all_keys.remove('billing_address') + elif stream == 'orders': + # No field named 'order_adjustments' present in the 'order' object + # Documentation: https://shopify.dev/api/admin-rest/2021-10/resources/order#resource_object + expected_all_keys.remove('order_adjustments') + + self.assertSetEqual(expected_all_keys, actual_all_keys) diff --git a/tests/test_automatic_fields.py b/tests/test_automatic_fields.py index 76158e29..8fa63207 100644 --- a/tests/test_automatic_fields.py +++ b/tests/test_automatic_fields.py @@ -31,6 +31,7 @@ def automatic_test(self, conn_id, testable_streams): """ Verify that for each stream you can get multiple pages of data when no fields are selected and only the automatic fields are replicated. + Verify that all replicated records have unique primary key values. PREREQUISITE For EACH stream add enough data that you surpass the limit of a single @@ -52,6 +53,7 @@ def automatic_test(self, conn_id, testable_streams): record_count_by_stream = self.run_sync(conn_id) actual_fields_by_stream = runner.examine_target_output_for_fields() + synced_records = runner.get_records_from_target_output() for stream in incremental_streams: with self.subTest(stream=stream): @@ -60,6 +62,11 @@ def automatic_test(self, conn_id, testable_streams): # SKIP THIS ASSERTION FOR STREAMS WHERE YOU CANNOT GET # MORE THAN 1 PAGE OF DATA IN THE TEST ACCOUNT stream_metadata = self.expected_metadata().get(stream, {}) + expected_primary_keys = self.expected_primary_keys().get(stream, set()) + + # collect records + messages = synced_records.get(stream) + minimum_record_count = stream_metadata.get( self.API_LIMIT, self.get_properties().get('result_per_page', self.DEFAULT_RESULTS_PER_PAGE) @@ -72,7 +79,15 @@ def automatic_test(self, conn_id, testable_streams): # verify that only the automatic fields are sent to the target self.assertEqual( actual_fields_by_stream.get(stream, set()), - self.expected_primary_keys().get(stream, set()) | + expected_primary_keys | self.expected_replication_keys().get(stream, set()), msg="The fields sent to the target are not the automatic fields" ) + + # Verify that all replicated records have unique primary key values. + records_pks_set = {tuple([message.get('data').get(primary_key) for primary_key in expected_primary_keys]) + for message in messages.get('messages')} + records_pks_list = [tuple([message.get('data').get(primary_key) for primary_key in expected_primary_keys]) + for message in messages.get('messages')] + self.assertCountEqual(records_pks_set, records_pks_list, + msg="We have duplicate records for {}".format(stream)) diff --git a/tests/test_bookmarks_updated.py b/tests/test_bookmarks_updated.py index c39ce1a3..87f78f34 100644 --- a/tests/test_bookmarks_updated.py +++ b/tests/test_bookmarks_updated.py @@ -15,6 +15,17 @@ class BookmarkTest(BaseTapTest): def name(): return "tap_tester_shopify_bookmark_test" + # function for verifying the date format + def is_expected_date_format(self, date): + try: + # parse date + dt.strptime(date, "%Y-%m-%dT%H:%M:%S.%fZ") + except ValueError: + # return False if date is in not expected format + return False + # return True in case of no error + return True + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.start_date = '2021-04-01T00:00:00Z' @@ -112,7 +123,9 @@ def bookmarks_test(self, conn_id, testable_streams): # verify the syncs sets a bookmark of the expected form self.assertIsNotNone(first_bookmark_value) + self.assertTrue(self.is_expected_date_format(first_bookmark_value)) self.assertIsNotNone(second_bookmark_value) + self.assertTrue(self.is_expected_date_format(second_bookmark_value)) # verify the 2nd bookmark is equal to 1st sync bookmark #NOT A BUG (IS the expected behaviour for shopify as they are using date windowing : TDL-17096 : 2nd bookmark value is getting assigned from the execution time rather than the actual bookmark time. This is an invalid assertion for shopify diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 5a2377dd..52807de6 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -24,6 +24,7 @@ def test_run(self): • Verify stream names follow naming convention streams should only have lowercase alphas and underscores • verify there is only 1 top level breadcrumb + • Verify there are no duplicate/conflicting metadata entries. • verify replication key(s) • verify primary key(s) • verify that if there is a replication key we are doing INCREMENTAL otherwise FULL @@ -33,23 +34,24 @@ def test_run(self): • verify that all other fields have inclusion of available (metadata and schema) """ conn_id = self.create_connection() + expected_streams = self.expected_streams() # Verify number of actual streams discovered match expected found_catalogs = menagerie.get_catalogs(conn_id) self.assertGreater(len(found_catalogs), 0, msg="unable to locate schemas for connection {}".format(conn_id)) self.assertEqual(len(found_catalogs), - len(self.expected_streams()), + len(expected_streams), msg="Expected {} streams, actual was {} for connection {}," " actual {}".format( - len(self.expected_streams()), + len(expected_streams), len(found_catalogs), found_catalogs, conn_id)) # Verify the stream names discovered were what we expect found_catalog_names = {c['tap_stream_id'] for c in found_catalogs} - self.assertEqual(set(self.expected_streams()), + self.assertEqual(set(expected_streams), set(found_catalog_names), msg="Expected streams don't match actual streams") @@ -58,7 +60,7 @@ def test_run(self): self.assertTrue(all([re.fullmatch(r"[a-z_]+", name) for name in found_catalog_names]), msg="One or more streams don't follow standard naming") - for stream in self.expected_streams(): + for stream in expected_streams: with self.subTest(stream=stream): catalog = next(iter([catalog for catalog in found_catalogs if catalog["stream_name"] == stream])) @@ -74,6 +76,14 @@ def test_run(self): self.assertTrue(len(stream_properties) == 1, msg="There is more than one top level breadcrumb") + # collect fields + actual_fields = [] + for md_entry in metadata: + if md_entry['breadcrumb'] != []: + actual_fields.append(md_entry['breadcrumb'][1]) + # Verify there are no duplicate/conflicting metadata entries. + self.assertEqual(len(actual_fields), len(set(actual_fields)), msg="There are duplicate entries in the fields of '{}' stream".format(stream)) + # verify replication key(s) self.assertEqual( set(stream_properties[0].get( diff --git a/tests/test_start_date.py b/tests/test_start_date.py index 52635141..6a2791eb 100644 --- a/tests/test_start_date.py +++ b/tests/test_start_date.py @@ -20,6 +20,8 @@ class StartDateTest(BaseTapTest): • verify that a sync with a later start date has at least one record synced and less records than the 1st sync with a previous start date • verify that each stream has less records than the earlier start date sync + • Verify by primary key values, that all records of the 2nd sync are included + in the 1st sync since 2nd sync has a later start date. • verify all data from later start data has bookmark values >= start_date • verify that the minimum bookmark sent to the target for the later start_date sync is greater than or equal to the start date @@ -59,6 +61,9 @@ def test_run(self): incremental_streams = {key for key, value in self.expected_replication_method().items() if value == self.INCREMENTAL} + # get expected replication keys + expected_replication_keys = self.expected_replication_keys() + # IF THERE ARE STREAMS THAT SHOULD NOT BE TESTED # REPLACE THE EMPTY SET BELOW WITH THOSE STREAMS @@ -103,7 +108,6 @@ def test_run(self): second_sync_record_count = self.run_sync(conn_id) second_total_records = reduce(lambda a, b: a + b, second_sync_record_count.values(), 0) second_sync_records = runner.get_records_from_target_output() - second_min_bookmarks = self.min_bookmarks_by_stream(second_sync_records) # verify that at least one record synced and less records synced than the 1st connection self.assertGreater(second_total_records, 0) @@ -112,27 +116,45 @@ def test_run(self): for stream in incremental_streams: with self.subTest(stream=stream): + # get primary key values for both sync records + expected_primary_keys = self.expected_primary_keys()[stream] + primary_keys_list_1 = [tuple(message.get('data').get(expected_pk) for expected_pk in expected_primary_keys) + for message in first_sync_records.get(stream).get('messages') + if message.get('action') == 'upsert'] + primary_keys_list_2 = [tuple(message.get('data').get(expected_pk) for expected_pk in expected_primary_keys) + for message in second_sync_records.get(stream).get('messages') + if message.get('action') == 'upsert'] + primary_keys_sync_1 = set(primary_keys_list_1) + primary_keys_sync_2 = set(primary_keys_list_2) + + # get replication key-values for all records for both syncs + replication_key_sync_1 = [message.get('data').get(expected_rk) for expected_rk in expected_replication_keys.get(stream) + for message in first_sync_records.get(stream).get('messages') + if message.get('action') == 'upsert'] + replication_key_sync_2 = [message.get('data').get(expected_rk) for expected_rk in expected_replication_keys.get(stream) + for message in second_sync_records.get(stream).get('messages') + if message.get('action') == 'upsert'] + # verify that each stream has less records than the first connection sync self.assertGreaterEqual( first_sync_record_count.get(stream, 0), second_sync_record_count.get(stream, 0), msg="second had more records, start_date usage not verified") - # verify all data from 2nd sync >= start_date - target_mark = second_min_bookmarks.get(stream, {"mark": None}) - target_value = next(iter(target_mark.values())) # there should be only one - - if target_value: + # Verify by primary key values, that all records of the 2nd sync are included in the 1st sync since 2nd sync has a later start date. + self.assertTrue(primary_keys_sync_2.issubset(primary_keys_sync_1)) - # it's okay if there isn't target data for a stream - try: - target_value = self.local_to_utc(parse(target_value)) + # get start dates for both syncs + first_sync_start_date = self.get_properties()["start_date"] + second_sync_start_date = self.start_date - # verify that the minimum bookmark sent to the target for the second sync - # is greater than or equal to the start date - self.assertGreaterEqual(target_value, - self.local_to_utc(parse(self.start_date))) + # loop over the start date/state file date and replication key records for each syncs + # to verify the records we synced are greater than the start date/state file date + for start_date, record_replication_keys in zip( + (first_sync_start_date, second_sync_start_date), + (replication_key_sync_1, replication_key_sync_2)): - except (OverflowError, ValueError, TypeError): - print("bookmarks cannot be converted to dates, " - "can't test start_date for {}".format(stream)) + # loop over every replication key records and verify we have + # synced records greater than start date/state file date + for record_replication_key in record_replication_keys: + self.assertGreaterEqual(record_replication_key, start_date)