Skip to content

Commit

Permalink
Another attempt to fix snowflake problems by slowing down dropping/cr…
Browse files Browse the repository at this point in the history
…eating tables/schemas.
  • Loading branch information
windiana42 committed Aug 16, 2024
1 parent fef9e6d commit 94975a1
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 68 deletions.
1 change: 0 additions & 1 deletion pipedag.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ instances:

snowflake:
instance_id: pd_snowflake
stage_commit_technique: READ_VIEWS
table_store:
table_store_connection: snowflake
lock_manager:
Expand Down
85 changes: 19 additions & 66 deletions src/pydiverse/pipedag/backend/table/sql/dialects/snowflake.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations

import time
import warnings
from typing import Literal

from pydiverse.pipedag.backend.table.sql.hooks import (
IbisTableHook,
Expand All @@ -25,7 +27,23 @@ class SnowflakeTableStore(SQLTableStore):
_dialect_name = "snowflake"

def _default_isolation_level(self) -> str | None:
return None # "READ UNCOMMITTED" does not exist in DuckDB
return None # "READ UNCOMMITTED" does not exist in Snowflake

def optional_pause_for_db_transactionality(
self,
prev_action: Literal[
"table_drop",
"table_create",
"schema_drop",
"schema_create",
"schema_rename",
],
):
_ = prev_action
# The snowflake backend has transactionality problems with very quick
# DROP/CREATE or RENAME activities for both schemas and tables
# which happen in testing.
time.sleep(2)

def _init_database(self):
create_database = self.engine_url.database.split("/")[0]
Expand All @@ -42,71 +60,6 @@ def _init_database(self):
)


# @SnowflakeTableStore.register_table(pd)
# class PandasTableHook(PandasTableHook):
# @classmethod
# def _execute_materialize(
# cls,
# df: pd.DataFrame,
# store: SnowflakeTableStore,
# table: Table[pd.DataFrame],
# schema: Schema,
# dtypes: dict[str, DType],
# ):
# engine = store.engine
# dtypes = cls._get_dialect_dtypes(dtypes, table)
# if table.type_map:
# dtypes.update(table.type_map)
#
# store.check_materialization_details_supported(
# resolve_materialization_details_label(table)
# )
#
# # Create empty table with correct schema
# cls._dialect_create_empty_table(store, df, table, schema, dtypes)
# store.add_indexes_and_set_nullable(
# table, schema, on_empty_table=True, table_cols=df.columns
# )
#
# # Copy dataframe directly to duckdb
# # This is SIGNIFICANTLY faster than using pandas.to_sql
# table_name = engine.dialect.identifier_preparer.quote(table.name)
# schema_name = engine.dialect.identifier_preparer.format_schema(schema.get())
#
# connection_uri = store.engine_url.render_as_string(hide_password=False)
# connection_uri = connection_uri.replace("duckdb:///", "", 1)
# with duckdb.connect(connection_uri) as conn:
# conn.execute(f"INSERT INTO {schema_name}.{table_name} SELECT * FROM df")
#
# store.add_indexes_and_set_nullable(
# table,
# schema,
# on_empty_table=False,
# table_cols=df.columns,
# )


try:
import polars
except ImportError as e:
warnings.warn(str(e), ImportWarning)
polars = None


# @SnowflakeTableStore.register_table(polars, snowflake)
# class PolarsTableHook(PolarsTableHook):
# @classmethod
# def _execute_query(cls, query: str, connection_uri: str, store: SQLTableStore):
# # Connectorx doesn't support duckdb.
# # Instead, we load it like this: Snowflake -> PyArrow -> Polars
# connection_uri = connection_uri.replace("snowflake:///", "", 1)
# with snowflake.connect(connection_uri) as conn:
# pl_table = conn.sql(query).arrow()
#
# df = polars.from_arrow(pl_table)
# return df


try:
import ibis
except ImportError:
Expand Down
2 changes: 2 additions & 0 deletions src/pydiverse/pipedag/backend/table/sql/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ def materialize(
]
store.execute(statements)
store.add_indexes_and_set_nullable(table, schema)
store.optional_pause_for_db_transactionality("table_create")

@classmethod
def retrieve(
Expand Down Expand Up @@ -391,6 +392,7 @@ def _execute_materialize(
on_empty_table=False if early else None,
table_cols=df.columns,
)
store.optional_pause_for_db_transactionality("table_create")

@classmethod
def retrieve(
Expand Down
26 changes: 25 additions & 1 deletion src/pydiverse/pipedag/backend/table/sql/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import warnings
from collections.abc import Iterable
from contextlib import contextmanager
from typing import Any
from typing import Any, Literal

import sqlalchemy as sa
import sqlalchemy.exc
Expand Down Expand Up @@ -878,6 +878,22 @@ def init_stage(self, stage: Stage):
return self._init_stage_read_views(stage)
raise ValueError(f"Invalid stage commit technique: {stage_commit_technique}")

def optional_pause_for_db_transactionality(
self,
prev_action: Literal[
"table_drop",
"table_create",
"schema_drop",
"schema_create",
"schema_rename",
],
):
_ = type
# Some backends have transactionality problems with very quick
# DROP/CREATE or RENAME activities for both schemas and tables
# which happen in testing.
pass

def _init_stage_schema_swap(self, stage: Stage):
schema = self.get_schema(stage.name)
transaction_schema = self.get_schema(stage.transaction_name)
Expand All @@ -893,10 +909,13 @@ def _init_stage_schema_swap(self, stage: Stage):
self.execute(
DropSchemaContent(transaction_schema, self.engine), conn=conn
)
self.optional_pause_for_db_transactionality("table_drop")
else:
self.execute(cs_base, conn=conn)
self.execute(ds_trans, conn=conn)
self.optional_pause_for_db_transactionality("schema_drop")
self.execute(cs_trans, conn=conn)
self.optional_pause_for_db_transactionality("schema_create")

if not self.disable_caching:
for table in [
Expand Down Expand Up @@ -974,13 +993,17 @@ def _commit_stage_schema_swap(self, stage: Stage):
),
conn=conn,
)
self.optional_pause_for_db_transactionality("schema_drop")
self.execute(RenameSchema(schema, tmp_schema, self.engine), conn=conn)
self.optional_pause_for_db_transactionality("schema_rename")
self.execute(
RenameSchema(transaction_schema, schema, self.engine), conn=conn
)
self.optional_pause_for_db_transactionality("schema_rename")
self.execute(
RenameSchema(tmp_schema, transaction_schema, self.engine), conn=conn
)
self.optional_pause_for_db_transactionality("schema_rename")

self._commit_stage_update_metadata(stage, conn=conn)

Expand All @@ -991,6 +1014,7 @@ def _commit_stage_read_views(self, stage: Stage):
with self.engine_connect() as conn:
# Clear contents of destination schema
self.execute(DropSchemaContent(dest_schema, self.engine), conn=conn)
self.optional_pause_for_db_transactionality("table_drop")

# Create aliases for all tables in transaction schema
inspector = sa.inspect(self.engine)
Expand Down

0 comments on commit 94975a1

Please sign in to comment.