From 9dfe89284897f46a535b8f6a24fb2784d89e1188 Mon Sep 17 00:00:00 2001 From: arilton Date: Tue, 19 Nov 2024 11:38:12 -0300 Subject: [PATCH 01/18] add all properties inside brackets --- target_mssql/sinks.py | 37 +++++++++++++++++++++---------------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/target_mssql/sinks.py b/target_mssql/sinks.py index f6d95f5..94fb572 100644 --- a/target_mssql/sinks.py +++ b/target_mssql/sinks.py @@ -270,21 +270,26 @@ def merge_upsert_from_table( self.connection.execute("COMMIT") - def conform_name(self, name: str, object_type: Optional[str] = None) -> str: - """Conform a stream property name to one suitable for the target system. + def conform_schema(self, schema: dict) -> dict: + """Return schema dictionary with property names conformed. - Transforms names to snake case, applicable to most common DBMSs'. - Developers may override this method to apply custom transformations - to database/schema/table/column names. - """ - # strip non-alphanumeric characters, keeping - . _ and spaces - name = re.sub(r"[^a-zA-Z0-9_\-\.\s]", "", name) - - # convert to snakecase - if name.isupper(): - name = name.lower() - - name = snakecase(name) + Args: + schema: JSON schema dictionary. - # replace leading digit - return replace_leading_digit(name) + Returns: + A schema dictionary with the property names conformed. + """ + conformed_schema = copy(schema) + conformed_property_names = { + key: self.bracket_names(self.conform_name(key)) for key in conformed_schema["properties"].keys() + } + self._check_conformed_names_not_duplicated(conformed_property_names) + conformed_schema["properties"] = { + conformed_property_names[key]: value + for key, value in conformed_schema["properties"].items() + } + self.key_properties = [self.bracket_names(key) for key in self.key_properties] + return conformed_schema + + def bracket_names(self, name: str) -> str: + return f"[{name}]" \ No newline at end of file From b901f7439c82f31d9fd76b482666076fcb9e692e Mon Sep 17 00:00:00 2001 From: arilton Date: Tue, 19 Nov 2024 11:43:05 -0300 Subject: [PATCH 02/18] cannot set key_properties --- target_mssql/sinks.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/target_mssql/sinks.py b/target_mssql/sinks.py index 94fb572..30307b6 100644 --- a/target_mssql/sinks.py +++ b/target_mssql/sinks.py @@ -74,6 +74,8 @@ def check_string_key_properties(self): if self.key_properties: schema = self.conform_schema(self.schema) for prop in self.key_properties: + if self.is_bracketed(prop): + prop = self.unbracket_names(prop) isnumeric = ("string" not in schema['properties'][prop]['type']) and isnumeric return self.key_properties and isnumeric @@ -116,8 +118,6 @@ def bulk_insert_records( insert_record[column.name] = record.get(field) insert_records.append(insert_record) - - if self.check_string_key_properties(): self.connection.execute(f"SET IDENTITY_INSERT { full_table_name } ON") @@ -288,8 +288,13 @@ def conform_schema(self, schema: dict) -> dict: conformed_property_names[key]: value for key, value in conformed_schema["properties"].items() } - self.key_properties = [self.bracket_names(key) for key in self.key_properties] return conformed_schema def bracket_names(self, name: str) -> str: - return f"[{name}]" \ No newline at end of file + return f"[{name}]" + + def unbracket_names(self, name: str) -> str: + return name.replace("[", "").replace("]", "") + + def is_bracketed(self, name: str) -> bool: + return name.startswith("[") and name.endswith("]") \ No newline at end of file From b02c7e993ee1c97f37121f8b32aada8a059d4fa7 Mon Sep 17 00:00:00 2001 From: arilton Date: Tue, 19 Nov 2024 15:33:41 -0300 Subject: [PATCH 03/18] only applying brackets in reserved words in table and column names. remove them from values and parameters --- target_mssql/sinks.py | 52 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 43 insertions(+), 9 deletions(-) diff --git a/target_mssql/sinks.py b/target_mssql/sinks.py index 30307b6..e3c926a 100644 --- a/target_mssql/sinks.py +++ b/target_mssql/sinks.py @@ -2,13 +2,12 @@ from __future__ import annotations -from typing import Any, Dict, Iterable, List, Optional +from typing import Any, Dict, Iterable, List, Optional, Union from copy import copy import sqlalchemy from singer_sdk.sinks import SQLSink from sqlalchemy import Column -import re -from singer_sdk.helpers._conformers import replace_leading_digit, snakecase +from textwrap import dedent from target_mssql.connector import mssqlConnector @@ -74,8 +73,7 @@ def check_string_key_properties(self): if self.key_properties: schema = self.conform_schema(self.schema) for prop in self.key_properties: - if self.is_bracketed(prop): - prop = self.unbracket_names(prop) + prop = self.conform_name(prop) isnumeric = ("string" not in schema['properties'][prop]['type']) and isnumeric return self.key_properties and isnumeric @@ -141,7 +139,7 @@ def column_representation( for property_name, property_jsonschema in conformed_properties.items(): columns.append( Column( - property_name, + self.unbracket_names(property_name), self.connector.to_sql_type(property_jsonschema), ) ) @@ -281,7 +279,7 @@ def conform_schema(self, schema: dict) -> dict: """ conformed_schema = copy(schema) conformed_property_names = { - key: self.bracket_names(self.conform_name(key)) for key in conformed_schema["properties"].keys() + key: self.conform_name(key) for key in conformed_schema["properties"].keys() } self._check_conformed_names_not_duplicated(conformed_property_names) conformed_schema["properties"] = { @@ -294,7 +292,43 @@ def bracket_names(self, name: str) -> str: return f"[{name}]" def unbracket_names(self, name: str) -> str: - return name.replace("[", "").replace("]", "") + if self.is_bracketed(name): + return name.replace("[", "").replace("]", "") + return name def is_bracketed(self, name: str) -> bool: - return name.startswith("[") and name.endswith("]") \ No newline at end of file + return name.startswith("[") and name.endswith("]") + + def is_protected_name(self, name: str) -> bool: + mssql_reserved_keywords = ["ADD","EXTERNAL","PROCEDURE","ALL","FETCH","PUBLIC","ALTER","FILE","RAISERROR","AND","FILLFACTOR","READ","ANY","FOR","READTEXT","AS","FOREIGN","RECONFIGURE","ASC","FREETEXT","REFERENCES","AUTHORIZATION","FREETEXTTABLE","REPLICATION","BACKUP","FROM","RESTORE","BEGIN","FULL","RESTRICT","BETWEEN","FUNCTION","RETURN","BREAK","GOTO","REVERT","BROWSE","GRANT","REVOKE","BULK","GROUP","RIGHT","BY","HAVING","ROLLBACK","CASCADE","HOLDLOCK","ROWCOUNT","CASE","IDENTITY","ROWGUIDCOL","CHECK","IDENTITY_INSERT","RULE","CHECKPOINT","IDENTITYCOL","SAVE","CLOSE","IF","SCHEMA","CLUSTERED","IN","SECURITYAUDIT","COALESCE","INDEX","SELECT","COLLATE","INNER","SEMANTICKEYPHRASETABLE","COLUMN","INSERT","SEMANTICSIMILARITYDETAILSTABLE","COMMIT","INTERSECT","SEMANTICSIMILARITYTABLE","COMPUTE","INTO","SESSION_USER","CONSTRAINT","IS","SET","CONTAINS","JOIN","SETUSER","CONTAINSTABLE","KEY","SHUTDOWN","CONTINUE","KILL","SOME","CONVERT","LEFT","STATISTICS","CREATE","LIKE","SYSTEM_USER","CROSS","LINENO","TABLE","CURRENT","LOAD","TABLESAMPLE","CURRENT_DATE","MERGE","TEXTSIZE","CURRENT_TIME","NATIONAL","THEN","CURRENT_TIMESTAMP","NOCHECK","TO","CURRENT_USER","NONCLUSTERED","TOP","CURSOR","NOT","TRAN","DATABASE","NULL","TRANSACTION","DBCC","NULLIF","TRIGGER","DEALLOCATE","OF","TRUNCATE","DECLARE","OFF","TRY_CONVERT","DEFAULT","OFFSETS","TSEQUAL","DELETE","ON","UNION","DENY","OPEN","UNIQUE","DESC","OPENDATASOURCE","UNPIVOT","DISK","OPENQUERY","UPDATE","DISTINCT","OPENROWSET","UPDATETEXT","DISTRIBUTED","OPENXML","USE","DOUBLE","OPTION","USER","DROP","OR","VALUES","DUMP","ORDER","VARYING","ELSE","OUTER","VIEW","END","OVER","WAITFOR","ERRLVL","PERCENT","WHEN","ESCAPE","PIVOT","WHERE","EXCEPT","PLAN","WHILE","EXEC","PRECISION","WITH","EXECUTE","PRIMARY","WITHIN GROUP","EXISTS","PRINT","WRITETEXT","EXIT","PROC"] + return name.upper() in mssql_reserved_keywords + + def conform_name(self, name: str, object_type: Optional[str] = None) -> str: + name = super().conform_name(name, object_type) + if self.is_protected_name(name): + return self.bracket_names(name) + return name + + def generate_insert_statement( + self, + full_table_name: str, + schema: dict, + ): + """Generate an insert statement for the given records. + + Args: + full_table_name: the target table name. + schema: the JSON schema for the new table. + + Returns: + An insert statement. + """ + property_names = list(self.conform_schema(schema)["properties"].keys()) + statement = dedent( + f"""\ + INSERT INTO {full_table_name} + ({", ".join(property_names)}) + VALUES ({", ".join([f":{self.unbracket_names(name)}" for name in property_names])}) + """ + ) + return statement.rstrip() \ No newline at end of file From 83799f29819ea17508a01e0c25879e400a9af54f Mon Sep 17 00:00:00 2001 From: arilton Date: Tue, 19 Nov 2024 17:59:22 -0300 Subject: [PATCH 04/18] only bracket names in insert/merge statements --- target_mssql/sinks.py | 36 +++++++++++++++++++++++++++--------- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/target_mssql/sinks.py b/target_mssql/sinks.py index e3c926a..7006f35 100644 --- a/target_mssql/sinks.py +++ b/target_mssql/sinks.py @@ -8,6 +8,8 @@ from singer_sdk.sinks import SQLSink from sqlalchemy import Column from textwrap import dedent +import re +from singer_sdk.helpers._conformers import replace_leading_digit, snakecase from target_mssql.connector import mssqlConnector @@ -73,7 +75,7 @@ def check_string_key_properties(self): if self.key_properties: schema = self.conform_schema(self.schema) for prop in self.key_properties: - prop = self.conform_name(prop) + # prop = self.conform_name(prop) isnumeric = ("string" not in schema['properties'][prop]['type']) and isnumeric return self.key_properties and isnumeric @@ -139,7 +141,7 @@ def column_representation( for property_name, property_jsonschema in conformed_properties.items(): columns.append( Column( - self.unbracket_names(property_name), + property_name, self.connector.to_sql_type(property_jsonschema), ) ) @@ -233,12 +235,12 @@ def merge_upsert_from_table( schema = self.conform_schema(schema) join_condition = " and ".join( - [f"temp.{key} = target.{key}" for key in join_keys] + [f"temp.[{key}] = target.[{key}]" for key in join_keys] ) update_stmt = ", ".join( [ - f"target.{key} = temp.{key}" + f"target.[{key}] = temp.[{key}]" for key in schema["properties"].keys() if key not in join_keys ] @@ -252,8 +254,8 @@ def merge_upsert_from_table( UPDATE SET { update_stmt } WHEN NOT MATCHED THEN - INSERT ({", ".join(schema["properties"].keys())}) - VALUES ({", ".join([f"temp.{key}" for key in schema["properties"].keys()])}); + INSERT ({", ".join([f"[{key}]" for key in schema["properties"].keys()])}) + VALUES ({", ".join([f"temp.[{key}]" for key in schema["properties"].keys()])}); """ @@ -268,7 +270,7 @@ def merge_upsert_from_table( self.connection.execute("COMMIT") - def conform_schema(self, schema: dict) -> dict: + def conform_schema_new(self, schema: dict) -> dict: """Return schema dictionary with property names conformed. Args: @@ -279,7 +281,7 @@ def conform_schema(self, schema: dict) -> dict: """ conformed_schema = copy(schema) conformed_property_names = { - key: self.conform_name(key) for key in conformed_schema["properties"].keys() + key: self.conform_name_new(key) for key in conformed_schema["properties"].keys() } self._check_conformed_names_not_duplicated(conformed_property_names) conformed_schema["properties"] = { @@ -304,6 +306,22 @@ def is_protected_name(self, name: str) -> bool: return name.upper() in mssql_reserved_keywords def conform_name(self, name: str, object_type: Optional[str] = None) -> str: + """Conform a stream property name to one suitable for the target system. + Transforms names to snake case, applicable to most common DBMSs'. + Developers may override this method to apply custom transformations + to database/schema/table/column names. + """ + # strip non-alphanumeric characters, keeping - . _ and spaces + name = re.sub(r"[^a-zA-Z0-9_\-\.\s]", "", name) + # convert to snakecase + if name.isupper(): + name = name.lower() + + name = snakecase(name) + # replace leading digit + return replace_leading_digit(name) + + def conform_name_new(self, name: str, object_type: Optional[str] = None) -> str: name = super().conform_name(name, object_type) if self.is_protected_name(name): return self.bracket_names(name) @@ -323,7 +341,7 @@ def generate_insert_statement( Returns: An insert statement. """ - property_names = list(self.conform_schema(schema)["properties"].keys()) + property_names = list(self.conform_schema_new(schema)["properties"].keys()) statement = dedent( f"""\ INSERT INTO {full_table_name} From 10cb38825ea8881d12ae7b46e940da0289ee23f8 Mon Sep 17 00:00:00 2001 From: Hassan Syyid Date: Tue, 19 Nov 2024 16:05:40 -0500 Subject: [PATCH 05/18] force drop table for testing --- target_mssql/connector.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/target_mssql/connector.py b/target_mssql/connector.py index 5ab9bc7..6075eab 100644 --- a/target_mssql/connector.py +++ b/target_mssql/connector.py @@ -42,6 +42,13 @@ def create_table_with_records( if primary_keys is None: primary_keys = self.key_properties partition_keys = partition_keys or None + + + # NOTE: Force create the table + # TODO: remove this + self.logger.info("Force dropping the table!") + self.connection.execute(f"DROP TABLE IF EXISTS #{full_table_name};") + self.connector.prepare_table( full_table_name=full_table_name, primary_keys=primary_keys, From a41f9c4f708009660e37ab04d1fa2bc4ca3da2cb Mon Sep 17 00:00:00 2001 From: Hassan Syyid Date: Tue, 19 Nov 2024 16:13:37 -0500 Subject: [PATCH 06/18] actually force drop the table --- target_mssql/connector.py | 6 ------ target_mssql/sinks.py | 5 +++++ 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/target_mssql/connector.py b/target_mssql/connector.py index 6075eab..45f083a 100644 --- a/target_mssql/connector.py +++ b/target_mssql/connector.py @@ -43,12 +43,6 @@ def create_table_with_records( primary_keys = self.key_properties partition_keys = partition_keys or None - - # NOTE: Force create the table - # TODO: remove this - self.logger.info("Force dropping the table!") - self.connection.execute(f"DROP TABLE IF EXISTS #{full_table_name};") - self.connector.prepare_table( full_table_name=full_table_name, primary_keys=primary_keys, diff --git a/target_mssql/sinks.py b/target_mssql/sinks.py index 7006f35..23c175e 100644 --- a/target_mssql/sinks.py +++ b/target_mssql/sinks.py @@ -178,6 +178,11 @@ def process_batch(self, context: dict) -> None: if self.key_properties: self.logger.info(f"Preparing table {self.full_table_name}") + # NOTE: Force create the table + # TODO: remove this + self.logger.info("Force dropping the table!") + self.connector.connection.execute(f"DROP TABLE IF EXISTS #{self.full_table_name};") + self.connector.prepare_table( full_table_name=self.full_table_name, schema=self.conform_schema(self.schema), From 90f7103fb8f14141ed3928dbd837c606f9203bd7 Mon Sep 17 00:00:00 2001 From: arilton Date: Tue, 19 Nov 2024 18:27:40 -0300 Subject: [PATCH 07/18] clean the code and improve against sql injection --- target_mssql/sinks.py | 112 +++++++++++------------------------------- 1 file changed, 30 insertions(+), 82 deletions(-) diff --git a/target_mssql/sinks.py b/target_mssql/sinks.py index 23c175e..526baea 100644 --- a/target_mssql/sinks.py +++ b/target_mssql/sinks.py @@ -2,11 +2,10 @@ from __future__ import annotations -from typing import Any, Dict, Iterable, List, Optional, Union -from copy import copy +from typing import Any, Dict, Iterable, List, Optional import sqlalchemy from singer_sdk.sinks import SQLSink -from sqlalchemy import Column +from sqlalchemy import Column, text from textwrap import dedent import re from singer_sdk.helpers._conformers import replace_leading_digit, snakecase @@ -75,7 +74,6 @@ def check_string_key_properties(self): if self.key_properties: schema = self.conform_schema(self.schema) for prop in self.key_properties: - # prop = self.conform_name(prop) isnumeric = ("string" not in schema['properties'][prop]['type']) and isnumeric return self.key_properties and isnumeric @@ -117,14 +115,19 @@ def bulk_insert_records( for column, field in zip(columns, self.schema["properties"].keys()): insert_record[column.name] = record.get(field) insert_records.append(insert_record) - if self.check_string_key_properties(): - self.connection.execute(f"SET IDENTITY_INSERT { full_table_name } ON") + self.connection.execute( + sqlalchemy.text("SET IDENTITY_INSERT :table_name ON"), + {"table_name": full_table_name} + ) self.connection.execute(insert_sql, insert_records) if self.check_string_key_properties(): - self.connection.execute(f"SET IDENTITY_INSERT { full_table_name } OFF") + self.connection.execute( + sqlalchemy.text("SET IDENTITY_INSERT :table_name OFF"), + {"table_name": full_table_name} + ) if isinstance(records, list): return len(records) # If list, we can quickly return record count. @@ -146,26 +149,6 @@ def column_representation( ) ) return columns - - # def conform_schema(self, schema: dict) -> dict: - # """Return schema dictionary with property names conformed. - - # Args: - # schema: JSON schema dictionary. - - # Returns: - # A schema dictionary with the property names conformed. - # """ - # conformed_schema = copy(schema) - # conformed_property_names = { - # key: self.conform_name(key) for key in conformed_schema["properties"].keys() - # } - # self._check_conformed_names_not_duplicated(conformed_property_names) - # conformed_schema["properties"] = { - # conformed_property_names[key]: value - # for key, value in conformed_schema["properties"].items() - # } - # return conformed_schema def process_batch(self, context: dict) -> None: """Process a batch with the given batch context. @@ -251,64 +234,35 @@ def merge_upsert_from_table( ] ) # noqa - merge_sql = f""" - MERGE INTO {to_table_name} AS target - USING {from_table_name} AS temp - ON {join_condition} + merge_sql = text(""" + MERGE INTO :to_table AS target + USING :from_table AS temp + ON :join_condition WHEN MATCHED THEN - UPDATE SET - { update_stmt } + UPDATE SET :update_stmt WHEN NOT MATCHED THEN - INSERT ({", ".join([f"[{key}]" for key in schema["properties"].keys()])}) - VALUES ({", ".join([f"temp.[{key}]" for key in schema["properties"].keys()])}); - """ - - + INSERT (:columns) + VALUES (:values); + """) + + params = { + "to_table": to_table_name, + "from_table": from_table_name, + "join_condition": join_condition, + "update_stmt": update_stmt, + "columns": ", ".join([f"[{key}]" for key in schema["properties"].keys()]), + "values": ", ".join([f"temp.[{key}]" for key in schema["properties"].keys()]) + } if self.check_string_key_properties(): self.connection.execute(f"SET IDENTITY_INSERT { to_table_name } ON") - self.connection.execute(merge_sql) + self.connection.execute(merge_sql, params) if self.check_string_key_properties(): self.connection.execute(f"SET IDENTITY_INSERT { to_table_name } OFF") self.connection.execute("COMMIT") - - def conform_schema_new(self, schema: dict) -> dict: - """Return schema dictionary with property names conformed. - - Args: - schema: JSON schema dictionary. - - Returns: - A schema dictionary with the property names conformed. - """ - conformed_schema = copy(schema) - conformed_property_names = { - key: self.conform_name_new(key) for key in conformed_schema["properties"].keys() - } - self._check_conformed_names_not_duplicated(conformed_property_names) - conformed_schema["properties"] = { - conformed_property_names[key]: value - for key, value in conformed_schema["properties"].items() - } - return conformed_schema - - def bracket_names(self, name: str) -> str: - return f"[{name}]" - - def unbracket_names(self, name: str) -> str: - if self.is_bracketed(name): - return name.replace("[", "").replace("]", "") - return name - - def is_bracketed(self, name: str) -> bool: - return name.startswith("[") and name.endswith("]") - - def is_protected_name(self, name: str) -> bool: - mssql_reserved_keywords = ["ADD","EXTERNAL","PROCEDURE","ALL","FETCH","PUBLIC","ALTER","FILE","RAISERROR","AND","FILLFACTOR","READ","ANY","FOR","READTEXT","AS","FOREIGN","RECONFIGURE","ASC","FREETEXT","REFERENCES","AUTHORIZATION","FREETEXTTABLE","REPLICATION","BACKUP","FROM","RESTORE","BEGIN","FULL","RESTRICT","BETWEEN","FUNCTION","RETURN","BREAK","GOTO","REVERT","BROWSE","GRANT","REVOKE","BULK","GROUP","RIGHT","BY","HAVING","ROLLBACK","CASCADE","HOLDLOCK","ROWCOUNT","CASE","IDENTITY","ROWGUIDCOL","CHECK","IDENTITY_INSERT","RULE","CHECKPOINT","IDENTITYCOL","SAVE","CLOSE","IF","SCHEMA","CLUSTERED","IN","SECURITYAUDIT","COALESCE","INDEX","SELECT","COLLATE","INNER","SEMANTICKEYPHRASETABLE","COLUMN","INSERT","SEMANTICSIMILARITYDETAILSTABLE","COMMIT","INTERSECT","SEMANTICSIMILARITYTABLE","COMPUTE","INTO","SESSION_USER","CONSTRAINT","IS","SET","CONTAINS","JOIN","SETUSER","CONTAINSTABLE","KEY","SHUTDOWN","CONTINUE","KILL","SOME","CONVERT","LEFT","STATISTICS","CREATE","LIKE","SYSTEM_USER","CROSS","LINENO","TABLE","CURRENT","LOAD","TABLESAMPLE","CURRENT_DATE","MERGE","TEXTSIZE","CURRENT_TIME","NATIONAL","THEN","CURRENT_TIMESTAMP","NOCHECK","TO","CURRENT_USER","NONCLUSTERED","TOP","CURSOR","NOT","TRAN","DATABASE","NULL","TRANSACTION","DBCC","NULLIF","TRIGGER","DEALLOCATE","OF","TRUNCATE","DECLARE","OFF","TRY_CONVERT","DEFAULT","OFFSETS","TSEQUAL","DELETE","ON","UNION","DENY","OPEN","UNIQUE","DESC","OPENDATASOURCE","UNPIVOT","DISK","OPENQUERY","UPDATE","DISTINCT","OPENROWSET","UPDATETEXT","DISTRIBUTED","OPENXML","USE","DOUBLE","OPTION","USER","DROP","OR","VALUES","DUMP","ORDER","VARYING","ELSE","OUTER","VIEW","END","OVER","WAITFOR","ERRLVL","PERCENT","WHEN","ESCAPE","PIVOT","WHERE","EXCEPT","PLAN","WHILE","EXEC","PRECISION","WITH","EXECUTE","PRIMARY","WITHIN GROUP","EXISTS","PRINT","WRITETEXT","EXIT","PROC"] - return name.upper() in mssql_reserved_keywords def conform_name(self, name: str, object_type: Optional[str] = None) -> str: """Conform a stream property name to one suitable for the target system. @@ -325,12 +279,6 @@ def conform_name(self, name: str, object_type: Optional[str] = None) -> str: name = snakecase(name) # replace leading digit return replace_leading_digit(name) - - def conform_name_new(self, name: str, object_type: Optional[str] = None) -> str: - name = super().conform_name(name, object_type) - if self.is_protected_name(name): - return self.bracket_names(name) - return name def generate_insert_statement( self, @@ -346,11 +294,11 @@ def generate_insert_statement( Returns: An insert statement. """ - property_names = list(self.conform_schema_new(schema)["properties"].keys()) + property_names = list(self.conform_schema(schema)["properties"].keys()) statement = dedent( f"""\ INSERT INTO {full_table_name} - ({", ".join(property_names)}) + ({", ".join([f"[{name}]" for name in property_names])}) VALUES ({", ".join([f":{self.unbracket_names(name)}" for name in property_names])}) """ ) From 09af6188cc4f0f7424a320ff26319ef53538b015 Mon Sep 17 00:00:00 2001 From: Hassan Syyid Date: Tue, 19 Nov 2024 16:41:57 -0500 Subject: [PATCH 08/18] fix drop logic --- target_mssql/sinks.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/target_mssql/sinks.py b/target_mssql/sinks.py index 526baea..5b3dda4 100644 --- a/target_mssql/sinks.py +++ b/target_mssql/sinks.py @@ -16,6 +16,7 @@ class mssqlSink(SQLSink): """mssql target sink class.""" connector_class = mssqlConnector + dropped_tables = dict() # Copied purely to help with type hints @property @@ -161,10 +162,13 @@ def process_batch(self, context: dict) -> None: if self.key_properties: self.logger.info(f"Preparing table {self.full_table_name}") + # NOTE: Force create the table # TODO: remove this - self.logger.info("Force dropping the table!") - self.connector.connection.execute(f"DROP TABLE IF EXISTS #{self.full_table_name};") + if not self.dropped_tables.get(self.stream_name, False): + self.logger.info("Force dropping the table!") + self.connector.connection.execute(f"DROP TABLE IF EXISTS {self.full_table_name};") + self.dropped_tables[self.stream_name] = True self.connector.prepare_table( full_table_name=self.full_table_name, From 6a9b7610344013d30c53d95548552aa31757cf9b Mon Sep 17 00:00:00 2001 From: Hassan Syyid Date: Tue, 19 Nov 2024 16:43:09 -0500 Subject: [PATCH 09/18] Revert "clean the code and improve against sql injection" This reverts commit 90f7103fb8f14141ed3928dbd837c606f9203bd7. --- target_mssql/sinks.py | 112 +++++++++++++++++++++++++++++++----------- 1 file changed, 82 insertions(+), 30 deletions(-) diff --git a/target_mssql/sinks.py b/target_mssql/sinks.py index 5b3dda4..4043bd6 100644 --- a/target_mssql/sinks.py +++ b/target_mssql/sinks.py @@ -2,10 +2,11 @@ from __future__ import annotations -from typing import Any, Dict, Iterable, List, Optional +from typing import Any, Dict, Iterable, List, Optional, Union +from copy import copy import sqlalchemy from singer_sdk.sinks import SQLSink -from sqlalchemy import Column, text +from sqlalchemy import Column from textwrap import dedent import re from singer_sdk.helpers._conformers import replace_leading_digit, snakecase @@ -75,6 +76,7 @@ def check_string_key_properties(self): if self.key_properties: schema = self.conform_schema(self.schema) for prop in self.key_properties: + # prop = self.conform_name(prop) isnumeric = ("string" not in schema['properties'][prop]['type']) and isnumeric return self.key_properties and isnumeric @@ -116,19 +118,14 @@ def bulk_insert_records( for column, field in zip(columns, self.schema["properties"].keys()): insert_record[column.name] = record.get(field) insert_records.append(insert_record) + if self.check_string_key_properties(): - self.connection.execute( - sqlalchemy.text("SET IDENTITY_INSERT :table_name ON"), - {"table_name": full_table_name} - ) + self.connection.execute(f"SET IDENTITY_INSERT { full_table_name } ON") self.connection.execute(insert_sql, insert_records) if self.check_string_key_properties(): - self.connection.execute( - sqlalchemy.text("SET IDENTITY_INSERT :table_name OFF"), - {"table_name": full_table_name} - ) + self.connection.execute(f"SET IDENTITY_INSERT { full_table_name } OFF") if isinstance(records, list): return len(records) # If list, we can quickly return record count. @@ -150,6 +147,26 @@ def column_representation( ) ) return columns + + # def conform_schema(self, schema: dict) -> dict: + # """Return schema dictionary with property names conformed. + + # Args: + # schema: JSON schema dictionary. + + # Returns: + # A schema dictionary with the property names conformed. + # """ + # conformed_schema = copy(schema) + # conformed_property_names = { + # key: self.conform_name(key) for key in conformed_schema["properties"].keys() + # } + # self._check_conformed_names_not_duplicated(conformed_property_names) + # conformed_schema["properties"] = { + # conformed_property_names[key]: value + # for key, value in conformed_schema["properties"].items() + # } + # return conformed_schema def process_batch(self, context: dict) -> None: """Process a batch with the given batch context. @@ -238,35 +255,64 @@ def merge_upsert_from_table( ] ) # noqa - merge_sql = text(""" - MERGE INTO :to_table AS target - USING :from_table AS temp - ON :join_condition + merge_sql = f""" + MERGE INTO {to_table_name} AS target + USING {from_table_name} AS temp + ON {join_condition} WHEN MATCHED THEN - UPDATE SET :update_stmt + UPDATE SET + { update_stmt } WHEN NOT MATCHED THEN - INSERT (:columns) - VALUES (:values); - """) - - params = { - "to_table": to_table_name, - "from_table": from_table_name, - "join_condition": join_condition, - "update_stmt": update_stmt, - "columns": ", ".join([f"[{key}]" for key in schema["properties"].keys()]), - "values": ", ".join([f"temp.[{key}]" for key in schema["properties"].keys()]) - } + INSERT ({", ".join([f"[{key}]" for key in schema["properties"].keys()])}) + VALUES ({", ".join([f"temp.[{key}]" for key in schema["properties"].keys()])}); + """ + + if self.check_string_key_properties(): self.connection.execute(f"SET IDENTITY_INSERT { to_table_name } ON") - self.connection.execute(merge_sql, params) + self.connection.execute(merge_sql) if self.check_string_key_properties(): self.connection.execute(f"SET IDENTITY_INSERT { to_table_name } OFF") self.connection.execute("COMMIT") + + def conform_schema_new(self, schema: dict) -> dict: + """Return schema dictionary with property names conformed. + + Args: + schema: JSON schema dictionary. + + Returns: + A schema dictionary with the property names conformed. + """ + conformed_schema = copy(schema) + conformed_property_names = { + key: self.conform_name_new(key) for key in conformed_schema["properties"].keys() + } + self._check_conformed_names_not_duplicated(conformed_property_names) + conformed_schema["properties"] = { + conformed_property_names[key]: value + for key, value in conformed_schema["properties"].items() + } + return conformed_schema + + def bracket_names(self, name: str) -> str: + return f"[{name}]" + + def unbracket_names(self, name: str) -> str: + if self.is_bracketed(name): + return name.replace("[", "").replace("]", "") + return name + + def is_bracketed(self, name: str) -> bool: + return name.startswith("[") and name.endswith("]") + + def is_protected_name(self, name: str) -> bool: + mssql_reserved_keywords = ["ADD","EXTERNAL","PROCEDURE","ALL","FETCH","PUBLIC","ALTER","FILE","RAISERROR","AND","FILLFACTOR","READ","ANY","FOR","READTEXT","AS","FOREIGN","RECONFIGURE","ASC","FREETEXT","REFERENCES","AUTHORIZATION","FREETEXTTABLE","REPLICATION","BACKUP","FROM","RESTORE","BEGIN","FULL","RESTRICT","BETWEEN","FUNCTION","RETURN","BREAK","GOTO","REVERT","BROWSE","GRANT","REVOKE","BULK","GROUP","RIGHT","BY","HAVING","ROLLBACK","CASCADE","HOLDLOCK","ROWCOUNT","CASE","IDENTITY","ROWGUIDCOL","CHECK","IDENTITY_INSERT","RULE","CHECKPOINT","IDENTITYCOL","SAVE","CLOSE","IF","SCHEMA","CLUSTERED","IN","SECURITYAUDIT","COALESCE","INDEX","SELECT","COLLATE","INNER","SEMANTICKEYPHRASETABLE","COLUMN","INSERT","SEMANTICSIMILARITYDETAILSTABLE","COMMIT","INTERSECT","SEMANTICSIMILARITYTABLE","COMPUTE","INTO","SESSION_USER","CONSTRAINT","IS","SET","CONTAINS","JOIN","SETUSER","CONTAINSTABLE","KEY","SHUTDOWN","CONTINUE","KILL","SOME","CONVERT","LEFT","STATISTICS","CREATE","LIKE","SYSTEM_USER","CROSS","LINENO","TABLE","CURRENT","LOAD","TABLESAMPLE","CURRENT_DATE","MERGE","TEXTSIZE","CURRENT_TIME","NATIONAL","THEN","CURRENT_TIMESTAMP","NOCHECK","TO","CURRENT_USER","NONCLUSTERED","TOP","CURSOR","NOT","TRAN","DATABASE","NULL","TRANSACTION","DBCC","NULLIF","TRIGGER","DEALLOCATE","OF","TRUNCATE","DECLARE","OFF","TRY_CONVERT","DEFAULT","OFFSETS","TSEQUAL","DELETE","ON","UNION","DENY","OPEN","UNIQUE","DESC","OPENDATASOURCE","UNPIVOT","DISK","OPENQUERY","UPDATE","DISTINCT","OPENROWSET","UPDATETEXT","DISTRIBUTED","OPENXML","USE","DOUBLE","OPTION","USER","DROP","OR","VALUES","DUMP","ORDER","VARYING","ELSE","OUTER","VIEW","END","OVER","WAITFOR","ERRLVL","PERCENT","WHEN","ESCAPE","PIVOT","WHERE","EXCEPT","PLAN","WHILE","EXEC","PRECISION","WITH","EXECUTE","PRIMARY","WITHIN GROUP","EXISTS","PRINT","WRITETEXT","EXIT","PROC"] + return name.upper() in mssql_reserved_keywords def conform_name(self, name: str, object_type: Optional[str] = None) -> str: """Conform a stream property name to one suitable for the target system. @@ -283,6 +329,12 @@ def conform_name(self, name: str, object_type: Optional[str] = None) -> str: name = snakecase(name) # replace leading digit return replace_leading_digit(name) + + def conform_name_new(self, name: str, object_type: Optional[str] = None) -> str: + name = super().conform_name(name, object_type) + if self.is_protected_name(name): + return self.bracket_names(name) + return name def generate_insert_statement( self, @@ -298,11 +350,11 @@ def generate_insert_statement( Returns: An insert statement. """ - property_names = list(self.conform_schema(schema)["properties"].keys()) + property_names = list(self.conform_schema_new(schema)["properties"].keys()) statement = dedent( f"""\ INSERT INTO {full_table_name} - ({", ".join([f"[{name}]" for name in property_names])}) + ({", ".join(property_names)}) VALUES ({", ".join([f":{self.unbracket_names(name)}" for name in property_names])}) """ ) From 9540822081f4b3c103fd6b23272c54f404abb8ce Mon Sep 17 00:00:00 2001 From: Hassan Syyid Date: Tue, 19 Nov 2024 17:42:13 -0500 Subject: [PATCH 10/18] turn off force drop logic --- target_mssql/sinks.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/target_mssql/sinks.py b/target_mssql/sinks.py index 4043bd6..d9b438a 100644 --- a/target_mssql/sinks.py +++ b/target_mssql/sinks.py @@ -182,10 +182,10 @@ def process_batch(self, context: dict) -> None: # NOTE: Force create the table # TODO: remove this - if not self.dropped_tables.get(self.stream_name, False): - self.logger.info("Force dropping the table!") - self.connector.connection.execute(f"DROP TABLE IF EXISTS {self.full_table_name};") - self.dropped_tables[self.stream_name] = True + # if not self.dropped_tables.get(self.stream_name, False): + # self.logger.info("Force dropping the table!") + # self.connector.connection.execute(f"DROP TABLE IF EXISTS {self.full_table_name};") + # self.dropped_tables[self.stream_name] = True self.connector.prepare_table( full_table_name=self.full_table_name, From c886bc828fd4fc3964b49db9a7a780817a8cbc9d Mon Sep 17 00:00:00 2001 From: arilton Date: Wed, 20 Nov 2024 16:12:43 -0300 Subject: [PATCH 11/18] alter varchar columns to max size before insert data --- target_mssql/sinks.py | 39 ++++++++++++++------------------------- 1 file changed, 14 insertions(+), 25 deletions(-) diff --git a/target_mssql/sinks.py b/target_mssql/sinks.py index d9b438a..f0e1925 100644 --- a/target_mssql/sinks.py +++ b/target_mssql/sinks.py @@ -147,26 +147,6 @@ def column_representation( ) ) return columns - - # def conform_schema(self, schema: dict) -> dict: - # """Return schema dictionary with property names conformed. - - # Args: - # schema: JSON schema dictionary. - - # Returns: - # A schema dictionary with the property names conformed. - # """ - # conformed_schema = copy(schema) - # conformed_property_names = { - # key: self.conform_name(key) for key in conformed_schema["properties"].keys() - # } - # self._check_conformed_names_not_duplicated(conformed_property_names) - # conformed_schema["properties"] = { - # conformed_property_names[key]: value - # for key, value in conformed_schema["properties"].items() - # } - # return conformed_schema def process_batch(self, context: dict) -> None: """Process a batch with the given batch context. @@ -187,12 +167,14 @@ def process_batch(self, context: dict) -> None: # self.connector.connection.execute(f"DROP TABLE IF EXISTS {self.full_table_name};") # self.dropped_tables[self.stream_name] = True + conformed_schema = self.conform_schema(self.schema) self.connector.prepare_table( full_table_name=self.full_table_name, - schema=self.conform_schema(self.schema), + schema=conformed_schema, primary_keys=self.key_properties, as_temp_table=False, ) + self.alter_varchar_columns(self.full_table_name, conformed_schema) # Create a temp table (Creates from the table above) self.logger.info(f"Creating temp table {self.full_table_name}") self.connector.create_temp_table_from_table( @@ -202,7 +184,7 @@ def process_batch(self, context: dict) -> None: self.logger.info("Inserting into temp table") self.bulk_insert_records( full_table_name=f"#{self.full_table_name}", - schema=self.conform_schema(self.schema), + schema=conformed_schema, records=context["records"], ) # Merge data from Temp table to main table @@ -210,14 +192,14 @@ def process_batch(self, context: dict) -> None: self.merge_upsert_from_table( from_table_name=f"#{self.full_table_name}", to_table_name=f"{self.full_table_name}", - schema=self.conform_schema(self.schema), + schema=conformed_schema, join_keys=self.key_properties, ) else: self.bulk_insert_records( full_table_name=self.full_table_name, - schema=self.conform_schema(self.schema), + schema=conformed_schema, records=context["records"], ) @@ -358,4 +340,11 @@ def generate_insert_statement( VALUES ({", ".join([f":{self.unbracket_names(name)}" for name in property_names])}) """ ) - return statement.rstrip() \ No newline at end of file + return statement.rstrip() + + def alter_varchar_columns(self, full_table_name: str, schema: dict): + for key, value in schema["properties"].items(): + if key in self.key_properties: + continue + if value.get("type") == "string" or set(value.get("type")) == {"string", "null"}: + self.connection.execute(f"ALTER TABLE {full_table_name} ALTER COLUMN {key} VARCHAR(MAX);") From 30260f7e69b8c3451d1b4ceff5feda41e194e9f7 Mon Sep 17 00:00:00 2001 From: arilton Date: Wed, 20 Nov 2024 19:51:23 -0300 Subject: [PATCH 12/18] add BEGIN TRANSACTION --- target_mssql/sinks.py | 1 + 1 file changed, 1 insertion(+) diff --git a/target_mssql/sinks.py b/target_mssql/sinks.py index f0e1925..7da8957 100644 --- a/target_mssql/sinks.py +++ b/target_mssql/sinks.py @@ -250,6 +250,7 @@ def merge_upsert_from_table( """ + self.connection.execute("BEGIN TRANSACTION") if self.check_string_key_properties(): self.connection.execute(f"SET IDENTITY_INSERT { to_table_name } ON") From bf9bb8b6e90d572506b349a4ddfb85f16d47099b Mon Sep 17 00:00:00 2001 From: arilton Date: Thu, 21 Nov 2024 16:10:52 -0300 Subject: [PATCH 13/18] execute the merge inside a transaction --- target_mssql/sinks.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/target_mssql/sinks.py b/target_mssql/sinks.py index 7da8957..813e13d 100644 --- a/target_mssql/sinks.py +++ b/target_mssql/sinks.py @@ -180,6 +180,7 @@ def process_batch(self, context: dict) -> None: self.connector.create_temp_table_from_table( from_table_name=self.full_table_name ) + # Insert into temp table self.logger.info("Inserting into temp table") self.bulk_insert_records( @@ -249,18 +250,17 @@ def merge_upsert_from_table( VALUES ({", ".join([f"temp.[{key}]" for key in schema["properties"].keys()])}); """ - - self.connection.execute("BEGIN TRANSACTION") - - if self.check_string_key_properties(): - self.connection.execute(f"SET IDENTITY_INSERT { to_table_name } ON") - - self.connection.execute(merge_sql) + def do_merge(conn, merge_sql, is_check_string_key_properties): + if is_check_string_key_properties: + conn.execute(f"SET IDENTITY_INSERT { to_table_name } ON") + + conn.execute(merge_sql) - if self.check_string_key_properties(): - self.connection.execute(f"SET IDENTITY_INSERT { to_table_name } OFF") + if is_check_string_key_properties: + conn.execute(f"SET IDENTITY_INSERT { to_table_name } OFF") - self.connection.execute("COMMIT") + is_check_string_key_properties = self.check_string_key_properties() + self.connection.transaction(do_merge, merge_sql, is_check_string_key_properties) def conform_schema_new(self, schema: dict) -> dict: """Return schema dictionary with property names conformed. From 068e3cab132e0a78dda167de56724acd5842bbec Mon Sep 17 00:00:00 2001 From: arilton Date: Fri, 22 Nov 2024 12:18:18 -0300 Subject: [PATCH 14/18] check if current_type.length is None at merge_sql_types --- target_mssql/connector.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/target_mssql/connector.py b/target_mssql/connector.py index 45f083a..090cf78 100644 --- a/target_mssql/connector.py +++ b/target_mssql/connector.py @@ -177,6 +177,7 @@ def merge_sql_types( # noqa if ( (opt_len is None) or (opt_len == 0) + or (current_type.length is None) or (opt_len >= current_type.length) ): return opt @@ -188,6 +189,7 @@ def merge_sql_types( # noqa if ( (opt_len is None) or (opt_len == 0) + or (current_type.length is None) or (opt_len >= current_type.length) ): return opt From 49b01371940f94a087bd622d0088d7e3a39a6364 Mon Sep 17 00:00:00 2001 From: Hassan Syyid Date: Fri, 22 Nov 2024 15:00:14 -0500 Subject: [PATCH 15/18] Revert "turn off force drop logic" This reverts commit 9540822081f4b3c103fd6b23272c54f404abb8ce. --- target_mssql/sinks.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/target_mssql/sinks.py b/target_mssql/sinks.py index 813e13d..ade41a7 100644 --- a/target_mssql/sinks.py +++ b/target_mssql/sinks.py @@ -162,10 +162,10 @@ def process_batch(self, context: dict) -> None: # NOTE: Force create the table # TODO: remove this - # if not self.dropped_tables.get(self.stream_name, False): - # self.logger.info("Force dropping the table!") - # self.connector.connection.execute(f"DROP TABLE IF EXISTS {self.full_table_name};") - # self.dropped_tables[self.stream_name] = True + if not self.dropped_tables.get(self.stream_name, False): + self.logger.info("Force dropping the table!") + self.connector.connection.execute(f"DROP TABLE IF EXISTS {self.full_table_name};") + self.dropped_tables[self.stream_name] = True conformed_schema = self.conform_schema(self.schema) self.connector.prepare_table( From 8293ac5921ef7541c29ece1104e1e55505c96213 Mon Sep 17 00:00:00 2001 From: Hassan Syyid Date: Fri, 22 Nov 2024 15:54:06 -0500 Subject: [PATCH 16/18] change drop table --- target_mssql/connector.py | 40 ++++++++++++++++++++++++++++++++++ target_mssql/sinks.py | 46 +++++++++++++++++++++++++++++++++------ 2 files changed, 79 insertions(+), 7 deletions(-) diff --git a/target_mssql/connector.py b/target_mssql/connector.py index 090cf78..35b806d 100644 --- a/target_mssql/connector.py +++ b/target_mssql/connector.py @@ -19,6 +19,46 @@ class mssqlConnector(SQLConnector): allow_column_alter: bool = True # Whether altering column types is supported. allow_merge_upsert: bool = True # Whether MERGE UPSERT is supported. allow_temp_tables: bool = True # Whether temp tables are supported. + dropped_tables = dict() + + def prepare_table( + self, + full_table_name: str, + schema: dict, + primary_keys: list[str], + partition_keys: list[str] | None = None, + as_temp_table: bool = False, + ) -> None: + """Adapt target table to provided schema if possible. + + Args: + full_table_name: the target table name. + schema: the JSON Schema for the table. + primary_keys: list of key properties. + partition_keys: list of partition keys. + as_temp_table: True to create a temp table. + """ + # NOTE: Force create the table + # TODO: remove this + if not self.dropped_tables.get(self.stream_name, False): + self.logger.info("Force dropping the table!") + self.connection.execute(f"DROP TABLE IF EXISTS {self.full_table_name};") + self.dropped_tables[self.stream_name] = True + + if not self.table_exists(full_table_name=full_table_name): + self.create_empty_table( + full_table_name=full_table_name, + schema=schema, + primary_keys=primary_keys, + partition_keys=partition_keys, + as_temp_table=as_temp_table, + ) + return + + for property_name, property_def in schema["properties"].items(): + self.prepare_column( + full_table_name, property_name, self.to_sql_type(property_def) + ) def create_table_with_records( self, diff --git a/target_mssql/sinks.py b/target_mssql/sinks.py index ade41a7..103cbea 100644 --- a/target_mssql/sinks.py +++ b/target_mssql/sinks.py @@ -148,6 +148,45 @@ def column_representation( ) return columns + def prepare_table( + self, + full_table_name: str, + schema: dict, + primary_keys: list[str], + partition_keys: list[str] | None = None, + as_temp_table: bool = False, + ) -> None: + """Adapt target table to provided schema if possible. + + Args: + full_table_name: the target table name. + schema: the JSON Schema for the table. + primary_keys: list of key properties. + partition_keys: list of partition keys. + as_temp_table: True to create a temp table. + """ + # NOTE: Force create the table + # TODO: remove this + if not self.dropped_tables.get(self.stream_name, False): + self.logger.info("Force dropping the table!") + self.connector.connection.execute(f"DROP TABLE IF EXISTS {self.full_table_name};") + self.dropped_tables[self.stream_name] = True + + if not self.table_exists(full_table_name=full_table_name): + self.create_empty_table( + full_table_name=full_table_name, + schema=schema, + primary_keys=primary_keys, + partition_keys=partition_keys, + as_temp_table=as_temp_table, + ) + return + + for property_name, property_def in schema["properties"].items(): + self.prepare_column( + full_table_name, property_name, self.to_sql_type(property_def) + ) + def process_batch(self, context: dict) -> None: """Process a batch with the given batch context. Writes a batch to the SQL target. Developers may override this method @@ -160,13 +199,6 @@ def process_batch(self, context: dict) -> None: if self.key_properties: self.logger.info(f"Preparing table {self.full_table_name}") - # NOTE: Force create the table - # TODO: remove this - if not self.dropped_tables.get(self.stream_name, False): - self.logger.info("Force dropping the table!") - self.connector.connection.execute(f"DROP TABLE IF EXISTS {self.full_table_name};") - self.dropped_tables[self.stream_name] = True - conformed_schema = self.conform_schema(self.schema) self.connector.prepare_table( full_table_name=self.full_table_name, From 06e531826f1db71c02aa2de96da4439c18ae13a5 Mon Sep 17 00:00:00 2001 From: Hassan Syyid Date: Fri, 22 Nov 2024 16:00:26 -0500 Subject: [PATCH 17/18] fix --- target_mssql/connector.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/target_mssql/connector.py b/target_mssql/connector.py index 35b806d..311f7e9 100644 --- a/target_mssql/connector.py +++ b/target_mssql/connector.py @@ -40,10 +40,10 @@ def prepare_table( """ # NOTE: Force create the table # TODO: remove this - if not self.dropped_tables.get(self.stream_name, False): + if not self.dropped_tables.get(full_table_name, False): self.logger.info("Force dropping the table!") - self.connection.execute(f"DROP TABLE IF EXISTS {self.full_table_name};") - self.dropped_tables[self.stream_name] = True + self.connection.execute(f"DROP TABLE IF EXISTS {full_table_name};") + self.dropped_tables[full_table_name] = True if not self.table_exists(full_table_name=full_table_name): self.create_empty_table( From 91e74eb83c9b96f15ae7e739776c330855a15f57 Mon Sep 17 00:00:00 2001 From: Hassan Syyid Date: Sun, 24 Nov 2024 18:11:55 -0700 Subject: [PATCH 18/18] remove drop tables logic --- target_mssql/connector.py | 8 ++++---- target_mssql/sinks.py | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/target_mssql/connector.py b/target_mssql/connector.py index 311f7e9..0926991 100644 --- a/target_mssql/connector.py +++ b/target_mssql/connector.py @@ -40,10 +40,10 @@ def prepare_table( """ # NOTE: Force create the table # TODO: remove this - if not self.dropped_tables.get(full_table_name, False): - self.logger.info("Force dropping the table!") - self.connection.execute(f"DROP TABLE IF EXISTS {full_table_name};") - self.dropped_tables[full_table_name] = True + # if not self.dropped_tables.get(full_table_name, False): + # self.logger.info("Force dropping the table!") + # self.connection.execute(f"DROP TABLE IF EXISTS {full_table_name};") + # self.dropped_tables[full_table_name] = True if not self.table_exists(full_table_name=full_table_name): self.create_empty_table( diff --git a/target_mssql/sinks.py b/target_mssql/sinks.py index 103cbea..97c77a3 100644 --- a/target_mssql/sinks.py +++ b/target_mssql/sinks.py @@ -167,10 +167,10 @@ def prepare_table( """ # NOTE: Force create the table # TODO: remove this - if not self.dropped_tables.get(self.stream_name, False): - self.logger.info("Force dropping the table!") - self.connector.connection.execute(f"DROP TABLE IF EXISTS {self.full_table_name};") - self.dropped_tables[self.stream_name] = True + # if not self.dropped_tables.get(self.stream_name, False): + # self.logger.info("Force dropping the table!") + # self.connector.connection.execute(f"DROP TABLE IF EXISTS {self.full_table_name};") + # self.dropped_tables[self.stream_name] = True if not self.table_exists(full_table_name=full_table_name): self.create_empty_table(