Skip to content

Commit

Permalink
fixed camelcase on merge
Browse files Browse the repository at this point in the history
  • Loading branch information
radbrt committed Dec 7, 2022
1 parent a508a99 commit ab91762
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 8 deletions.
27 changes: 20 additions & 7 deletions target_oracle/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,11 +382,21 @@ def process_batch(self, context: dict) -> None:
"""
# First we need to be sure the main table is already created

conformed_records = (
[self.conform_record(record) for record in context["records"]]
if isinstance(context["records"], list)
else (self.conform_record(record) for record in context["records"])
)

join_keys = [self.conform_name(key, "column") for key in self.key_properties]
schema = self.conform_schema(self.schema)


if self.key_properties:
self.logger.info(f"Preparing table {self.full_table_name}")
self.connector.prepare_table(
full_table_name=self.full_table_name,
schema=self.schema,
schema=schema,
primary_keys=self.key_properties,
as_temp_table=False,
)
Expand All @@ -404,23 +414,23 @@ def process_batch(self, context: dict) -> None:
# Insert into temp table
self.bulk_insert_records(
full_table_name=tmp_table_name,
schema=self.schema,
records=context["records"],
schema=schema,
records=conformed_records,
)
# Merge data from Temp table to main table
self.logger.info(f"Merging data from temp table to {self.full_table_name}")
self.merge_upsert_from_table(
from_table_name=tmp_table_name,
to_table_name=self.full_table_name,
schema=self.schema,
join_keys=self.key_properties,
schema=schema,
join_keys=join_keys,
)

else:
self.bulk_insert_records(
full_table_name=self.full_table_name,
schema=self.schema,
records=context["records"],
schema=schema,
records=conformed_records,
)

def merge_upsert_from_table(
Expand All @@ -443,6 +453,9 @@ def merge_upsert_from_table(
# TODO think about sql injeciton,
# issue here https://github.com/MeltanoLabs/target-postgres/issues/22

join_keys = [self.conform_name(key, "column") for key in join_keys]
schema = self.conform_schema(schema)

join_condition = " and ".join(
[f"temp.{key} = target.{key}" for key in join_keys]
)
Expand Down
2 changes: 1 addition & 1 deletion target_oracle/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def test_record_missing_required_property(oracle_target):

# TODO test that data is correctly set
# see target-sqllit/tests/test_target_sqllite.py
@pytest.mark.skip(reason="Waiting for SDK to handle this")
# @pytest.mark.skip(reason="Waiting for SDK to handle this")
def test_column_camel_case(oracle_target):
file_name = "camelcase.singer"
singer_file_to_target(file_name, oracle_target)
Expand Down

0 comments on commit ab91762

Please sign in to comment.