Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Add table prefixes to to_sql method #60409

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
03c8183
Implement prefixes parameter.
Diadochokinetic Oct 29, 2024
f09fd54
[WIP] implent special cases for temporary tables.
Diadochokinetic Oct 29, 2024
03b8642
Specify Exception in _exists_temporary.
Diadochokinetic Nov 17, 2024
ccb9eac
remove print
Diadochokinetic Nov 17, 2024
d792f27
Finalize working mysql implementation.
Diadochokinetic Nov 17, 2024
2582834
[WIP] Add rollback for postgres.
Diadochokinetic Nov 20, 2024
b138532
Add support for sqlite.
Diadochokinetic Nov 21, 2024
f696257
Add connectables with default pool and add test for if_exists=append.
Diadochokinetic Nov 24, 2024
33b69d9
Merge branch 'pandas-dev:main' into table_prefixes
Diadochokinetic Nov 24, 2024
0bc6504
Fix typo in prefixes docstring.
Diadochokinetic Nov 25, 2024
8a94c2b
Undo experimental import changes.
Diadochokinetic Nov 25, 2024
ec32a70
Merge branch 'table_prefixes' of https://github.com/Diadochokinetic/p…
Diadochokinetic Nov 25, 2024
be305ff
Add some documentation.
Diadochokinetic Nov 25, 2024
35a6394
Fix typo in NDFrame.to_sql docstring.
Diadochokinetic Nov 26, 2024
145d18c
Add prefixes parameter in to_sql suberclass.
Diadochokinetic Nov 26, 2024
eddf687
Add prefixes parameter to ADBC subclass method to_sql.
Diadochokinetic Nov 26, 2024
3190142
Disable case sensitivity check for temporary tables.
Diadochokinetic Nov 26, 2024
154c208
Merge remote-tracking branch 'upstream/main' into table_prefixes
Diadochokinetic Nov 26, 2024
98a153f
Merge branch 'main' into table_prefixes
Diadochokinetic Nov 26, 2024
8a0611d
[WIP] Add support for adbc driver.
Diadochokinetic Nov 27, 2024
eb6e9f0
Merge branch 'table_prefixes' of https://github.com/Diadochokinetic/p…
Diadochokinetic Nov 27, 2024
522f842
Fix mypy unsupported operand types error.
Diadochokinetic Nov 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pandas/core/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2788,6 +2788,7 @@ def to_sql(
chunksize: int | None = None,
dtype: DtypeArg | None = None,
method: Literal["multi"] | Callable | None = None,
prefixes: Sequence[str] | None = None,
) -> int | None:
"""
Write records stored in a DataFrame to a SQL database.
Expand Down Expand Up @@ -2844,6 +2845,9 @@ def to_sql(

Details and a sample callable implementation can be found in the
section :ref:`insert method <io.sql.method>`.
prefixes : sequence, optional
A list of strings to insert after CREATE in the CREATE TABLE statement.
They will be separated by spaces.

Returns
-------
Expand Down Expand Up @@ -3017,6 +3021,7 @@ def to_sql(
chunksize=chunksize,
dtype=dtype,
method=method,
prefixes=prefixes,
)

@final
Expand Down
125 changes: 118 additions & 7 deletions pandas/io/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
Generator,
Iterator,
Mapping,
Sequence,
)

from sqlalchemy import Table
Expand Down Expand Up @@ -744,6 +745,7 @@ def to_sql(
chunksize: int | None = None,
dtype: DtypeArg | None = None,
method: Literal["multi"] | Callable | None = None,
prefixes: Sequence[str] | None = None,
engine: str = "auto",
**engine_kwargs,
) -> int | None:
Expand Down Expand Up @@ -791,6 +793,9 @@ def to_sql(

Details and a sample callable implementation can be found in the
section :ref:`insert method <io.sql.method>`.
prefixes : sequence, optional
A list of strings to insert after CREATE in the CREATE TABLE statement.
They will be separated by spaces.
engine : {'auto', 'sqlalchemy'}, default 'auto'
SQL engine library to use. If 'auto', then the option
``io.sql.engine`` is used. The default ``io.sql.engine``
Expand Down Expand Up @@ -839,6 +844,7 @@ def to_sql(
chunksize=chunksize,
dtype=dtype,
method=method,
prefixes=prefixes,
engine=engine,
**engine_kwargs,
)
Expand Down Expand Up @@ -932,6 +938,7 @@ def __init__(
schema=None,
keys=None,
dtype: DtypeArg | None = None,
prefixes: Sequence[str] | None = None,
) -> None:
self.name = name
self.pd_sql = pandas_sql_engine
Expand All @@ -942,6 +949,11 @@ def __init__(
self.if_exists = if_exists
self.keys = keys
self.dtype = dtype
self.prefixes = prefixes
# check if the table to be created is a temporary table
self.is_temporary = self.prefixes is not None and "TEMPORARY".casefold() in [
prefix.casefold() for prefix in self.prefixes
]

if frame is not None:
# We want to initialize based on a dataframe
Expand All @@ -956,8 +968,42 @@ def __init__(
if not len(self.name):
raise ValueError("Empty table name specified")

def _drop_temporary_table(self):
"""Drop a temporary table. Temporary tables are not in a database's meta data
and need to be dropped hard coded."""
if self.schema is None:
query = f"DROP TABLE {self.name}"
else:
query = f"DROP TABLE {self.schema}.{self.name}"
self.pd_sql.execute(query)

def _exists_temporary(self):
"""Check if a temporary table exists. Temporary tables are not in a database's
meta data. The existence is duck tested by a SELECT statement."""
from sqlalchemy.exc import (
OperationalError,
ProgrammingError,
)

if self.schema is None:
query = f"SELECT * FROM {self.name} LIMIT 1"
else:
query = f"SELECT * FROM {self.schema}.{self.name} LIMIT 1"
try:
_ = self.pd_sql.read_query(query)
return True
except ProgrammingError:
# Some DBMS (e.g. postgres) require a rollback after a caught exception
self.pd_sql.execute("rollback")
return False
except OperationalError:
return False

def exists(self):
return self.pd_sql.has_table(self.name, self.schema)
if self.is_temporary:
return self._exists_temporary()
else:
return self.pd_sql.has_table(self.name, self.schema)

def sql_schema(self) -> str:
from sqlalchemy.schema import CreateTable
Expand All @@ -966,7 +1012,9 @@ def sql_schema(self) -> str:

def _execute_create(self) -> None:
# Inserting table into database, add to MetaData object
self.table = self.table.to_metadata(self.pd_sql.meta)
if not self.is_temporary:
# only insert into meta data, if table is not temporary
self.table = self.table.to_metadata(self.pd_sql.meta)
with self.pd_sql.run_transaction():
self.table.create(bind=self.pd_sql.con)

Expand All @@ -975,7 +1023,10 @@ def create(self) -> None:
if self.if_exists == "fail":
raise ValueError(f"Table '{self.name}' already exists.")
if self.if_exists == "replace":
self.pd_sql.drop_table(self.name, self.schema)
if self.is_temporary:
self._drop_temporary_table()
else:
self.pd_sql.drop_table(self.name, self.schema)
self._execute_create()
elif self.if_exists == "append":
pass
Expand Down Expand Up @@ -1269,7 +1320,7 @@ def _create_table_setup(self):
# At this point, attach to new metadata, only attach to self.meta
# once table is created.
meta = MetaData()
return Table(self.name, meta, *columns, schema=schema)
return Table(self.name, meta, *columns, schema=schema, prefixes=self.prefixes)

def _harmonize_columns(
self,
Expand Down Expand Up @@ -1487,6 +1538,7 @@ def to_sql(
chunksize: int | None = None,
dtype: DtypeArg | None = None,
method: Literal["multi"] | Callable | None = None,
prefixes: Sequence[str] | None = None,
engine: str = "auto",
**engine_kwargs,
) -> int | None:
Expand Down Expand Up @@ -1871,6 +1923,7 @@ def prep_table(
index_label=None,
schema=None,
dtype: DtypeArg | None = None,
prefixes: Sequence[str] | None = None,
) -> SQLTable:
"""
Prepares table in the database for data insertion. Creates it if needed, etc.
Expand Down Expand Up @@ -1906,6 +1959,7 @@ def prep_table(
index_label=index_label,
schema=schema,
dtype=dtype,
prefixes=prefixes,
)
table.create()
return table
Expand Down Expand Up @@ -1950,6 +2004,7 @@ def to_sql(
chunksize: int | None = None,
dtype: DtypeArg | None = None,
method: Literal["multi"] | Callable | None = None,
prefixes: Sequence[str] | None = None,
engine: str = "auto",
**engine_kwargs,
) -> int | None:
Expand Down Expand Up @@ -1991,6 +2046,9 @@ def to_sql(

Details and a sample callable implementation can be found in the
section :ref:`insert method <io.sql.method>`.
prefixes : sequence, optional
A list of strings to insert after CREATE in the CREATE TABLE statement.
They will be separated by spaces.
engine : {'auto', 'sqlalchemy'}, default 'auto'
SQL engine library to use. If 'auto', then the option
``io.sql.engine`` is used. The default ``io.sql.engine``
Expand All @@ -2011,6 +2069,7 @@ def to_sql(
index_label=index_label,
schema=schema,
dtype=dtype,
prefixes=prefixes,
)

total_inserted = sql_engine.insert_records(
Expand All @@ -2025,7 +2084,9 @@ def to_sql(
**engine_kwargs,
)

self.check_case_sensitive(name=name, schema=schema)
# only check case sensitivity for non temporary tables
if not table.is_temporary:
self.check_case_sensitive(name=name, schema=schema)
return total_inserted

@property
Expand Down Expand Up @@ -2303,6 +2364,7 @@ def to_sql(
chunksize: int | None = None,
dtype: DtypeArg | None = None,
method: Literal["multi"] | Callable | None = None,
prefixes: Sequence[str] | None = None,
engine: str = "auto",
**engine_kwargs,
) -> int | None:
Expand Down Expand Up @@ -2332,6 +2394,9 @@ def to_sql(
Raises NotImplementedError
method : {None', 'multi', callable}, default None
Raises NotImplementedError
prefixes : sequence, optional
A list of strings to insert after CREATE in the CREATE TABLE statement.
They will be separated by spaces.
engine : {'auto', 'sqlalchemy'}, default 'auto'
Raises NotImplementedError if not set to 'auto'
"""
Expand All @@ -2350,6 +2415,11 @@ def to_sql(
"engine != 'auto' not implemented for ADBC drivers"
)

# check if the table to be created is a temporary table
temporary = prefixes is not None and "TEMPORARY".casefold() in [
prefix.casefold() for prefix in prefixes
]

if schema:
table_name = f"{schema}.{name}"
else:
Expand All @@ -2360,7 +2430,14 @@ def to_sql(
# as applicable modes, so the semantics get blurred across
# the libraries
mode = "create"
if self.has_table(name, schema):

# for temporary tables use duck testing for existence check
if temporary:
exists = self._has_table_temporary(name, schema)
else:
exists = self.has_table(name, schema)

if exists:
if if_exists == "fail":
raise ValueError(f"Table '{table_name}' already exists.")
elif if_exists == "replace":
Expand All @@ -2378,12 +2455,41 @@ def to_sql(

with self.con.cursor() as cur:
total_inserted = cur.adbc_ingest(
table_name=name, data=tbl, mode=mode, db_schema_name=schema
table_name=name,
data=tbl,
mode=mode,
db_schema_name=schema,
temporary=temporary,
)

self.con.commit()
return total_inserted

def _has_table_temporary(self, name: str, schema: str | None = None) -> bool:
"""Check if a temporary table exists. Temporary tables are not in a database's
meta data. The existence is duck tested by a SELECT statement."""
from adbc_driver_manager import ProgrammingError

# sqlite doesn't allow a rollback at this point
rollback = (
True if not self.con.adbc_get_info()["vendor_name"] == "SQLite" else False
)

if schema is None:
query = f"SELECT * FROM {name} LIMIT 1"
else:
query = f"SELECT * FROM {schema}.{name} LIMIT 1"
try:
with self.con.cursor() as cur:
cur.execute(query)
return True
except ProgrammingError:
if rollback:
# Some DBMS (e.g. postgres) require a rollback after a caught exception
with self.con.cursor() as cur:
cur.execute("rollback")
return False

def has_table(self, name: str, schema: str | None = None) -> bool:
meta = self.con.adbc_get_objects(
db_schema_filter=schema, table_name_filter=name
Expand Down Expand Up @@ -2758,6 +2864,7 @@ def to_sql(
chunksize: int | None = None,
dtype: DtypeArg | None = None,
method: Literal["multi"] | Callable | None = None,
prefixes: Sequence[str] | None = None,
engine: str = "auto",
**engine_kwargs,
) -> int | None:
Expand Down Expand Up @@ -2798,6 +2905,9 @@ def to_sql(

Details and a sample callable implementation can be found in the
section :ref:`insert method <io.sql.method>`.
prefixes : sequence, optional
A list of strings to insert after CREATE in the CREATE TABLE statement.
They will be separated by spaces.
"""
if dtype:
if not is_dict_like(dtype):
Expand All @@ -2823,6 +2933,7 @@ def to_sql(
if_exists=if_exists,
index_label=index_label,
dtype=dtype,
prefixes=prefixes,
)
table.create()
return table.insert(chunksize, method)
Expand Down
Loading
Loading