Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tdl 15459 update bookmarking for transactions #141

Open
wants to merge 21 commits into
base: crest-work
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions tap_shopify/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,9 @@ def discover():
'tap_stream_id': schema_name,
'schema': catalog_schema,
'metadata': get_discovery_metadata(stream, schema),
'key_properties': stream.key_properties,
'replication_key': stream.replication_key,
'replication_method': stream.replication_method
'key_properties': stream.key_properties
}

streams.append(catalog_entry)

return {'streams': streams}
Expand Down Expand Up @@ -143,7 +142,7 @@ def sync():
singer.write_schema(stream["tap_stream_id"],
stream["schema"],
stream["key_properties"],
bookmark_properties=stream["replication_key"])
bookmark_properties=stream.get("replication_key", None))
namrata270998 marked this conversation as resolved.
Show resolved Hide resolved
Context.counts[stream["tap_stream_id"]] = 0

# If there is a currently syncing stream bookmark, shuffle the
Expand Down
19 changes: 5 additions & 14 deletions tap_shopify/streams/transactions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import shopify
import singer
from singer.utils import strftime, strptime_to_utc
from tap_shopify.context import Context
from tap_shopify.streams.base import (Stream,
shopify_error_handling)
Expand Down Expand Up @@ -55,7 +54,8 @@ def canonicalize(transaction_dict, field_name):

class Transactions(Stream):
name = 'transactions'
replication_key = 'created_at'
# As it is a child of orders stream and it is incremental based on its parent.
replication_key = None
replication_object = shopify.Transaction
# Added decorator over functions of shopify SDK
replication_object.find = shopify_error_handling(replication_object.find)
Expand Down Expand Up @@ -107,19 +107,10 @@ def get_objects(self):
yield transaction

def sync(self):
bookmark = self.get_bookmark()
max_bookmark = bookmark
for transaction in self.get_objects():
transaction_dict = transaction.to_dict()
replication_value = strptime_to_utc(transaction_dict[self.replication_key])
namrata270998 marked this conversation as resolved.
Show resolved Hide resolved
if replication_value >= bookmark:
for field_name in ['token', 'version', 'ack', 'timestamp', 'build']:
canonicalize(transaction_dict, field_name)
yield transaction_dict

if replication_value > max_bookmark:
max_bookmark = replication_value

self.update_bookmark(strftime(max_bookmark))
for field_name in ['token', 'version', 'ack', 'timestamp', 'build']:
canonicalize(transaction_dict, field_name)
yield transaction_dict

Context.stream_objects['transactions'] = Transactions
86 changes: 51 additions & 35 deletions tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class BaseTapTest(unittest.TestCase):
START_DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
BOOKMARK_COMPARISON_FORMAT = "%Y-%m-%dT00:00:00+00:00"
DEFAULT_RESULTS_PER_PAGE = 175
# skipped this stream due to change in the bookmarking logic of the stream.
SKIPPED_STREAMS = ('transactions')

