From a4c579f2354fb189432a2415da0971c4584fc4a6 Mon Sep 17 00:00:00 2001 From: bhtowles Date: Mon, 16 Oct 2023 18:07:08 -0500 Subject: [PATCH] TDL-24162 Log based inclusivity updates (#90) * Uncomment all record count assertions, fix with pk count where needed, new method * First round review comments (set comprehension, get() fallback) and some line length cleanup * Delete commented out test_sync_full.py * Review comments 2, make pk count method generic and update to use tuples to fix dupes * Update log based int test to add new table pk to expected metadata * Review comments 3, fail test if upsert format or value is incorrect * Fix typo / bug for pk tuple iteration * update comment --- tests/base.py | 23 + .../test_log_based_interruped_replication.py | 90 +-- tests/test_sync_full.py | 722 ------------------ tests/test_sync_logical_datetime.py | 9 +- tests/test_sync_logical_float.py | 9 +- tests/test_sync_logical_multiple_dbs.py | 9 +- tests/test_sync_logical_names.py | 9 +- tests/test_sync_logical_others.py | 9 +- tests/test_sync_logical_pks.py | 209 +++-- 9 files changed, 214 insertions(+), 875 deletions(-) delete mode 100644 tests/test_sync_full.py diff --git a/tests/base.py b/tests/base.py index 181ff21..fdbdc17 100644 --- a/tests/base.py +++ b/tests/base.py @@ -210,6 +210,29 @@ def select_all_streams_and_fields(conn_id, catalogs, select_all_fields: bool = T connections.select_catalog_and_fields_via_metadata( conn_id, catalog, schema, additional_md, non_selected_properties) + def unique_pk_count_by_stream(self, recs_by_stream): + """ + Switch from upsert record count verification to unique pk count verification due to + tap-mssql inconsistency with log based inclusivity TDL-24162 (that will not be fixed) + """ + pk_count_by_stream = {} + for strm, recs in recs_by_stream.items(): + primary_keys = self.expected_primary_keys_by_stream_id()[strm] + + # use tuple generator to handle arbitrary number of pks during set comprehension + stream_pks = {tuple(m.get('data', {}).get(pk) for pk in primary_keys) + for m in recs['messages'] + if m['action'] == 'upsert'} + + # fail the test if any upserts fail to return 'data' or a pk value + for pk in stream_pks: + for i in range(len(pk)): + self.assertIsNotNone(pk[i]) + + pk_count_by_stream[strm] = len(stream_pks) + + return pk_count_by_stream + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.start_date = self.get_properties().get("start_date") diff --git a/tests/test_log_based_interruped_replication.py b/tests/test_log_based_interruped_replication.py index 78f61da..6a24cf1 100644 --- a/tests/test_log_based_interruped_replication.py +++ b/tests/test_log_based_interruped_replication.py @@ -216,11 +216,8 @@ def test_run(self): # verify records match on the first sync records_by_stream = runner.get_records_from_target_output() - record_count_by_stream = runner.examine_target_output_file( - self, conn_id, self.expected_sync_streams(), self.expected_primary_keys_by_sync_stream_id()) - - # BUG : TDL-19583 log_based_interruptible_dbo_int_and_bool_data is replicating the last row twice - #self.assertEqual(record_count_by_stream, self.expected_count()) + pk_count_by_stream = self.unique_pk_count_by_stream(records_by_stream) + self.assertEqual(pk_count_by_stream, self.expected_count()) table_version = dict() initial_log_version = dict() @@ -229,45 +226,34 @@ def test_run(self): stream_expected_data = self.expected_metadata()[stream] table_version[stream] = records_by_stream[stream]['table_version'] - # BUG: TDL-19583 - 3 activate_version messages - # verify on the first sync you get - # activate version message before and after all data for the full table - # and before the logical replication part - - # self.assertEqual( - # records_by_stream[stream]['messages'][0]['action'], - # 'activate_version') - # self.assertEqual( - # records_by_stream[stream]['messages'][-1]['action'], - # 'activate_version') - # self.assertEqual( - # records_by_stream[stream]['messages'][-2]['action'], - # 'activate_version') - - self.assertEqual( - len([m for m in records_by_stream[stream]['messages'][1:] if m["action"] == "activate_version"]), - 2, - msg="Expect 2 more activate version messages for end of full table and beginning of log based") + # gather all actions then verify 3 activate versions, 1 at start, 2 in the last 3 + actions = [rec['action'] for rec in records_by_stream[stream]['messages']] + self.assertEqual(actions[0], 'activate_version') + self.assertEqual(len([a for a in actions[-3:] if a == "activate_version"]), 2, + msg=("Expected 2 of the last 3 messages to be activate version messages. 1 for " + "end of full table and 1 for beginning of log based. Position can vary " + "due to TDL-24162") + ) # verify state and bookmarks initial_state = menagerie.get_state(conn_id) bookmark = initial_state['bookmarks'][stream] - self.assertIsNone(initial_state.get('currently_syncing'), msg="expected state's currently_syncing to be None") - self.assertIsNotNone( - bookmark.get('current_log_version'), - msg="expected bookmark to have current_log_version because we are using log replication") - self.assertTrue(bookmark['initial_full_table_complete'], msg="expected full table to be complete") + self.assertIsNone(initial_state.get('currently_syncing'), + msg="expected state's currently_syncing to be None") + self.assertIsNotNone(bookmark.get('current_log_version'), + msg="expected bookmark to have current_log_version due to log replication") + self.assertTrue(bookmark['initial_full_table_complete'], + msg="expected full table to be complete") inital_log_version = bookmark['current_log_version'] self.assertEqual(bookmark['version'], table_version[stream], msg="expected bookmark for stream to match version") expected_schemas = self.expected_metadata()[stream]['schema'] - self.assertEqual(records_by_stream[stream]['schema'], - expected_schemas, - msg="expected: {} != actual: {}".format(expected_schemas, - records_by_stream[stream]['schema'])) + self.assertEqual(records_by_stream[stream]['schema'], expected_schemas, + msg="expected: {} != actual: {}".format( + expected_schemas, records_by_stream[stream]['schema'])) initial_log_version[stream] = bookmark['current_log_version'] initial_log_version_value = set(initial_log_version.values()).pop() @@ -281,14 +267,18 @@ def test_run(self): # --> A table which is interrupted del interrupted_state['bookmarks']['log_based_interruptible_dbo_int_data']['version'] - interrupted_state['bookmarks']['log_based_interruptible_dbo_int_data']['initial_full_table_complete'] = False + interrupted_state['bookmarks']['log_based_interruptible_dbo_int_data'][ + 'initial_full_table_complete'] = False max_pk_values = {'max_pk_values': {'pk': 12}} last_pk_fetched = {'last_pk_fetched': {'pk': 10}} - interrupted_state['bookmarks']['log_based_interruptible_dbo_int_and_bool_data'].update(max_pk_values) - interrupted_state['bookmarks']['log_based_interruptible_dbo_int_and_bool_data'].update(last_pk_fetched) - interrupted_state['bookmarks']['log_based_interruptible_dbo_int_and_bool_data']['initial_full_table_complete'] = False + interrupted_state['bookmarks']['log_based_interruptible_dbo_int_and_bool_data'].update( + max_pk_values) + interrupted_state['bookmarks']['log_based_interruptible_dbo_int_and_bool_data'].update( + last_pk_fetched) + interrupted_state['bookmarks']['log_based_interruptible_dbo_int_and_bool_data'][ + 'initial_full_table_complete'] = False menagerie.set_state(conn_id, interrupted_state) @@ -310,20 +300,24 @@ def test_run(self): query_list.extend(insert(database_name, schema_name, table_name, int_after_values)) - mssql_cursor_context_manager(*query_list) + # add new table's pk to expected_metadata + self.EXPECTED_METADATA['log_based_interruptible_dbo_int_data_after'] = { + self.PRIMARY_KEYS: {'pk'}} + # invoke the sync job AGAIN following various manipulations to the data # add the newly created stream in the expectations expected_sync_streams = self.expected_sync_streams() expected_sync_streams.add('log_based_interruptible_dbo_int_data_after') expected_primary_keys_by_sync_stream_id = self.expected_primary_keys_by_sync_stream_id() - expected_primary_keys_by_sync_stream_id['log_based_interruptible_dbo_int_data_after'] = {'pk'} + expected_primary_keys_by_sync_stream_id[ + 'log_based_interruptible_dbo_int_data_after'] = {'pk'} expected_count = self.expected_count() expected_count['log_based_interruptible_dbo_int_data_after'] = 6 - expected_count['log_based_interruptible_dbo_int_and_bool_data'] = 3 - expected_count['log_based_interruptible_dbo_int_data'] = 0 + expected_count['log_based_interruptible_dbo_int_and_bool_data'] = 2 + expected_count['log_based_interruptible_dbo_int_data'] = 14 # run in check mode check_job_name = runner.run_check_mode(self, conn_id) @@ -345,15 +339,14 @@ def test_run(self): records_by_stream = runner.get_records_from_target_output() - record_count_by_stream = runner.examine_target_output_file( - self, conn_id, expected_sync_streams, expected_primary_keys_by_sync_stream_id) + pk_count_by_stream = self.unique_pk_count_by_stream(records_by_stream) second_state = menagerie.get_state(conn_id) bookmark_2 = second_state['bookmarks'] - # validate the record count for all the streams after interruption recovery - # BUG: TDL-19583 Duplicate record within a sync with 3 activate_version messages - #self.assertEqual(record_count_by_stream, expected_count) + # validate the record count for all the streams after interruption recovery, use unique + # pks instead of all upserts to de-dupe and avoid inconsistency from TDL-24162 + self.assertEqual(pk_count_by_stream, expected_count) second_log_version = dict() for stream in expected_sync_streams: @@ -409,14 +402,13 @@ def test_run(self): records_by_stream = runner.get_records_from_target_output() - record_count_by_stream = runner.examine_target_output_file( - self, conn_id, expected_sync_streams, expected_primary_keys_by_sync_stream_id) + pk_count_by_stream = self.unique_pk_count_by_stream(records_by_stream) expected_count['log_based_interruptible_dbo_int_data_after'] = 3 expected_count['log_based_interruptible_dbo_int_and_bool_data'] = 0 expected_count['log_based_interruptible_dbo_int_data'] = 0 - self.assertEqual(record_count_by_stream, expected_count) + self.assertEqual(pk_count_by_stream, expected_count) final_state = menagerie.get_state(conn_id) bookmark_3 = final_state['bookmarks'] diff --git a/tests/test_sync_full.py b/tests/test_sync_full.py deleted file mode 100644 index 326c8c2..0000000 --- a/tests/test_sync_full.py +++ /dev/null @@ -1,722 +0,0 @@ -# """ -# Test tap discovery -# """ -# import random -# from datetime import date, datetime, timezone, time -# from decimal import getcontext, Decimal -# from json import dumps -# from random import randint, sample -# -# import dateutil.tz -# import sys -# -# from numpy import float32 -# -# from tap_tester import menagerie, runner, connections -# -# from tap_tester.suites.mssql.database import drop_all_user_databases, create_database, \ -# create_table, mssql_cursor_context_manager, insert -# -# from base import BaseTapTest -# -# -# class SyncTesFull(BaseTapTest): -# """ Test the tap discovery """ -# -# EXPECTED_METADATA = dict() -# -# def name(self): -# return "{}_full_sync_test".format(super().name()) -# -# @classmethod -# def discovery_expected_metadata(cls): -# """The expected streams and metadata about the streams""" -# -# return cls.EXPECTED_METADATA -# -# @classmethod -# def setUpClass(cls) -> None: -# """Create the expected schema in the test database""" -# drop_all_user_databases() -# database_name = "data_types_database" -# schema_name = "dbo" -# -# query_list = list(create_database(database_name, "Latin1_General_CS_AS")) -# # query_list.extend(create_schema(database_name, schema_name)) -# -# table_name = "integers" -# column_name = ["pk", "MyBigIntColumn", "MyIntColumn", "MySmallIntColumn"] -# column_type = ["int", "bigint", "int", "smallint"] -# # TODO - BUG https://stitchdata.atlassian.net/browse/SRCE-1072 -# primary_key = {"pk"} -# column_def = [" ".join(x) for x in list(zip(column_name, column_type))] -# query_list.extend(create_table(database_name, schema_name, table_name, column_def, -# primary_key=primary_key)) -# -# # create data -# num_bytes = [8, 4, 2] # bytes in a bigint, int, smallint and tinyint -# values = [ -# (0, ) + tuple(-(2 ** (8 * size - 1)) for size in num_bytes), # min -# (1, ) + tuple(0 for _ in num_bytes), # 0 -# (2, ) + tuple(2 ** (8 * size - 1) - 1 for size in num_bytes), # max -# (3, None, None, None) # null -# ] -# values.extend([(pk, ) + tuple(randint(-(2 ** (8 * size - 1)), 2 ** (8 * size - 1) - 1) -# for size in num_bytes) -# for pk in range(4, 14)]) # random sample values -# query_list.extend(insert(database_name, schema_name, table_name, values)) -# cls.add_expected_metadata(cls, database_name, schema_name, table_name, column_name, -# column_type, primary_key, values) -# -# table_name = "tiny_integers_and_bools" -# column_name = ["pk", "MyTinyIntColumn", "my_boolean"] -# column_type = ["int", "tinyint", "bit"] -# primary_key = {"pk"} -# column_def = [" ".join(x) for x in list(zip(column_name, column_type))] -# query_list.extend(create_table(database_name, schema_name, table_name, column_def, -# primary_key=primary_key)) -# -# # create data -# values = [ -# (0, 0, False), # min -# (1, 255, True), # max -# (2, None, None) # null -# ] -# values.extend([(pk, randint(0, 255), bool(randint(0, 1))) for pk in range(3, 13)]) # random sample values -# query_list.extend(insert(database_name, schema_name, table_name, values)) -# cls.add_expected_metadata(cls, database_name, schema_name, table_name, column_name, -# column_type, primary_key, values) -# -# # TODO - BUG https://stitchdata.atlassian.net/browse/SRCE-1075 -# table_name = "numeric_precisions" -# precision_scale = [(precision, randint(0, precision)) for precision in (9, 15)] # , 19, 28, 38)] -# column_type = [ -# "numeric({},{})".format(precision, scale) -# for precision, scale in precision_scale -# ] -# column_name = ["pk"] + [x.replace("(", "_").replace(",", "_").replace(")", "") for x in column_type] -# column_type = ["int"] + column_type -# primary_key = {"pk"} -# column_def = [" ".join(x) for x in list(zip(column_name, column_type))] -# query_list.extend(create_table(database_name, schema_name, table_name, column_def, -# primary_key=primary_key)) -# -# # generate values for one precision at a time and then zip them together -# columns = [] -# column = 0 -# for precision, scale in precision_scale: -# getcontext().prec = precision -# columns.append([ -# Decimal(-10 ** precision + 1) / Decimal(10 ** scale), # min -# 0, -# None, -# Decimal(10 ** precision - 1) / Decimal(10 ** scale) # max -# ]) -# columns[column].extend([Decimal(random.randint(-10 ** precision + 1, 10 ** precision - 1)) / -# Decimal(10 ** scale) -# for _ in range(10)]) -# column = column + 1 -# -# values = list(zip(range(14), *columns)) -# query_list.extend(insert(database_name, schema_name, table_name, values)) -# cls.add_expected_metadata(cls, database_name, schema_name, table_name, column_name, -# column_type, primary_key, values) -# -# table_name = "decimal_precisions" -# precision_scale = [(precision, randint(0, precision)) for precision in (9, 15)] # 19, 28, 38)] -# column_type = [ -# "decimal({},{})".format(precision, scale) -# for precision, scale in precision_scale -# ] -# column_name = ["pk"] + [x.replace("(", "_").replace(",", "_").replace(")", "") for x in column_type] -# column_type = ["int"] + column_type -# primary_key = {"pk"} -# column_def = [" ".join(x) for x in list(zip(column_name, column_type))] -# query_list.extend(create_table(database_name, schema_name, table_name, column_def, -# primary_key=primary_key)) -# -# # generate values for one precision at a time and then zip them together -# columns = [] -# column = 0 -# for precision, scale in precision_scale: -# getcontext().prec = precision -# columns.append([ -# Decimal(-10 ** precision + 1) / Decimal(10 ** scale), # min -# 0, -# None, -# Decimal(10 ** precision - 1) / Decimal(10 ** scale) # max -# ]) -# columns[column].extend([Decimal(random.randint(-10 ** precision + 1, 10 ** precision - 1)) / -# Decimal(10 ** scale) -# for _ in range(10)]) -# column = column + 1 -# -# values = list(zip(range(14), *columns)) -# query_list.extend(insert(database_name, schema_name, table_name, values)) -# cls.add_expected_metadata(cls, database_name, schema_name, table_name, column_name, -# column_type, primary_key, values) -# -# # TODO - BUG https://stitchdata.atlassian.net/browse/SRCE-1078 -# table_name = "float_precisions" -# column_name = ["pk", "float_24", "float_53", "real_24_bits"] -# column_type = ["int", "float(24)", "float(53)", "real"] -# primary_key = {"pk"} -# column_def = [" ".join(x) for x in list(zip(column_name, column_type))] -# query_list.extend(create_table(database_name, schema_name, table_name, column_def, -# primary_key=primary_key)) -# -# # create data -# values = [ -# (0, -# float(str(float32(1.175494351e-38))), -# 2.2250738585072014e-308, -# float(str(float32(1.175494351e-38)))), # min positive -# (1, float(str(float32(3.402823466e+38))), 1.7976931348623158e+308, float(str(float32(3.402823466e+38)))), # max positive -# (2, float(str(float32(-1.175494351e-38))), -2.2250738585072014e-308, float(str(float32(-1.175494351e-38)))), # smallest negative -# (3, float(str(float32(-3.402823466e+38))), -1.7976931348623158e+308, float(str(float32(-3.402823466e+38)))), # largest negative -# (4, 0.0, 0.0, 0.0), # 0 -# (5, None, None, None), # Null -# # (float("Inf"), -float("Inf"), float('NaN')) -# ] -# -# # # random small positive values -# # values.extend([( -# # pk, -# # random.uniform(1.175494351e-38, 10 ** random.randint(-37, 0)), -# # random.uniform(2.2250738585072014e-308, 10 ** random.randint(-307, 0)), -# # random.uniform(1.175494351e-38, 10 ** random.randint(-37, 0)) -# # ) for pk in range(6, 11) -# # ]) -# # -# # # random large positive values -# # values.extend([( -# # pk, -# # random.uniform(1, 10 ** random.randint(0, 38)), -# # random.uniform(1, 10 ** random.randint(0, 308)), -# # random.uniform(1, 10 ** random.randint(0, 38)) -# # ) for pk in range(11, 16) -# # ]) -# # -# # # random small negative values -# # values.extend([( -# # pk, -# # random.uniform(-1.175494351e-38, -1 * 10 ** random.randint(-37, 0)), -# # random.uniform(-2.2250738585072014e-308, -1 * 10 ** random.randint(-307, 0)), -# # random.uniform(-1.175494351e-38, -1 * 10 ** random.randint(-37, 0)) -# # ) for pk in range(16, 21) -# # ]) -# # -# # # random large negative values -# # values.extend([( -# # pk, -# # random.uniform(-1, -1 *10 ** random.randint(0, 38)), -# # random.uniform(-1, -1 *10 ** random.randint(0, 308)), -# # random.uniform(-1, -1 *10 ** random.randint(0, 38)) -# # ) for pk in range(21, 26) -# # ]) -# query_list.extend(insert(database_name, schema_name, table_name, values)) -# cls.add_expected_metadata(cls, database_name, schema_name, table_name, column_name, -# column_type, primary_key, values) -# -# # # TODO - BUG https://stitchdata.atlassian.net/browse/SRCE-1080 -# # table_name = "dates_and_times" -# # column_name = ["just_a_date", "date_and_time", "bigger_range_and_precision_datetime", -# # "datetime_with_timezones", "datetime_no_seconds", "its_time"] -# # column_type = ["date", "datetime", "datetime2", "datetimeoffset", "smalldatetime", "time"] -# # -# # # TODO - remove this once more datetime types are supported. Also, things shouldn't blow up -# # # when they aren't supported -# # column_name = ["pk", "just_a_date", "date_and_time", "its_time"] -# # column_type = ["int", "date", "datetime", "time"] -# # primary_key = set() -# # column_def = [" ".join(x) for x in list(zip(column_name, column_type))] -# # query_list.extend(create_table(database_name, schema_name, table_name, column_def, -# # primary_key=primary_key)) -# # -# # # create data -# # values = [ -# # ( -# # 0, -# # date(1, 1, 1), -# # datetime(1753, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc), -# # # datetime(1, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc), -# # # datetime.fromtimestamp( -# # # datetime(1, 1, 1, 14, 0, 0, 0, tzinfo=timezone.utc).timestamp(), -# # # tz=dateutil.tz.tzoffset(None, -14*60)), -# # # datetime(1900, 1, 1, 0, 0, tzinfo=timezone.utc), -# # time(0, 0, 0, 0, tzinfo=timezone.utc), -# # ), # min -# # ( -# # 1, -# # date(9999, 12, 31), -# # datetime(9999, 12, 31, 23, 59, 59, 999000, tzinfo=timezone.utc), -# # # datetime(9999, 12, 31, 23, 59, 59, 999999, tzinfo=timezone.utc), -# # # datetime.fromtimestamp( -# # # datetime(9999, 12, 31, 9, 59, 59, 999999, tzinfo=timezone.utc).timestamp(), -# # # tz=dateutil.tz.tzoffset(None, 14*60)), -# # # datetime(2079, 6, 6, 23, 59, tzinfo=timezone.utc), -# # time(23, 59, 59, 999999, tzinfo=timezone.utc), -# # ), # max -# # (2, None, None, None) # , None, None, None), # Null -# # ] -# # -# # # TODO - make some random samples... -# # -# # query_list.extend(insert(database_name, schema_name, table_name, values)) -# # cls.add_expected_metadata(cls, database_name, schema_name, table_name, column_name, -# # column_type, primary_key, values) -# -# table_name = "char_data" -# column_name = ["pk", "char_2"] # , "char_8000"] -# column_type = ["int", "char(2)"] # , "char(8000)"] -# primary_key = {"pk"} -# column_def = [" ".join(x) for x in list(zip(column_name, column_type))] -# query_list.extend(create_table(database_name, schema_name, table_name, column_def, -# primary_key=primary_key)) -# -# # use all valid unicode characters -# chars = list(range(0, 55296)) -# chars.extend(range(57344, sys.maxunicode)) -# chars.reverse() -# -# values = [(pk, "".join([chr(chars.pop()) for _ in range(2)])) for pk in range(16)] -# cls.add_expected_metadata(cls, database_name, schema_name, table_name, column_name, -# column_type, primary_key, values) -# query_list.extend(insert(database_name, schema_name, table_name, values)) -# -# table_name = "varchar_data" -# column_name = ["pk", "varchar_5", "varchar_8000", "varchar_max"] -# column_type = ["int", "varchar(5)", "varchar(8000)", "varchar(max)"] -# primary_key = {"pk"} -# column_def = [" ".join(x) for x in list(zip(column_name, column_type))] -# query_list.extend(create_table(database_name, schema_name, table_name, column_def, -# primary_key=primary_key)) -# -# values = [ -# (pk, -# chr(chars.pop()), -# "".join([chr(chars.pop()) for _ in range(15)]), -# "".join([chr(chars.pop()) for _ in range(randint(1, 16))]) -# ) for pk in range(3) -# ] -# values.extend([(50, None, None, None), ]) -# query_list.extend(insert(database_name, schema_name, table_name, values)) -# cls.add_expected_metadata(cls, database_name, schema_name, table_name, column_name, -# column_type, primary_key, values) -# -# # table_name = "nchar_data" -# # column_name = ["pk", "nchar_8"] # , "nchar_4000"] -# # column_type = ["int", "nchar(8)"] # , "nchar(4000)"] -# # primary_key = {"pk"} -# # column_def = [" ".join(x) for x in list(zip(column_name, column_type))] -# # query_list.extend(create_table(database_name, schema_name, table_name, column_def, -# # primary_key=primary_key)) -# # values = [ -# # (pk, -# # "".join([chr(chars.pop()) for _ in range(4)])) #chr(chars.pop())) # , "".join([chr(chars.pop()) for _ in range(1500)])) -# # for pk in range(1) -# # ] -# # # values.extend([(50, None, None), ]) -# # query_list.extend(insert(database_name, schema_name, table_name, values)) -# # cls.add_expected_metadata(cls, database_name, schema_name, table_name, column_name, -# # column_type, primary_key, values) -# # -# table_name = "nvarchar_data" -# column_name = ["pk", "nvarchar_5", "nvarchar_4000", "nvarchar_max"] -# column_type = ["int", "nvarchar(5)", "nvarchar(4000)", "nvarchar(max)"] -# primary_key = {"pk"} -# column_def = [" ".join(x) for x in list(zip(column_name, column_type))] -# query_list.extend(create_table(database_name, schema_name, table_name, column_def, -# primary_key=primary_key)) -# chars.reverse() -# values = [ -# (pk, -# chr(chars.pop()), -# "".join([chr(chars.pop()) for _ in range(8)]), -# "".join([chr(chars.pop()) for _ in range(randint(1, 8))]) -# ) for pk in range(1) -# ] -# values.extend([(50, None, None, None), ]) -# -# # pk = 51 -# # while len(chars): -# # # Use the rest of the characters -# # values.extend([( -# # pk, -# # chr(chars.pop()), -# # "".join([chr(chars.pop()) for _ in range(min(len(chars), 800))]) if len(chars) else "", -# # "".join([chr(chars.pop()) for _ in range(min(len(chars), randint(1, 800)))]) if len(chars) else "", -# # "".join([chr(chars.pop()) for _ in range(min(len(chars), randint(1, random_types[0] // 10)))]), -# # "".join([chr(chars.pop()) for _ in range(min(len(chars), randint(1, random_types[1] // 10)))]), -# # "".join([chr(chars.pop()) for _ in range(min(len(chars), randint(1, random_types[2] // 10)))]) -# # )]) -# # pk += 1 -# -# query_list.extend(insert(database_name, schema_name, table_name, values)) -# cls.add_expected_metadata(cls, database_name, schema_name, table_name, column_name, -# column_type, primary_key, values) -# -# # query_list.extend(['-- there are {} characters left to test'.format(len(chars))]) -# # -# # -# table_name = "money_money_money" -# column_name = ["pk", "cash_money", "change"] -# column_type = ["int", "money", "smallmoney"] -# primary_key = {"pk"} -# column_def = [" ".join(x) for x in list(zip(column_name, column_type))] -# query_list.extend(create_table(database_name, schema_name, table_name, column_def, -# primary_key=primary_key)) -# values = [ -# (0, 123.45, 0.99), -# (1, 123213.99, 0.00) -# ] -# query_list.extend(insert(database_name, schema_name, table_name, values)) -# column_name = ["pk"] -# column_type = ["int"] -# values = [ -# (0, ), -# (1, ) -# ] -# cls.add_expected_metadata(cls, database_name, schema_name, table_name, column_name, -# column_type, primary_key, values) -# -# # table_name = "binary_data" -# # column_name = ["binary_1", "binary_8000"] -# # column_type = ["binary(1)", "binary(8000)"] -# # primary_key = set() -# # column_def = [" ".join(x) for x in list(zip(column_name, column_type))] -# # query_list.extend(create_table(database_name, schema_name, table_name, column_def, -# # primary_key=primary_key)) -# # cls.add_expected_metadata(cls, database_name, schema_name, table_name, column_name, -# # column_type, primary_key) -# # -# # table_name = "varbinary_data" -# # column_name = ["varbinary_1", "varbinary_8000", "varbinary_max"] -# # column_type = ["varbinary(1)", "varbinary(8000)", "varbinary(max)"] -# # random_types = [x for x in sample(range(1, 8000), 3)] -# # column_name.extend(["varbinary_{0}".format(x) for x in random_types]) -# # column_type.extend(["varbinary({0})".format(x) for x in random_types]) -# # primary_key = set() -# # column_def = [" ".join(x) for x in list(zip(column_name, column_type))] -# # query_list.extend(create_table(database_name, schema_name, table_name, column_def, -# # primary_key=primary_key)) -# # cls.add_expected_metadata(cls, database_name, schema_name, table_name, column_name, -# # column_type, primary_key) -# # -# # table_name = "text_and_image_deprecated_soon" -# # column_name = ["nvarchar_text", "varchar_text", "varbinary_data", -# # "rowversion_synonym_timestamp"] -# # column_type = ["ntext", "text", "image", "timestamp"] -# # primary_key = set() -# # column_def = [" ".join(x) for x in list(zip(column_name, column_type))] -# # query_list.extend(create_table(database_name, schema_name, table_name, column_def, -# # primary_key=primary_key)) -# # cls.add_expected_metadata(cls, database_name, schema_name, table_name, column_name, -# # column_type, primary_key) -# # -# # table_name = "weirdos" -# # column_name = [ -# # "geospacial", "geospacial_map", "markup", "guid", "version", "tree", -# # "variant", "SpecialPurposeColumns" -# # ] -# # column_type = [ -# # "geometry", "geography", "xml", "uniqueidentifier", "rowversion", "hierarchyid", -# # "sql_variant", "xml COLUMN_SET FOR ALL_SPARSE_COLUMNS" -# # ] -# # primary_key = set() -# # column_def = [" ".join(x) for x in list(zip(column_name, column_type))] -# # query_list.extend(create_table(database_name, schema_name, table_name, column_def, -# # primary_key=primary_key)) -# # column_type[7] = "xml" # this is the underlying type -# # cls.add_expected_metadata(cls, database_name, schema_name, table_name, column_name, -# # column_type, primary_key) -# # -# # table_name = "computed_columns" -# # column_name = ["started_at", "ended_at", "durations_days"] -# # column_type = ["datetimeoffset", "datetimeoffset", "AS DATEDIFF(day, started_at, ended_at)"] -# # primary_key = set() -# # column_def = [" ".join(x) for x in list(zip(column_name, column_type))] -# # query_list.extend(create_table(database_name, schema_name, table_name, column_def, -# # primary_key=primary_key)) -# # column_type[2] = "int" # this is the underlying type of a datediff -# # cls.add_expected_metadata(cls, database_name, schema_name, table_name, column_name, -# # column_type, primary_key) -# -# mssql_cursor_context_manager(*query_list) -# -# cls.expected_metadata = cls.discovery_expected_metadata -# -# def test_run(self): -# """ -# Verify that a full sync can send capture all data and send it in the correct format -# """ -# -# # run in check mode -# check_job_name = runner.run_check_mode(self, conn_id) -# -# # verify check exit codes -# exit_status = menagerie.get_exit_status(conn_id, check_job_name) -# menagerie.verify_check_exit_status(self, exit_status, check_job_name) -# -# # get the catalog information of discovery -# found_catalogs = menagerie.get_catalogs(conn_id) -# found_catalog_names = {c['tap_stream_id'] for c in found_catalogs} -# -# # verify that persisted streams have the correct properties -# test_catalog = found_catalogs[0] -# -# additional_md = [{"breadcrumb": [], "metadata": {'replication-method': 'FULL_TABLE'}}] -# BaseTapTest.select_all_streams_and_fields( -# conn_id, found_catalogs, additional_md=additional_md, -# non_selected_properties=["cash_money", "change"]) -# -# # clear state -# menagerie.set_state(conn_id, {}) -# sync_job_name = runner.run_sync_mode(self, conn_id) -# -# # verify tap and target exit codes -# exit_status = menagerie.get_exit_status(conn_id, sync_job_name) -# menagerie.verify_sync_exit_status(self, exit_status, sync_job_name) -# -# # verify record counts of streams -# record_count_by_stream = runner.examine_target_output_file( -# self, conn_id, self.expected_streams(), self.expected_primary_keys_by_stream_id()) -# expected_count = {k: len(v['values']) for k, v in self.expected_metadata().items()} -# self.assertEqual(record_count_by_stream, expected_count) -# -# # verify records match on the first sync -# records_by_stream = runner.get_records_from_target_output() -# -# for stream in self.expected_streams(): -# with self.subTest(stream=stream): -# stream_expected_data = self.expected_metadata()[stream] -# # TODO - test schema matches expectations based on data type, nullable, not nullable, datetimes as string +, etc -# # This needs to be consistent based on replication method so you can change replication methods -# table_version = records_by_stream[stream]['table_version'] -# -# # verify on the first sync you get activate version message before and after all data -# self.assertEqual( -# records_by_stream[stream]['messages'][0]['action'], -# 'activate_version') -# self.assertEqual( -# records_by_stream[stream]['messages'][-1]['action'], -# 'activate_version') -# column_names = [ -# list(field_data.keys())[0] for field_data in stream_expected_data[self.FIELDS] -# ] -# -# expected_messages = [ -# { -# "action": "upsert", "data": -# { -# column: value for column, value -# in list(zip(column_names, stream_expected_data[self.VALUES][row])) -# } -# } for row in range(len(stream_expected_data[self.VALUES])) -# ] -# -# # remove sequences from actual values for comparison -# [message.pop("sequence") for message -# in records_by_stream[stream]['messages'][1:-1]] -# -# # Verify all data is correct -# for expected_row, actual_row in list( -# zip(expected_messages, records_by_stream[stream]['messages'][1:-1])): -# with self.subTest(expected_row=expected_row): -# self.assertEqual(actual_row["action"], "upsert") -# self.assertEqual(len(expected_row["data"].keys()), len(actual_row["data"].keys()), -# msg="there are not the same number of columns") -# -# for column_name, expected_value in expected_row["data"].items(): -# column_index = [list(key.keys())[0] for key in -# self.expected_metadata()[stream][self.FIELDS]].index(column_name) -# -# if isinstance(expected_value, Decimal): -# self.assertEqual(type(actual_row["data"][column_name]), float, -# msg="decimal value is not represented as a number") -# self.assertEqual(expected_value, Decimal(str(actual_row["data"][column_name])), -# msg="expected: {} != actual {}".format( -# expected_row, actual_row)) -# elif self.expected_metadata()[stream][self.FIELDS][column_index -# ][column_name][self.DATATYPE] == "real": -# if actual_row["data"][column_name] is None: -# self.assertEqual(expected_value, actual_row["data"][column_name], -# msg="expected: {} != actual {}".format( -# expected_row, actual_row)) -# else: -# self.assertEqual(type(actual_row["data"][column_name]), float, -# msg="float value is not represented as a number") -# self.assertEqual(float(str(float32(expected_value))), -# float(str(float32(actual_row["data"][column_name]))), -# msg="single value of {} doesn't match actual {}".format( -# float(str(float32(expected_value))), -# float(str(float32(actual_row["data"][column_name])))) -# ) -# else: -# self.assertEqual(expected_value, actual_row["data"][column_name], -# msg="expected: {} != actual {}".format( -# expected_row, actual_row)) -# print("records are correct for stream {}".format(stream)) -# -# # verify state and bookmarks -# state = menagerie.get_state(conn_id) -# -# bookmark = state['bookmarks'][stream] -# -# self.assertIsNone(state.get('currently_syncing'), msg="expected state's currently_syncing to be None") -# # TODO - change this to something for mssql once binlog (cdc) is finalized and we know what it is -# self.assertIsNone( -# bookmark.get('lsn'), -# msg="expected bookmark for stream to have NO lsn because we are using full-table replication") -# -# self.assertEqual(bookmark['version'], table_version, -# msg="expected bookmark for stream to match version") -# -# expected_schemas = { -# "selected": True, -# "type": "object", -# "properties": { -# k: dict( -# **self.DATATYPE_SCHEMAS[v["sql-datatype"]], -# selected=True, -# inclusion=v["inclusion"] -# ) -# for fd in stream_expected_data[self.FIELDS] for k, v in fd.items() -# } -# } -# -# # I made everything nullable except pks. Based on this the DATATYPE_SCHEMAS reflects nullable types -# # we need to update the pk to be not nullable -# expected_schemas["properties"]["pk"]["type"] = ["integer"] -# self.assertEqual(records_by_stream[stream]['schema'], -# expected_schemas, -# msg="expected: {} != actual: {}".format(expected_schemas, -# records_by_stream[stream]['schema'])) -# -# # ---------------------------------------------------------------------- -# # invoke the sync job AGAIN and get the same 3 records -# # ---------------------------------------------------------------------- -# # TODO - update the table to add a column and ensure that discovery adds the new column -# sync_job_name = runner.run_sync_mode(self, conn_id) -# -# # verify tap and target exit codes -# exit_status = menagerie.get_exit_status(conn_id, sync_job_name) -# menagerie.verify_sync_exit_status(self, exit_status, sync_job_name) -# record_count_by_stream = runner.examine_target_output_file( -# self, conn_id, self.expected_streams(), self.expected_primary_keys_by_stream_id()) -# expected_count = {k: len(v['values']) for k, v in self.expected_metadata().items()} -# self.assertEqual(record_count_by_stream, expected_count) -# records_by_stream = runner.get_records_from_target_output() -# -# for stream in self.expected_streams(): -# with self.subTest(stream=stream): -# stream_expected_data = self.expected_metadata()[stream] -# # TODO - test schema matches expectations based on data type, nullable, not nullable, datetimes as string +, etc -# # This needs to be consistent based on replication method so you can change replication methods -# # {'action': 'upsert', 'sequence': 1560362044666000001, 'data': {'MySmallIntColumn': 0, 'pk': 1, 'MyIntColumn': 0, 'MyBigIntColumn': 0}} -# -# new_table_version = records_by_stream[stream]['table_version'] -# -# # verify on a subsequent sync you get activate version message only after all data -# self.assertEqual( -# records_by_stream[stream]['messages'][0]['action'], -# 'upsert') -# self.assertEqual( -# records_by_stream[stream]['messages'][-1]['action'], -# 'activate_version') -# column_names = [ -# list(field_data.keys())[0] for field_data in stream_expected_data[self.FIELDS] -# ] -# -# expected_messages = [ -# { -# "action": "upsert", "data": -# { -# column: value for column, value -# in list(zip(column_names, stream_expected_data[self.VALUES][row])) -# } -# } for row in range(len(stream_expected_data[self.VALUES])) -# ] -# -# # remove sequences from actual values for comparison -# [message.pop("sequence") for message -# in records_by_stream[stream]['messages'][0:-1]] -# -# # Verify all data is correct -# for expected_row, actual_row in list( -# zip(expected_messages, records_by_stream[stream]['messages'][0:-1])): -# with self.subTest(expected_row=expected_row): -# self.assertEqual(actual_row["action"], "upsert") -# self.assertEqual(len(expected_row["data"].keys()), len(actual_row["data"].keys()), -# msg="there are not the same number of columns") -# -# for column_name, expected_value in expected_row["data"].items(): -# column_index = [list(key.keys())[0] for key in -# self.expected_metadata()[stream][self.FIELDS]].index(column_name) -# -# if isinstance(expected_value, Decimal): -# self.assertEqual(type(actual_row["data"][column_name]), float, -# msg="decimal value is not represented as a number") -# self.assertEqual(expected_value, Decimal(str(actual_row["data"][column_name])), -# msg="expected: {} != actual {}".format( -# expected_row, actual_row)) -# elif self.expected_metadata()[stream][self.FIELDS][column_index -# ][column_name][self.DATATYPE] == "real": -# if actual_row["data"][column_name] is None: -# self.assertEqual(expected_value, actual_row["data"][column_name], -# msg="expected: {} != actual {}".format( -# expected_row, actual_row)) -# else: -# self.assertEqual(type(actual_row["data"][column_name]), float, -# msg="float value is not represented as a number") -# self.assertEqual(float(str(float32(expected_value))), -# float(str(float32(actual_row["data"][column_name]))), -# msg="single value of {} doesn't match actual {}".format( -# float(str(float32(expected_value))), -# float(str(float32(actual_row["data"][column_name])))) -# ) -# else: -# self.assertEqual(expected_value, actual_row["data"][column_name], -# msg="expected: {} != actual {}".format( -# expected_row, actual_row)) -# print("records are correct for stream {}".format(stream)) -# -# # verify state and bookmarks -# state = menagerie.get_state(conn_id) -# -# bookmark = state['bookmarks'][stream] -# self.assertIsNone(state.get('currently_syncing'), msg="expected state's currently_syncing to be None") -# -# self.assertIsNone( -# bookmark.get('lsn'), -# msg="expected bookmark for stream to have NO lsn because we are using full-table replication") -# self.assertGreater(new_table_version, table_version, -# msg="table version {} didn't increate from {} on the second run".format( -# new_table_version, -# table_version)) -# self.assertEqual(bookmark['version'], new_table_version, -# msg="expected bookmark for stream to match version") -# -# expected_schemas = { -# "selected": True, -# "type": "object", -# "properties": { -# k: dict( -# **self.DATATYPE_SCHEMAS[v["sql-datatype"]], -# selected=True, -# inclusion=v["inclusion"] -# ) -# for fd in stream_expected_data[self.FIELDS] for k, v in fd.items() -# } -# } -# -# # I made everything nullable except pks. Based on this the DATATYPE_SCHEMAS reflects nullable types -# # we need to update the pk to be not nullable -# expected_schemas["properties"]["pk"]["type"] = ["integer"] -# self.assertEqual(records_by_stream[stream]['schema'], -# expected_schemas, -# msg="expected: {} != actual: {}".format(expected_schemas, -# records_by_stream[stream]['schema'])) -# -# -# # SCENARIOS.add(SyncTesFull) THIS TEST WAS RE-WRITTEN IN SMALLER PARTS diff --git a/tests/test_sync_logical_datetime.py b/tests/test_sync_logical_datetime.py index cbb7cb9..b727e48 100644 --- a/tests/test_sync_logical_datetime.py +++ b/tests/test_sync_logical_datetime.py @@ -215,13 +215,14 @@ def test_run(self): # run a sync and verify exit codes record_count_by_stream = self.run_sync(conn_id, clear_state=True) - # verify record counts of streams - expected_count = {k: len(v['values']) for k, v in self.expected_metadata().items()} - # self.assertEqual(record_count_by_stream, expected_count) - # verify records match on the first sync records_by_stream = runner.get_records_from_target_output() + # verify record counts of streams + expected_count = {k: len(v['values']) for k, v in self.expected_metadata().items()} + pk_count_by_stream = self.unique_pk_count_by_stream(records_by_stream) + self.assertEqual(pk_count_by_stream, expected_count) + table_version = dict() for stream in self.expected_streams(): with self.subTest(stream=stream): diff --git a/tests/test_sync_logical_float.py b/tests/test_sync_logical_float.py index 9ec81fd..31f4304 100644 --- a/tests/test_sync_logical_float.py +++ b/tests/test_sync_logical_float.py @@ -139,13 +139,14 @@ def test_run(self): # run a sync and verify exit codes record_count_by_stream = self.run_sync(conn_id, clear_state=True) - # verify record counts of streams - expected_count = {k: len(v['values']) for k, v in self.expected_metadata().items()} - # self.assertEqual(record_count_by_stream, expected_count) - # verify records match on the first sync records_by_stream = runner.get_records_from_target_output() + # verify record counts of streams + expected_count = {k: len(v['values']) for k, v in self.expected_metadata().items()} + pk_count_by_stream = self.unique_pk_count_by_stream(records_by_stream) + self.assertEqual(pk_count_by_stream, expected_count) + table_version = dict() for stream in self.expected_streams(): with self.subTest(stream=stream): diff --git a/tests/test_sync_logical_multiple_dbs.py b/tests/test_sync_logical_multiple_dbs.py index a9f4042..608a572 100644 --- a/tests/test_sync_logical_multiple_dbs.py +++ b/tests/test_sync_logical_multiple_dbs.py @@ -318,13 +318,14 @@ def test_run(self): # run a sync and verify exit codes record_count_by_stream = self.run_sync(conn_id, clear_state=True) - # verify record counts of streams - expected_count = {k: len(v['values']) for k, v in self.expected_metadata().items()} - # self.assertEqual(record_count_by_stream, expected_count) - # verify records match on the first sync records_by_stream = runner.get_records_from_target_output() + # verify record counts of streams + expected_count = {k: len(v['values']) for k, v in self.expected_metadata().items()} + pk_count_by_stream = self.unique_pk_count_by_stream(records_by_stream) + self.assertEqual(pk_count_by_stream, expected_count) + table_version = dict() for stream in self.expected_streams(): with self.subTest(stream=stream): diff --git a/tests/test_sync_logical_names.py b/tests/test_sync_logical_names.py index b13f0ef..ddfb14a 100644 --- a/tests/test_sync_logical_names.py +++ b/tests/test_sync_logical_names.py @@ -303,13 +303,14 @@ def test_run(self): # run a sync and verify exit codes record_count_by_stream = self.run_sync(conn_id, clear_state=True) - # verify record counts of streams - expected_count = {k: len(v['values']) for k, v in self.expected_metadata().items()} - # self.assertEqual(record_count_by_stream, expected_count) - # verify records match on the first sync records_by_stream = runner.get_records_from_target_output() + # verify record counts of streams + expected_count = {k: len(v['values']) for k, v in self.expected_metadata().items()} + pk_count_by_stream = self.unique_pk_count_by_stream(records_by_stream) + self.assertEqual(pk_count_by_stream, expected_count) + table_version = dict() for stream in self.expected_streams(): with self.subTest(stream=stream): diff --git a/tests/test_sync_logical_others.py b/tests/test_sync_logical_others.py index 1ffd670..d024c1e 100644 --- a/tests/test_sync_logical_others.py +++ b/tests/test_sync_logical_others.py @@ -275,13 +275,14 @@ def test_run(self): # run a sync and verify exit codes record_count_by_stream = self.run_sync(conn_id, clear_state=True) - # verify record counts of streams - expected_count = {k: len(v['values']) for k, v in self.expected_metadata().items()} - # self.assertEqual(record_count_by_stream, expected_count) - # verify records match on the first sync records_by_stream = runner.get_records_from_target_output() + # verify record counts of streams + expected_count = {k: len(v['values']) for k, v in self.expected_metadata().items()} + pk_count_by_stream = self.unique_pk_count_by_stream(records_by_stream) + self.assertEqual(pk_count_by_stream, expected_count) + table_version = dict() for stream in self.expected_streams(): with self.subTest(stream=stream): diff --git a/tests/test_sync_logical_pks.py b/tests/test_sync_logical_pks.py index bfcd1a8..94beced 100644 --- a/tests/test_sync_logical_pks.py +++ b/tests/test_sync_logical_pks.py @@ -5,9 +5,9 @@ from tap_tester import menagerie, runner, LOGGER -from database import drop_all_user_databases, create_database, \ - create_table, mssql_cursor_context_manager, insert, enable_database_tracking, update_by_pk, delete_by_pk, \ - create_view +from database import create_database, create_table, create_view, delete_by_pk, \ + drop_all_user_databases, enable_database_tracking, insert, mssql_cursor_context_manager, \ + update_by_pk from base import BaseTapTest @@ -91,9 +91,16 @@ def setUpClass(cls) -> None: 'database-name': database_name, 'stream_name': table_name, 'fields': [ - {'first_name': {'sql-datatype': 'varchar', 'selected-by-default': True, 'inclusion': 'automatic'}}, - {'last_name': {'sql-datatype': 'varchar', 'selected-by-default': True, 'inclusion': 'automatic'}}, - {'info': {'sql-datatype': 'int', 'selected-by-default': True, 'inclusion': 'available'}}], + {'first_name': { + 'sql-datatype': 'varchar', + 'selected-by-default': True, + 'inclusion': 'automatic'}}, + {'last_name': { + 'sql-datatype': 'varchar', + 'selected-by-default': True, + 'inclusion': 'automatic'}}, + {'info': { + 'sql-datatype': 'int', 'selected-by-default': True, 'inclusion': 'available'}}], 'schema': { 'type': 'object', 'selected': True, @@ -118,7 +125,7 @@ def setUpClass(cls) -> None: column_type = ["varchar(256)", "varchar(256)", "int"] column_def = [" ".join(x) for x in list(zip(column_name, column_type))] query_list.extend(create_table(database_name, schema_name, table_name, column_def, - primary_key=primary_key, tracking=True) ) + primary_key=primary_key, tracking=True)) query_list.extend(insert(database_name, schema_name, table_name, cls.EXPECTED_METADATA['{}_{}_{}'.format( database_name, schema_name, table_name)]["values"])) @@ -138,8 +145,10 @@ def setUpClass(cls) -> None: 'database-name': database_name, 'stream_name': table_name, 'fields': [ - {'pk': {'sql-datatype': 'int', 'selected-by-default': True, 'inclusion': 'automatic'}}, - {'data': {'sql-datatype': 'int', 'selected-by-default': True, 'inclusion': 'available'}}], + {'pk': { + 'sql-datatype': 'int', 'selected-by-default': True, 'inclusion': 'automatic'}}, + {'data': { + 'sql-datatype': 'int', 'selected-by-default': True, 'inclusion': 'available'}}], 'schema': { 'type': 'object', 'selected': True, @@ -162,7 +171,7 @@ def setUpClass(cls) -> None: column_type = ["int", "int"] column_def = [" ".join(x) for x in list(zip(column_name, column_type))] query_list.extend(create_table(database_name, schema_name, table_name, column_def, - primary_key=primary_key, tracking=True) ) + primary_key=primary_key, tracking=True)) query_list.extend(insert(database_name, schema_name, table_name, cls.EXPECTED_METADATA['{}_{}_{}'.format( database_name, schema_name, table_name)]["values"])) @@ -182,8 +191,10 @@ def setUpClass(cls) -> None: 'database-name': database_name, 'stream_name': table_name, 'fields': [ - {'pk': {'sql-datatype': 'int', 'selected-by-default': True, 'inclusion': 'automatic'}}, - {'data': {'sql-datatype': 'int', 'selected-by-default': True, 'inclusion': 'available'}}], + {'pk': { + 'sql-datatype': 'int', 'selected-by-default': True, 'inclusion': 'automatic'}}, + {'data': { + 'sql-datatype': 'int', 'selected-by-default': True, 'inclusion': 'available'}}], 'schema': { 'type': 'object', 'selected': True, @@ -206,7 +217,7 @@ def setUpClass(cls) -> None: column_type = ["int", "int NOT NULL UNIQUE"] column_def = [" ".join(x) for x in list(zip(column_name, column_type))] query_list.extend(create_table(database_name, schema_name, table_name, column_def, - primary_key=primary_key, tracking=True) ) + primary_key=primary_key, tracking=True)) query_list.extend(insert(database_name, schema_name, table_name, cls.EXPECTED_METADATA['{}_{}_{}'.format( database_name, schema_name, table_name)]["values"])) @@ -228,8 +239,10 @@ def setUpClass(cls) -> None: 'database-name': database_name, 'stream_name': table_name, 'fields': [ - {'pk': {'sql-datatype': 'int', 'selected-by-default': True, 'inclusion': 'automatic'}}, - {'fk': {'sql-datatype': 'int', 'selected-by-default': True, 'inclusion': 'available'}}], + {'pk': { + 'sql-datatype': 'int', 'selected-by-default': True, 'inclusion': 'automatic'}}, + {'fk': { + 'sql-datatype': 'int', 'selected-by-default': True, 'inclusion': 'available'}}], 'schema': { 'type': 'object', 'selected': True, @@ -371,8 +384,10 @@ def setUpClass(cls) -> None: 'database-name': database_name, 'stream_name': table_name, 'fields': [ - {'pk': {'sql-datatype': 'int', 'selected-by-default': True, 'inclusion': 'automatic'}}, - {'default_column': {'sql-datatype': 'int', 'selected-by-default': True, 'inclusion': 'available'}}], + {'pk': { + 'sql-datatype': 'int', 'selected-by-default': True, 'inclusion': 'automatic'}}, + {'default_column': { + 'sql-datatype': 'int', 'selected-by-default': True, 'inclusion': 'available'}}], 'schema': { 'type': 'object', 'selected': True, @@ -394,13 +409,14 @@ def setUpClass(cls) -> None: column_name = ["pk", "default_column"] column_type = ["int", "int DEFAULT -1"] column_def = [" ".join(x) for x in list(zip(column_name, column_type))] - query_list.extend(create_table( - database_name, schema_name, table_name, column_def, primary_key=primary_key, tracking=True) ) + query_list.extend(create_table(database_name, schema_name, table_name, + column_def, primary_key=primary_key, tracking=True)) query_list.extend(insert(database_name, schema_name, table_name, cls.EXPECTED_METADATA['{}_{}_{}'.format( database_name, schema_name, table_name)]["values"], column_names=["pk"])) - cls.EXPECTED_METADATA['{}_{}_{}'.format(database_name, schema_name, table_name)]["values"] = [ + cls.EXPECTED_METADATA['{}_{}_{}'.format( + database_name, schema_name, table_name)]["values"] = [ (0, -1), (1, -1)] @@ -418,8 +434,10 @@ def setUpClass(cls) -> None: 'database-name': database_name, 'stream_name': table_name, 'fields': [ - {'pk': {'sql-datatype': 'int', 'selected-by-default': True, 'inclusion': 'automatic'}}, - {'age': {'sql-datatype': 'int', 'selected-by-default': True, 'inclusion': 'available'}}], + {'pk': { + 'sql-datatype': 'int', 'selected-by-default': True, 'inclusion': 'automatic'}}, + {'age': { + 'sql-datatype': 'int', 'selected-by-default': True, 'inclusion': 'available'}}], 'schema': { 'type': 'object', 'selected': True, @@ -441,8 +459,8 @@ def setUpClass(cls) -> None: column_name = ["pk", "age"] column_type = ["int", "int CHECK (age <= 120)"] column_def = [" ".join(x) for x in list(zip(column_name, column_type))] - query_list.extend(create_table( - database_name, schema_name, table_name, column_def, primary_key=primary_key, tracking=True) ) + query_list.extend(create_table(database_name, schema_name, table_name, + column_def, primary_key=primary_key, tracking=True)) query_list.extend(insert(database_name, schema_name, table_name, cls.EXPECTED_METADATA['{}_{}_{}'.format( database_name, schema_name, table_name)]["values"])) @@ -461,8 +479,10 @@ def setUpClass(cls) -> None: 'database-name': database_name, 'stream_name': table_name, 'fields': [ - {'pk': {'sql-datatype': 'int', 'selected-by-default': True, 'inclusion': 'automatic'}}, - {'even_id': {'sql-datatype': 'int', 'selected-by-default': True, 'inclusion': 'available'}}], + {'pk': { + 'sql-datatype': 'int', 'selected-by-default': True, 'inclusion': 'automatic'}}, + {'even_id': { + 'sql-datatype': 'int', 'selected-by-default': True, 'inclusion': 'available'}}], 'schema': { 'type': 'object', 'selected': True, @@ -484,15 +504,16 @@ def setUpClass(cls) -> None: column_name = ["pk", "even_id"] column_type = ["int", "int IDENTITY(2,2)"] column_def = [" ".join(x) for x in list(zip(column_name, column_type))] - query_list.extend(create_table( - database_name, schema_name, table_name, column_def, primary_key=primary_key, tracking=True) ) + query_list.extend(create_table(database_name, schema_name, table_name, + column_def, primary_key=primary_key, tracking=True)) query_list.extend(insert(database_name, schema_name, table_name, cls.EXPECTED_METADATA['{}_{}_{}'.format( database_name, schema_name, table_name)]["values"], column_names=["pk"])) - cls.EXPECTED_METADATA['{}_{}_{}'.format(database_name, schema_name, table_name)]["values"] = [ - (1, 2), - (2, 4)] + cls.EXPECTED_METADATA['{}_{}_{}'.format( + database_name, schema_name, table_name)]["values"] = [ + (1, 2), + (2, 4)] mssql_cursor_context_manager(*query_list) cls.expected_metadata = cls.discovery_expected_metadata @@ -524,13 +545,14 @@ def test_run(self): # run a sync and verify exit codes record_count_by_stream = self.run_sync(conn_id, clear_state=True) - # verify record counts of streams - expected_count = {k: len(v['values']) for k, v in self.expected_metadata().items()} - # self.assertEqual(record_count_by_stream, expected_count) - # verify records match on the first sync records_by_stream = runner.get_records_from_target_output() + # verify record counts of streams + expected_count = {k: len(v['values']) for k, v in self.expected_metadata().items()} + pk_count_by_stream = self.unique_pk_count_by_stream(records_by_stream) + self.assertEqual(pk_count_by_stream, expected_count) + table_version = dict() for stream in self.expected_streams(): with self.subTest(stream=stream): @@ -559,10 +581,10 @@ def test_run(self): self.assertEqual( records_by_stream[stream]['messages'][-1]['action'], 'activate_version') - self.assertEqual( - len([m for m in records_by_stream[stream]['messages'][1:] if m["action"] == "activate_version"]), - 2, - msg="Expect 2 more activate version messages for end of full table and beginning of log based") + self.assertEqual(2, len([m for m in records_by_stream[stream]['messages'][1:] + if m["action"] == "activate_version"]), + msg=("Expect 2 more activate version messages for end of full " + "table and beginning of log based")) column_names = [ list(field_data.keys())[0] for field_data in stream_expected_data[self.FIELDS] @@ -589,7 +611,8 @@ def test_run(self): with self.subTest(expected_row=expected_row): self.assertEqual(actual_row["action"], "upsert") - self.assertEqual(len(expected_row["data"].keys()), len(actual_row["data"].keys()), + self.assertEqual(len(expected_row["data"].keys()), + len(actual_row["data"].keys()), msg="there are not the same number of columns") for column_name, expected_value in expected_row["data"].items(): self.assertEqual(expected_value, actual_row["data"][column_name], @@ -599,10 +622,10 @@ def test_run(self): # Verify all data is correct for the log replication part if sent if records_by_stream[stream]['messages'][-1].get("data"): for column_name, expected_value in expected_messages[-1]["data"].items(): - self.assertEqual(expected_value, - records_by_stream[stream]['messages'][-1]["data"][column_name], - msg="expected: {} != actual {}".format( - expected_row, actual_row)) + self.assertEqual( + expected_value, + records_by_stream[stream]['messages'][-1]["data"][column_name], + msg="expected: {} != actual {}".format(expected_row, actual_row)) LOGGER.info("records are correct for stream %s", stream) @@ -610,21 +633,21 @@ def test_run(self): state = menagerie.get_state(conn_id) bookmark = state['bookmarks'][stream] - self.assertIsNone(state.get('currently_syncing'), msg="expected state's currently_syncing to be None") - self.assertIsNotNone( - bookmark.get('current_log_version'), - msg="expected bookmark to have current_log_version because we are using log replication") - self.assertTrue(bookmark['initial_full_table_complete'], msg="expected full table to be complete") + self.assertIsNone(state.get('currently_syncing'), + msg="expected state's currently_syncing to be None") + self.assertIsNotNone(bookmark.get('current_log_version'), + msg="expected bookmark to have current_log_version due to log replication") + self.assertTrue(bookmark['initial_full_table_complete'], + msg="expected full table to be complete") inital_log_version = bookmark['current_log_version'] self.assertEqual(bookmark['version'], table_version[stream], msg="expected bookmark for stream to match version") expected_schemas = self.expected_metadata()[stream]['schema'] - self.assertEqual(records_by_stream[stream]['schema'], - expected_schemas, - msg="expected: {} != actual: {}".format(expected_schemas, - records_by_stream[stream]['schema'])) + self.assertEqual(records_by_stream[stream]['schema'], expected_schemas, + msg=("expected: {} != actual: {}".format( + expected_schemas, records_by_stream[stream]['schema']))) # ---------------------------------------------------------------------- # invoke the sync job AGAIN and after insert, update, delete or rows @@ -638,8 +661,10 @@ def test_run(self): update_value = [("Tim", "Berners-Lee", 65)] delete_value = [("Larry", "Page")] query_list = (insert(database_name, schema_name, table_name, insert_value)) - query_list.extend(delete_by_pk(database_name, schema_name, table_name, delete_value, column_name[:2])) - query_list.extend(update_by_pk(database_name, schema_name, table_name, update_value, column_name)) + query_list.extend(delete_by_pk( + database_name, schema_name, table_name, delete_value, column_name[:2])) + query_list.extend(update_by_pk( + database_name, schema_name, table_name, update_value, column_name)) mssql_cursor_context_manager(*query_list) insert_value = [insert_value[0] + (None,)] update_value = [update_value[0] + (None,)] @@ -657,8 +682,10 @@ def test_run(self): update_value = [(1, 65)] delete_value = [(0, )] query_list = (insert(database_name, schema_name, table_name, insert_value)) - query_list.extend(delete_by_pk(database_name, schema_name, table_name, delete_value, column_name[:1])) - query_list.extend(update_by_pk(database_name, schema_name, table_name, update_value, column_name)) + query_list.extend(delete_by_pk( + database_name, schema_name, table_name, delete_value, column_name[:1])) + query_list.extend(update_by_pk( + database_name, schema_name, table_name, update_value, column_name)) mssql_cursor_context_manager(*query_list) insert_value = [insert_value[0] + (None,)] update_value = [update_value[0] + (None,)] @@ -676,8 +703,10 @@ def test_run(self): update_value = [(0, 2)] delete_value = [(1, ), (2, )] query_list = (insert(database_name, schema_name, table_name, insert_value)) - query_list.extend(delete_by_pk(database_name, schema_name, table_name, delete_value, column_name[:1])) - query_list.extend(update_by_pk(database_name, schema_name, table_name, update_value, column_name)) + query_list.extend(delete_by_pk( + database_name, schema_name, table_name, delete_value, column_name[:1])) + query_list.extend(update_by_pk( + database_name, schema_name, table_name, update_value, column_name)) mssql_cursor_context_manager(*query_list) insert_value = [insert_value[0] + (None,)] update_value = [update_value[0] + (None,)] @@ -697,8 +726,10 @@ def test_run(self): update_value = [(1, 65)] delete_value = [(0, )] query_list = (insert(database_name, schema_name, table_name, insert_value)) - query_list.extend(delete_by_pk(database_name, schema_name, table_name, delete_value, column_name[:1])) - query_list.extend(update_by_pk(database_name, schema_name, table_name, update_value, column_name)) + query_list.extend(delete_by_pk( + database_name, schema_name, table_name, delete_value, column_name[:1])) + query_list.extend(update_by_pk( + database_name, schema_name, table_name, update_value, column_name)) mssql_cursor_context_manager(*query_list) insert_value = [insert_value[0] + (None,)] update_value = [update_value[0] + (None,)] @@ -716,8 +747,10 @@ def test_run(self): update_value = [(1, 65)] delete_value = [(0,)] query_list = (insert(database_name, schema_name, table_name, insert_value)) - query_list.extend(delete_by_pk(database_name, schema_name, table_name, delete_value, column_name[:1])) - query_list.extend(update_by_pk(database_name, schema_name, table_name, update_value, column_name)) + query_list.extend(delete_by_pk( + database_name, schema_name, table_name, delete_value, column_name[:1])) + query_list.extend(update_by_pk( + database_name, schema_name, table_name, update_value, column_name)) mssql_cursor_context_manager(*query_list) insert_value = [insert_value[0] + (None,)] update_value = [update_value[0] + (None,)] @@ -735,8 +768,10 @@ def test_run(self): update_value = [(1, 65)] delete_value = [(0,)] query_list = (insert(database_name, schema_name, table_name, insert_value)) - query_list.extend(delete_by_pk(database_name, schema_name, table_name, delete_value, column_name[:1])) - query_list.extend(update_by_pk(database_name, schema_name, table_name, update_value, column_name)) + query_list.extend(delete_by_pk( + database_name, schema_name, table_name, delete_value, column_name[:1])) + query_list.extend(update_by_pk( + database_name, schema_name, table_name, update_value, column_name)) mssql_cursor_context_manager(*query_list) insert_value = [insert_value[0] + (None,)] update_value = [update_value[0] + (None,)] @@ -754,9 +789,12 @@ def test_run(self): insert_value = [(3,)] update_value = [(1, )] delete_value = [(2,)] - query_list = (insert(database_name, schema_name, table_name, insert_value, column_names=column_name[:1])) - query_list.extend(delete_by_pk(database_name, schema_name, table_name, delete_value, column_name[:1])) - query_list.extend(update_by_pk(database_name, schema_name, table_name, update_value, column_name)) + query_list = (insert( + database_name, schema_name, table_name, insert_value, column_names=column_name[:1])) + query_list.extend(delete_by_pk( + database_name, schema_name, table_name, delete_value, column_name[:1])) + query_list.extend(update_by_pk( + database_name, schema_name, table_name, update_value, column_name)) mssql_cursor_context_manager(*query_list) insert_value = [insert_value[0] + (6, None)] update_value = [update_value[0] + (2, None)] @@ -783,8 +821,8 @@ def test_run(self): self.assertEqual( records_by_stream[stream]['messages'][0]['action'], 'activate_version') - self.assertTrue(all( - [message["action"] == "upsert" for message in records_by_stream[stream]['messages'][1:]] + self.assertTrue(all([message["action"] == "upsert" + for message in records_by_stream[stream]['messages'][1:]] )) column_names = [ @@ -812,8 +850,9 @@ def test_run(self): self.assertEqual(actual_row["action"], "upsert") # we only send the _sdc_deleted_at column for deleted rows - self.assertGreaterEqual(len(expected_row["data"].keys()), len(actual_row["data"].keys()), - msg="there are not the same number of columns") + self.assertGreaterEqual(len(expected_row["data"].keys()), + len(actual_row["data"].keys()), + msg="there are not the same number of columns") for column_name, expected_value in expected_row["data"].items(): if column_name != "_sdc_deleted_at": @@ -828,10 +867,12 @@ def test_run(self): except ValueError: actual_value = datetime.strptime(actual_row["data"][column_name], "%Y-%m-%dT%H:%M:%SZ") - self.assertGreaterEqual(actual_value, expected_value - timedelta(seconds=15)) - self.assertLessEqual(actual_value, expected_value + timedelta(seconds=15)) + self.assertGreaterEqual(actual_value, expected_value - timedelta( + seconds=15)) + self.assertLessEqual(actual_value, expected_value + timedelta( + seconds=15)) else: - # the row wasn't deleted so we can either not pass the column or it can be None + # row wasn't deleted so don't pass the column or let it be None self.assertIsNone(actual_row["data"].get(column_name)) LOGGER.info("records are correct for stream %s", stream) @@ -840,11 +881,12 @@ def test_run(self): state = menagerie.get_state(conn_id) bookmark = state['bookmarks'][stream] - self.assertIsNone(state.get('currently_syncing'), msg="expected state's currently_syncing to be None") - self.assertIsNotNone( - bookmark.get('current_log_version'), - msg="expected bookmark to have current_log_version because we are using log replication") - self.assertTrue(bookmark['initial_full_table_complete'], msg="expected full table to be complete") + self.assertIsNone(state.get('currently_syncing'), + msg="expected state's currently_syncing to be None") + self.assertIsNotNone(bookmark.get('current_log_version'), + msg="expected bookmark to have current_log_version due to log replication") + self.assertTrue(bookmark['initial_full_table_complete'], + msg="expected full table to be complete") new_log_version = bookmark['current_log_version'] self.assertGreater(new_log_version, inital_log_version, msg='expected log version to increase') @@ -855,7 +897,6 @@ def test_run(self): msg="expected bookmark for stream to match version") expected_schemas = self.expected_metadata()[stream]['schema'] - self.assertEqual(records_by_stream[stream]['schema'], - expected_schemas, - msg="expected: {} != actual: {}".format(expected_schemas, - records_by_stream[stream]['schema'])) + self.assertEqual(records_by_stream[stream]['schema'], expected_schemas, + msg="expected: {} != actual: {}".format( + expected_schemas, records_by_stream[stream]['schema']))