-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use CrateDB as JobStore #7
Conversation
300a04a
to
ee50caa
Compare
c30fb9c
to
dc6a828
Compare
""" | ||
class CrateDBMongoDBJobStore(MongoDBJobStore): | ||
from cratedb_toolkit.adapter.pymongo import PyMongoCrateDbAdapter | ||
def __init__(self, dburi, *args, **kwargs): | ||
with PyMongoCrateDbAdapter(dburi=dburi): | ||
super().__init__(*args, **kwargs) | ||
""" | ||
|
||
|
||
def CrateDBMongoDBJobStore(dburi: str): | ||
from cratedb_toolkit.adapter.pymongo import PyMongoCrateDbAdapter | ||
|
||
with PyMongoCrateDbAdapter(dburi=dburi): | ||
import pymongo | ||
|
||
amalgamated_client: pymongo.MongoClient = pymongo.MongoClient( | ||
"localhost", 27017, timeoutMS=100, connectTimeoutMS=100, socketTimeoutMS=100, serverSelectionTimeoutMS=100 | ||
) | ||
from apscheduler.jobstores.mongodb import MongoDBJobStore | ||
|
||
class CrateDBMongoDBJobStoreImpl(MongoDBJobStore): | ||
def __init__(self, *args, **kwargs): | ||
kwargs["client"] = amalgamated_client | ||
super().__init__(*args, **kwargs) | ||
|
||
return CrateDBMongoDBJobStoreImpl() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was just an attempt, and will be removed.
class LargeBinary(sa.String): | ||
|
||
"""A type for large binary byte data. | ||
|
||
The :class:`.LargeBinary` type corresponds to a large and/or unlengthed | ||
binary type for the target platform, such as BLOB on MySQL and BYTEA for | ||
PostgreSQL. It also handles the necessary conversions for the DBAPI. | ||
|
||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to go into crate-python.
def patchme(self): | ||
""" | ||
A few patches to make the CrateDB SQLAlchemy dialect work. | ||
|
||
TODO: Upstream to crate-python. | ||
""" | ||
from crate.client.sqlalchemy import CrateDialect | ||
from crate.client.sqlalchemy.compiler import CrateDDLCompiler, CrateTypeCompiler | ||
|
||
def visit_BLOB(self, type_, **kw): | ||
return "STRING" | ||
|
||
def visit_FLOAT(self, type_, **kw): | ||
""" | ||
From `sqlalchemy.sql.sqltypes.Float`. | ||
|
||
When a :paramref:`.Float.precision` is not provided in a | ||
:class:`_types.Float` type some backend may compile this type as | ||
an 8 bytes / 64 bit float datatype. To use a 4 bytes / 32 bit float | ||
datatype a precision <= 24 can usually be provided or the | ||
:class:`_types.REAL` type can be used. | ||
This is known to be the case in the PostgreSQL and MSSQL dialects | ||
that render the type as ``FLOAT`` that's in both an alias of | ||
``DOUBLE PRECISION``. Other third party dialects may have similar | ||
behavior. | ||
""" | ||
if not type_.precision: | ||
return "FLOAT" | ||
elif type_.precision <= 24: | ||
return "FLOAT" | ||
else: | ||
return "DOUBLE" | ||
|
||
CrateTypeCompiler.visit_BLOB = visit_BLOB | ||
CrateTypeCompiler.visit_FLOAT = visit_FLOAT | ||
|
||
def visit_create_index(self, create, **kw): | ||
return "SELECT 1;" | ||
|
||
CrateDDLCompiler.visit_create_index = visit_create_index | ||
|
||
CrateDialect.colspecs.update( | ||
{ | ||
sa.sql.sqltypes.LargeBinary: LargeBinary, | ||
} | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also needs to go into crate-python.
def receive_after_execute( | ||
conn: sa.engine.Connection, clauseelement, multiparams, params, execution_options, result | ||
): | ||
""" | ||
Run a `REFRESH TABLE ...` command after each DML operation (INSERT, UPDATE, DELETE). | ||
""" | ||
if isinstance(clauseelement, (sa.sql.Insert, sa.sql.Update, sa.sql.Delete)): | ||
conn.execute(sa.text(f"REFRESH TABLE {clauseelement.table}")) | ||
|
||
sa.event.listen(self.engine, "after_execute", receive_after_execute) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On the other hand, this needs to go into cratedb-toolkit.
dc6a828
to
d141f83
Compare
About
APScheduler provides a few built-in job store implementations already. This patch builds upon the
SQLAlchemyJobStore
, and makes it compatible with CrateDB by adding a few polyfills. 1Footnotes
As far as I can see, most of those can be refactored into the SQLAlchemy dialect in the canonical
crate-python
package. ↩