Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use CrateDB as JobStore #7

Merged
merged 2 commits into from
Dec 2, 2023
Merged

Use CrateDB as JobStore #7

merged 2 commits into from
Dec 2, 2023

Conversation

amotl
Copy link
Member

@amotl amotl commented Dec 1, 2023

About

APScheduler provides a few built-in job store implementations already. This patch builds upon the SQLAlchemyJobStore, and makes it compatible with CrateDB by adding a few polyfills. 1

Footnotes

  1. As far as I can see, most of those can be refactored into the SQLAlchemy dialect in the canonical crate-python package.

@amotl amotl force-pushed the cratedb-storage branch 2 times, most recently from 300a04a to ee50caa Compare December 2, 2023 03:46
@amotl amotl changed the base branch from master to tests December 2, 2023 03:46
@amotl amotl force-pushed the cratedb-storage branch 2 times, most recently from c30fb9c to dc6a828 Compare December 2, 2023 03:59
Comment on lines +1 to +26
"""
class CrateDBMongoDBJobStore(MongoDBJobStore):
from cratedb_toolkit.adapter.pymongo import PyMongoCrateDbAdapter
def __init__(self, dburi, *args, **kwargs):
with PyMongoCrateDbAdapter(dburi=dburi):
super().__init__(*args, **kwargs)
"""


def CrateDBMongoDBJobStore(dburi: str):
from cratedb_toolkit.adapter.pymongo import PyMongoCrateDbAdapter

with PyMongoCrateDbAdapter(dburi=dburi):
import pymongo

amalgamated_client: pymongo.MongoClient = pymongo.MongoClient(
"localhost", 27017, timeoutMS=100, connectTimeoutMS=100, socketTimeoutMS=100, serverSelectionTimeoutMS=100
)
from apscheduler.jobstores.mongodb import MongoDBJobStore

class CrateDBMongoDBJobStoreImpl(MongoDBJobStore):
def __init__(self, *args, **kwargs):
kwargs["client"] = amalgamated_client
super().__init__(*args, **kwargs)

return CrateDBMongoDBJobStoreImpl()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was just an attempt, and will be removed.

Comment on lines +7 to +15
class LargeBinary(sa.String):

"""A type for large binary byte data.

The :class:`.LargeBinary` type corresponds to a large and/or unlengthed
binary type for the target platform, such as BLOB on MySQL and BYTEA for
PostgreSQL. It also handles the necessary conversions for the DBAPI.

"""
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to go into crate-python.

Comment on lines +67 to +112
def patchme(self):
"""
A few patches to make the CrateDB SQLAlchemy dialect work.

TODO: Upstream to crate-python.
"""
from crate.client.sqlalchemy import CrateDialect
from crate.client.sqlalchemy.compiler import CrateDDLCompiler, CrateTypeCompiler

def visit_BLOB(self, type_, **kw):
return "STRING"

def visit_FLOAT(self, type_, **kw):
"""
From `sqlalchemy.sql.sqltypes.Float`.

When a :paramref:`.Float.precision` is not provided in a
:class:`_types.Float` type some backend may compile this type as
an 8 bytes / 64 bit float datatype. To use a 4 bytes / 32 bit float
datatype a precision <= 24 can usually be provided or the
:class:`_types.REAL` type can be used.
This is known to be the case in the PostgreSQL and MSSQL dialects
that render the type as ``FLOAT`` that's in both an alias of
``DOUBLE PRECISION``. Other third party dialects may have similar
behavior.
"""
if not type_.precision:
return "FLOAT"
elif type_.precision <= 24:
return "FLOAT"
else:
return "DOUBLE"

CrateTypeCompiler.visit_BLOB = visit_BLOB
CrateTypeCompiler.visit_FLOAT = visit_FLOAT

def visit_create_index(self, create, **kw):
return "SELECT 1;"

CrateDDLCompiler.visit_create_index = visit_create_index

CrateDialect.colspecs.update(
{
sa.sql.sqltypes.LargeBinary: LargeBinary,
}
)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also needs to go into crate-python.

Comment on lines +56 to +65
def receive_after_execute(
conn: sa.engine.Connection, clauseelement, multiparams, params, execution_options, result
):
"""
Run a `REFRESH TABLE ...` command after each DML operation (INSERT, UPDATE, DELETE).
"""
if isinstance(clauseelement, (sa.sql.Insert, sa.sql.Update, sa.sql.Delete)):
conn.execute(sa.text(f"REFRESH TABLE {clauseelement.table}"))

sa.event.listen(self.engine, "after_execute", receive_after_execute)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the other hand, this needs to go into cratedb-toolkit.

@amotl amotl changed the base branch from tests to master December 2, 2023 14:02
@amotl amotl marked this pull request as ready for review December 2, 2023 14:03
@amotl amotl merged commit dc4d547 into master Dec 2, 2023
2 checks passed
@amotl amotl deleted the cratedb-storage branch December 2, 2023 14:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant