Skip to content

Commit

Permalink
✨(backends) use generic base backend classes
Browse files Browse the repository at this point in the history
We try to improve data backend typing using generic types.
  • Loading branch information
SergioSim committed Nov 13, 2023
1 parent 7c5efc1 commit 20d7114
Show file tree
Hide file tree
Showing 18 changed files with 198 additions and 105 deletions.
13 changes: 7 additions & 6 deletions src/ralph/backends/data/async_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
from io import IOBase
from itertools import chain
from typing import Iterable, Iterator, Optional, Union
from typing import Iterable, Iterator, Optional, TypeVar, Union

from elasticsearch import ApiError, AsyncElasticsearch, TransportError
from elasticsearch.helpers import BulkIndexError, async_streaming_bulk
Expand All @@ -21,23 +21,24 @@
from ralph.utils import parse_bytes_to_dict, read_raw

logger = logging.getLogger(__name__)
Settings = TypeVar("Settings", bound=ESDataBackendSettings)


class AsyncESDataBackend(BaseAsyncDataBackend, AsyncWritable, AsyncListable):
class AsyncESDataBackend(
BaseAsyncDataBackend[Settings, ESQuery], AsyncWritable, AsyncListable
):
"""Asynchronous Elasticsearch data backend."""

name = "async_es"
query_class = ESQuery
settings_class = ESDataBackendSettings

def __init__(self, settings: Optional[ESDataBackendSettings] = None):
def __init__(self, settings: Optional[Settings] = None):
"""Instantiate the asynchronous Elasticsearch client.
Args:
settings (ESDataBackendSettings or None): The data backend settings.
If `settings` is `None`, a default settings instance is used instead.
"""
self.settings = settings if settings else self.settings_class()
super().__init__(settings)
self._client = None

@property
Expand Down
15 changes: 9 additions & 6 deletions src/ralph/backends/data/async_mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
from io import IOBase
from itertools import chain
from typing import Any, Dict, Iterable, Iterator, Optional, Union
from typing import Any, Dict, Iterable, Iterator, Optional, TypeVar, Union

from bson.errors import BSONError
from motor.motor_asyncio import AsyncIOMotorClient
Expand All @@ -29,22 +29,25 @@
)

logger = logging.getLogger(__name__)
Settings = TypeVar("Settings", bound=MongoDataBackendSettings)


class AsyncMongoDataBackend(BaseAsyncDataBackend, AsyncWritable, AsyncListable):
class AsyncMongoDataBackend(
BaseAsyncDataBackend[Settings, MongoQuery],
AsyncWritable,
AsyncListable,
):
"""Async MongoDB data backend."""

name = "async_mongo"
query_class = MongoQuery
settings_class = MongoDataBackendSettings

def __init__(self, settings: Optional[MongoDataBackendSettings] = None):
def __init__(self, settings: Optional[Settings] = None):
"""Instantiate the asynchronous MongoDB client.
Args:
settings (MongoDataBackendSettings or None): The data backend settings.
"""
self.settings = settings if settings else self.settings_class()
super().__init__(settings)
self.client = AsyncIOMotorClient(
self.settings.CONNECTION_URI, **self.settings.CLIENT_OPTIONS.dict()
)
Expand Down
87 changes: 66 additions & 21 deletions src/ralph/backends/data/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from abc import ABC, abstractmethod
from enum import Enum, unique
from io import IOBase
from typing import Iterable, Iterator, Optional, Union
from typing import Any, Generic, Iterable, Iterator, Optional, Type, TypeVar, Union

from pydantic import BaseModel, BaseSettings, ValidationError

Expand Down Expand Up @@ -34,7 +34,7 @@ class Config:

extra = "forbid"

query_string: Union[str, None]
query_string: Union[str, None] = None


