Skip to content

Commit

Permalink
relational.write - SQL statements are executed individually instead…
Browse files Browse the repository at this point in the history
… of in bulk for MySQL #373 (#375)

* set dialect._requires_alias_for_on_duplicate_key for pymysql's engine dialect

* update json schemas

* formatting

* docstring

---------

Co-authored-by: github-actions <[email protected]>
  • Loading branch information
spicy-sauce and github-actions authored Sep 19, 2024
1 parent 3d3cb18 commit abd38bd
Show file tree
Hide file tree
Showing 6 changed files with 2,446 additions and 1,994 deletions.
2,665 changes: 1,547 additions & 1,118 deletions core/poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ azure-eventhub = { version = "^5.11.2", optional = true }
azure-eventhub-checkpointstoreblob-aio = { version = "^1.1.4", optional = true }
aiohttp = { version = "^3.8.4", optional = true }
cassandra-driver = { version = "^3.25.0", optional = true }
fastparquet = { version = "^2023.2.0", optional = true, platform = "python>=3.8" }
fastparquet = { version = "^2023.2.0", optional = true, markers = "python_version >= '3.8'" }
ibm_db_sa = { version = "^0.4.0", optional = true }
oracledb = { version = "^1.2.2", optional = true }
psycopg2-binary = { version = "^2.9.5", optional = true }
pymssql = { version = "^2.2.7", optional = true }
PyMySQL = { version = "^1.0.2", optional = true }
PyMySQL = { version = "^1.1.1", optional = true }
redis = { version = "^4.3.5", optional = true }
SQLAlchemy = { version = "^2.0.4", optional = true }

Expand Down
28 changes: 27 additions & 1 deletion core/src/datayoga_core/blocks/relational/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import logging
from enum import Enum, unique
from typing import Tuple
from typing import Callable, Tuple

import sqlalchemy as sa
from datayoga_core.connection import Connection
from datayoga_core.context import Context
from sqlalchemy.engine import Engine
from sqlalchemy.engine.interfaces import DBAPIConnection
from sqlalchemy.pool import ConnectionPoolEntry

Expand Down Expand Up @@ -77,9 +78,34 @@ def get_engine(connection_name: str, context: Context, autocommit: bool = True)
# add NLS_DATE_FORMAT for Oracle to explicitly set the date format to ISO
sa.event.listen(engine, "connect", alter_session_on_oracle_connect)

if engine.driver == "pymysql":
# add a listener to configure PyMySQL dialect settings
sa.event.listen(engine, "connect", create_pymysql_dialect_configuration(engine))
return engine, db_type


def create_pymysql_dialect_configuration(engine: Engine) -> Callable[[DBAPIConnection, ConnectionPoolEntry], None]:
"""Creates a function to configure the PyMySQL dialect.
Args:
engine (Engine): The SQLAlchemy engine to configure.
Returns:
Callable[[DBAPIConnection, ConnectionPoolEntry], None]: A function that configures PyMySQL dialect settings.
"""
def configure_pymysql_dialect(dbapi_connection: DBAPIConnection, connection_record: ConnectionPoolEntry):
"""Configures settings specific to PyMySQL on each connection.
Args:
dbapi_connection (DBAPIConnection): DB-API connection object from PyMySQL.
connection_record (ConnectionPoolEntry): Connection record managed by SQLAlchemy.
"""
# Disable PyMySQL's handling of ON DUPLICATE KEY UPDATE alias requirement
engine.dialect._requires_alias_for_on_duplicate_key = False

return configure_pymysql_dialect


def alter_session_on_oracle_connect(dbapi_connection: DBAPIConnection, connection_record: ConnectionPoolEntry):
"""SQLAlchemy event listener to alter the Oracle session settings upon connection.
Expand Down
6 changes: 0 additions & 6 deletions core/src/datayoga_core/blocks/relational/write/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,6 @@ def setup_engine(self):
return

self.engine, self.db_type = relational_utils.get_engine(self.properties["connection"], self.context)

# Disable the new MySQL 8.0.17+ default behavior of requiring an alias for ON DUPLICATE KEY UPDATE
# This behavior is not supported by pymysql driver
if self.engine.driver == "pymysql":
self.engine.dialect._requires_alias_for_on_duplicate_key = False

self.schema = self.properties.get("schema")
self.table = self.properties.get("table")
self.opcode_field = self.properties.get("opcode_field")
Expand Down
23 changes: 13 additions & 10 deletions core/src/datayoga_core/write_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@


def validate_records(records: List[Dict[str, Any]], keys: List[str]) -> Tuple[List[Dict[str, Any]], List[Result]]:

# validate that the specified keys exist in the records
"""Validates that the specified keys exist in the records."""
rejected_records: List[Result] = []
valid_records = []
key_set = set(keys)
Expand All @@ -26,25 +25,29 @@ def validate_records(records: List[Dict[str, Any]], keys: List[str]) -> Tuple[Li
return valid_records, rejected_records


def group_records_by_opcode(records: List[Dict[str, Any]],
opcode_field: str) -> Dict[str, List[Dict[str, Any]]]:
# group records by their opcode, reject records with unsupported opcode or with missing keys
def group_records_by_opcode(
records: List[Dict[str, Any]],
opcode_field: str
) -> Dict[str, List[Dict[str, Any]]]:
"""Groups records by their opcode, reject records with unsupported opcode or with missing keys."""
groups = defaultdict(list)
for record in records:
groups[record.get(opcode_field, "")].append(record)
return groups


def get_column_mapping(mapping: List[Union[Dict[str, str], str]]) -> List[Dict[str, str]]:
"""Returns a list of dictionaries with the column and key names."""
return [{"column": next(iter(item.keys())), "key": next(iter(item.values()))} if isinstance(item, dict)
else {"column": item, "key": item} for item in mapping] if mapping else []


def map_record(record: Dict[str, Any],
keys: List[Union[Dict[str, str], str]],
mapping: Optional[List[Union[Dict[str, str], str]]] = None):
# map the record based on the mapping definitions
# add nulls for missing mapping fields
def map_record(
record: Dict[str, Any],
keys: List[Union[Dict[str, str], str]],
mapping: Optional[List[Union[Dict[str, str], str]]] = None
):
"""Maps the record based on the mapping definitions, adds nulls for missing mapping fields."""
mapping = mapping or []
mapped_record = {}
for item in keys + mapping:
Expand Down
Loading

0 comments on commit abd38bd

Please sign in to comment.