From e0c55a4b5868edfc73858fc0cb53e1d7a1979843 Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Wed, 16 Mar 2022 05:05:37 +0000 Subject: [PATCH 01/20] updated bookmark strategy for transactions --- .../test_bookmark_for_transactions.py | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100644 tests/unittests/test_bookmark_for_transactions.py diff --git a/tests/unittests/test_bookmark_for_transactions.py b/tests/unittests/test_bookmark_for_transactions.py new file mode 100644 index 00000000..1784a43b --- /dev/null +++ b/tests/unittests/test_bookmark_for_transactions.py @@ -0,0 +1,33 @@ +import unittest +from unittest import mock +from singer.utils import strptime_to_utc +from tap_shopify.context import Context +from tap_shopify.streams import transactions + +TRANSACTIONS_OBJECT = Context.stream_objects['transactions']() + +class Transaction(): + '''The Transaction object to return.''' + def __init__(self, id, created_at): + self.id = id + self.created_at = created_at + + def to_dict(self): + return {"id": self.id, "created_at": self.created_at} + +tx1 = Transaction("i11", "2021-08-11T01:57:05-04:00") +tx2 = Transaction("i12", "2021-08-12T01:57:05-04:00") +tx3 = Transaction("i21", "2021-08-13T01:57:05-04:00") +tx4 = Transaction("i22", "2021-08-14T01:57:05-04:00") + +class TestTransactionsBookmark(unittest.TestCase): + + @mock.patch("tap_shopify.streams.base.Stream.get_bookmark") + @mock.patch('tap_shopify.streams.transactions.Transactions.get_objects') + def test_sync(self, mock_get_transactions, mock_get_bookmark): + '''Verify that the sync returns all the rcords for transactions without filtering through bookmark.''' + mock_get_transactions.return_value = [tx1, tx2, tx3, tx4] + mock_get_bookmark.return_value = strptime_to_utc("2021-08-13T01:05:05-04:00") + expected_sync = [tx1.to_dict(), tx2.to_dict(), tx3.to_dict(), tx4.to_dict()] + actual_sync = list(TRANSACTIONS_OBJECT.sync()) + self.assertEqual(expected_sync, actual_sync) From df05032a8260c9854d937ab805c8cca59beadc9b Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Wed, 16 Mar 2022 05:08:11 +0000 Subject: [PATCH 02/20] updated bookmakr for transactions --- tap_shopify/streams/transactions.py | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/tap_shopify/streams/transactions.py b/tap_shopify/streams/transactions.py index fc9b2a55..23c90697 100644 --- a/tap_shopify/streams/transactions.py +++ b/tap_shopify/streams/transactions.py @@ -107,19 +107,11 @@ def get_objects(self): yield transaction def sync(self): - bookmark = self.get_bookmark() - max_bookmark = bookmark for transaction in self.get_objects(): transaction_dict = transaction.to_dict() replication_value = strptime_to_utc(transaction_dict[self.replication_key]) - if replication_value >= bookmark: - for field_name in ['token', 'version', 'ack', 'timestamp', 'build']: - canonicalize(transaction_dict, field_name) - yield transaction_dict - - if replication_value > max_bookmark: - max_bookmark = replication_value - - self.update_bookmark(strftime(max_bookmark)) + for field_name in ['token', 'version', 'ack', 'timestamp', 'build']: + canonicalize(transaction_dict, field_name) + yield transaction_dict Context.stream_objects['transactions'] = Transactions From 432068ef923f9889018ed76053ef510f61c14ca4 Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Wed, 16 Mar 2022 05:08:56 +0000 Subject: [PATCH 03/20] removed unused var --- tap_shopify/streams/transactions.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tap_shopify/streams/transactions.py b/tap_shopify/streams/transactions.py index 23c90697..3a4273c7 100644 --- a/tap_shopify/streams/transactions.py +++ b/tap_shopify/streams/transactions.py @@ -109,7 +109,6 @@ def get_objects(self): def sync(self): for transaction in self.get_objects(): transaction_dict = transaction.to_dict() - replication_value = strptime_to_utc(transaction_dict[self.replication_key]) for field_name in ['token', 'version', 'ack', 'timestamp', 'build']: canonicalize(transaction_dict, field_name) yield transaction_dict From 5a9d583a954c845f8196485eff62f16b3b9b5fe0 Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Wed, 16 Mar 2022 05:35:06 +0000 Subject: [PATCH 04/20] removed unused import --- tap_shopify/streams/transactions.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tap_shopify/streams/transactions.py b/tap_shopify/streams/transactions.py index 3a4273c7..8a235381 100644 --- a/tap_shopify/streams/transactions.py +++ b/tap_shopify/streams/transactions.py @@ -1,6 +1,5 @@ import shopify import singer -from singer.utils import strftime, strptime_to_utc from tap_shopify.context import Context from tap_shopify.streams.base import (Stream, shopify_error_handling) From ff0a963e3dc5bdb93fa3e0d6675cd8d7b5dae416 Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Wed, 16 Mar 2022 15:23:21 +0000 Subject: [PATCH 05/20] updated integration tests --- tests/test_bookmarks.py | 8 ++++-- tests/test_bookmarks_updated.py | 51 +++++++++++++++++++++------------ tests/test_start_date.py | 2 +- 3 files changed, 40 insertions(+), 21 deletions(-) diff --git a/tests/test_bookmarks.py b/tests/test_bookmarks.py index a6a1f135..90304694 100644 --- a/tests/test_bookmarks.py +++ b/tests/test_bookmarks.py @@ -101,8 +101,12 @@ def bookmarks_test(self, conn_id, testable_streams): stream_bookmark_key) == 1 # There shouldn't be a compound replication key stream_bookmark_key = stream_bookmark_key.pop() - state_value = first_sync_state.get("bookmarks", {}).get( - stream, {None: None}).get(stream_bookmark_key) + if stream not in ('transactions'): + state_value = first_sync_state.get("bookmarks", {}).get( + stream, {None: None}).get(stream_bookmark_key) + else: + state_value = first_sync_state.get("bookmarks", {}).get( + 'transaction_orders').get('updated_at') target_value = first_max_bookmarks.get( stream, {None: None}).get(stream_bookmark_key) target_min_value = first_min_bookmarks.get( diff --git a/tests/test_bookmarks_updated.py b/tests/test_bookmarks_updated.py index abb9dee1..20c14923 100644 --- a/tests/test_bookmarks_updated.py +++ b/tests/test_bookmarks_updated.py @@ -59,7 +59,7 @@ def bookmarks_test(self, conn_id, testable_streams): first_sync_records = runner.get_records_from_target_output() # BUG:TDL-17087 : State has additional values which are not streams # Need to remove additional values from bookmark value - extra_stuff = {'transaction_orders', 'metafield_products', 'refund_orders', 'product_variants'} + extra_stuff = {'metafield_products', 'refund_orders', 'product_variants'} for keys in list(first_sync_bookmark['bookmarks'].keys()): if keys in extra_stuff: first_sync_bookmark['bookmarks'].pop(keys) @@ -72,7 +72,7 @@ def bookmarks_test(self, conn_id, testable_streams): #simulated_states = self.calculated_states_by_stream(first_sync_bookmark) # We are hardcoding the updated state to ensure that we get atleast 1 record in second sync. These values have been provided after reviewing the max bookmark value for each of the streams - simulated_states = {'products': {'updated_at': '2021-12-20T05:10:05.000000Z'}, 'collects': {'updated_at': '2021-09-01T09:08:28.000000Z'}, 'abandoned_checkouts': {'updated_at': '2022-02-02T16:00:00.000000Z'}, 'inventory_levels': {'updated_at': '2021-12-20T05:09:34.000000Z'}, 'locations': {'updated_at': '2021-07-20T09:00:22.000000Z'}, 'events': {'created_at': '2021-12-20T05:09:01.000000Z'}, 'inventory_items': {'updated_at': '2021-09-15T19:44:11.000000Z'}, 'transactions': {'created_at': '2021-12-20T00:08:52-05:00'}, 'metafields': {'updated_at': '2021-09-07T21:18:05.000000Z'}, 'order_refunds': {'created_at': '2021-05-01T17:41:18.000000Z'}, 'customers': {'updated_at': '2021-12-20T05:08:17.000000Z'}, 'orders': {'updated_at': '2021-12-20T05:09:01.000000Z'}, 'custom_collections': {'updated_at': '2021-12-20T17:41:18.000000Z'}} + simulated_states = {'products': {'updated_at': '2021-12-20T05:10:05.000000Z'}, 'collects': {'updated_at': '2021-09-01T09:08:28.000000Z'}, 'abandoned_checkouts': {'updated_at': '2022-02-02T16:00:00.000000Z'}, 'inventory_levels': {'updated_at': '2021-12-20T05:09:34.000000Z'}, 'locations': {'updated_at': '2021-07-20T09:00:22.000000Z'}, 'events': {'created_at': '2021-12-20T05:09:01.000000Z'}, 'inventory_items': {'updated_at': '2021-09-15T19:44:11.000000Z'}, 'transaction_orders': {'updated_at': '2021-12-12T00:08:33.000000Z'}, 'metafields': {'updated_at': '2021-09-07T21:18:05.000000Z'}, 'order_refunds': {'created_at': '2021-05-01T17:41:18.000000Z'}, 'customers': {'updated_at': '2021-12-20T05:08:17.000000Z'}, 'orders': {'updated_at': '2021-12-20T05:09:01.000000Z'}, 'custom_collections': {'updated_at': '2021-12-20T17:41:18.000000Z'}} for stream, updated_state in simulated_states.items(): new_state['bookmarks'][stream] = updated_state @@ -99,15 +99,25 @@ def bookmarks_test(self, conn_id, testable_streams): if record.get('action') == 'upsert'] second_sync_messages = [record.get('data') for record in second_sync_records.get(stream, {}).get('messages', []) if record.get('action') == 'upsert'] - first_bookmark_value = first_sync_bookmark.get('bookmarks', {stream: None}).get(stream) + if stream not in ('transactions'): + first_bookmark_value = first_sync_bookmark.get('bookmarks', {stream: None}).get(stream) + else: + first_bookmark_value = first_sync_bookmark.get('bookmarks').get('transaction_orders') first_bookmark_value = list(first_bookmark_value.values())[0] - second_bookmark_value = second_sync_bookmark.get('bookmarks', {stream: None}).get(stream) + + if stream not in ('transactions'): + second_bookmark_value = second_sync_bookmark.get('bookmarks', {stream: None}).get(stream) + else: + second_bookmark_value = second_sync_bookmark.get('bookmarks').get('transaction_orders') second_bookmark_value = list(second_bookmark_value.values())[0] replication_key = next(iter(expected_replication_keys[stream])) first_bookmark_value_utc = self.convert_state_to_utc(first_bookmark_value) second_bookmark_value_utc = self.convert_state_to_utc(second_bookmark_value) - simulated_bookmark = new_state['bookmarks'][stream] + if stream not in ('transactions'): + simulated_bookmark = new_state['bookmarks'][stream] + else: + simulated_bookmark = new_state['bookmarks']['transaction_orders'] simulated_bookmark_value = list(simulated_bookmark.values())[0] # verify the syncs sets a bookmark of the expected form @@ -118,24 +128,29 @@ def bookmarks_test(self, conn_id, testable_streams): #NOT A BUG (IS the expected behaviour for shopify as they are using date windowing : TDL-17096 : 2nd bookmark value is getting assigned from the execution time rather than the actual bookmark time. This is an invalid assertion for shopify #self.assertEqual(first_bookmark_value, second_bookmark_value) - for record in first_sync_messages: - replication_key_value = record.get(replication_key) - # verify 1st sync bookmark value is the max replication key value for a given stream - self.assertLessEqual(replication_key_value, first_bookmark_value_utc, msg="First sync bookmark was set incorrectly, a record with a greater replication key value was synced") - - for record in second_sync_messages: - replication_key_value = record.get(replication_key) - # verify the 2nd sync replication key value is greater or equal to the 1st sync bookmarks - self.assertGreaterEqual(replication_key_value, simulated_bookmark_value, msg="Second sync records do not respect the previous bookmark") - # verify the 2nd sync bookmark value is the max replication key value for a given stream - self.assertLessEqual(replication_key_value, second_bookmark_value_utc, msg="Second sync bookmark was set incorrectly, a record with a greater replication key value was synced") + # As the bookmark for transactions is solely dependent on the value of bookmark in 'transaction_orders' which stores the parent record's + # bookmark, hence we'd now get all the data for transactions stream without filtering on `created_at` + if stream not in ('transactions'): + for record in first_sync_messages: + replication_key_value = record.get(replication_key) + # verify 1st sync bookmark value is the max replication key value for a given stream + self.assertLessEqual(replication_key_value, first_bookmark_value_utc, msg="First sync bookmark was set incorrectly, a record with a greater replication key value was synced") + + for record in second_sync_messages: + replication_key_value = record.get(replication_key) + # verify the 2nd sync replication key value is greater or equal to the 1st sync bookmarks + self.assertGreaterEqual(replication_key_value, simulated_bookmark_value, msg="Second sync records do not respect the previous bookmark") + # verify the 2nd sync bookmark value is the max replication key value for a given stream + self.assertLessEqual(replication_key_value, second_bookmark_value_utc, msg="Second sync bookmark was set incorrectly, a record with a greater replication key value was synced") # verify that we get less data in the 2nd sync # collects has all the records with the same value of replication key, so we are removing from this assertion - if stream not in ('collects'): + # As the bookmark for `transactions` is solely dependent on the value of bookmark in 'transaction_orders' which stores the parent record's + # bookmark, hence we'd now get all the data for transactions stream without filtering on `created_at` + if stream not in ('collects', 'transactions'): self.assertLess(second_sync_count, first_sync_count, msg="Second sync does not have less records, bookmark usage not verified") # verify that we get atleast 1 record in the second sync - if stream not in ('collects'): + if stream not in ('collects', 'transactions'): self.assertGreater(second_sync_count, 0, msg="Second sync did not yield any records") diff --git a/tests/test_start_date.py b/tests/test_start_date.py index 52635141..6fa4115e 100644 --- a/tests/test_start_date.py +++ b/tests/test_start_date.py @@ -122,7 +122,7 @@ def test_run(self): target_mark = second_min_bookmarks.get(stream, {"mark": None}) target_value = next(iter(target_mark.values())) # there should be only one - if target_value: + if target_value and stream not in ('transactions'): # it's okay if there isn't target data for a stream try: From 012c578e8fcae0ff28c850cb3eae30ad3d7c8492 Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Tue, 22 Mar 2022 10:04:14 +0000 Subject: [PATCH 06/20] rempoved the replication key for transactions --- tap_shopify/__init__.py | 2 +- tap_shopify/streams/transactions.py | 2 +- tests/base.py | 84 +++++++++++++++++------------ tests/test_bookmarks.py | 16 +++--- tests/test_bookmarks_updated.py | 14 +++-- tests/test_discovery.py | 7 ++- 6 files changed, 72 insertions(+), 53 deletions(-) diff --git a/tap_shopify/__init__.py b/tap_shopify/__init__.py index e0299bcb..f82ded42 100644 --- a/tap_shopify/__init__.py +++ b/tap_shopify/__init__.py @@ -143,7 +143,7 @@ def sync(): singer.write_schema(stream["tap_stream_id"], stream["schema"], stream["key_properties"], - bookmark_properties=stream["replication_key"]) + bookmark_properties=stream.get("replication_key", None)) Context.counts[stream["tap_stream_id"]] = 0 # If there is a currently syncing stream bookmark, shuffle the diff --git a/tap_shopify/streams/transactions.py b/tap_shopify/streams/transactions.py index 8a235381..89f9889d 100644 --- a/tap_shopify/streams/transactions.py +++ b/tap_shopify/streams/transactions.py @@ -54,7 +54,7 @@ def canonicalize(transaction_dict, field_name): class Transactions(Stream): name = 'transactions' - replication_key = 'created_at' + replication_key = None replication_object = shopify.Transaction # Added decorator over functions of shopify SDK replication_object.find = shopify_error_handling(replication_object.find) diff --git a/tests/base.py b/tests/base.py index 2d45ba7b..e5f5d1da 100644 --- a/tests/base.py +++ b/tests/base.py @@ -107,7 +107,11 @@ def expected_metadata(self): self.API_LIMIT: 250}, "metafields": meta, "transactions": { - self.REPLICATION_KEYS: {"created_at"}, + # `transactions` is child stream of `orders` stream which is incremental. + # We are writing a separate bookmark for the child stream in which we are storing + # the bookmark based on the parent's replication key. + # But, we are not using any fields from the child record for it. + # That's why the `transactions` stream does not have replication_key but still it is incremental. self.PRIMARY_KEYS: {"id"}, self.FOREIGN_KEYS: {"order_id"}, self.REPLICATION_METHOD: self.INCREMENTAL, @@ -240,46 +244,56 @@ def max_bookmarks_by_stream(self, sync_records): """ max_bookmarks = {} for stream, batch in sync_records.items(): - - upsert_messages = [m for m in batch.get('messages') if m['action'] == 'upsert'] - stream_bookmark_key = self.expected_replication_keys().get(stream, set()) - assert len(stream_bookmark_key) == 1 # There shouldn't be a compound replication key - stream_bookmark_key = stream_bookmark_key.pop() - - bk_values = [message["data"].get(stream_bookmark_key) for message in upsert_messages] - max_bookmarks[stream] = {stream_bookmark_key: None} - for bk_value in bk_values: - if bk_value is None: - continue - - if max_bookmarks[stream][stream_bookmark_key] is None: - max_bookmarks[stream][stream_bookmark_key] = bk_value - - if bk_value > max_bookmarks[stream][stream_bookmark_key]: - max_bookmarks[stream][stream_bookmark_key] = bk_value + # `transactions` is child stream of `orders` stream which is incremental. + # We are writing a separate bookmark for the child stream in which we are storing + # the bookmark based on the parent's replication key. + # But, we are not using any fields from the child record for it. + # That's why the `transactions` stream does not have replication_key but still it is incremental. + if stream not in ('transactions'): + upsert_messages = [m for m in batch.get('messages') if m['action'] == 'upsert'] + stream_bookmark_key = self.expected_replication_keys().get(stream, set()) + assert len(stream_bookmark_key) == 1 # There shouldn't be a compound replication key + stream_bookmark_key = stream_bookmark_key.pop() + + bk_values = [message["data"].get(stream_bookmark_key) for message in upsert_messages] + max_bookmarks[stream] = {stream_bookmark_key: None} + for bk_value in bk_values: + if bk_value is None: + continue + + if max_bookmarks[stream][stream_bookmark_key] is None: + max_bookmarks[stream][stream_bookmark_key] = bk_value + + if bk_value > max_bookmarks[stream][stream_bookmark_key]: + max_bookmarks[stream][stream_bookmark_key] = bk_value return max_bookmarks def min_bookmarks_by_stream(self, sync_records): """Return the minimum value for the replication key for each stream""" min_bookmarks = {} for stream, batch in sync_records.items(): - - upsert_messages = [m for m in batch.get('messages') if m['action'] == 'upsert'] - stream_bookmark_key = self.expected_replication_keys().get(stream, set()) - assert len(stream_bookmark_key) == 1 # There shouldn't be a compound replication key - (stream_bookmark_key, ) = stream_bookmark_key - - bk_values = [message["data"].get(stream_bookmark_key) for message in upsert_messages] - min_bookmarks[stream] = {stream_bookmark_key: None} - for bk_value in bk_values: - if bk_value is None: - continue - - if min_bookmarks[stream][stream_bookmark_key] is None: - min_bookmarks[stream][stream_bookmark_key] = bk_value - - if bk_value < min_bookmarks[stream][stream_bookmark_key]: - min_bookmarks[stream][stream_bookmark_key] = bk_value + # `transactions` is child stream of `orders` stream which is incremental. + # We are writing a separate bookmark for the child stream in which we are storing + # the bookmark based on the parent's replication key. + # But, we are not using any fields from the child record for it. + # That's why the `transactions` stream does not have replication_key but still it is incremental. + if stream not in ('transactions'): + upsert_messages = [m for m in batch.get('messages') if m['action'] == 'upsert'] + stream_bookmark_key = self.expected_replication_keys().get(stream, set()) + assert len(stream_bookmark_key) == 1 # There shouldn't be a compound replication key + (stream_bookmark_key, ) = stream_bookmark_key + + bk_values = [message["data"].get(stream_bookmark_key) for message in upsert_messages] + min_bookmarks[stream] = {stream_bookmark_key: None} + for bk_value in bk_values: + if bk_value is None: + continue + + if min_bookmarks[stream][stream_bookmark_key] is None: + min_bookmarks[stream][stream_bookmark_key] = bk_value + + if bk_value < min_bookmarks[stream][stream_bookmark_key]: + min_bookmarks[stream][stream_bookmark_key] = bk_value return min_bookmarks @staticmethod diff --git a/tests/test_bookmarks.py b/tests/test_bookmarks.py index 90304694..14254086 100644 --- a/tests/test_bookmarks.py +++ b/tests/test_bookmarks.py @@ -48,8 +48,13 @@ def bookmarks_test(self, conn_id, testable_streams): # Select all streams and no fields within streams found_catalogs = menagerie.get_catalogs(conn_id) + # `transactions` is child stream of `orders` stream which is incremental. + # We are writing a separate bookmark for the child stream in which we are storing + # the bookmark based on the parent's replication key. + # But, we are not using any fields from the child record for it. + # That's why the `transactions` stream does not have replication_key but still it is incremental. incremental_streams = {key for key, value in self.expected_replication_method().items() - if value == self.INCREMENTAL and key in testable_streams} + if value == self.INCREMENTAL and key in testable_streams and key not in ('transactions')} # Our test data sets for Shopify do not have any abandoned_checkouts our_catalogs = [catalog for catalog in found_catalogs if @@ -101,12 +106,9 @@ def bookmarks_test(self, conn_id, testable_streams): stream_bookmark_key) == 1 # There shouldn't be a compound replication key stream_bookmark_key = stream_bookmark_key.pop() - if stream not in ('transactions'): - state_value = first_sync_state.get("bookmarks", {}).get( - stream, {None: None}).get(stream_bookmark_key) - else: - state_value = first_sync_state.get("bookmarks", {}).get( - 'transaction_orders').get('updated_at') + + state_value = first_sync_state.get("bookmarks", {}).get( + stream, {None: None}).get(stream_bookmark_key) target_value = first_max_bookmarks.get( stream, {None: None}).get(stream_bookmark_key) target_min_value = first_min_bookmarks.get( diff --git a/tests/test_bookmarks_updated.py b/tests/test_bookmarks_updated.py index 20c14923..5b963dc4 100644 --- a/tests/test_bookmarks_updated.py +++ b/tests/test_bookmarks_updated.py @@ -101,17 +101,14 @@ def bookmarks_test(self, conn_id, testable_streams): if record.get('action') == 'upsert'] if stream not in ('transactions'): first_bookmark_value = first_sync_bookmark.get('bookmarks', {stream: None}).get(stream) + second_bookmark_value = second_sync_bookmark.get('bookmarks', {stream: None}).get(stream) else: first_bookmark_value = first_sync_bookmark.get('bookmarks').get('transaction_orders') - first_bookmark_value = list(first_bookmark_value.values())[0] - if stream not in ('transactions'): - second_bookmark_value = second_sync_bookmark.get('bookmarks', {stream: None}).get(stream) - else: second_bookmark_value = second_sync_bookmark.get('bookmarks').get('transaction_orders') + first_bookmark_value = list(first_bookmark_value.values())[0] second_bookmark_value = list(second_bookmark_value.values())[0] - replication_key = next(iter(expected_replication_keys[stream])) first_bookmark_value_utc = self.convert_state_to_utc(first_bookmark_value) second_bookmark_value_utc = self.convert_state_to_utc(second_bookmark_value) if stream not in ('transactions'): @@ -125,12 +122,13 @@ def bookmarks_test(self, conn_id, testable_streams): self.assertIsNotNone(second_bookmark_value) # verify the 2nd bookmark is equal to 1st sync bookmark - #NOT A BUG (IS the expected behaviour for shopify as they are using date windowing : TDL-17096 : 2nd bookmark value is getting assigned from the execution time rather than the actual bookmark time. This is an invalid assertion for shopify + #NOT A BUG (IS the expected behavior for shopify as they are using date windowing : TDL-17096 : 2nd bookmark value is getting assigned from the execution time rather than the actual bookmark time. This is an invalid assertion for shopify #self.assertEqual(first_bookmark_value, second_bookmark_value) - # As the bookmark for transactions is solely dependent on the value of bookmark in 'transaction_orders' which stores the parent record's - # bookmark, hence we'd now get all the data for transactions stream without filtering on `created_at` + # The `transactions` stream is a child of th `orders` stream. Hence the bookmark for transactions is solely dependent on the value of bookmark in 'transaction_orders' which stores the parent record's bookmark. + # Hence it doesn't have its own replication key. if stream not in ('transactions'): + replication_key = next(iter(expected_replication_keys[stream])) for record in first_sync_messages: replication_key_value = record.get(replication_key) # verify 1st sync bookmark value is the max replication key value for a given stream diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 5a2377dd..a2c4ee42 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -98,8 +98,13 @@ def test_run(self): # verify that if there is a replication key we are doing INCREMENTAL otherwise FULL actual_replication_method = stream_properties[0].get( "metadata", {self.REPLICATION_METHOD: None}).get(self.REPLICATION_METHOD) + # `transactions` is child stream of `orders` stream which is incremental. + # We are writing a separate bookmark for the child stream in which we are storing + # the bookmark based on the parent's replication key. + # But, we are not using any fields from the child record for it. + # That's why the `transactions` stream does not have replication_key but still it is incremental. if stream_properties[0].get( - "metadata", {self.REPLICATION_KEYS: []}).get(self.REPLICATION_KEYS, []): + "metadata", {self.REPLICATION_KEYS: []}).get(self.REPLICATION_KEYS, []) or stream in ('transactions'): self.assertTrue(actual_replication_method == self.INCREMENTAL, msg="Expected INCREMENTAL replication " From c925f2f0d158267387c88ba5675d6484da94270f Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Tue, 22 Mar 2022 10:24:41 +0000 Subject: [PATCH 07/20] changed rep key from None to [] --- tap_shopify/streams/transactions.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tap_shopify/streams/transactions.py b/tap_shopify/streams/transactions.py index 89f9889d..31d40c13 100644 --- a/tap_shopify/streams/transactions.py +++ b/tap_shopify/streams/transactions.py @@ -54,7 +54,8 @@ def canonicalize(transaction_dict, field_name): class Transactions(Stream): name = 'transactions' - replication_key = None + # As it is a child of orders stream and it is incremental based on its parent. + replication_key = [] replication_object = shopify.Transaction # Added decorator over functions of shopify SDK replication_object.find = shopify_error_handling(replication_object.find) From 3c97292e94e3554c371636dbc810c9c01f6de07f Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Tue, 22 Mar 2022 12:57:44 +0000 Subject: [PATCH 08/20] removed the rep key for transactions in catalog --- tap_shopify/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tap_shopify/__init__.py b/tap_shopify/__init__.py index f82ded42..b37a3999 100644 --- a/tap_shopify/__init__.py +++ b/tap_shopify/__init__.py @@ -115,6 +115,8 @@ def discover(): 'replication_key': stream.replication_key, 'replication_method': stream.replication_method } + if stream.name == 'transactions': + catalog_entry.pop('replication_key') streams.append(catalog_entry) return {'streams': streams} From 8af075b1c84f60f13e9ff0bb01358e412f8687d9 Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Tue, 22 Mar 2022 13:07:53 +0000 Subject: [PATCH 09/20] changed back rep key to None --- tap_shopify/streams/transactions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_shopify/streams/transactions.py b/tap_shopify/streams/transactions.py index 31d40c13..0fd72a71 100644 --- a/tap_shopify/streams/transactions.py +++ b/tap_shopify/streams/transactions.py @@ -55,7 +55,7 @@ def canonicalize(transaction_dict, field_name): class Transactions(Stream): name = 'transactions' # As it is a child of orders stream and it is incremental based on its parent. - replication_key = [] + replication_key = None replication_object = shopify.Transaction # Added decorator over functions of shopify SDK replication_object.find = shopify_error_handling(replication_object.find) From 1814b1c2c97c22900ff25e66f5bfd886f7194832 Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Tue, 22 Mar 2022 13:36:17 +0000 Subject: [PATCH 10/20] test cci run --- tap_shopify/__init__.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tap_shopify/__init__.py b/tap_shopify/__init__.py index b37a3999..6707a470 100644 --- a/tap_shopify/__init__.py +++ b/tap_shopify/__init__.py @@ -112,11 +112,11 @@ def discover(): 'schema': catalog_schema, 'metadata': get_discovery_metadata(stream, schema), 'key_properties': stream.key_properties, - 'replication_key': stream.replication_key, - 'replication_method': stream.replication_method + # 'replication_key': stream.replication_key, + # 'replication_method': stream.replication_method } - if stream.name == 'transactions': - catalog_entry.pop('replication_key') + # if stream.name == 'transactions': + # catalog_entry.pop('replication_key') streams.append(catalog_entry) return {'streams': streams} From 0675c47e7b7d7a93c54791366ceb6818b716e247 Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Tue, 22 Mar 2022 13:44:43 +0000 Subject: [PATCH 11/20] removed the replication key and method which was written outside the metadata --- tap_shopify/__init__.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/tap_shopify/__init__.py b/tap_shopify/__init__.py index 6707a470..f0ce5eb0 100644 --- a/tap_shopify/__init__.py +++ b/tap_shopify/__init__.py @@ -111,12 +111,9 @@ def discover(): 'tap_stream_id': schema_name, 'schema': catalog_schema, 'metadata': get_discovery_metadata(stream, schema), - 'key_properties': stream.key_properties, - # 'replication_key': stream.replication_key, - # 'replication_method': stream.replication_method + 'key_properties': stream.key_properties } - # if stream.name == 'transactions': - # catalog_entry.pop('replication_key') + streams.append(catalog_entry) return {'streams': streams} From eafd58eb81a8b6dc355323fdcc08415e7fa8db25 Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Thu, 24 Mar 2022 13:53:53 +0000 Subject: [PATCH 12/20] resolved comments --- tap_shopify/__init__.py | 2 +- tests/base.py | 16 ++++++++-------- tests/test_bookmarks_updated.py | 2 +- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/tap_shopify/__init__.py b/tap_shopify/__init__.py index f0ce5eb0..0759f43a 100644 --- a/tap_shopify/__init__.py +++ b/tap_shopify/__init__.py @@ -142,7 +142,7 @@ def sync(): singer.write_schema(stream["tap_stream_id"], stream["schema"], stream["key_properties"], - bookmark_properties=stream.get("replication_key", None)) + bookmark_properties=stream["replication_key"]) Context.counts[stream["tap_stream_id"]] = 0 # If there is a currently syncing stream bookmark, shuffle the diff --git a/tests/base.py b/tests/base.py index e5f5d1da..05364b00 100644 --- a/tests/base.py +++ b/tests/base.py @@ -245,10 +245,10 @@ def max_bookmarks_by_stream(self, sync_records): max_bookmarks = {} for stream, batch in sync_records.items(): # `transactions` is child stream of `orders` stream which is incremental. - # We are writing a separate bookmark for the child stream in which we are storing - # the bookmark based on the parent's replication key. - # But, we are not using any fields from the child record for it. - # That's why the `transactions` stream does not have replication_key but still it is incremental. + # We are writing a separate bookmark for the child stream in which we are storing + # the bookmark based on the parent's replication key. + # But, we are not using any fields from the child record for it. + # That's why the `transactions` stream does not have replication_key but still it is incremental. if stream not in ('transactions'): upsert_messages = [m for m in batch.get('messages') if m['action'] == 'upsert'] stream_bookmark_key = self.expected_replication_keys().get(stream, set()) @@ -273,10 +273,10 @@ def min_bookmarks_by_stream(self, sync_records): min_bookmarks = {} for stream, batch in sync_records.items(): # `transactions` is child stream of `orders` stream which is incremental. - # We are writing a separate bookmark for the child stream in which we are storing - # the bookmark based on the parent's replication key. - # But, we are not using any fields from the child record for it. - # That's why the `transactions` stream does not have replication_key but still it is incremental. + # We are writing a separate bookmark for the child stream in which we are storing + # the bookmark based on the parent's replication key. + # But, we are not using any fields from the child record for it. + # That's why the `transactions` stream does not have replication_key but still it is incremental. if stream not in ('transactions'): upsert_messages = [m for m in batch.get('messages') if m['action'] == 'upsert'] stream_bookmark_key = self.expected_replication_keys().get(stream, set()) diff --git a/tests/test_bookmarks_updated.py b/tests/test_bookmarks_updated.py index 5b963dc4..cc57a4a7 100644 --- a/tests/test_bookmarks_updated.py +++ b/tests/test_bookmarks_updated.py @@ -125,7 +125,7 @@ def bookmarks_test(self, conn_id, testable_streams): #NOT A BUG (IS the expected behavior for shopify as they are using date windowing : TDL-17096 : 2nd bookmark value is getting assigned from the execution time rather than the actual bookmark time. This is an invalid assertion for shopify #self.assertEqual(first_bookmark_value, second_bookmark_value) - # The `transactions` stream is a child of th `orders` stream. Hence the bookmark for transactions is solely dependent on the value of bookmark in 'transaction_orders' which stores the parent record's bookmark. + # The `transactions` stream is a child of the `orders` stream. Hence the bookmark for transactions is solely dependent on the value of bookmark in 'transaction_orders' which stores the parent record's bookmark. # Hence it doesn't have its own replication key. if stream not in ('transactions'): replication_key = next(iter(expected_replication_keys[stream])) From 5546bfce96461b395a87bc1d755954ad50dd204a Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Thu, 24 Mar 2022 13:55:18 +0000 Subject: [PATCH 13/20] resolved comments --- tests/test_bookmarks.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_bookmarks.py b/tests/test_bookmarks.py index 14254086..047e2c78 100644 --- a/tests/test_bookmarks.py +++ b/tests/test_bookmarks.py @@ -49,10 +49,10 @@ def bookmarks_test(self, conn_id, testable_streams): # Select all streams and no fields within streams found_catalogs = menagerie.get_catalogs(conn_id) # `transactions` is child stream of `orders` stream which is incremental. - # We are writing a separate bookmark for the child stream in which we are storing - # the bookmark based on the parent's replication key. - # But, we are not using any fields from the child record for it. - # That's why the `transactions` stream does not have replication_key but still it is incremental. + # We are writing a separate bookmark for the child stream in which we are storing + # the bookmark based on the parent's replication key. + # But, we are not using any fields from the child record for it. + # That's why the `transactions` stream does not have replication_key but still it is incremental. incremental_streams = {key for key, value in self.expected_replication_method().items() if value == self.INCREMENTAL and key in testable_streams and key not in ('transactions')} From 43ee4b1921403dcd7cbc95c9baa922f718a5ef01 Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Mon, 28 Mar 2022 09:16:47 +0000 Subject: [PATCH 14/20] fixed cci failure --- tap_shopify/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_shopify/__init__.py b/tap_shopify/__init__.py index 0759f43a..f0ce5eb0 100644 --- a/tap_shopify/__init__.py +++ b/tap_shopify/__init__.py @@ -142,7 +142,7 @@ def sync(): singer.write_schema(stream["tap_stream_id"], stream["schema"], stream["key_properties"], - bookmark_properties=stream["replication_key"]) + bookmark_properties=stream.get("replication_key", None)) Context.counts[stream["tap_stream_id"]] = 0 # If there is a currently syncing stream bookmark, shuffle the From 910567f68f70851a4b1ee717246ca0bbd9d23b59 Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Wed, 20 Apr 2022 13:32:59 +0000 Subject: [PATCH 15/20] resolved comments --- tests/base.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/base.py b/tests/base.py index 9a85b683..188645ba 100644 --- a/tests/base.py +++ b/tests/base.py @@ -28,6 +28,8 @@ class BaseTapTest(unittest.TestCase): START_DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ" BOOKMARK_COMPARISON_FORMAT = "%Y-%m-%dT00:00:00+00:00" DEFAULT_RESULTS_PER_PAGE = 175 + # skipped this stream due to change in the bookmarking logic of the stream. + SKIPPED_STREAMS = ('transactions') @staticmethod def tap_name(): @@ -254,7 +256,7 @@ def max_bookmarks_by_stream(self, sync_records): # the bookmark based on the parent's replication key. # But, we are not using any fields from the child record for it. # That's why the `transactions` stream does not have replication_key but still it is incremental. - if stream not in ('transactions'): + if stream not in self.SKIPPED_STREAMS: upsert_messages = [m for m in batch.get('messages') if m['action'] == 'upsert'] stream_bookmark_key = self.expected_replication_keys().get(stream, set()) assert len(stream_bookmark_key) == 1 # There shouldn't be a compound replication key @@ -282,7 +284,7 @@ def min_bookmarks_by_stream(self, sync_records): # the bookmark based on the parent's replication key. # But, we are not using any fields from the child record for it. # That's why the `transactions` stream does not have replication_key but still it is incremental. - if stream not in ('transactions'): + if stream not in self.SKIPPED_STREAMS: upsert_messages = [m for m in batch.get('messages') if m['action'] == 'upsert'] stream_bookmark_key = self.expected_replication_keys().get(stream, set()) assert len(stream_bookmark_key) == 1 # There shouldn't be a compound replication key From c989cfb7b3d764b1dccad9f7c061ed653d7f0dd2 Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Thu, 21 Apr 2022 10:15:17 +0000 Subject: [PATCH 16/20] used SKIPPED_STREAMS for transactions in tap-tester --- tests/test_bookmarks.py | 2 +- tests/test_bookmarks_updated.py | 6 +++--- tests/test_discovery.py | 2 +- tests/test_start_date.py | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/test_bookmarks.py b/tests/test_bookmarks.py index 047e2c78..bab9158b 100644 --- a/tests/test_bookmarks.py +++ b/tests/test_bookmarks.py @@ -54,7 +54,7 @@ def bookmarks_test(self, conn_id, testable_streams): # But, we are not using any fields from the child record for it. # That's why the `transactions` stream does not have replication_key but still it is incremental. incremental_streams = {key for key, value in self.expected_replication_method().items() - if value == self.INCREMENTAL and key in testable_streams and key not in ('transactions')} + if value == self.INCREMENTAL and key in testable_streams and key not in self.SKIPPED_STREAMS} # Our test data sets for Shopify do not have any abandoned_checkouts our_catalogs = [catalog for catalog in found_catalogs if diff --git a/tests/test_bookmarks_updated.py b/tests/test_bookmarks_updated.py index 0634c5df..78c46ac9 100644 --- a/tests/test_bookmarks_updated.py +++ b/tests/test_bookmarks_updated.py @@ -115,7 +115,7 @@ def bookmarks_test(self, conn_id, testable_streams): if record.get('action') == 'upsert'] second_sync_messages = [record.get('data') for record in second_sync_records.get(stream, {}).get('messages', []) if record.get('action') == 'upsert'] - if stream not in ('transactions'): + if stream not in self.SKIPPED_STREAMS: first_bookmark_value = first_sync_bookmark.get('bookmarks', {stream: None}).get(stream) second_bookmark_value = second_sync_bookmark.get('bookmarks', {stream: None}).get(stream) else: @@ -127,7 +127,7 @@ def bookmarks_test(self, conn_id, testable_streams): first_bookmark_value_utc = self.convert_state_to_utc(first_bookmark_value) second_bookmark_value_utc = self.convert_state_to_utc(second_bookmark_value) - if stream not in ('transactions'): + if stream not in self.SKIPPED_STREAMS: simulated_bookmark = new_state['bookmarks'][stream] else: simulated_bookmark = new_state['bookmarks']['transaction_orders'] @@ -145,7 +145,7 @@ def bookmarks_test(self, conn_id, testable_streams): # The `transactions` stream is a child of the `orders` stream. Hence the bookmark for transactions is solely dependent on the value of bookmark in 'transaction_orders' which stores the parent record's bookmark. # Hence it doesn't have its own replication key. - if stream not in ('transactions'): + if stream not in self.SKIPPED_STREAMS: replication_key = next(iter(expected_replication_keys[stream])) for record in first_sync_messages: replication_key_value = record.get(replication_key) diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 288d850b..abd9cc67 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -113,7 +113,7 @@ def test_run(self): # But, we are not using any fields from the child record for it. # That's why the `transactions` stream does not have replication_key but still it is incremental. if stream_properties[0].get( - "metadata", {self.REPLICATION_KEYS: []}).get(self.REPLICATION_KEYS, []) or stream in ('transactions'): + "metadata", {self.REPLICATION_KEYS: []}).get(self.REPLICATION_KEYS, []) or stream in self.SKIPPED_STREAMS: self.assertTrue(actual_replication_method == self.INCREMENTAL, msg="Expected INCREMENTAL replication " diff --git a/tests/test_start_date.py b/tests/test_start_date.py index dc4caeb7..1249b832 100644 --- a/tests/test_start_date.py +++ b/tests/test_start_date.py @@ -151,7 +151,7 @@ def test_run(self): for start_date, target_mark in zip((first_sync_start_date, second_sync_start_date), (first_sync_target_mark, second_sync_target_mark)): target_value = next(iter(target_mark.values())) # there should be only one - if target_value and stream not in ('transactions'): + if target_value and stream not in self.SKIPPED_STREAMS: # it's okay if there isn't target data for a stream try: From 792562ed02c69ec926bdc120a6739b764d73d621 Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Thu, 21 Apr 2022 15:46:32 +0000 Subject: [PATCH 17/20] fixed indentation --- tests/test_start_date.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_start_date.py b/tests/test_start_date.py index 1249b832..8c387094 100644 --- a/tests/test_start_date.py +++ b/tests/test_start_date.py @@ -151,7 +151,7 @@ def test_run(self): for start_date, target_mark in zip((first_sync_start_date, second_sync_start_date), (first_sync_target_mark, second_sync_target_mark)): target_value = next(iter(target_mark.values())) # there should be only one - if target_value and stream not in self.SKIPPED_STREAMS: + if target_value and stream not in self.SKIPPED_STREAMS: # it's okay if there isn't target data for a stream try: From 3f829036952fb2d194aec1823abcbd9d61fd5c47 Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Fri, 22 Apr 2022 12:32:03 +0000 Subject: [PATCH 18/20] added an assertion for transactions --- tests/test_bookmarks_updated.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_bookmarks_updated.py b/tests/test_bookmarks_updated.py index 78c46ac9..e0d10153 100644 --- a/tests/test_bookmarks_updated.py +++ b/tests/test_bookmarks_updated.py @@ -168,5 +168,5 @@ def bookmarks_test(self, conn_id, testable_streams): msg="Second sync does not have less records, bookmark usage not verified") # verify that we get atleast 1 record in the second sync - if stream not in ('collects', 'transactions'): + if stream not in ('collects'): self.assertGreater(second_sync_count, 0, msg="Second sync did not yield any records") From 037ee46254ade476490be4cff71589e0d8bc67d7 Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Fri, 29 Apr 2022 12:32:20 +0000 Subject: [PATCH 19/20] included the collects stream for tests --- tests/test_bookmarks_updated.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/test_bookmarks_updated.py b/tests/test_bookmarks_updated.py index e0d10153..a86f21af 100644 --- a/tests/test_bookmarks_updated.py +++ b/tests/test_bookmarks_updated.py @@ -163,10 +163,9 @@ def bookmarks_test(self, conn_id, testable_streams): # collects has all the records with the same value of replication key, so we are removing from this assertion # As the bookmark for `transactions` is solely dependent on the value of bookmark in 'transaction_orders' which stores the parent record's # bookmark, hence we'd now get all the data for transactions stream without filtering on `created_at` - if stream not in ('collects', 'transactions'): + if stream not in self.SKIPPED_STREAMS: self.assertLess(second_sync_count, first_sync_count, msg="Second sync does not have less records, bookmark usage not verified") # verify that we get atleast 1 record in the second sync - if stream not in ('collects'): - self.assertGreater(second_sync_count, 0, msg="Second sync did not yield any records") + self.assertGreater(second_sync_count, 0, msg="Second sync did not yield any records") From 515640b5ed4363fc70e8836e73c2636b5aa263ba Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Tue, 3 May 2022 06:50:57 +0000 Subject: [PATCH 20/20] updated simulated state for collects stream --- tests/test_bookmarks_updated.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_bookmarks_updated.py b/tests/test_bookmarks_updated.py index a86f21af..7780e01c 100644 --- a/tests/test_bookmarks_updated.py +++ b/tests/test_bookmarks_updated.py @@ -88,7 +88,7 @@ def bookmarks_test(self, conn_id, testable_streams): #simulated_states = self.calculated_states_by_stream(first_sync_bookmark) # We are hardcoding the updated state to ensure that we get atleast 1 record in second sync. These values have been provided after reviewing the max bookmark value for each of the streams - simulated_states = {'products': {'updated_at': '2021-12-20T05:10:05.000000Z'}, 'collects': {'updated_at': '2021-09-01T09:08:28.000000Z'}, 'abandoned_checkouts': {'updated_at': '2022-02-02T16:00:00.000000Z'}, 'inventory_levels': {'updated_at': '2021-12-20T05:09:34.000000Z'}, 'locations': {'updated_at': '2021-07-20T09:00:22.000000Z'}, 'events': {'created_at': '2021-12-20T05:09:01.000000Z'}, 'inventory_items': {'updated_at': '2021-09-15T19:44:11.000000Z'}, 'transaction_orders': {'updated_at': '2021-12-12T00:08:33.000000Z'}, 'metafields': {'updated_at': '2021-09-07T21:18:05.000000Z'}, 'order_refunds': {'created_at': '2021-05-01T17:41:18.000000Z'}, 'customers': {'updated_at': '2021-12-20T05:08:17.000000Z'}, 'orders': {'updated_at': '2021-12-20T05:09:01.000000Z'}, 'custom_collections': {'updated_at': '2021-12-20T17:41:18.000000Z'}} + simulated_states = {'products': {'updated_at': '2021-12-20T05:10:05.000000Z'}, 'collects': {'updated_at': '2022-01-15T00:53:24.000000Z'}, 'abandoned_checkouts': {'updated_at': '2022-02-02T16:00:00.000000Z'}, 'inventory_levels': {'updated_at': '2021-12-20T05:09:34.000000Z'}, 'locations': {'updated_at': '2021-07-20T09:00:22.000000Z'}, 'events': {'created_at': '2021-12-20T05:09:01.000000Z'}, 'inventory_items': {'updated_at': '2021-09-15T19:44:11.000000Z'}, 'transaction_orders': {'updated_at': '2021-12-12T00:08:33.000000Z'}, 'metafields': {'updated_at': '2021-09-07T21:18:05.000000Z'}, 'order_refunds': {'created_at': '2021-05-01T17:41:18.000000Z'}, 'customers': {'updated_at': '2021-12-20T05:08:17.000000Z'}, 'orders': {'updated_at': '2021-12-20T05:09:01.000000Z'}, 'custom_collections': {'updated_at': '2021-12-20T17:41:18.000000Z'}} for stream, updated_state in simulated_states.items(): new_state['bookmarks'][stream] = updated_state