@unique
Expand Down Expand Up @@ -144,25 +144,65 @@ def list(
"""


class BaseDataBackend(ABC):
def get_backend_generic_argument(backend_class: Type, position: int) -> Optional[Type]:
"""Return the generic argument of `backend_class` at specified `position`."""
if not hasattr(backend_class, "__orig_bases__"):
return None

bases = backend_class.__orig_bases__[0]
if not hasattr(bases, "__args__") or len(bases.__args__) < abs(position) + 1:
return None

argument = bases.__args__[position]
if argument is Any:
return None

if isinstance(argument, TypeVar):
return argument.__bound__

return argument


def set_backend_settings_class(backend_class: Type):
"""Set `settings_class` attribute with `Config.env_prefix` for `backend_class`."""
settings_class = get_backend_generic_argument(backend_class, 0)
if settings_class:
backend_class.settings_class = settings_class


def set_backend_query_class(backend_class: Type):
"""Set `query_class` attribute for `backend_class`."""
query_class = get_backend_generic_argument(backend_class, 1)
if query_class:
backend_class.query_class = query_class


Settings = TypeVar("Settings", bound=BaseDataBackendSettings)
Query = TypeVar("Query", bound=BaseQuery)


class BaseDataBackend(Generic[Settings, Query], ABC):
"""Base data backend interface."""

name = "base"
query_class = BaseQuery
settings_class = BaseDataBackendSettings
query_class: Type[Query]
settings_class: Type[Settings]

@abstractmethod
def __init__(self, settings: Optional[BaseDataBackendSettings] = None):
def __init_subclass__(cls, **kwargs): # noqa: D105
super().__init_subclass__(**kwargs)
set_backend_settings_class(cls)
set_backend_query_class(cls)

def __init__(self, settings: Optional[Settings] = None):
"""Instantiate the data backend.
Args:
settings (BaseDataBackendSettings or None): The data backend settings.
settings (Settings or None): The data backend settings.
If `settings` is `None`, a default settings instance is used instead.
"""
self.settings: Settings = settings if settings else self.settings_class()

def validate_query(
self, query: Union[str, dict, BaseQuery, None] = None
) -> BaseQuery:
def validate_query(self, query: Union[str, dict, Query, None] = None) -> Query:
"""Validate and transform the query."""
if query is None:
query = self.query_class()
Expand Down Expand Up @@ -203,7 +243,7 @@ def status(self) -> DataBackendStatus:
def read( # noqa: PLR0913
self,
*,
query: Optional[Union[str, BaseQuery]] = None,
query: Optional[Union[str, Query]] = None,
target: Optional[str] = None,
chunk_size: Optional[int] = None,
raw_output: bool = False,
Expand All @@ -212,7 +252,7 @@ def read( # noqa: PLR0913
"""Read records matching the `query` in the `target` container and yield them.
Args:
query: (str or BaseQuery): The query to select records to read.
query: (str or Query): The query to select records to read.
target (str or None): The target container name.
If `target` is `None`, a default value is used instead.
chunk_size (int or None): The number of records or bytes to read in one
Expand Down Expand Up @@ -324,21 +364,26 @@ async def list(
"""


class BaseAsyncDataBackend(ABC):
class BaseAsyncDataBackend(Generic[Settings, Query], ABC):
"""Base async data backend interface."""

name = "base"
query_class = BaseQuery
settings_class = BaseDataBackendSettings
query_class: Type[Query]
settings_class: Type[Settings]

@abstractmethod
def __init__(self, settings: Optional[BaseDataBackendSettings] = None):
def __init_subclass__(cls, **kwargs): # noqa: D105
super().__init_subclass__(**kwargs)
set_backend_settings_class(cls)
set_backend_query_class(cls)

def __init__(self, settings: Optional[Settings] = None):
"""Instantiate the data backend.
Args:
settings (BaseDataBackendSettings or None): The backend settings.
settings (Settings or None): The backend settings.
If `settings` is `None`, a default settings instance is used instead.
"""
self.settings: Settings = settings if settings else self.settings_class()

def validate_query(
self, query: Union[str, dict, BaseQuery, None] = None
Expand Down Expand Up @@ -383,7 +428,7 @@ async def status(self) -> DataBackendStatus:
async def read( # noqa: PLR0913
self,
*,
query: Optional[Union[str, BaseQuery]] = None,
query: Optional[Union[str, Query]] = None,
target: Optional[str] = None,
chunk_size: Optional[int] = None,
raw_output: bool = False,
Expand All @@ -392,7 +437,7 @@ async def read( # noqa: PLR0913
"""Read records matching the `query` in the `target` container and yield them.
Args:
query: (str or BaseQuery): The query to select records to read.
query: (str or Query): The query to select records to read.
target (str or None): The target container name.
If `target` is `None`, a default value is used instead.
chunk_size (int or None): The number of records or bytes to read in one
Expand Down
16 changes: 11 additions & 5 deletions src/ralph/backends/data/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
List,
NamedTuple,
Optional,
TypeVar,
Union,
)
from uuid import UUID, uuid4
Expand Down Expand Up @@ -108,22 +109,27 @@ class ClickHouseQuery(BaseClickHouseQuery):
query_string: Union[Json[BaseClickHouseQuery], None]


class ClickHouseDataBackend(BaseDataBackend, Writable, Listable):
Settings = TypeVar("Settings", bound=ClickHouseDataBackendSettings)


class ClickHouseDataBackend(
BaseDataBackend[Settings, ClickHouseQuery],
Writable,
Listable,
):
"""ClickHouse database backend."""

name = "clickhouse"
query_class = ClickHouseQuery
default_operation_type = BaseOperationType.CREATE
settings_class = ClickHouseDataBackendSettings

def __init__(self, settings: Optional[ClickHouseDataBackendSettings] = None):
def __init__(self, settings: Optional[Settings] = None):
"""Instantiate the ClickHouse configuration.
Args:
settings (ClickHouseDataBackendSettings or None): The ClickHouse
data backend settings.
"""
self.settings = settings if settings else self.settings_class()
super().__init__(settings)
self.database = self.settings.DATABASE
self.event_table_name = self.settings.EVENT_TABLE_NAME
self.default_chunk_size = self.settings.DEFAULT_CHUNK_SIZE
Expand Down
15 changes: 8 additions & 7 deletions src/ralph/backends/data/es.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from io import IOBase
from itertools import chain
from pathlib import Path
from typing import Iterable, Iterator, List, Literal, Optional, Union
from typing import Iterable, Iterator, List, Literal, Optional, TypeVar, Union

from elasticsearch import ApiError, Elasticsearch, TransportError
from elasticsearch.helpers import BulkIndexError, streaming_bulk
Expand Down Expand Up @@ -111,21 +111,22 @@ class ESQuery(BaseQuery):
track_total_hits: Literal[False] = False


class ESDataBackend(BaseDataBackend, Writable, Listable):
Settings = TypeVar("Settings", bound=ESDataBackendSettings)


class ESDataBackend(BaseDataBackend[Settings, ESQuery], Writable, Listable):
"""Elasticsearch data backend."""

name = "es"
query_class = ESQuery
settings_class = ESDataBackendSettings

def __init__(self, settings: Optional[ESDataBackendSettings] = None):
def __init__(self, settings: Optional[Settings] = None):
"""Instantiate the Elasticsearch data backend.
Args:
settings (ESDataBackendSettings or None): The data backend settings.
settings (Settings or None): The data backend settings.
If `settings` is `None`, a default settings instance is used instead.
"""
self.settings = settings if settings else self.settings_class()
super().__init__(settings)
self._client = None

@property
Expand Down
16 changes: 12 additions & 4 deletions src/ralph/backends/data/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from io import IOBase
from itertools import chain
from pathlib import Path
from typing import IO, Iterable, Iterator, Optional, Union
from typing import IO, Iterable, Iterator, Optional, TypeVar, Union
from uuid import uuid4

from ralph.backends.data.base import (
Expand Down Expand Up @@ -51,20 +51,28 @@ class Config(BaseSettingsConfig):
LOCALE_ENCODING: str = "utf8"


class FSDataBackend(HistoryMixin, BaseDataBackend, Writable, Listable):
Settings = TypeVar("Settings", bound=FSDataBackendSettings)


class FSDataBackend(
BaseDataBackend[Settings, BaseQuery],
Writable,
Listable,
HistoryMixin,
):
"""FileSystem data backend."""

name = "fs"
default_operation_type = BaseOperationType.CREATE
settings_class = FSDataBackendSettings

def __init__(self, settings: Optional[FSDataBackendSettings] = None):
def __init__(self, settings: Optional[Settings] = None):
"""Create the default target directory if it does not exist.
Args:
settings (FSDataBackendSettings or None): The data backend settings.
If `settings` is `None`, a default settings instance is used instead.
"""
super().__init__(settings)
self.settings = settings if settings else self.settings_class()
self.default_chunk_size = self.settings.DEFAULT_CHUNK_SIZE
self.default_directory = self.settings.DEFAULT_DIRECTORY_PATH
Expand Down
9 changes: 6 additions & 3 deletions src/ralph/backends/data/ldp.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,14 @@ class Config(BaseSettingsConfig):
SERVICE_NAME: Optional[str] = None


class LDPDataBackend(HistoryMixin, BaseDataBackend, Listable):
class LDPDataBackend(
BaseDataBackend[LDPDataBackendSettings, BaseQuery],
Listable,
HistoryMixin,
):
"""OVH LDP (Log Data Platform) data backend."""

name = "ldp"
settings_class = LDPDataBackendSettings

def __init__(self, settings: Optional[LDPDataBackendSettings] = None):
"""Instantiate the OVH LDP client.
Expand All @@ -70,7 +73,7 @@ def __init__(self, settings: Optional[LDPDataBackendSettings] = None):
settings (LDPDataBackendSettings or None): The data backend settings.
If `settings` is `None`, a default settings instance is used instead.
"""
self.settings = settings if settings else self.settings_class()
super().__init__(settings)
self.service_name = self.settings.SERVICE_NAME
self.stream_id = self.settings.DEFAULT_STREAM_ID
self.timeout = self.settings.REQUEST_TIMEOUT
Expand Down
Loading

0 comments on commit 20d7114

Please sign in to comment.