Skip to content

Commit

Permalink
♻️(backends) use common utilities among backends
Browse files Browse the repository at this point in the history
During data backend unification work some shared utilities were
developed to factorize duplication:
`parse_dict_to_bytes`, `read_raw` and `iter_by_batch`.
However, some backends still used their own implementation of
these utilities leading to some minor behavioral differences
among backends.
Thus, for consistency, we update backends to use common utilities.
  • Loading branch information
SergioSim committed Nov 13, 2023
1 parent 1d6be41 commit 89b640a
Show file tree
Hide file tree
Showing 23 changed files with 388 additions and 388 deletions.
1 change: 0 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ methods under the unified `lrs` backend interface [BC]
have an authority field matching that of the user
- CLI: change `push` to `write` and `fetch` to `read` [BC]
- Upgrade `fastapi` to `0.104.1`
- Upgrade `more-itertools` to `10.1.0`
- Upgrade `sentry_sdk` to `1.34.0`
- Upgrade `uvicorn` to `0.24.0`
- API: Invalid parameters now return 400 status code
Expand Down
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ backend-ldp = [
]
backend-lrs = [
"httpx<0.25.0", # pin as Python 3.7 is no longer supported from release 0.25.0
"more-itertools==10.1.0",
]
backend-mongo = [
"motor[srv]>=3.3.0",
Expand Down
23 changes: 17 additions & 6 deletions src/ralph/backends/data/async_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
)
from ralph.backends.data.es import ESDataBackend, ESDataBackendSettings, ESQuery
from ralph.exceptions import BackendException, BackendParameterException
from ralph.utils import parse_bytes_to_dict, read_raw
from ralph.utils import async_parse_dict_to_bytes, parse_to_dict

