diff --git a/python/arcticdb/version_store/library.py b/python/arcticdb/version_store/library.py index 1f10b3d7d5..08756a10ff 100644 --- a/python/arcticdb/version_store/library.py +++ b/python/arcticdb/version_store/library.py @@ -622,8 +622,8 @@ def write_batch( ) def write_pickle_batch( - self, payloads: List[WritePayload], prune_previous_versions: bool = False, staged=False - ) -> List[VersionedItem]: + self, payloads: List[WritePayload], prune_previous_versions: bool = False + ) -> List[Union[VersionedItem, DataError]]: """ Write a batch of multiple symbols, pickling their data if necessary. @@ -633,14 +633,13 @@ def write_pickle_batch( Symbols and their corresponding data. There must not be any duplicate symbols in `payload`. prune_previous_versions: `bool`, default=False Removes previous (non-snapshotted) versions from the database. - staged: `bool`, default=False - See documentation on `write`. Returns ------- - List[VersionedItem] + List[Union[VersionedItem, DataError]] Structures containing metadata and version number of the written symbols in the store, in the - same order as `payload`. + same order as `payload`. If a key error or any other internal exception is raised, a DataError object is + returned, with symbol, error_code, error_category, and exception_string properties. Raises ------ @@ -654,13 +653,14 @@ def write_pickle_batch( """ self._raise_if_duplicate_symbols_in_batch(payloads) - return self._nvs.batch_write( + return self._nvs._batch_write_internal( [p.symbol for p in payloads], [p.data for p in payloads], [p.metadata for p in payloads], prune_previous_version=prune_previous_versions, pickle_on_failure=True, - parallel=staged, + validate_index=False, + throw_on_error=False, ) def append( diff --git a/python/tests/integration/arcticdb/test_arctic_batch.py b/python/tests/integration/arcticdb/test_arctic_batch.py index 93ff652332..4b3983867a 100644 --- a/python/tests/integration/arcticdb/test_arctic_batch.py +++ b/python/tests/integration/arcticdb/test_arctic_batch.py @@ -328,6 +328,41 @@ def test_write_pickle_batch_duplicate_symbols(arctic_library): assert not lib.list_symbols() +def test_write_pickle_batch_dataerror(library_factory): + """Only way to trigger a DataError response with write_pickle_batch is to enable dedup and delete previous version's + index key.""" + lib = library_factory(LibraryOptions(dedup=True)) + assert lib._nvs._lib_cfg.lib_desc.version.write_options.de_duplication + + lib.write_pickle("s1", 1) + lib.write_pickle("s2", 2) + + lib_tool = lib._nvs.library_tool() + s1_index_key = lib_tool.find_keys_for_id(KeyType.TABLE_INDEX, "s1")[0] + lib_tool.remove(s1_index_key) + + # When + batch = lib.write_pickle_batch( + [ + WritePayload("s1", 3, metadata="great_metadata_s1"), + WritePayload("s2", 4, metadata="great_metadata_s2"), + ] + ) + + # Then + assert isinstance(batch[0], DataError) + assert batch[0].symbol == "s1" + assert batch[0].version_request_type is None + assert batch[0].version_request_data is None + assert batch[0].error_code == ErrorCode.E_KEY_NOT_FOUND + assert batch[0].error_category == ErrorCategory.STORAGE + + assert not isinstance(batch[1], DataError) + vit = lib.read("s2") + assert vit.metadata == "great_metadata_s2" + assert vit.data == 4 + + def test_write_batch(library_factory): """Should be able to write different size of batch of data.""" lib = library_factory(LibraryOptions(rows_per_segment=10))