@staticmethod
def tap_name():
Expand Down Expand Up @@ -107,7 +109,11 @@ def expected_metadata(self):
self.API_LIMIT: 250},
"metafields": meta,
"transactions": {
self.REPLICATION_KEYS: {"created_at"},
# `transactions` is child stream of `orders` stream which is incremental.
# We are writing a separate bookmark for the child stream in which we are storing
# the bookmark based on the parent's replication key.
# But, we are not using any fields from the child record for it.
# That's why the `transactions` stream does not have replication_key but still it is incremental.
self.PRIMARY_KEYS: {"id"},
self.FOREIGN_KEYS: {"order_id"},
self.REPLICATION_METHOD: self.INCREMENTAL,
Expand Down Expand Up @@ -245,46 +251,56 @@ def max_bookmarks_by_stream(self, sync_records):
"""
max_bookmarks = {}
for stream, batch in sync_records.items():

upsert_messages = [m for m in batch.get('messages') if m['action'] == 'upsert']
stream_bookmark_key = self.expected_replication_keys().get(stream, set())
assert len(stream_bookmark_key) == 1 # There shouldn't be a compound replication key
stream_bookmark_key = stream_bookmark_key.pop()

bk_values = [message["data"].get(stream_bookmark_key) for message in upsert_messages]
max_bookmarks[stream] = {stream_bookmark_key: None}
for bk_value in bk_values:
if bk_value is None:
continue

if max_bookmarks[stream][stream_bookmark_key] is None:
max_bookmarks[stream][stream_bookmark_key] = bk_value

if bk_value > max_bookmarks[stream][stream_bookmark_key]:
max_bookmarks[stream][stream_bookmark_key] = bk_value
# `transactions` is child stream of `orders` stream which is incremental.
# We are writing a separate bookmark for the child stream in which we are storing
# the bookmark based on the parent's replication key.
# But, we are not using any fields from the child record for it.
# That's why the `transactions` stream does not have replication_key but still it is incremental.
if stream not in self.SKIPPED_STREAMS:
upsert_messages = [m for m in batch.get('messages') if m['action'] == 'upsert']
stream_bookmark_key = self.expected_replication_keys().get(stream, set())
assert len(stream_bookmark_key) == 1 # There shouldn't be a compound replication key
stream_bookmark_key = stream_bookmark_key.pop()

bk_values = [message["data"].get(stream_bookmark_key) for message in upsert_messages]
max_bookmarks[stream] = {stream_bookmark_key: None}
for bk_value in bk_values:
if bk_value is None:
continue

if max_bookmarks[stream][stream_bookmark_key] is None:
max_bookmarks[stream][stream_bookmark_key] = bk_value

if bk_value > max_bookmarks[stream][stream_bookmark_key]:
max_bookmarks[stream][stream_bookmark_key] = bk_value
return max_bookmarks

def min_bookmarks_by_stream(self, sync_records):
"""Return the minimum value for the replication key for each stream"""
min_bookmarks = {}
for stream, batch in sync_records.items():

upsert_messages = [m for m in batch.get('messages') if m['action'] == 'upsert']
stream_bookmark_key = self.expected_replication_keys().get(stream, set())
assert len(stream_bookmark_key) == 1 # There shouldn't be a compound replication key
(stream_bookmark_key, ) = stream_bookmark_key

bk_values = [message["data"].get(stream_bookmark_key) for message in upsert_messages]
min_bookmarks[stream] = {stream_bookmark_key: None}
for bk_value in bk_values:
if bk_value is None:
continue

if min_bookmarks[stream][stream_bookmark_key] is None:
min_bookmarks[stream][stream_bookmark_key] = bk_value

if bk_value < min_bookmarks[stream][stream_bookmark_key]:
min_bookmarks[stream][stream_bookmark_key] = bk_value
# `transactions` is child stream of `orders` stream which is incremental.
# We are writing a separate bookmark for the child stream in which we are storing
# the bookmark based on the parent's replication key.
# But, we are not using any fields from the child record for it.
# That's why the `transactions` stream does not have replication_key but still it is incremental.
if stream not in self.SKIPPED_STREAMS:
upsert_messages = [m for m in batch.get('messages') if m['action'] == 'upsert']
stream_bookmark_key = self.expected_replication_keys().get(stream, set())
assert len(stream_bookmark_key) == 1 # There shouldn't be a compound replication key
(stream_bookmark_key, ) = stream_bookmark_key

bk_values = [message["data"].get(stream_bookmark_key) for message in upsert_messages]
min_bookmarks[stream] = {stream_bookmark_key: None}
for bk_value in bk_values:
if bk_value is None:
continue

if min_bookmarks[stream][stream_bookmark_key] is None:
min_bookmarks[stream][stream_bookmark_key] = bk_value

if bk_value < min_bookmarks[stream][stream_bookmark_key]:
min_bookmarks[stream][stream_bookmark_key] = bk_value
return min_bookmarks

@staticmethod
Expand Down
8 changes: 7 additions & 1 deletion tests/test_bookmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,13 @@ def bookmarks_test(self, conn_id, testable_streams):

# Select all streams and no fields within streams
found_catalogs = menagerie.get_catalogs(conn_id)
# `transactions` is child stream of `orders` stream which is incremental.
# We are writing a separate bookmark for the child stream in which we are storing
# the bookmark based on the parent's replication key.
# But, we are not using any fields from the child record for it.
# That's why the `transactions` stream does not have replication_key but still it is incremental.
incremental_streams = {key for key, value in self.expected_replication_method().items()
if value == self.INCREMENTAL and key in testable_streams}
if value == self.INCREMENTAL and key in testable_streams and key not in self.SKIPPED_STREAMS}

# Our test data sets for Shopify do not have any abandoned_checkouts
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The abandoned_checkouts stream should remain under test. It is incorrectly removed on crest master. See base.py init

Copy link
Contributor Author

@namrata270998 namrata270998 Apr 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The abandoned checkouts are added back in another PR in this commit which is not merged yet.

our_catalogs = [catalog for catalog in found_catalogs if
Expand Down Expand Up @@ -101,6 +106,7 @@ def bookmarks_test(self, conn_id, testable_streams):
stream_bookmark_key) == 1 # There shouldn't be a compound replication key
stream_bookmark_key = stream_bookmark_key.pop()


state_value = first_sync_state.get("bookmarks", {}).get(
stream, {None: None}).get(stream_bookmark_key)
target_value = first_max_bookmarks.get(
Expand Down
54 changes: 33 additions & 21 deletions tests/test_bookmarks_updated.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def bookmarks_test(self, conn_id, testable_streams):
first_sync_records = runner.get_records_from_target_output()
# BUG:TDL-17087 : State has additional values which are not streams
# Need to remove additional values from bookmark value
extra_stuff = {'transaction_orders', 'metafield_products', 'refund_orders', 'product_variants'}
extra_stuff = {'metafield_products', 'refund_orders', 'product_variants'}
for keys in list(first_sync_bookmark['bookmarks'].keys()):
if keys in extra_stuff:
first_sync_bookmark['bookmarks'].pop(keys)
Expand All @@ -88,7 +88,7 @@ def bookmarks_test(self, conn_id, testable_streams):
#simulated_states = self.calculated_states_by_stream(first_sync_bookmark)

# We are hardcoding the updated state to ensure that we get atleast 1 record in second sync. These values have been provided after reviewing the max bookmark value for each of the streams
simulated_states = {'products': {'updated_at': '2021-12-20T05:10:05.000000Z'}, 'collects': {'updated_at': '2021-09-01T09:08:28.000000Z'}, 'abandoned_checkouts': {'updated_at': '2022-02-02T16:00:00.000000Z'}, 'inventory_levels': {'updated_at': '2021-12-20T05:09:34.000000Z'}, 'locations': {'updated_at': '2021-07-20T09:00:22.000000Z'}, 'events': {'created_at': '2021-12-20T05:09:01.000000Z'}, 'inventory_items': {'updated_at': '2021-09-15T19:44:11.000000Z'}, 'transactions': {'created_at': '2021-12-20T00:08:52-05:00'}, 'metafields': {'updated_at': '2021-09-07T21:18:05.000000Z'}, 'order_refunds': {'created_at': '2021-05-01T17:41:18.000000Z'}, 'customers': {'updated_at': '2021-12-20T05:08:17.000000Z'}, 'orders': {'updated_at': '2021-12-20T05:09:01.000000Z'}, 'custom_collections': {'updated_at': '2021-12-20T17:41:18.000000Z'}}
simulated_states = {'products': {'updated_at': '2021-12-20T05:10:05.000000Z'}, 'collects': {'updated_at': '2022-01-15T00:53:24.000000Z'}, 'abandoned_checkouts': {'updated_at': '2022-02-02T16:00:00.000000Z'}, 'inventory_levels': {'updated_at': '2021-12-20T05:09:34.000000Z'}, 'locations': {'updated_at': '2021-07-20T09:00:22.000000Z'}, 'events': {'created_at': '2021-12-20T05:09:01.000000Z'}, 'inventory_items': {'updated_at': '2021-09-15T19:44:11.000000Z'}, 'transaction_orders': {'updated_at': '2021-12-12T00:08:33.000000Z'}, 'metafields': {'updated_at': '2021-09-07T21:18:05.000000Z'}, 'order_refunds': {'created_at': '2021-05-01T17:41:18.000000Z'}, 'customers': {'updated_at': '2021-12-20T05:08:17.000000Z'}, 'orders': {'updated_at': '2021-12-20T05:09:01.000000Z'}, 'custom_collections': {'updated_at': '2021-12-20T17:41:18.000000Z'}}

for stream, updated_state in simulated_states.items():
new_state['bookmarks'][stream] = updated_state
Expand All @@ -115,15 +115,22 @@ def bookmarks_test(self, conn_id, testable_streams):
if record.get('action') == 'upsert']
second_sync_messages = [record.get('data') for record in second_sync_records.get(stream, {}).get('messages', [])
if record.get('action') == 'upsert']
first_bookmark_value = first_sync_bookmark.get('bookmarks', {stream: None}).get(stream)
if stream not in self.SKIPPED_STREAMS:
first_bookmark_value = first_sync_bookmark.get('bookmarks', {stream: None}).get(stream)
second_bookmark_value = second_sync_bookmark.get('bookmarks', {stream: None}).get(stream)
else:
first_bookmark_value = first_sync_bookmark.get('bookmarks').get('transaction_orders')

second_bookmark_value = second_sync_bookmark.get('bookmarks').get('transaction_orders')
first_bookmark_value = list(first_bookmark_value.values())[0]
second_bookmark_value = second_sync_bookmark.get('bookmarks', {stream: None}).get(stream)
second_bookmark_value = list(second_bookmark_value.values())[0]

replication_key = next(iter(expected_replication_keys[stream]))
first_bookmark_value_utc = self.convert_state_to_utc(first_bookmark_value)
second_bookmark_value_utc = self.convert_state_to_utc(second_bookmark_value)
simulated_bookmark = new_state['bookmarks'][stream]
if stream not in self.SKIPPED_STREAMS:
simulated_bookmark = new_state['bookmarks'][stream]
else:
simulated_bookmark = new_state['bookmarks']['transaction_orders']
simulated_bookmark_value = list(simulated_bookmark.values())[0]

# verify the syncs sets a bookmark of the expected form
Expand All @@ -133,27 +140,32 @@ def bookmarks_test(self, conn_id, testable_streams):
self.assertTrue(self.is_expected_date_format(second_bookmark_value))

# verify the 2nd bookmark is equal to 1st sync bookmark
#NOT A BUG (IS the expected behaviour for shopify as they are using date windowing : TDL-17096 : 2nd bookmark value is getting assigned from the execution time rather than the actual bookmark time. This is an invalid assertion for shopify
#NOT A BUG (IS the expected behavior for shopify as they are using date windowing : TDL-17096 : 2nd bookmark value is getting assigned from the execution time rather than the actual bookmark time. This is an invalid assertion for shopify
#self.assertEqual(first_bookmark_value, second_bookmark_value)

for record in first_sync_messages:
replication_key_value = record.get(replication_key)
# verify 1st sync bookmark value is the max replication key value for a given stream
self.assertLessEqual(replication_key_value, first_bookmark_value_utc, msg="First sync bookmark was set incorrectly, a record with a greater replication key value was synced")

for record in second_sync_messages:
replication_key_value = record.get(replication_key)
# verify the 2nd sync replication key value is greater or equal to the 1st sync bookmarks
self.assertGreaterEqual(replication_key_value, simulated_bookmark_value, msg="Second sync records do not respect the previous bookmark")
# verify the 2nd sync bookmark value is the max replication key value for a given stream
self.assertLessEqual(replication_key_value, second_bookmark_value_utc, msg="Second sync bookmark was set incorrectly, a record with a greater replication key value was synced")
# The `transactions` stream is a child of the `orders` stream. Hence the bookmark for transactions is solely dependent on the value of bookmark in 'transaction_orders' which stores the parent record's bookmark.
# Hence it doesn't have its own replication key.
if stream not in self.SKIPPED_STREAMS:
replication_key = next(iter(expected_replication_keys[stream]))
for record in first_sync_messages:
replication_key_value = record.get(replication_key)
# verify 1st sync bookmark value is the max replication key value for a given stream
self.assertLessEqual(replication_key_value, first_bookmark_value_utc, msg="First sync bookmark was set incorrectly, a record with a greater replication key value was synced")

for record in second_sync_messages:
replication_key_value = record.get(replication_key)
# verify the 2nd sync replication key value is greater or equal to the 1st sync bookmarks
self.assertGreaterEqual(replication_key_value, simulated_bookmark_value, msg="Second sync records do not respect the previous bookmark")
# verify the 2nd sync bookmark value is the max replication key value for a given stream
self.assertLessEqual(replication_key_value, second_bookmark_value_utc, msg="Second sync bookmark was set incorrectly, a record with a greater replication key value was synced")

# verify that we get less data in the 2nd sync
# collects has all the records with the same value of replication key, so we are removing from this assertion
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the manipulated state should be altered so this does not happen.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, for the transactions stream, we had 2 bookmarks earlier i.e. transactions and transaction_orders(which stores the bookmark of the parent i.e. orders bookmark). However, as this card suggested removing the filtering of the transactions based on the transactions bookmark, we have now removed the transaction_orders completely. Hence this assertion of checking the replication key value against the bookmark value would now actually check the transaction_orders bookmark value against the replication value of the transactions. Thus, we have skipped this assertion for the stream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I was not clear, I meant for the collections stream not the transactions. Can more data be generated, and the state injected for that stream be moved so that not less records are coming through on the second sync?

Copy link
Contributor Author

@namrata270998 namrata270998 May 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have generated more data for the collections stream and updated the simulated state. Hence we don't have to skip it anymore!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

if stream not in ('collects'):
# As the bookmark for `transactions` is solely dependent on the value of bookmark in 'transaction_orders' which stores the parent record's
# bookmark, hence we'd now get all the data for transactions stream without filtering on `created_at`
if stream not in self.SKIPPED_STREAMS:
self.assertLess(second_sync_count, first_sync_count,
msg="Second sync does not have less records, bookmark usage not verified")

# verify that we get atleast 1 record in the second sync
if stream not in ('collects'):
self.assertGreater(second_sync_count, 0, msg="Second sync did not yield any records")
self.assertGreater(second_sync_count, 0, msg="Second sync did not yield any records")
Loading