logger = logging.getLogger(__name__)
Settings = TypeVar("Settings", bound=ESDataBackendSettings)
Expand Down Expand Up @@ -140,6 +140,21 @@ async def read( # noqa: PLR0912, PLR0913
Raise:
BackendException: If a failure occurs during Elasticsearch connection.
"""
if raw_output:
documents = self.read(
query=query,
target=target,
chunk_size=chunk_size,
raw_output=False,
ignore_errors=ignore_errors,
)
async for document in async_parse_dict_to_bytes(
documents, self.settings.LOCALE_ENCODING, ignore_errors, logger
):
yield document

return

target = target if target else self.settings.DEFAULT_INDEX
chunk_size = chunk_size if chunk_size else self.settings.DEFAULT_CHUNK_SIZE
if ignore_errors:
Expand Down Expand Up @@ -183,10 +198,6 @@ async def read( # noqa: PLR0912, PLR0913
if count:
query.search_after = [str(part) for part in documents[-1]["sort"]]
kwargs["search_after"] = query.search_after
if raw_output:
documents = read_raw(
documents, self.settings.LOCALE_ENCODING, ignore_errors, logger
)
for document in documents:
yield document

Expand Down Expand Up @@ -240,7 +251,7 @@ async def write( # noqa: PLR0913

data = chain((first_record,), data)
if isinstance(first_record, bytes):
data = parse_bytes_to_dict(data, ignore_errors, logger)
data = parse_to_dict(data, ignore_errors, logger)

logger.debug(
"Start writing to the %s index (chunk size: %d)", target, chunk_size
Expand Down
51 changes: 24 additions & 27 deletions src/ralph/backends/data/async_mongo.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
"""Async MongoDB data backend for Ralph."""

import json
import logging
from io import IOBase
from itertools import chain
from typing import Any, Dict, Iterable, Iterator, Optional, TypeVar, Union
from typing import Iterable, Iterator, Optional, TypeVar, Union

from bson.errors import BSONError
from motor.motor_asyncio import AsyncIOMotorClient
Expand All @@ -18,7 +17,7 @@
MongoQuery,
)
from ralph.exceptions import BackendException, BackendParameterException
from ralph.utils import parse_bytes_to_dict
from ralph.utils import async_parse_dict_to_bytes, iter_by_batch, parse_to_dict

from ..data.base import (
AsyncListable,
Expand Down Expand Up @@ -150,6 +149,21 @@ async def read( # noqa: PLR0913
BackendException: If a failure occurs during MongoDB connection.
BackendParameterException: If a failure occurs with MongoDB collection.
"""
if raw_output:
documents = self.read(
query=query,
target=target,
chunk_size=chunk_size,
raw_output=False,
ignore_errors=ignore_errors,
)
async for document in async_parse_dict_to_bytes(
documents, self.settings.LOCALE_ENCODING, ignore_errors, logger
):
yield document

return

if not chunk_size:
chunk_size = self.settings.DEFAULT_CHUNK_SIZE

Expand All @@ -163,20 +177,10 @@ async def read( # noqa: PLR0913
logger.error(msg, target, error)
raise BackendParameterException(msg % (target, error)) from error

reader = self._read_raw if raw_output else lambda _: _
try:
async for document in collection.find(batch_size=chunk_size, **query):
document.update({"_id": str(document.get("_id"))})
try:
yield reader(document)
except (TypeError, ValueError) as error:
msg = "Failed to encode MongoDB document with ID %s: %s"
document_id = document.get("_id")
logger.error(msg, document_id, error)
if ignore_errors:
logger.warning(msg, document_id, error)
continue
raise BackendException(msg % (document_id, error)) from error
yield document
except (PyMongoError, IndexError, TypeError, ValueError) as error:
msg = "Failed to execute MongoDB query: %s"
logger.error(msg, error)
Expand Down Expand Up @@ -235,29 +239,26 @@ async def write( # noqa: PLR0913
try:
first_record = next(data)
except StopIteration:
logger.warning("Data Iterator is empty; skipping write to target.")
logger.info("Data Iterator is empty; skipping write to target.")
return count
data = chain([first_record], data)
if isinstance(first_record, bytes):
data = parse_bytes_to_dict(data, ignore_errors, logger)
data = parse_to_dict(data, ignore_errors, logger)

if operation_type == BaseOperationType.UPDATE:
for batch in MongoDataBackend.iter_by_batch(
MongoDataBackend.to_replace_one(data), chunk_size
):
data = MongoDataBackend.to_replace_one(data)
for batch in iter_by_batch(data, chunk_size):
count += await self._bulk_update(batch, ignore_errors, collection)
logger.info("Updated %d documents with success", count)
elif operation_type == BaseOperationType.DELETE:
for batch in MongoDataBackend.iter_by_batch(
MongoDataBackend.to_ids(data), chunk_size
):
for batch in iter_by_batch(MongoDataBackend.to_ids(data), chunk_size):
count += await self._bulk_delete(batch, ignore_errors, collection)
logger.info("Deleted %d documents with success", count)
else:
data = MongoDataBackend.to_documents(
data, ignore_errors, operation_type, logger
)
for batch in MongoDataBackend.iter_by_batch(data, chunk_size):
for batch in iter_by_batch(data, chunk_size):
count += await self._bulk_import(batch, ignore_errors, collection)
logger.info("Inserted %d documents with success", count)

Expand Down Expand Up @@ -326,7 +327,3 @@ async def _bulk_update(batch: list, ignore_errors: bool, collection: Collection)
modified_count = updated_documents.modified_count
logger.debug("Updated %d documents chunk with success", modified_count)
return modified_count

def _read_raw(self, document: Dict[str, Any]) -> bytes:
"""Read the `document` dictionary and return bytes."""
return json.dumps(document).encode(self.settings.LOCALE_ENCODING)
83 changes: 19 additions & 64 deletions src/ralph/backends/data/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
)
from ralph.conf import BaseSettingsConfig, ClientOptions
from ralph.exceptions import BackendException, BackendParameterException
from ralph.utils import iter_by_batch, parse_dict_to_bytes, parse_to_dict

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -206,7 +207,7 @@ def list(
yield str(table.get("name"))

@enforce_query_checks
def read( # noqa: PLR0912, PLR0913
def read( # noqa: PLR0913
self,
*,
query: Optional[Union[str, ClickHouseQuery]] = None,
Expand Down Expand Up @@ -235,6 +236,18 @@ def read( # noqa: PLR0912, PLR0913
Raise:
BackendException: If a failure occurs during ClickHouse connection.
"""
if raw_output:
documents = self.read(
query=query,
target=target,
chunk_size=chunk_size,
raw_output=False,
ignore_errors=ignore_errors,
)
locale = self.settings.LOCALE_ENCODING
yield from parse_dict_to_bytes(documents, locale, ignore_errors, logger)
return

if target is None:
target = self.event_table_name

Expand Down Expand Up @@ -269,8 +282,6 @@ def read( # noqa: PLR0912, PLR0913
if query.limit:
sql += f"\nLIMIT {query.limit}"

reader = self._read_raw if raw_output else self._read_json

logger.debug(
"Start reading the %s table of the %s database (chunk size: %d)",
target,
Expand All @@ -284,16 +295,7 @@ def read( # noqa: PLR0912, PLR0913
settings={"buffer_size": chunk_size},
column_oriented=query.column_oriented,
).named_results()
for statement in result:
try:
yield reader(statement)
except (TypeError, ValueError) as error:
msg = "Failed to encode document %s: %s"
if ignore_errors:
logger.warning(msg, statement, error)
continue
logger.error(msg, statement, error)
raise BackendException(msg % (statement, error)) from error
yield from parse_to_dict(result, ignore_errors, logger, self._read_json)
except (ClickHouseError, IndexError, TypeError, ValueError) as error:
msg = "Failed to read documents: %s"
logger.error(msg, error)
Expand Down Expand Up @@ -352,7 +354,7 @@ def write( # noqa: PLR0913

data = chain([first_record], data)
if isinstance(first_record, bytes):
data = self._parse_bytes_to_dict(data, ignore_errors)
data = parse_to_dict(data, ignore_errors, logger)

if operation_type not in [BaseOperationType.CREATE, BaseOperationType.INDEX]:
msg = "%s operation_type is not allowed."
Expand All @@ -361,30 +363,9 @@ def write( # noqa: PLR0913

# operation_type is either CREATE or INDEX
count = 0
batch = []

for insert_tuple in self._to_insert_tuples(
data,
ignore_errors=ignore_errors,
):
batch.append(insert_tuple)
if len(batch) < chunk_size:
continue

count += self._bulk_import(
batch,
ignore_errors=ignore_errors,
event_table_name=target,
)
batch = []

# Edge case: if the total number of documents is lower than the chunk size
if len(batch) > 0:
count += self._bulk_import(
batch,
ignore_errors=ignore_errors,
event_table_name=target,
)
insert_tuples = self._to_insert_tuples(data, ignore_errors)
for batch in iter_by_batch(insert_tuples, chunk_size):
count += self._bulk_import(batch, ignore_errors, target)

logger.info("Inserted a total of %d documents with success", count)

Expand Down Expand Up @@ -472,36 +453,10 @@ def _bulk_import(

return inserted_count

@staticmethod
def _parse_bytes_to_dict(
raw_documents: Iterable[bytes], ignore_errors: bool
) -> Iterator[dict]:
"""Read the `raw_documents` Iterable and yield dictionaries."""
for raw_document in raw_documents:
try:
yield json.loads(raw_document)
except (TypeError, json.JSONDecodeError) as error:
if ignore_errors:
logger.warning(
"Raised error: %s, for document %s", error, raw_document
)
continue
logger.error("Raised error: %s, for document %s", error, raw_document)
raise error

@staticmethod
def _read_json(document: Dict[str, Any]) -> Dict[str, Any]:
"""Read the `documents` row and yield for the event JSON."""
if "event" in document:
document["event"] = json.loads(document["event"])

return document

def _read_raw(self, document: Dict[str, Any]) -> bytes:
"""Read the `documents` Iterable and yield bytes."""
# We want to return a JSON structure of the whole row, so if the event string
# is in there we first need to serialize it so that we can deserialize the
# whole thing.
document = self._read_json(document)

return json.dumps(document).encode(self.locale_encoding)
28 changes: 18 additions & 10 deletions src/ralph/backends/data/es.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
)
from ralph.conf import BaseSettingsConfig, ClientOptions, CommaSeparatedTuple
from ralph.exceptions import BackendException, BackendParameterException
from ralph.utils import parse_bytes_to_dict, read_raw
from ralph.utils import parse_dict_to_bytes, parse_to_dict

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -199,7 +199,7 @@ def list(
yield index

@enforce_query_checks
def read( # noqa: PLR0912, PLR0913
def read( # noqa: PLR0913
self,
*,
query: Optional[Union[str, ESQuery]] = None,
Expand All @@ -219,7 +219,9 @@ def read( # noqa: PLR0912, PLR0913
chunk_size (int or None): The chunk size when reading documents by batches.
If chunk_size is `None` it defaults to `DEFAULT_CHUNK_SIZE`.
raw_output (bool): Controls whether to yield dictionaries or bytes.
ignore_errors (bool): Ignored.
ignore_errors (bool): If `True`, errors during the encoding operation
will be ignored and logged. If `False` (default), a `BackendException`
will be raised if an error occurs.
Yield:
bytes: The next raw document if `raw_output` is True.
Expand All @@ -228,6 +230,17 @@ def read( # noqa: PLR0912, PLR0913
Raise:
BackendException: If a failure occurs during Elasticsearch connection.
"""
if raw_output:
documents = self.read(
query=query,
target=target,
chunk_size=chunk_size,
raw_output=False,
ignore_errors=ignore_errors,
)
locale = self.settings.LOCALE_ENCODING
yield from parse_dict_to_bytes(documents, locale, ignore_errors, logger)
return
target = target if target else self.settings.DEFAULT_INDEX
chunk_size = chunk_size if chunk_size else self.settings.DEFAULT_CHUNK_SIZE
if ignore_errors:
Expand Down Expand Up @@ -269,12 +282,7 @@ def read( # noqa: PLR0912, PLR0913
if count:
query.search_after = [str(part) for part in documents[-1]["sort"]]
kwargs["search_after"] = query.search_after
if raw_output:
documents = read_raw(
documents, self.settings.LOCALE_ENCODING, ignore_errors, logger
)
for document in documents:
yield document
yield from documents

def write( # noqa: PLR0913
self,
Expand Down Expand Up @@ -326,7 +334,7 @@ def write( # noqa: PLR0913

data = chain((first_record,), data)
if isinstance(first_record, bytes):
data = parse_bytes_to_dict(data, ignore_errors, logger)
data = parse_to_dict(data, ignore_errors, logger)

logger.debug(
"Start writing to the %s index (chunk size: %d)", target, chunk_size
Expand Down
Loading

0 comments on commit 89b640a

Please sign in to comment.