From 89b640a0d1d6163750d4c3f7949dafea00213a3e Mon Sep 17 00:00:00 2001 From: SergioSim Date: Mon, 13 Nov 2023 09:07:27 +0100 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=EF=B8=8F(backends)=20use=20common=20u?= =?UTF-8?q?tilities=20among=20backends?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit During data backend unification work some shared utilities were developed to factorize duplication: `parse_dict_to_bytes`, `read_raw` and `iter_by_batch`. However, some backends still used their own implementation of these utilities leading to some minor behavioral differences among backends. Thus, for consistency, we update backends to use common utilities. --- CHANGELOG.md | 1 - pyproject.toml | 1 - src/ralph/backends/data/async_es.py | 23 +++-- src/ralph/backends/data/async_mongo.py | 51 ++++++------ src/ralph/backends/data/clickhouse.py | 83 +++++-------------- src/ralph/backends/data/es.py | 28 ++++--- src/ralph/backends/data/fs.py | 106 ++++++++++-------------- src/ralph/backends/data/mongo.py | 41 ++++----- src/ralph/backends/data/s3.py | 58 +++---------- src/ralph/backends/data/swift.py | 70 +++++++--------- src/ralph/backends/http/async_lrs.py | 8 +- src/ralph/cli.py | 2 +- src/ralph/utils.py | 90 ++++++++++++++++---- tests/backends/data/test_async_es.py | 4 +- tests/backends/data/test_async_mongo.py | 41 +++++---- tests/backends/data/test_clickhouse.py | 36 ++++---- tests/backends/data/test_es.py | 2 +- tests/backends/data/test_fs.py | 32 +++++-- tests/backends/data/test_ldp.py | 2 +- tests/backends/data/test_mongo.py | 39 +++++---- tests/backends/data/test_s3.py | 36 ++++---- tests/backends/data/test_swift.py | 15 +++- tests/backends/http/test_async_lrs.py | 7 +- 23 files changed, 388 insertions(+), 388 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 894aba3f7..737c9998a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,7 +34,6 @@ methods under the unified `lrs` backend interface [BC] have an authority field matching that of the user - CLI: change `push` to `write` and `fetch` to `read` [BC] - Upgrade `fastapi` to `0.104.1` -- Upgrade `more-itertools` to `10.1.0` - Upgrade `sentry_sdk` to `1.34.0` - Upgrade `uvicorn` to `0.24.0` - API: Invalid parameters now return 400 status code diff --git a/pyproject.toml b/pyproject.toml index 42a417f58..17f218ca9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -56,7 +56,6 @@ backend-ldp = [ ] backend-lrs = [ "httpx<0.25.0", # pin as Python 3.7 is no longer supported from release 0.25.0 - "more-itertools==10.1.0", ] backend-mongo = [ "motor[srv]>=3.3.0", diff --git a/src/ralph/backends/data/async_es.py b/src/ralph/backends/data/async_es.py index d156aa4e3..43b436f80 100644 --- a/src/ralph/backends/data/async_es.py +++ b/src/ralph/backends/data/async_es.py @@ -18,7 +18,7 @@ ) from ralph.backends.data.es import ESDataBackend, ESDataBackendSettings, ESQuery from ralph.exceptions import BackendException, BackendParameterException -from ralph.utils import parse_bytes_to_dict, read_raw +from ralph.utils import async_parse_dict_to_bytes, parse_to_dict logger = logging.getLogger(__name__) Settings = TypeVar("Settings", bound=ESDataBackendSettings) @@ -140,6 +140,21 @@ async def read( # noqa: PLR0912, PLR0913 Raise: BackendException: If a failure occurs during Elasticsearch connection. """ + if raw_output: + documents = self.read( + query=query, + target=target, + chunk_size=chunk_size, + raw_output=False, + ignore_errors=ignore_errors, + ) + async for document in async_parse_dict_to_bytes( + documents, self.settings.LOCALE_ENCODING, ignore_errors, logger + ): + yield document + + return + target = target if target else self.settings.DEFAULT_INDEX chunk_size = chunk_size if chunk_size else self.settings.DEFAULT_CHUNK_SIZE if ignore_errors: @@ -183,10 +198,6 @@ async def read( # noqa: PLR0912, PLR0913 if count: query.search_after = [str(part) for part in documents[-1]["sort"]] kwargs["search_after"] = query.search_after - if raw_output: - documents = read_raw( - documents, self.settings.LOCALE_ENCODING, ignore_errors, logger - ) for document in documents: yield document @@ -240,7 +251,7 @@ async def write( # noqa: PLR0913 data = chain((first_record,), data) if isinstance(first_record, bytes): - data = parse_bytes_to_dict(data, ignore_errors, logger) + data = parse_to_dict(data, ignore_errors, logger) logger.debug( "Start writing to the %s index (chunk size: %d)", target, chunk_size diff --git a/src/ralph/backends/data/async_mongo.py b/src/ralph/backends/data/async_mongo.py index 02272b336..e7fe38b2d 100644 --- a/src/ralph/backends/data/async_mongo.py +++ b/src/ralph/backends/data/async_mongo.py @@ -1,10 +1,9 @@ """Async MongoDB data backend for Ralph.""" -import json import logging from io import IOBase from itertools import chain -from typing import Any, Dict, Iterable, Iterator, Optional, TypeVar, Union +from typing import Iterable, Iterator, Optional, TypeVar, Union from bson.errors import BSONError from motor.motor_asyncio import AsyncIOMotorClient @@ -18,7 +17,7 @@ MongoQuery, ) from ralph.exceptions import BackendException, BackendParameterException -from ralph.utils import parse_bytes_to_dict +from ralph.utils import async_parse_dict_to_bytes, iter_by_batch, parse_to_dict from ..data.base import ( AsyncListable, @@ -150,6 +149,21 @@ async def read( # noqa: PLR0913 BackendException: If a failure occurs during MongoDB connection. BackendParameterException: If a failure occurs with MongoDB collection. """ + if raw_output: + documents = self.read( + query=query, + target=target, + chunk_size=chunk_size, + raw_output=False, + ignore_errors=ignore_errors, + ) + async for document in async_parse_dict_to_bytes( + documents, self.settings.LOCALE_ENCODING, ignore_errors, logger + ): + yield document + + return + if not chunk_size: chunk_size = self.settings.DEFAULT_CHUNK_SIZE @@ -163,20 +177,10 @@ async def read( # noqa: PLR0913 logger.error(msg, target, error) raise BackendParameterException(msg % (target, error)) from error - reader = self._read_raw if raw_output else lambda _: _ try: async for document in collection.find(batch_size=chunk_size, **query): document.update({"_id": str(document.get("_id"))}) - try: - yield reader(document) - except (TypeError, ValueError) as error: - msg = "Failed to encode MongoDB document with ID %s: %s" - document_id = document.get("_id") - logger.error(msg, document_id, error) - if ignore_errors: - logger.warning(msg, document_id, error) - continue - raise BackendException(msg % (document_id, error)) from error + yield document except (PyMongoError, IndexError, TypeError, ValueError) as error: msg = "Failed to execute MongoDB query: %s" logger.error(msg, error) @@ -235,29 +239,26 @@ async def write( # noqa: PLR0913 try: first_record = next(data) except StopIteration: - logger.warning("Data Iterator is empty; skipping write to target.") + logger.info("Data Iterator is empty; skipping write to target.") return count data = chain([first_record], data) if isinstance(first_record, bytes): - data = parse_bytes_to_dict(data, ignore_errors, logger) + data = parse_to_dict(data, ignore_errors, logger) if operation_type == BaseOperationType.UPDATE: - for batch in MongoDataBackend.iter_by_batch( - MongoDataBackend.to_replace_one(data), chunk_size - ): + data = MongoDataBackend.to_replace_one(data) + for batch in iter_by_batch(data, chunk_size): count += await self._bulk_update(batch, ignore_errors, collection) logger.info("Updated %d documents with success", count) elif operation_type == BaseOperationType.DELETE: - for batch in MongoDataBackend.iter_by_batch( - MongoDataBackend.to_ids(data), chunk_size - ): + for batch in iter_by_batch(MongoDataBackend.to_ids(data), chunk_size): count += await self._bulk_delete(batch, ignore_errors, collection) logger.info("Deleted %d documents with success", count) else: data = MongoDataBackend.to_documents( data, ignore_errors, operation_type, logger ) - for batch in MongoDataBackend.iter_by_batch(data, chunk_size): + for batch in iter_by_batch(data, chunk_size): count += await self._bulk_import(batch, ignore_errors, collection) logger.info("Inserted %d documents with success", count) @@ -326,7 +327,3 @@ async def _bulk_update(batch: list, ignore_errors: bool, collection: Collection) modified_count = updated_documents.modified_count logger.debug("Updated %d documents chunk with success", modified_count) return modified_count - - def _read_raw(self, document: Dict[str, Any]) -> bytes: - """Read the `document` dictionary and return bytes.""" - return json.dumps(document).encode(self.settings.LOCALE_ENCODING) diff --git a/src/ralph/backends/data/clickhouse.py b/src/ralph/backends/data/clickhouse.py index 5373da0db..25d8ecdb9 100755 --- a/src/ralph/backends/data/clickhouse.py +++ b/src/ralph/backends/data/clickhouse.py @@ -35,6 +35,7 @@ ) from ralph.conf import BaseSettingsConfig, ClientOptions from ralph.exceptions import BackendException, BackendParameterException +from ralph.utils import iter_by_batch, parse_dict_to_bytes, parse_to_dict logger = logging.getLogger(__name__) @@ -206,7 +207,7 @@ def list( yield str(table.get("name")) @enforce_query_checks - def read( # noqa: PLR0912, PLR0913 + def read( # noqa: PLR0913 self, *, query: Optional[Union[str, ClickHouseQuery]] = None, @@ -235,6 +236,18 @@ def read( # noqa: PLR0912, PLR0913 Raise: BackendException: If a failure occurs during ClickHouse connection. """ + if raw_output: + documents = self.read( + query=query, + target=target, + chunk_size=chunk_size, + raw_output=False, + ignore_errors=ignore_errors, + ) + locale = self.settings.LOCALE_ENCODING + yield from parse_dict_to_bytes(documents, locale, ignore_errors, logger) + return + if target is None: target = self.event_table_name @@ -269,8 +282,6 @@ def read( # noqa: PLR0912, PLR0913 if query.limit: sql += f"\nLIMIT {query.limit}" - reader = self._read_raw if raw_output else self._read_json - logger.debug( "Start reading the %s table of the %s database (chunk size: %d)", target, @@ -284,16 +295,7 @@ def read( # noqa: PLR0912, PLR0913 settings={"buffer_size": chunk_size}, column_oriented=query.column_oriented, ).named_results() - for statement in result: - try: - yield reader(statement) - except (TypeError, ValueError) as error: - msg = "Failed to encode document %s: %s" - if ignore_errors: - logger.warning(msg, statement, error) - continue - logger.error(msg, statement, error) - raise BackendException(msg % (statement, error)) from error + yield from parse_to_dict(result, ignore_errors, logger, self._read_json) except (ClickHouseError, IndexError, TypeError, ValueError) as error: msg = "Failed to read documents: %s" logger.error(msg, error) @@ -352,7 +354,7 @@ def write( # noqa: PLR0913 data = chain([first_record], data) if isinstance(first_record, bytes): - data = self._parse_bytes_to_dict(data, ignore_errors) + data = parse_to_dict(data, ignore_errors, logger) if operation_type not in [BaseOperationType.CREATE, BaseOperationType.INDEX]: msg = "%s operation_type is not allowed." @@ -361,30 +363,9 @@ def write( # noqa: PLR0913 # operation_type is either CREATE or INDEX count = 0 - batch = [] - - for insert_tuple in self._to_insert_tuples( - data, - ignore_errors=ignore_errors, - ): - batch.append(insert_tuple) - if len(batch) < chunk_size: - continue - - count += self._bulk_import( - batch, - ignore_errors=ignore_errors, - event_table_name=target, - ) - batch = [] - - # Edge case: if the total number of documents is lower than the chunk size - if len(batch) > 0: - count += self._bulk_import( - batch, - ignore_errors=ignore_errors, - event_table_name=target, - ) + insert_tuples = self._to_insert_tuples(data, ignore_errors) + for batch in iter_by_batch(insert_tuples, chunk_size): + count += self._bulk_import(batch, ignore_errors, target) logger.info("Inserted a total of %d documents with success", count) @@ -472,23 +453,6 @@ def _bulk_import( return inserted_count - @staticmethod - def _parse_bytes_to_dict( - raw_documents: Iterable[bytes], ignore_errors: bool - ) -> Iterator[dict]: - """Read the `raw_documents` Iterable and yield dictionaries.""" - for raw_document in raw_documents: - try: - yield json.loads(raw_document) - except (TypeError, json.JSONDecodeError) as error: - if ignore_errors: - logger.warning( - "Raised error: %s, for document %s", error, raw_document - ) - continue - logger.error("Raised error: %s, for document %s", error, raw_document) - raise error - @staticmethod def _read_json(document: Dict[str, Any]) -> Dict[str, Any]: """Read the `documents` row and yield for the event JSON.""" @@ -496,12 +460,3 @@ def _read_json(document: Dict[str, Any]) -> Dict[str, Any]: document["event"] = json.loads(document["event"]) return document - - def _read_raw(self, document: Dict[str, Any]) -> bytes: - """Read the `documents` Iterable and yield bytes.""" - # We want to return a JSON structure of the whole row, so if the event string - # is in there we first need to serialize it so that we can deserialize the - # whole thing. - document = self._read_json(document) - - return json.dumps(document).encode(self.locale_encoding) diff --git a/src/ralph/backends/data/es.py b/src/ralph/backends/data/es.py index f3e572c41..48250c321 100644 --- a/src/ralph/backends/data/es.py +++ b/src/ralph/backends/data/es.py @@ -22,7 +22,7 @@ ) from ralph.conf import BaseSettingsConfig, ClientOptions, CommaSeparatedTuple from ralph.exceptions import BackendException, BackendParameterException -from ralph.utils import parse_bytes_to_dict, read_raw +from ralph.utils import parse_dict_to_bytes, parse_to_dict logger = logging.getLogger(__name__) @@ -199,7 +199,7 @@ def list( yield index @enforce_query_checks - def read( # noqa: PLR0912, PLR0913 + def read( # noqa: PLR0913 self, *, query: Optional[Union[str, ESQuery]] = None, @@ -219,7 +219,9 @@ def read( # noqa: PLR0912, PLR0913 chunk_size (int or None): The chunk size when reading documents by batches. If chunk_size is `None` it defaults to `DEFAULT_CHUNK_SIZE`. raw_output (bool): Controls whether to yield dictionaries or bytes. - ignore_errors (bool): Ignored. + ignore_errors (bool): If `True`, errors during the encoding operation + will be ignored and logged. If `False` (default), a `BackendException` + will be raised if an error occurs. Yield: bytes: The next raw document if `raw_output` is True. @@ -228,6 +230,17 @@ def read( # noqa: PLR0912, PLR0913 Raise: BackendException: If a failure occurs during Elasticsearch connection. """ + if raw_output: + documents = self.read( + query=query, + target=target, + chunk_size=chunk_size, + raw_output=False, + ignore_errors=ignore_errors, + ) + locale = self.settings.LOCALE_ENCODING + yield from parse_dict_to_bytes(documents, locale, ignore_errors, logger) + return target = target if target else self.settings.DEFAULT_INDEX chunk_size = chunk_size if chunk_size else self.settings.DEFAULT_CHUNK_SIZE if ignore_errors: @@ -269,12 +282,7 @@ def read( # noqa: PLR0912, PLR0913 if count: query.search_after = [str(part) for part in documents[-1]["sort"]] kwargs["search_after"] = query.search_after - if raw_output: - documents = read_raw( - documents, self.settings.LOCALE_ENCODING, ignore_errors, logger - ) - for document in documents: - yield document + yield from documents def write( # noqa: PLR0913 self, @@ -326,7 +334,7 @@ def write( # noqa: PLR0913 data = chain((first_record,), data) if isinstance(first_record, bytes): - data = parse_bytes_to_dict(data, ignore_errors, logger) + data = parse_to_dict(data, ignore_errors, logger) logger.debug( "Start writing to the %s index (chunk size: %d)", target, chunk_size diff --git a/src/ralph/backends/data/fs.py b/src/ralph/backends/data/fs.py index e3d160ee9..5434c5361 100644 --- a/src/ralph/backends/data/fs.py +++ b/src/ralph/backends/data/fs.py @@ -1,13 +1,12 @@ """FileSystem data backend for Ralph.""" -import json import logging import os from datetime import datetime, timezone -from io import IOBase +from io import BufferedReader, IOBase from itertools import chain from pathlib import Path -from typing import IO, Iterable, Iterator, Optional, TypeVar, Union +from typing import Iterable, Iterator, Optional, Tuple, TypeVar, Union from uuid import uuid4 from ralph.backends.data.base import ( @@ -23,7 +22,7 @@ from ralph.backends.mixins import HistoryMixin from ralph.conf import BaseSettingsConfig from ralph.exceptions import BackendException, BackendParameterException -from ralph.utils import now +from ralph.utils import now, parse_dict_to_bytes, parse_to_dict logger = logging.getLogger(__name__) @@ -184,30 +183,15 @@ def read( # noqa: PLR0913 BackendException: If a failure during the read operation occurs and `ignore_errors` is set to `False`. """ - if not query.query_string: - query.query_string = self.default_query_string - if not chunk_size: chunk_size = self.default_chunk_size - target = Path(target) if target else self.default_directory - if not target.is_absolute() and target != self.default_directory: - target = self.default_directory / target - paths = list( - filter(lambda path: path.is_file(), target.glob(query.query_string)) - ) - - if not paths: - logger.info("No file found for query: %s", target / query.query_string) - return - - logger.debug("Reading matching files: %s", paths) - - for path in paths: - with path.open("rb") as file: - reader = self._read_raw if raw_output else self._read_dict - for chunk in reader(file, chunk_size, ignore_errors): + for file, path in self._iter_files_matching_query(target, query): + if raw_output: + while chunk := file.read(chunk_size): yield chunk + else: + yield from parse_to_dict(file, ignore_errors, logger) # The file has been read, add a new entry to the history. self.append_to_history( @@ -224,12 +208,34 @@ def read( # noqa: PLR0913 } ) + def _iter_files_matching_query( + self, target: Union[str, None], query: BaseQuery + ) -> Iterator[Tuple[BufferedReader, Path]]: + """Return file/path tuples for files matching the query.""" + if not query.query_string: + query.query_string = self.default_query_string + + path = Path(target) if target else self.default_directory + if not path.is_absolute() and path != self.default_directory: + path = self.default_directory / path + + paths = list(filter(lambda x: x.is_file(), path.glob(query.query_string))) + if not paths: + msg = "No file found for query: %s" + logger.info(msg, path / Path(str(query.query_string))) + return + + logger.debug("Reading matching files: %s", paths) + for path in paths: + with path.open("rb") as file: + yield file, path + def write( # noqa: PLR0913 self, data: Union[IOBase, Iterable[bytes], Iterable[dict]], target: Optional[str] = None, chunk_size: Optional[int] = None, # noqa: ARG002 - ignore_errors: bool = False, # noqa: ARG002 + ignore_errors: bool = False, operation_type: Optional[BaseOperationType] = None, ) -> int: """Write data records to the target file and return their count. @@ -242,7 +248,9 @@ def write( # noqa: PLR0913 If target is `None`, a random (uuid4) file is created in the `default_directory_path` and used as the target instead. chunk_size (int or None): Ignored. - ignore_errors (bool): Ignored. + ignore_errors (bool): If `True`, errors during the encoding operation + will be ignored and logged. If `False` (default), a `BackendException` + will be raised if an error occurs. operation_type (BaseOperationType or None): The mode of the write operation. If operation_type is `CREATE` or `INDEX`, the target file is expected to be absent. If the target file exists a `FileExistsError` is raised. @@ -265,6 +273,13 @@ def write( # noqa: PLR0913 except StopIteration: logger.info("Data Iterator is empty; skipping write to target.") return 0 + + data = chain((first_record,), data) + if isinstance(first_record, dict): + data = parse_dict_to_bytes( + data, self.locale_encoding, ignore_errors, logger + ) + if not operation_type: operation_type = self.default_operation_type @@ -297,10 +312,8 @@ def write( # noqa: PLR0913 logger.debug("Appending to file: %s", path) with path.open(mode) as file: - is_dict = isinstance(first_record, dict) - writer = self._write_dict if is_dict else self._write_raw - for chunk in chain((first_record,), data): - writer(file, chunk) + for chunk in data: + file.write(chunk) # The file has been created, add a new entry to the history. self.append_to_history( @@ -319,34 +332,5 @@ def write( # noqa: PLR0913 return 1 def close(self) -> None: - """FS backend has nothing to close, this method is not implemented.""" - msg = "FS data backend does not support `close` method" - logger.error(msg) - raise NotImplementedError(msg) - - @staticmethod - def _read_raw(file: IO, chunk_size: int, _ignore_errors: bool) -> Iterator[bytes]: - """Read the `file` in chunks of size `chunk_size` and yield them.""" - while chunk := file.read(chunk_size): - yield chunk - - @staticmethod - def _read_dict(file: IO, _chunk_size: int, ignore_errors: bool) -> Iterator[dict]: - """Read the `file` by line and yield JSON parsed dictionaries.""" - for i, line in enumerate(file): - try: - yield json.loads(line) - except (TypeError, json.JSONDecodeError) as err: - msg = "Raised error: %s, in file %s at line %s" - logger.error(msg, err, file, i) - if not ignore_errors: - raise BackendException(msg % (err, file, i)) from err - - @staticmethod - def _write_raw(file: IO, chunk: bytes) -> None: - """Write the `chunk` bytes to the file.""" - file.write(chunk) - - def _write_dict(self, file: IO, chunk: dict) -> None: - """Write the `chunk` dictionary to the file.""" - file.write(bytes(f"{json.dumps(chunk)}\n", encoding=self.locale_encoding)) + """FS backend has no open connections to close. No action.""" + logger.info("No open connections to close; skipping") diff --git a/src/ralph/backends/data/mongo.py b/src/ralph/backends/data/mongo.py index 8f38aa64b..7556b9dec 100644 --- a/src/ralph/backends/data/mongo.py +++ b/src/ralph/backends/data/mongo.py @@ -36,7 +36,7 @@ ) from ralph.conf import BaseSettingsConfig, ClientOptions from ralph.exceptions import BackendException, BackendParameterException -from ralph.utils import parse_bytes_to_dict, read_raw +from ralph.utils import iter_by_batch, parse_dict_to_bytes, parse_to_dict logger = logging.getLogger(__name__) @@ -204,6 +204,18 @@ def read( # noqa: PLR0913 BackendException: If a failure during the read operation occurs. BackendParameterException: If the `target` is not a valid collection name. """ + if raw_output: + documents = self.read( + query=query, + target=target, + chunk_size=chunk_size, + raw_output=False, + ignore_errors=ignore_errors, + ) + locale = self.settings.LOCALE_ENCODING + yield from parse_dict_to_bytes(documents, locale, ignore_errors, logger) + return + if not chunk_size: chunk_size = self.settings.DEFAULT_CHUNK_SIZE @@ -221,12 +233,7 @@ def read( # noqa: PLR0913 try: documents = collection.find(batch_size=chunk_size, **query) documents = (d.update({"_id": str(d.get("_id"))}) or d for d in documents) - if raw_output: - documents = read_raw( - documents, self.settings.LOCALE_ENCODING, ignore_errors, logger - ) - for document in documents: - yield document + yield from documents except (PyMongoError, IndexError, TypeError, ValueError) as error: msg = "Failed to execute MongoDB query: %s" logger.error(msg, error) @@ -289,19 +296,19 @@ def write( # noqa: PLR0913 return count data = chain([first_record], data) if isinstance(first_record, bytes): - data = parse_bytes_to_dict(data, ignore_errors, logger) + data = parse_to_dict(data, ignore_errors, logger) if operation_type == BaseOperationType.UPDATE: - for batch in self.iter_by_batch(self.to_replace_one(data), chunk_size): + for batch in iter_by_batch(self.to_replace_one(data), chunk_size): count += self._bulk_update(batch, ignore_errors, collection) logger.info("Updated %d documents with success", count) elif operation_type == BaseOperationType.DELETE: - for batch in self.iter_by_batch(self.to_ids(data), chunk_size): + for batch in iter_by_batch(self.to_ids(data), chunk_size): count += self._bulk_delete(batch, ignore_errors, collection) logger.info("Deleted %d documents with success", count) else: data = self.to_documents(data, ignore_errors, operation_type, logger) - for batch in self.iter_by_batch(data, chunk_size): + for batch in iter_by_batch(data, chunk_size): count += self._bulk_import(batch, ignore_errors, collection) logger.info("Inserted %d documents with success", count) @@ -320,18 +327,6 @@ def close(self) -> None: logger.error(msg, error) raise BackendException(msg % error) from error - @staticmethod - def iter_by_batch(data: Iterable[dict], chunk_size: int): - """Iterate over `data` Iterable and yield batches of size `chunk_size`.""" - batch = [] - for document in data: - batch.append(document) - if len(batch) >= chunk_size: - yield batch - batch = [] - if batch: - yield batch - @staticmethod def to_ids(data: Iterable[dict]) -> Iterable[str]: """Convert `data` statements to ids.""" diff --git a/src/ralph/backends/data/s3.py b/src/ralph/backends/data/s3.py index c04ca5b73..d2629fdbe 100644 --- a/src/ralph/backends/data/s3.py +++ b/src/ralph/backends/data/s3.py @@ -1,6 +1,5 @@ """S3 data backend for Ralph.""" -import json import logging from io import IOBase from itertools import chain @@ -16,7 +15,6 @@ ReadTimeoutError, ResponseStreamingError, ) -from botocore.response import StreamingBody from requests_toolbelt import StreamingIterator from ralph.backends.data.base import ( @@ -32,7 +30,7 @@ from ralph.backends.mixins import HistoryMixin from ralph.conf import BaseSettingsConfig from ralph.exceptions import BackendException, BackendParameterException -from ralph.utils import now +from ralph.utils import now, parse_dict_to_bytes, parse_to_dict logger = logging.getLogger(__name__) @@ -207,10 +205,12 @@ def read( # noqa: PLR0913 if not ignore_errors: raise BackendException(msg % (query.query_string, error_msg)) from err - reader = self._read_raw if raw_output else self._read_dict try: - for chunk in reader(response["Body"], chunk_size, ignore_errors): - yield chunk + if raw_output: + yield from response["Body"].iter_chunks(chunk_size) + else: + lines = response["Body"].iter_lines(chunk_size) + yield from parse_to_dict(lines, ignore_errors, logger) except (ReadTimeoutError, ResponseStreamingError) as err: msg = "Failed to read chunk from object %s" logger.error(msg, query.query_string) @@ -307,7 +307,9 @@ def write( # noqa: PLR0913 data = chain((first_record,), data) if isinstance(first_record, dict): - data = self._parse_dict_to_bytes(data, ignore_errors) + data = parse_dict_to_bytes( + data, self.settings.LOCALE_ENCODING, ignore_errors, logger + ) counter = {"count": 0} data = self._count(data, counter) @@ -361,47 +363,7 @@ def close(self) -> None: raise BackendException(msg % error) from error @staticmethod - def _read_raw( - obj: StreamingBody, chunk_size: int, _ignore_errors: bool - ) -> Iterator[bytes]: - """Read the `object` in chunks of size `chunk_size` and yield them.""" - for chunk in obj.iter_chunks(chunk_size): - yield chunk - - @staticmethod - def _read_dict( - obj: StreamingBody, chunk_size: int, ignore_errors: bool - ) -> Iterator[dict]: - """Read the `object` by line and yield JSON parsed dictionaries.""" - for line in obj.iter_lines(chunk_size): - try: - yield json.loads(line) - except (TypeError, json.JSONDecodeError) as err: - msg = "Raised error: %s" - logger.error(msg, err) - if not ignore_errors: - raise BackendException(msg % err) from err - - @staticmethod - def _parse_dict_to_bytes( - statements: Iterable[dict], ignore_errors: bool - ) -> Iterator[bytes]: - """Read the `statements` Iterable and yield bytes.""" - for statement in statements: - try: - yield bytes(f"{json.dumps(statement)}\n", encoding="utf-8") - except TypeError as error: - msg = "Failed to encode JSON: %s, for document %s" - logger.error(msg, error, statement) - if ignore_errors: - continue - raise BackendException(msg % (error, statement)) from error - - @staticmethod - def _count( - statements: Union[Iterable[bytes], Iterable[dict]], - counter: dict, - ) -> Iterator: + def _count(statements: Iterable, counter: dict) -> Iterator: """Count the elements in the `statements` Iterable and yield element.""" for statement in statements: counter["count"] += 1 diff --git a/src/ralph/backends/data/swift.py b/src/ralph/backends/data/swift.py index 77cc52817..471e08fd1 100644 --- a/src/ralph/backends/data/swift.py +++ b/src/ralph/backends/data/swift.py @@ -1,9 +1,9 @@ """Base data backend for Ralph.""" -import json import logging from functools import cached_property from io import IOBase +from itertools import chain from typing import Iterable, Iterator, Optional, Union from uuid import uuid4 @@ -22,7 +22,7 @@ from ralph.backends.mixins import HistoryMixin from ralph.conf import BaseSettingsConfig from ralph.exceptions import BackendException, BackendParameterException -from ralph.utils import now +from ralph.utils import now, parse_dict_to_bytes, parse_to_dict logger = logging.getLogger(__name__) @@ -221,13 +221,13 @@ def read( # noqa: PLR0913 msg = "Failed to read %s: %s" error = err.msg logger.error(msg, query.query_string, error) - if not ignore_errors: - raise BackendException(msg % (query.query_string, error)) from err - - reader = self._read_raw if raw_output else self._read_dict + raise BackendException(msg % (query.query_string, error)) from err - for chunk in reader(content, chunk_size, ignore_errors): - yield chunk + if raw_output: + while chunk := content.read(chunk_size): + yield chunk + else: + yield from parse_to_dict(content, ignore_errors, logger) # Archive read, add a new entry to the history self.append_to_history( @@ -240,7 +240,7 @@ def read( # noqa: PLR0913 } ) - def write( # noqa: PLR0912, PLR0913 + def write( # noqa: PLR0913 self, data: Union[IOBase, Iterable[bytes], Iterable[dict]], target: Optional[str] = None, @@ -270,11 +270,22 @@ def write( # noqa: PLR0912, PLR0913 `ignore_errors` is set to `False`. BackendParameterException: If a backend argument value is not valid. """ + data = iter(data) try: - first_record = next(iter(data)) + first_record = next(data) except StopIteration: logger.info("Data Iterator is empty; skipping write to target.") return 0 + + data = chain((first_record,), data) + if isinstance(first_record, dict): + data = parse_dict_to_bytes( + data, self.settings.LOCALE_ENCODING, ignore_errors, logger + ) + + counter = {"count": 0} + data = self._count(data, counter) + if not operation_type: operation_type = self.default_operation_type @@ -310,14 +321,7 @@ def write( # noqa: PLR0912, PLR0913 if target_object in list(self.list(target=target_container)): msg = "%s already exists and overwrite is not allowed for operation %s" logger.error(msg, target_object, operation_type) - if not ignore_errors: - raise BackendException(msg % (target_object, operation_type)) - - if isinstance(first_record, dict): - data = [ - json.dumps(statement).encode(self.locale_encoding) - for statement in data - ] + raise BackendException(msg % (target_object, operation_type)) try: self.connection.put_object( @@ -330,10 +334,9 @@ def write( # noqa: PLR0912, PLR0913 msg = "Failed to write to object %s: %s" error = err.msg logger.error(msg, target_object, error) - if not ignore_errors: - raise BackendException(msg % (target_object, error)) from err + raise BackendException(msg % (target_object, error)) from err - count = sum(1 for _ in data) + count = counter["count"] logging.info("Successfully written %s statements to %s", count, target) # Archive written, add a new entry to the history @@ -382,23 +385,8 @@ def _details(self, container: str, name: str): } @staticmethod - def _read_dict( - obj: Iterable, _chunk_size: int, ignore_errors: bool - ) -> Iterator[dict]: - """Read the `object` by line and yield JSON parsed dictionaries.""" - for i, line in enumerate(obj): - try: - yield json.loads(line) - except (TypeError, json.JSONDecodeError) as err: - msg = "Raised error: %s, at line %s" - logger.error(msg, err, i) - if not ignore_errors: - raise BackendException(msg % (err, i)) from err - - @staticmethod - def _read_raw( - obj: Iterable, chunk_size: int, _ignore_errors: bool - ) -> Iterator[bytes]: - """Read the `object` by line and yield bytes.""" - while chunk := obj.read(chunk_size): - yield chunk + def _count(statements: Iterable, counter: dict) -> Iterator: + """Count the elements in the `statements` Iterable and yield element.""" + for statement in statements: + counter["count"] += 1 + yield statement diff --git a/src/ralph/backends/http/async_lrs.py b/src/ralph/backends/http/async_lrs.py index b97a2462a..a92d40fc1 100644 --- a/src/ralph/backends/http/async_lrs.py +++ b/src/ralph/backends/http/async_lrs.py @@ -8,14 +8,13 @@ from urllib.parse import ParseResult, parse_qs, urljoin, urlparse from httpx import AsyncClient, HTTPError, HTTPStatusError, RequestError -from more_itertools import chunked from pydantic import AnyHttpUrl, BaseModel, Field, parse_obj_as from pydantic.types import PositiveInt from ralph.backends.lrs.base import LRSStatementsQuery from ralph.conf import BaseSettingsConfig, HeadersParameters from ralph.exceptions import BackendException, BackendParameterException -from ralph.utils import gather_with_limited_concurrency +from ralph.utils import gather_with_limited_concurrency, iter_by_batch from .base import ( BaseHTTPBackend, @@ -257,6 +256,9 @@ async def write( # noqa: PLR0913 if not target: target = self.settings.STATEMENTS_ENDPOINT + if not chunk_size: + chunk_size = 500 + target = ParseResult( scheme=urlparse(self.base_url).scheme, netloc=urlparse(self.base_url).netloc, @@ -288,7 +290,7 @@ async def write( # noqa: PLR0913 # Create tasks tasks = set() - for chunk in chunked(data, chunk_size): + for chunk in iter_by_batch(data, chunk_size): tasks.add(self._post_and_raise_for_status(target, chunk, ignore_errors)) # Run POST tasks diff --git a/src/ralph/cli.py b/src/ralph/cli.py index de483e90e..eae21557b 100644 --- a/src/ralph/cli.py +++ b/src/ralph/cli.py @@ -675,7 +675,7 @@ def read( # noqa: PLR0913 iter_over_async(statements) if isasyncgen(statements) else statements ) for statement in statements: - click.echo(statement) + click.echo(statement, nl=False) elif isinstance(backend, BaseStreamBackend): backend.stream(sys.stdout.buffer) elif isinstance(backend, BaseHTTPBackend): diff --git a/src/ralph/utils.py b/src/ralph/utils.py index e1d08b834..0bb73c974 100644 --- a/src/ralph/utils.py +++ b/src/ralph/utils.py @@ -9,7 +9,22 @@ from importlib import import_module from inspect import getmembers, isclass, iscoroutine from logging import Logger, getLogger -from typing import Any, Dict, Iterable, Iterator, List, Optional, Sequence, Type, Union +from typing import ( + Any, + AsyncIterable, + AsyncIterator, + Callable, + Dict, + Iterable, + Iterator, + List, + Optional, + Sequence, + Tuple, + Type, + TypeVar, + Union, +) from ralph.exceptions import BackendException, UnsupportedBackendException @@ -171,39 +186,80 @@ def statements_are_equivalent(statement_1: dict, statement_2: dict) -> bool: return True -def parse_bytes_to_dict( - raw_documents: Iterable[bytes], ignore_errors: bool, logger_class: logging.Logger +T = TypeVar("T") + + +def parse_to_dict( + raw_documents: Iterable[T], + ignore_errors: bool, + logger_class: logging.Logger, + parser: Callable[[T], Dict[str, Any]] = json.loads, + exceptions: Tuple[Type[Exception], ...] = (TypeError, json.JSONDecodeError), ) -> Iterator[dict]: """Read the `raw_documents` Iterable and yield dictionaries.""" - for raw_document in raw_documents: + for i, raw_document in enumerate(raw_documents): try: - yield json.loads(raw_document) - except (TypeError, json.JSONDecodeError) as error: - msg = "Failed to decode JSON: %s, for document: %s" + yield parser(raw_document) + except exceptions as error: + msg = "Failed to decode JSON: %s, for document: %s, at line %s" if ignore_errors: - logger_class.warning(msg, error, raw_document) + logger_class.warning(msg, error, raw_document, i) continue - logger_class.error(msg, error, raw_document) - raise BackendException(msg % (error, raw_document)) from error + logger_class.error(msg, error, raw_document, i) + raise BackendException(msg % (error, raw_document, i)) from error -def read_raw( +def parse_dict_to_bytes( documents: Iterable[Dict[str, Any]], encoding: str, ignore_errors: bool, logger_class: logging.Logger, ) -> Iterator[bytes]: """Read the `documents` Iterable with the `encoding` and yield bytes.""" - for document in documents: + for i, document in enumerate(documents): + try: + yield f"{json.dumps(document)}\n".encode(encoding) + except (TypeError, ValueError) as error: + msg = "Failed to encode JSON: %s, for document: %s, at line %s" + if ignore_errors: + logger_class.warning(msg, error, document, i) + continue + logger_class.error(msg, error, document, i) + raise BackendException(msg % (error, document, i)) from error + + +async def async_parse_dict_to_bytes( + documents: AsyncIterable[Dict[str, Any]], + encoding: str, + ignore_errors: bool, + logger_class: logging.Logger, +) -> AsyncIterator[bytes]: + """Read the `documents` Iterable with the `encoding` and yield bytes.""" + i = 0 + async for document in documents: try: - yield json.dumps(document).encode(encoding) + yield f"{json.dumps(document)}\n".encode(encoding) except (TypeError, ValueError) as error: - msg = "Failed to convert document to bytes: %s" + msg = "Failed to encode JSON: %s, for document: %s, at line %s" if ignore_errors: - logger_class.warning(msg, error) + logger_class.warning(msg, error, document, i) continue - logger_class.error(msg, error) - raise BackendException(msg % error) from error + logger_class.error(msg, error, document, i) + raise BackendException(msg % (error, document, i)) from error + + i += 1 + + +def iter_by_batch(data: Iterable[Any], batch_size: int): + """Iterate over `data` Iterable and yield batches of size `batch_size`.""" + batch = [] + for document in data: + batch.append(document) + if len(batch) >= batch_size: + yield batch + batch = [] + if batch: + yield batch def iter_over_async(agenerator) -> Iterable: diff --git a/tests/backends/data/test_async_es.py b/tests/backends/data/test_async_es.py index 7a7fd7cc5..0c42d1a17 100644 --- a/tests/backends/data/test_async_es.py +++ b/tests/backends/data/test_async_es.py @@ -732,7 +732,7 @@ async def test_backends_data_async_es_data_backend_write_method_without_ignore_e # By default, we should raise an error and stop the importation. msg = ( r"Failed to decode JSON: Expecting value: line 1 column 1 \(char 0\), " - r"for document: b'This is invalid JSON'" + r"for document: b'This is invalid JSON', at line 1" ) with pytest.raises(BackendException, match=msg): with caplog.at_level(logging.ERROR): @@ -761,7 +761,7 @@ async def test_backends_data_async_es_data_backend_write_method_with_ignore_erro msg = ( "Failed to decode JSON: Expecting value: line 1 column 1 (char 0), " - "for document: b'This is invalid JSON'" + "for document: b'This is invalid JSON', at line 1" ) records = [{"id": idx, "count": random.randint(0, 100)} for idx in range(10)] # Patch a record with a non-expected type for the count field (should be diff --git a/tests/backends/data/test_async_mongo.py b/tests/backends/data/test_async_mongo.py index 6544b8afe..79f86e6af 100644 --- a/tests/backends/data/test_async_mongo.py +++ b/tests/backends/data/test_async_mongo.py @@ -2,6 +2,7 @@ import json import logging +import re import pytest from bson.objectid import ObjectId @@ -325,9 +326,9 @@ async def test_backends_data_async_mongo_data_backend_read_method_with_raw_outpu {"_id": ObjectId("64945e530468d817b1f756db"), "id": "baz"}, ] expected = [ - b'{"_id": "64945e53a4ee2699573e0d6f", "id": "foo"}', - b'{"_id": "64945e530468d817b1f756da", "id": "bar"}', - b'{"_id": "64945e530468d817b1f756db", "id": "baz"}', + b'{"_id": "64945e53a4ee2699573e0d6f", "id": "foo"}\n', + b'{"_id": "64945e530468d817b1f756da", "id": "bar"}\n', + b'{"_id": "64945e530468d817b1f756db", "id": "baz"}\n', ] await backend.collection.insert_many(documents) await backend.database.foobar.insert_many(documents[:2]) @@ -453,14 +454,15 @@ async def test_backends_data_async_mongo_data_backend_read_method_with_ignore_er """ backend = async_mongo_backend() + unparsable_value = ObjectId() documents = [ {"_id": ObjectId("64945e53a4ee2699573e0d6f"), "id": "foo"}, - {"_id": ObjectId("64945e530468d817b1f756da"), "id": ObjectId()}, + {"_id": ObjectId("64945e530468d817b1f756da"), "id": unparsable_value}, {"_id": ObjectId("64945e530468d817b1f756db"), "id": "baz"}, ] expected = [ - b'{"_id": "64945e53a4ee2699573e0d6f", "id": "foo"}', - b'{"_id": "64945e530468d817b1f756db", "id": "baz"}', + b'{"_id": "64945e53a4ee2699573e0d6f", "id": "foo"}\n', + b'{"_id": "64945e530468d817b1f756db", "id": "baz"}\n', ] await backend.collection.insert_many(documents) await backend.database.foobar.insert_many(documents[:2]) @@ -480,8 +482,9 @@ async def test_backends_data_async_mongo_data_backend_read_method_with_ignore_er assert ( "ralph.backends.data.async_mongo", logging.WARNING, - "Failed to encode MongoDB document with ID 64945e530468d817b1f756da: " - "Object of type ObjectId is not JSON serializable", + "Failed to encode JSON: Object of type ObjectId is not " + "JSON serializable, for document: {'_id': '64945e530468d817b1f756da', " + f"'id': ObjectId('{unparsable_value}')}}, at line 1", ) in caplog.record_tuples @@ -495,9 +498,10 @@ async def test_backends_data_async_mongo_data_backend_read_method_without_ignore """ backend = async_mongo_backend() + unparsable_value = ObjectId() documents = [ {"_id": ObjectId("64945e53a4ee2699573e0d6f"), "id": "foo"}, - {"_id": ObjectId("64945e530468d817b1f756da"), "id": ObjectId()}, + {"_id": ObjectId("64945e530468d817b1f756da"), "id": unparsable_value}, {"_id": ObjectId("64945e530468d817b1f756db"), "id": "baz"}, ] expected = b'{"_id": "64945e53a4ee2699573e0d6f", "id": "foo"}' @@ -505,27 +509,28 @@ async def test_backends_data_async_mongo_data_backend_read_method_without_ignore await backend.database.foobar.insert_many(documents[:2]) kwargs = {"raw_output": True, "ignore_errors": False} msg = ( - "Failed to encode MongoDB document with ID 64945e530468d817b1f756da: " - "Object of type ObjectId is not JSON serializable" + "Failed to encode JSON: Object of type ObjectId is not JSON serializable, " + "for document: {'_id': '64945e530468d817b1f756da', " + f"'id': ObjectId('{unparsable_value}')}}, at line 1" ) with caplog.at_level(logging.ERROR): - with pytest.raises(BackendException, match=msg): + with pytest.raises(BackendException, match=re.escape(msg)): result = [statement async for statement in backend.read(**kwargs)] assert next(result) == expected next(result) - with pytest.raises(BackendException, match=msg): + with pytest.raises(BackendException, match=re.escape(msg)): result = [ statement async for statement in backend.read(**kwargs, target="foobar") ] assert next(result) == expected next(result) - with pytest.raises(BackendException, match=msg): + with pytest.raises(BackendException, match=re.escape(msg)): result = [ statement async for statement in backend.read(**kwargs, chunk_size=2) ] assert next(result) == expected next(result) - with pytest.raises(BackendException, match=msg): + with pytest.raises(BackendException, match=re.escape(msg)): result = [ statement async for statement in backend.read(**kwargs, chunk_size=1000) ] @@ -938,7 +943,7 @@ async def test_backends_data_async_mongo_data_backend_write_method_with_unparsab backend = async_mongo_backend() msg = ( "Failed to decode JSON: Expecting value: line 1 column 1 (char 0), " - "for document: b'not valid JSON!'" + "for document: b'not valid JSON!', at line 0" ) msg_regex = msg.replace("(", r"\(").replace(")", r"\)") with pytest.raises(BackendException, match=msg_regex): @@ -964,13 +969,13 @@ async def test_backends_data_async_mongo_data_backend_write_method_with_no_data( 0. """ backend = async_mongo_backend() - with caplog.at_level(logging.WARNING): + with caplog.at_level(logging.INFO): assert await backend.write(data=[]) == 0 msg = "Data Iterator is empty; skipping write to target." assert ( "ralph.backends.data.async_mongo", - logging.WARNING, + logging.INFO, msg, ) in caplog.record_tuples diff --git a/tests/backends/data/test_clickhouse.py b/tests/backends/data/test_clickhouse.py index 9d1184e3f..52db896e4 100644 --- a/tests/backends/data/test_clickhouse.py +++ b/tests/backends/data/test_clickhouse.py @@ -2,7 +2,9 @@ import json import logging +import re import uuid +from collections import namedtuple from datetime import datetime, timedelta import pytest @@ -226,26 +228,23 @@ def test_backends_data_clickhouse_data_backend_read_method_with_failures( ): """Test the `ClickHouseDataBackend.read` method with failures.""" backend = clickhouse_backend() - - statement = {"id": str(uuid.uuid4()), "timestamp": str(datetime.utcnow())} - document = {"event": json.dumps(statement)} - backend.write([statement]) + document = {"event": "Invalid JSON!"} # JSON encoding error - def mock_read_json(*args, **kwargs): - """Mock the `ClickHouseDataBackend._read_json` method.""" - raise TypeError("Error") + def mock_clickhouse_client_query(*args, **kwargs): + """Mock the `clickhouse.Client.query` returning an unparsable document.""" + return namedtuple("_", "named_results")(lambda: [document]) - monkeypatch.setattr(backend, "_read_json", mock_read_json) + monkeypatch.setattr(backend.client, "query", mock_clickhouse_client_query) - msg = f"Failed to encode document {document}: Error" + msg = ( + "Failed to decode JSON: Expecting value: line 1 column 1 (char 0), " + "for document: {'event': 'Invalid JSON!'}, at line 0" + ) # Not ignoring errors with caplog.at_level(logging.ERROR): - with pytest.raises( - BackendException, - match=msg, - ): + with pytest.raises(BackendException, match=re.escape(msg)): list(backend.read(raw_output=False, ignore_errors=False)) assert ( @@ -281,10 +280,7 @@ def mock_query(*_, **__): msg = "Failed to read documents: Something is wrong" with caplog.at_level(logging.ERROR): - with pytest.raises( - BackendException, - match=msg, - ): + with pytest.raises(BackendException, match=re.escape(msg)): list(backend.read(ignore_errors=True)) assert ( @@ -565,7 +561,11 @@ def test_backends_data_clickhouse_data_backend_write_method_bytes_failed( byte_data.append(json_str.encode("utf-8")) count = 0 - with pytest.raises(json.JSONDecodeError): + msg = ( + r"Failed to decode JSON: Expecting value: line 1 column 1 \(char 0\), " + r"for document: b'failed_json_str', at line 0" + ) + with pytest.raises(BackendException, match=msg): count = backend.write(byte_data) assert count == 0 diff --git a/tests/backends/data/test_es.py b/tests/backends/data/test_es.py index 53c57339b..ba657ca49 100644 --- a/tests/backends/data/test_es.py +++ b/tests/backends/data/test_es.py @@ -656,7 +656,7 @@ def test_backends_data_es_data_backend_write_method_without_ignore_errors( # By default, we should raise an error and stop the importation. msg = ( r"Failed to decode JSON: Expecting value: line 1 column 1 \(char 0\), " - r"for document: b'This is invalid JSON'" + r"for document: b'This is invalid JSON', at line 1" ) with pytest.raises(BackendException, match=msg): with caplog.at_level(logging.ERROR): diff --git a/tests/backends/data/test_fs.py b/tests/backends/data/test_fs.py index e1591796c..8f1d778d8 100644 --- a/tests/backends/data/test_fs.py +++ b/tests/backends/data/test_fs.py @@ -637,7 +637,11 @@ def test_backends_data_fs_data_backend_read_method_without_ignore_errors( result = backend.read(ignore_errors=False) assert isinstance(result, Iterable) assert next(result) == valid_dictionary - with pytest.raises(BackendException, match="Raised error:"): + msg = ( + r"Failed to decode JSON: Expecting value: line 1 column 1 \(char 0\), " + r"for document: b'baz\\n', at line 1" + ) + with pytest.raises(BackendException, match=msg): next(result) # When the `read` method fails to read a file entirely, then no entry should be @@ -652,7 +656,11 @@ def test_backends_data_fs_data_backend_read_method_without_ignore_errors( result = backend.read(ignore_errors=False, target=absolute_path) assert isinstance(result, Iterable) assert next(result) == valid_dictionary - with pytest.raises(BackendException, match="Raised error:"): + msg = ( + r"Failed to decode JSON: Expecting value: line 1 column 1 \(char 0\), " + r"for document: b'baz', at line 0" + ) + with pytest.raises(BackendException, match=msg): next(result) # When the `read` method succeeds to read one file entirely, and fails to read @@ -675,7 +683,11 @@ def test_backends_data_fs_data_backend_read_method_without_ignore_errors( # line, the `read` method should raise a `BackendException`. result = backend.read(ignore_errors=False, target="bar") assert isinstance(result, Iterable) - with pytest.raises(BackendException, match="Raised error:"): + msg = ( + r"Failed to decode JSON: Expecting value: line 1 column 1 \(char 0\), " + r"for document: b'baz\\n', at line 0" + ) + with pytest.raises(BackendException, match=msg): next(result) # When the `read` method fails to read a file entirely, then no new entry should be @@ -992,11 +1004,15 @@ def test_backends_data_fs_data_backend_write_method_without_target( ] -def test_backends_data_fs_data_backend_close_method(fs_backend): - """Test that the `FSDataBackend.close` method raise an error.""" +def test_backends_data_fs_data_backend_close_method(fs_backend, caplog): + """Test that the `FSDataBackend.close` method write an info log.""" backend = fs_backend() - - error = "FS data backend does not support `close` method" - with pytest.raises(NotImplementedError, match=error): + with caplog.at_level(logging.INFO): backend.close() + + assert ( + "ralph.backends.data.fs", + logging.INFO, + "No open connections to close; skipping", + ) in caplog.record_tuples diff --git a/tests/backends/data/test_ldp.py b/tests/backends/data/test_ldp.py index c0ff932d4..8fa80ad53 100644 --- a/tests/backends/data/test_ldp.py +++ b/tests/backends/data/test_ldp.py @@ -691,7 +691,7 @@ def mock_post(url): def test_backends_data_ldp_data_backend_close_method(ldp_backend, caplog): - """Test that the `LDPDataBackend.close` method raise an error.""" + """Test that the `LDPDataBackend.close` method write an info log.""" backend = ldp_backend() diff --git a/tests/backends/data/test_mongo.py b/tests/backends/data/test_mongo.py index 12a0e56da..063dd0679 100644 --- a/tests/backends/data/test_mongo.py +++ b/tests/backends/data/test_mongo.py @@ -2,6 +2,7 @@ import json import logging +import re import pytest from bson.objectid import ObjectId @@ -264,9 +265,9 @@ def test_backends_data_mongo_data_backend_read_method_with_raw_output( {"_id": ObjectId("64945e530468d817b1f756db"), "id": "baz"}, ] expected = [ - b'{"_id": "64945e53a4ee2699573e0d6f", "id": "foo"}', - b'{"_id": "64945e530468d817b1f756da", "id": "bar"}', - b'{"_id": "64945e530468d817b1f756db", "id": "baz"}', + b'{"_id": "64945e53a4ee2699573e0d6f", "id": "foo"}\n', + b'{"_id": "64945e530468d817b1f756da", "id": "bar"}\n', + b'{"_id": "64945e530468d817b1f756db", "id": "baz"}\n', ] backend.collection.insert_many(documents) backend.database.foobar.insert_many(documents[:2]) @@ -362,14 +363,15 @@ def test_backends_data_mongo_data_backend_read_method_with_ignore_errors( """ backend = mongo_backend() + unparsable_value = ObjectId() documents = [ {"_id": ObjectId("64945e53a4ee2699573e0d6f"), "id": "foo"}, - {"_id": ObjectId("64945e530468d817b1f756da"), "id": ObjectId()}, + {"_id": ObjectId("64945e530468d817b1f756da"), "id": unparsable_value}, {"_id": ObjectId("64945e530468d817b1f756db"), "id": "baz"}, ] expected = [ - b'{"_id": "64945e53a4ee2699573e0d6f", "id": "foo"}', - b'{"_id": "64945e530468d817b1f756db", "id": "baz"}', + b'{"_id": "64945e53a4ee2699573e0d6f", "id": "foo"}\n', + b'{"_id": "64945e530468d817b1f756db", "id": "baz"}\n', ] backend.collection.insert_many(documents) backend.database.foobar.insert_many(documents[:2]) @@ -383,8 +385,9 @@ def test_backends_data_mongo_data_backend_read_method_with_ignore_errors( assert ( "ralph.backends.data.mongo", logging.WARNING, - "Failed to convert document to bytes: " - "Object of type ObjectId is not JSON serializable", + "Failed to encode JSON: Object of type ObjectId is not " + "JSON serializable, for document: {'_id': '64945e530468d817b1f756da', " + f"'id': ObjectId('{unparsable_value}')}}, at line 1", ) in caplog.record_tuples backend.close() @@ -398,33 +401,35 @@ def test_backends_data_mongo_data_backend_read_method_without_ignore_errors( """ backend = mongo_backend() + unparsable_value = ObjectId() documents = [ {"_id": ObjectId("64945e53a4ee2699573e0d6f"), "id": "foo"}, - {"_id": ObjectId("64945e530468d817b1f756da"), "id": ObjectId()}, + {"_id": ObjectId("64945e530468d817b1f756da"), "id": unparsable_value}, {"_id": ObjectId("64945e530468d817b1f756db"), "id": "baz"}, ] - expected = b'{"_id": "64945e53a4ee2699573e0d6f", "id": "foo"}' + expected = b'{"_id": "64945e53a4ee2699573e0d6f", "id": "foo"}\n' backend.collection.insert_many(documents) backend.database.foobar.insert_many(documents[:2]) kwargs = {"raw_output": True, "ignore_errors": False} msg = ( - "Failed to convert document to bytes: " - "Object of type ObjectId is not JSON serializable" + "Failed to encode JSON: Object of type ObjectId is not " + "JSON serializable, for document: {'_id': '64945e530468d817b1f756da', " + f"'id': ObjectId('{unparsable_value}')}}, at line 1" ) with caplog.at_level(logging.ERROR): - with pytest.raises(BackendException, match=msg): + with pytest.raises(BackendException, match=re.escape(msg)): result = backend.read(**kwargs) assert next(result) == expected next(result) - with pytest.raises(BackendException, match=msg): + with pytest.raises(BackendException, match=re.escape(msg)): result = backend.read(**kwargs, target="foobar") assert next(result) == expected next(result) - with pytest.raises(BackendException, match=msg): + with pytest.raises(BackendException, match=re.escape(msg)): result = backend.read(**kwargs, chunk_size=2) assert next(result) == expected next(result) - with pytest.raises(BackendException, match=msg): + with pytest.raises(BackendException, match=re.escape(msg)): result = backend.read(**kwargs, chunk_size=1000) assert next(result) == expected next(result) @@ -817,7 +822,7 @@ def test_backends_data_mongo_data_backend_write_method_with_unparsable_documents backend = mongo_backend() msg = ( "Failed to decode JSON: Expecting value: line 1 column 1 (char 0), " - "for document: b'not valid JSON!'" + "for document: b'not valid JSON!', at line 0" ) msg_regex = msg.replace("(", r"\(").replace(")", r"\)") with caplog.at_level(logging.ERROR): diff --git a/tests/backends/data/test_s3.py b/tests/backends/data/test_s3.py index a4cd6b574..82ee4cd35 100644 --- a/tests/backends/data/test_s3.py +++ b/tests/backends/data/test_s3.py @@ -3,6 +3,8 @@ import datetime import json import logging +import re +from collections import namedtuple import boto3 import pytest @@ -319,7 +321,8 @@ def test_backends_data_s3_read_with_invalid_output_should_log_the_error( assert ( "ralph.backends.data.s3", logging.ERROR, - "Raised error: Expecting value: line 1 column 1 (char 0)", + "Failed to decode JSON: Expecting value: line 1 column 1 (char 0)," + " for document: b'some contents in the body', at line 0", ) in caplog.record_tuples backend.clean_history(lambda *_: True) @@ -425,13 +428,18 @@ def test_backends_data_s3_read_with_iter_error_should_log_the_error( Body=body, ) - def mock_read_raw(*args, **kwargs): - raise ResponseStreamingError(error="error") + def mock_get_object(*args, **kwargs): # pylint: disable=unused-argument + """Mock the boto3 client.get_object method raising an exception on iteration.""" + + def raising_iter_chunks(*_, **__): # pylint: disable=unused-argument + raise ResponseStreamingError(error="error") + + return {"Body": namedtuple("_", "iter_chunks")(raising_iter_chunks)} with caplog.at_level(logging.ERROR): with pytest.raises(BackendException): backend = s3_backend() - monkeypatch.setattr(backend, "_read_raw", mock_read_raw) + monkeypatch.setattr(backend.client, "get_object", mock_get_object) backend.clean_history(lambda *_: True) list(backend.read(query=object_name, target=bucket_name, raw_output=True)) @@ -581,11 +589,13 @@ def test_backends_data_s3_write_method_with_create_index_operation( data = [{"some": "content", "datetime": date}] - error = "Object of type datetime is not JSON serializable" - - with caplog.at_level(logging.ERROR): + msg = ( + "Failed to encode JSON: Object of type datetime is not JSON serializable, " + f"for document: {data[0]}, at line 0" + ) + with caplog.at_level(logging.WARNING): # Without ignoring error - with pytest.raises(BackendException, match=error): + with pytest.raises(BackendException, match=re.escape(msg)): response = backend.write( data=data, target=object_name, @@ -603,18 +613,14 @@ def test_backends_data_s3_write_method_with_create_index_operation( assert list( filter( - lambda record: record[1] == logging.ERROR, + lambda record: record[1] in [logging.ERROR, logging.WARNING], caplog.record_tuples, ) ) == ( [ - ( - "ralph.backends.data.s3", - logging.ERROR, - f"Failed to encode JSON: {error}, for document {data[0]}", - ) + ("ralph.backends.data.s3", logging.ERROR, msg), + ("ralph.backends.data.s3", logging.WARNING, msg), ] - * 2 ) backend.close() diff --git a/tests/backends/data/test_swift.py b/tests/backends/data/test_swift.py index c31d29202..0d5e23c3e 100644 --- a/tests/backends/data/test_swift.py +++ b/tests/backends/data/test_swift.py @@ -461,7 +461,11 @@ def mock_get_object_1(*args, **kwargs): result = backend.read(ignore_errors=False, query="2020-06-02.gz") assert isinstance(result, Iterable) assert next(result) == valid_dictionary - with pytest.raises(BackendException, match="Raised error:"): + msg = ( + r"Failed to decode JSON: Expecting value: line 1 column 1 \(char 0\), " + r"for document: b'baz\\n', at line 1" + ) + with pytest.raises(BackendException, match=msg): next(result) # When the `read` method fails to read a file entirely, then no entry should be @@ -478,7 +482,11 @@ def mock_get_object_2(*args, **kwargs): # method should raise a `BackendException` at the second line. result = backend.read(ignore_errors=False, query="2020-06-03.gz") assert isinstance(result, Iterable) - with pytest.raises(BackendException, match="Raised error:"): + msg = ( + r"Failed to decode JSON: Expecting value: line 1 column 1 \(char 0\), " + r"for document: b'baz\\n', at line 0" + ) + with pytest.raises(BackendException, match=msg): next(result) backend.close() @@ -629,7 +637,8 @@ def test_backends_data_swift_data_backend_write_method_without_target( def mock_get_container(*args, **kwargs): return (None, [x["name"] for x in listing]) - def mock_put_object(*args, **kwargs): + def mock_put_object(container, obj, contents): + list(contents) return 1 def mock_head_object(*args, **kwargs): diff --git a/tests/backends/http/test_async_lrs.py b/tests/backends/http/test_async_lrs.py index 0b7080b32..be0601630 100644 --- a/tests/backends/http/test_async_lrs.py +++ b/tests/backends/http/test_async_lrs.py @@ -672,11 +672,14 @@ async def test_backends_http_lrs_write_without_operation( max_num_simultaneous=max_num_simultaneous, ) + # If no chunk_size is provided, a default value (500) should be used. + if chunk_size is None: + chunk_size = 500 + assert ( "ralph.backends.http.async_lrs", logging.DEBUG, - f"Start writing to the {base_url}{target} endpoint (chunk size: " - f"{chunk_size})", + f"Start writing to the {base_url}{target} endpoint (chunk size: {chunk_size})", ) in caplog.record_tuples assert (