From e62a048c8f1473790e196450dbb427dd5e3a2177 Mon Sep 17 00:00:00 2001 From: Vasil Danielov Pashov Date: Wed, 21 Aug 2024 17:32:09 +0300 Subject: [PATCH] Fixes for sort and finalize (#1763) #### Reference Issues/PRs Fixes: #1737 #1736 #1735 #### What does this implement or fix? #### Any other comments? #### Checklist
Checklist for code changes... - [ ] Have you updated the relevant docstrings, documentation and copyright notice? - [ ] Is this contribution tested against [all ArcticDB's features](../docs/mkdocs/docs/technical/contributing.md)? - [ ] Do all exceptions introduced raise appropriate [error messages](https://docs.arcticdb.io/error_messages/)? - [ ] Are API changes highlighted in the PR description? - [ ] Is the PR labelled as enhancement or bug so it appears in autogenerated release notes?
--------- Co-authored-by: Vasil Pashov --- cpp/arcticdb/pipeline/frame_slice.hpp | 2 - cpp/arcticdb/pipeline/frame_utils.cpp | 3 + cpp/arcticdb/pipeline/index_utils.hpp | 20 +++--- cpp/arcticdb/pipeline/read_pipeline.hpp | 2 +- cpp/arcticdb/processing/clause.cpp | 7 +- cpp/arcticdb/stream/append_map.cpp | 17 +++-- cpp/arcticdb/stream/merge.hpp | 3 + cpp/arcticdb/util/error_code.hpp | 1 + cpp/arcticdb/version/version_core-inl.hpp | 5 +- cpp/arcticdb/version/version_core.cpp | 46 +++++++++---- python/arcticdb/version_store/_store.py | 5 +- python/arcticdb/version_store/library.py | 29 ++++++-- .../tests/integration/arcticdb/test_arctic.py | 68 ++++++++++++++++++- .../arcticdb/version_store/test_sort_merge.py | 38 ++++++++--- 14 files changed, 191 insertions(+), 55 deletions(-) diff --git a/cpp/arcticdb/pipeline/frame_slice.hpp b/cpp/arcticdb/pipeline/frame_slice.hpp index a0fb8c65b7..35630222ad 100644 --- a/cpp/arcticdb/pipeline/frame_slice.hpp +++ b/cpp/arcticdb/pipeline/frame_slice.hpp @@ -83,7 +83,6 @@ struct FrameSlice { hash_bucket_(hash), num_buckets_(num_buckets), indices_(std::move(indices)) { - util::check(col_range.diff() > 0 || row_range.diff() > 0, "Expected non-zero column or row range"); } FrameSlice(const ColRange& col_range, const RowRange& row_range, @@ -93,7 +92,6 @@ struct FrameSlice { row_range(row_range), hash_bucket_(hash_bucket), num_buckets_(num_buckets) { - util::check(col_range.diff() > 0 || row_range.diff() > 0, "Expected non-zero column or row range"); } explicit FrameSlice(const SegmentInMemory& seg); diff --git a/cpp/arcticdb/pipeline/frame_utils.cpp b/cpp/arcticdb/pipeline/frame_utils.cpp index efbb03924b..948251666f 100644 --- a/cpp/arcticdb/pipeline/frame_utils.cpp +++ b/cpp/arcticdb/pipeline/frame_utils.cpp @@ -100,6 +100,9 @@ size_t adjust_slice_rowcounts(std::vector & slice_and_ke } size_t get_slice_rowcounts(std::vector & slice_and_keys) { + if (slice_and_keys.empty()) { + return 0; + } auto current_col = slice_and_keys[0].slice_.col_range.first; size_t rowcount = 0u; for (auto& slice_and_key : slice_and_keys) { diff --git a/cpp/arcticdb/pipeline/index_utils.hpp b/cpp/arcticdb/pipeline/index_utils.hpp index be2a633cd7..931cbaf3ef 100644 --- a/cpp/arcticdb/pipeline/index_utils.hpp +++ b/cpp/arcticdb/pipeline/index_utils.hpp @@ -36,21 +36,17 @@ inline std::vector unfiltered_index(const index::IndexSegmentReader template std::optional index_value_from_row(const RowType &row, IndexDescriptorImpl::Type index_type, int field_num) { - std::optional index_value; switch (index_type) { case IndexDescriptorImpl::Type::TIMESTAMP: - case IndexDescriptorImpl::Type::ROWCOUNT: - index_value = row.template scalar_at(field_num); - break; - case IndexDescriptorImpl::Type::STRING: { - auto opt = row.string_at(field_num); - index_value = opt ? std::make_optional(std::string(opt.value())) : std::nullopt; - break; - } - default: - util::raise_rte("Unknown index type {} for column {}", int(index_type), field_num); + case IndexDescriptorImpl::Type::ROWCOUNT: return row.template scalar_at(field_num); + case IndexDescriptorImpl::Type::STRING: { + auto opt = row.string_at(field_num); + return opt ? std::make_optional(std::string(opt.value())) : std::nullopt; } - return index_value; + default: + util::raise_rte("Unknown index type {} for column {}", int(index_type), field_num); + } + return std::nullopt; } template diff --git a/cpp/arcticdb/pipeline/read_pipeline.hpp b/cpp/arcticdb/pipeline/read_pipeline.hpp index 576fdc60d8..686833c223 100644 --- a/cpp/arcticdb/pipeline/read_pipeline.hpp +++ b/cpp/arcticdb/pipeline/read_pipeline.hpp @@ -173,7 +173,7 @@ inline std::optional overall_column_bitset( inline void generate_filtered_field_descriptors(PipelineContext& context, const std::optional>& columns) { if (columns.has_value()) { - std::unordered_set column_set{std::begin(*columns), std::end(*columns)}; + const ankerl::unordered_dense::set column_set{std::begin(*columns), std::end(*columns)}; context.filter_columns_ = std::make_shared(); const auto& desc = context.descriptor(); diff --git a/cpp/arcticdb/processing/clause.cpp b/cpp/arcticdb/processing/clause.cpp index 661e513e73..8490737b30 100644 --- a/cpp/arcticdb/processing/clause.cpp +++ b/cpp/arcticdb/processing/clause.cpp @@ -34,7 +34,7 @@ std::vector> structure_by_row_slice( ranges_and_keys.erase(ranges_and_keys.begin(), ranges_and_keys.begin() + start_from); std::vector> res; - RowRange previous_row_range; + RowRange previous_row_range{-1, -1}; for (const auto& [idx, ranges_and_key]: folly::enumerate(ranges_and_keys)) { RowRange current_row_range{ranges_and_key.row_range_}; if (current_row_range != previous_row_range) { @@ -941,6 +941,11 @@ std::optional>> MergeClause::repartition(std:: auto compare = [](const std::unique_ptr &left, const std::unique_ptr &right) { + if (left->seg_.row_count() == 0) { + return false; + } else if (right->seg_.row_count() == 0) { + return true; + } const auto left_index = index::index_value_from_row(left->row(), IndexDescriptorImpl::Type::TIMESTAMP, 0); const auto right_index = index::index_value_from_row(right->row(), IndexDescriptorImpl::Type::TIMESTAMP, 0); return left_index > right_index; diff --git a/cpp/arcticdb/stream/append_map.cpp b/cpp/arcticdb/stream/append_map.cpp index 4a1d6d08b3..b3b8b55969 100644 --- a/cpp/arcticdb/stream/append_map.cpp +++ b/cpp/arcticdb/stream/append_map.cpp @@ -147,10 +147,9 @@ SegmentInMemory incomplete_segment_from_frame( auto index_tensor = std::move(frame->index_tensor); const bool has_index = frame->has_index(); const auto index = std::move(frame->index); - SegmentInMemory output; - auto field_tensors = std::move(frame->field_tensors); - std::visit([&](const auto& idx) { + auto field_tensors = std::move(frame->field_tensors); + auto output = std::visit([&](const auto& idx) { using IdxType = std::decay_t; using SingleSegmentAggregator = Aggregator; auto copy_prev_key = prev_key; @@ -158,6 +157,14 @@ SegmentInMemory incomplete_segment_from_frame( util::check(!timeseries_desc.fields().empty(), "Expected fields not to be empty in incomplete segment"); auto norm_meta = timeseries_desc.proto().normalization(); auto descriptor = timeseries_desc.as_stream_descriptor(); + + SegmentInMemory output; + if (num_rows == 0) { + output = SegmentInMemory(FixedSchema{descriptor, index}.default_descriptor(), 0, false, false); + output.set_timeseries_descriptor(pack_timeseries_descriptor(descriptor, existing_rows, std::move(copy_prev_key), std::move(norm_meta))); + return output; + } + SingleSegmentAggregator agg{FixedSchema{descriptor, index}, [&](auto&& segment) { auto tsd = pack_timeseries_descriptor(descriptor, existing_rows + num_rows, std::move(copy_prev_key), std::move(norm_meta)); segment.set_timeseries_descriptor(tsd); @@ -194,7 +201,8 @@ SegmentInMemory incomplete_segment_from_frame( agg.end_block_write(num_rows); agg.commit(); - }, index); + return output; + }, index); ARCTICDB_DEBUG(log::version(), "Constructed segment from frame of {} rows and {} columns at offset {}", output.row_count(), output.num_columns(), output.offset()); return output; @@ -241,7 +249,6 @@ std::vector get_incomplete( bool load_data) { using namespace arcticdb::pipelines; - std::unique_ptr unused; auto entries = get_incomplete_append_slices_for_stream_id(store, stream_id, via_iteration, load_data); util::variant_match(range, diff --git a/cpp/arcticdb/stream/merge.hpp b/cpp/arcticdb/stream/merge.hpp index 61a4ebeec7..0be30bfedf 100644 --- a/cpp/arcticdb/stream/merge.hpp +++ b/cpp/arcticdb/stream/merge.hpp @@ -19,6 +19,9 @@ void do_merge( while (!input_streams.empty()) { auto next = input_streams.pop_top(); + if (next->row().parent_->row_count() == 0) { + continue; + } agg.start_row(pipelines::index::index_value_from_row(next->row(), IndexDescriptorImpl::Type::TIMESTAMP, 0).value()) ([&next, add_symbol_column](auto &rb) { if(add_symbol_column) rb.set_scalar_by_name("symbol", std::string_view(std::get(next->id())), DataType::UTF_DYNAMIC64); diff --git a/cpp/arcticdb/util/error_code.hpp b/cpp/arcticdb/util/error_code.hpp index edbba4534a..0ac81779ec 100644 --- a/cpp/arcticdb/util/error_code.hpp +++ b/cpp/arcticdb/util/error_code.hpp @@ -91,6 +91,7 @@ inline std::unordered_map get_error_category_names() ERROR_CODE(7001, E_INVALID_DECIMAL_STRING) \ ERROR_CODE(7002, E_INVALID_CHAR_IN_NAME) \ ERROR_CODE(7003, E_NAME_TOO_LONG) \ + ERROR_CODE(7004, E_NO_STAGED_SEGMENTS) \ ERROR_CODE(8000, E_UNRECOGNISED_COLUMN_STATS_VERSION) \ ERROR_CODE(9000, E_DECODE_ERROR) \ ERROR_CODE(9001, E_UNKNOWN_CODEC) \ diff --git a/cpp/arcticdb/version/version_core-inl.hpp b/cpp/arcticdb/version/version_core-inl.hpp index 7fb742072f..6f210f6968 100644 --- a/cpp/arcticdb/version/version_core-inl.hpp +++ b/cpp/arcticdb/version/version_core-inl.hpp @@ -127,12 +127,15 @@ void do_compact( }; for(auto it = target_start; it != target_end; ++it) { - decltype(auto) sk = [&it](){ + auto sk = [&it](){ if constexpr(std::is_same_v) return it->slice_and_key(); else return *it; }(); + if (sk.slice().rows().diff() == 0) { + continue; + } aggregator.add_segment( std::move(sk.segment(store)), sk.slice(), diff --git a/cpp/arcticdb/version/version_core.cpp b/cpp/arcticdb/version/version_core.cpp index 5fb7e0eef1..e2e86ff975 100644 --- a/cpp/arcticdb/version/version_core.cpp +++ b/cpp/arcticdb/version/version_core.cpp @@ -789,22 +789,32 @@ void read_incompletes_to_pipeline( if(incomplete_segments.empty()) return; + // In order to have the right normalization metadata and descriptor we need to find the first non-empty segment. + // Picking an empty segment when there are non-empty ones will impact the index type and column namings. + // If all segments are empty we will procede as if were appending/writing and empty dataframe. + debug::check(!incomplete_segments.empty(), "Incomplete segments must be non-empty"); + const auto* seg = &incomplete_segments.front().segment(store); + for (auto& slice : incomplete_segments) { + if (slice.segment(store).row_count() > 0) { + seg = &slice.segment_.value(); + break; + } + } // Mark the start point of the incompletes, so we know that there is no column slicing after this point pipeline_context->incompletes_after_ = pipeline_context->slice_and_keys_.size(); // If there are only incompletes we need to add the index here if(pipeline_context->slice_and_keys_.empty()) { - add_index_columns_to_query(read_query, incomplete_segments.begin()->segment(store).index_descriptor()); + add_index_columns_to_query(read_query, seg->index_descriptor()); } - auto first_seg = incomplete_segments.begin()->segment(store); + if (!pipeline_context->desc_) - pipeline_context->desc_ = first_seg.descriptor(); + pipeline_context->desc_ = seg->descriptor(); if (!pipeline_context->norm_meta_) { pipeline_context->norm_meta_ = std::make_unique(); - auto segment_tsd = first_seg.index_descriptor(); - pipeline_context->norm_meta_->CopyFrom(segment_tsd.proto().normalization()); + pipeline_context->norm_meta_->CopyFrom(seg->index_descriptor().proto().normalization()); ensure_timeseries_norm_meta(*pipeline_context->norm_meta_, pipeline_context->stream_id_, sparsify); } @@ -847,11 +857,16 @@ void check_incompletes_index_ranges_dont_overlap(const std::shared_ptr unique_timestamp_ranges; for (auto it = pipeline_context->incompletes_begin(); it!= pipeline_context->end(); it++) { + if (it->slice_and_key().slice().rows().diff() == 0) { + continue; + } sorting::check( !last_existing_index_value.has_value() || it->slice_and_key().key().start_time() >= *last_existing_index_value, "Cannot append staged segments to existing data as incomplete segment contains index value < existing data (in UTC): {} <= {}", date_and_time(it->slice_and_key().key().start_time()), - date_and_time(*last_existing_index_value)); + // Should never reach "" but the standard mandates that all function arguments are evaluated + last_existing_index_value ? date_and_time(*last_existing_index_value) : "" + ); auto [_, inserted] = unique_timestamp_ranges.insert({it->slice_and_key().key().start_time(), it->slice_and_key().key().end_time()}); // This is correct because incomplete segments aren't column sliced sorting::check( @@ -1278,6 +1293,10 @@ VersionedItem sort_merge_impl( options.convert_int_to_float_, options.via_iteration_, options.sparsify_); + user_input::check( + pipeline_context->slice_and_keys_.size() > 0, + "Finalizing staged data is not allowed with empty staging area" + ); std::vector delete_keys; for(auto sk = pipeline_context->incompletes_begin(); sk != pipeline_context->end(); ++sk) { @@ -1301,10 +1320,10 @@ VersionedItem sort_merge_impl( if (options.append_ && update_info.previous_index_key_.has_value() && update_info.previous_index_key_->end_time() - 1 > std::get(TimeseriesIndex::start_value_for_segment(segments[0].segment_.value()))) { store->remove_keys(delete_keys).get(); - user_input::raise( - "Cannot append index starting at {} to data frame with index ending at {}", - std::get(TimeseriesIndex::start_value_for_segment(segments[0].segment_.value())), - update_info.previous_index_key_->end_time() - 1 + sorting::raise( + "Cannot append staged segments to existing data as incomplete segment contains index value < existing data (in UTC): {} <= {}", + date_and_time(std::get(TimeseriesIndex::start_value_for_segment(segments[0].segment_.value()))), + date_and_time(update_info.previous_index_key_->end_time() - 1) ); } pipeline_context->total_rows_ = num_versioned_rows + get_slice_rowcounts(segments); @@ -1383,9 +1402,10 @@ VersionedItem compact_incomplete_impl( options.convert_int_to_float_, options.via_iteration_, options.sparsify_); - if (pipeline_context->slice_and_keys_.size() == prev_size) { - util::raise_rte("No incomplete segments found for {}", stream_id); - } + user_input::check( + pipeline_context->slice_and_keys_.size() != prev_size, + "Finalizing staged data is not allowed with empty staging area" + ); if (options.validate_index_) { check_incompletes_index_ranges_dont_overlap(pipeline_context, previous_sorted_value); } diff --git a/python/arcticdb/version_store/_store.py b/python/arcticdb/version_store/_store.py index 27503df03a..3e407c8d26 100644 --- a/python/arcticdb/version_store/_store.py +++ b/python/arcticdb/version_store/_store.py @@ -1951,7 +1951,7 @@ def compact_incomplete( metadata: Optional[Any] = None, prune_previous_version: Optional[bool] = None, validate_index: bool = False, - ): + ) -> VersionedItem: """ Compact previously written un-indexed chunks of data, produced by a tick collector or parallel writes/appends. @@ -1988,9 +1988,10 @@ def compact_incomplete( "prune_previous_version", self._write_options(), global_default=False, existing_value=prune_previous_version ) udm = normalize_metadata(metadata) if metadata is not None else None - return self.version_store.compact_incomplete( + vit = self.version_store.compact_incomplete( symbol, append, convert_int_to_float, via_iteration, sparsify, udm, prune_previous_version, validate_index ) + return self._convert_thin_cxx_item_to_python(vit, metadata) @staticmethod def _get_index_columns_from_descriptor(descriptor): diff --git a/python/arcticdb/version_store/library.py b/python/arcticdb/version_store/library.py index 34c7a24a00..11451ed73c 100644 --- a/python/arcticdb/version_store/library.py +++ b/python/arcticdb/version_store/library.py @@ -25,6 +25,7 @@ import pandas as pd import numpy as np import logging +from arcticdb.version_store._normalization import normalize_metadata logger = logging.getLogger(__name__) @@ -902,7 +903,7 @@ def finalize_staged_data( prune_previous_versions: bool = False, metadata: Any = None, validate_index=True, - ): + ) -> VersionedItem: """ Finalizes staged data, making it available for reads. @@ -924,12 +925,18 @@ def finalize_staged_data( are non-overlapping with each other, and, in the case of `StagedDataFinalizeMethod.APPEND`, fall after the last index value in the previous version. + Returns + ------- + VersionedItem + Structure containing metadata and version number of the written symbol in the store. + The data member will be None. + See Also -------- write Documentation on the ``staged`` parameter explains the concept of staged data in more detail. """ - self._nvs.compact_incomplete( + return self._nvs.compact_incomplete( symbol, append=mode == StagedDataFinalizeMethod.APPEND, convert_int_to_float=False, @@ -943,7 +950,8 @@ def sort_and_finalize_staged_data( symbol: str, mode: Optional[StagedDataFinalizeMethod] = StagedDataFinalizeMethod.WRITE, prune_previous_versions: bool = False, - ): + metadata: Any = None + ) -> VersionedItem: """ sort_merge will sort and finalize staged data. This differs from `finalize_staged_data` in that it can support staged segments with interleaved time periods - the end result will be ordered. This requires @@ -961,18 +969,27 @@ def sort_and_finalize_staged_data( prune_previous_versions : bool, default=False Removes previous (non-snapshotted) versions from the database. + metadata : Any, default=None + Optional metadata to persist along with the symbol. + + Returns + ------- + VersionedItem + Structure containing metadata and version number of the written symbol in the store. + The data member will be None. + See Also -------- write Documentation on the ``staged`` parameter explains the concept of staged data in more detail. """ - - self._nvs.version_store.sort_merge( + vit = self._nvs.version_store.sort_merge( symbol, - None, + normalize_metadata(metadata) if metadata is not None else None, mode == StagedDataFinalizeMethod.APPEND, prune_previous_versions=prune_previous_versions, ) + return self._nvs._convert_thin_cxx_item_to_python(vit, metadata) def get_staged_symbols(self) -> List[str]: """ diff --git a/python/tests/integration/arcticdb/test_arctic.py b/python/tests/integration/arcticdb/test_arctic.py index a146693179..bb8decde24 100644 --- a/python/tests/integration/arcticdb/test_arctic.py +++ b/python/tests/integration/arcticdb/test_arctic.py @@ -246,7 +246,12 @@ def test_staged_data(arctic_library, finalize_method): lib.write(sym_unfinalized, df_2, staged=True) metadata = {"hello": "world"} - lib.finalize_staged_data(sym_with_metadata, finalize_method, metadata=metadata) + finalize_result_meta = lib.finalize_staged_data(sym_with_metadata, finalize_method, metadata=metadata) + assert finalize_result_meta.metadata == metadata + assert finalize_result_meta.symbol == sym_with_metadata + assert finalize_result_meta.library == lib.name + assert finalize_result_meta.version == (1 if finalize_method == StagedDataFinalizeMethod.APPEND else 0) + lib.finalize_staged_data(sym_without_metadata, finalize_method) assert set(lib.list_symbols()) == {sym_with_metadata, sym_without_metadata} @@ -285,6 +290,67 @@ def test_parallel_writes_and_appends_index_validation(arctic_library, finalize_m expected = pd.concat([df_0, df_1, df_2]) if finalize_method == StagedDataFinalizeMethod.APPEND else pd.concat([df_1, df_2]) assert_frame_equal(received, expected) +@pytest.mark.parametrize("finalize_method", (StagedDataFinalizeMethod.APPEND, StagedDataFinalizeMethod.WRITE)) +class TestFinalizeWithEmptySegments: + def test_staged_segment_is_only_empty_dfs(self, lmdb_library, finalize_method): + lib = lmdb_library + lib.write("sym", pd.DataFrame([]), staged=True) + lib.write("sym", pd.DataFrame([]), staged=True) + lib.finalize_staged_data("sym", mode=finalize_method) + assert_frame_equal(lib.read("sym").data, pd.DataFrame([], index=pd.DatetimeIndex([]))) + + def test_staged_segment_has_empty_df(self, lmdb_library, finalize_method): + lib = lmdb_library + index = pd.DatetimeIndex([pd.Timestamp(2024, 1, 1), pd.Timestamp(2024, 1, 3), pd.Timestamp(2024, 1, 4)]) + df1 = pd.DataFrame({"col": [1, 2, 3]}, index=index) + df2 = pd.DataFrame({}) + df3 = pd.DataFrame({"col": [4]}, index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 5)])) + lib.write("sym", df1, staged=True) + lib.write("sym", df2, staged=True) + lib.write("sym", df3, staged=True) + lib.finalize_staged_data("sym", mode=finalize_method) + assert_frame_equal(lib.read("sym").data, pd.concat([df1, df2, df3])) + + def test_df_without_rows(self, lmdb_library, finalize_method): + lib = lmdb_library + df = pd.DataFrame({"col": []}, index=pd.DatetimeIndex([])) + lib.write("sym", df, staged=True) + lib.finalize_staged_data("sym", mode=finalize_method) + assert_frame_equal(lib.read("sym").data, df) + +@pytest.mark.parametrize("finalize_method", (StagedDataFinalizeMethod.APPEND, StagedDataFinalizeMethod.WRITE)) +def test_finalize_without_adding_segments(lmdb_library, finalize_method): + lib = lmdb_library + with pytest.raises(UserInputException) as exception_info: + lib.finalize_staged_data("sym", mode=finalize_method) + +class TestAppendStagedData: + def test_appended_df_interleaves_with_storage(self, lmdb_library): + lib = lmdb_library + initial_df = pd.DataFrame({"col": [1, 3]}, index=pd.DatetimeIndex([np.datetime64('2023-01-01'), np.datetime64('2023-01-03')], dtype="datetime64[ns]")) + lib.write("sym", initial_df) + df1 = pd.DataFrame({"col": [2]}, index=pd.DatetimeIndex([np.datetime64('2023-01-02')], dtype="datetime64[ns]")) + lib.write("sym", df1, staged=True) + with pytest.raises(SortingException) as exception_info: + lib.finalize_staged_data("sym", mode=StagedDataFinalizeMethod.APPEND) + assert "append" in str(exception_info.value) + + def test_appended_df_start_same_as_df_end(self, lmdb_library): + lib = lmdb_library + df = pd.DataFrame( + {"col": [1, 2, 3]}, + index=pd.DatetimeIndex([np.datetime64('2023-01-01'), np.datetime64('2023-01-02'), np.datetime64('2023-01-03')], dtype="datetime64[ns]") + ) + lib.write("sym", df) + df_to_append = pd.DataFrame( + {"col": [4, 5, 6]}, + index=pd.DatetimeIndex([np.datetime64('2023-01-03'), np.datetime64('2023-01-04'), np.datetime64('2023-01-05')], dtype="datetime64[ns]") + ) + lib.write("sym", df_to_append, staged=True) + lib.finalize_staged_data("sym", mode=StagedDataFinalizeMethod.APPEND) + res = lib.read("sym").data + expected_df = pd.concat([df, df_to_append]) + assert_frame_equal(lib.read("sym").data, expected_df) def test_snapshots_and_deletes(arctic_library): lib = arctic_library diff --git a/python/tests/unit/arcticdb/version_store/test_sort_merge.py b/python/tests/unit/arcticdb/version_store/test_sort_merge.py index 237f3faafd..9687156fe1 100644 --- a/python/tests/unit/arcticdb/version_store/test_sort_merge.py +++ b/python/tests/unit/arcticdb/version_store/test_sort_merge.py @@ -3,7 +3,7 @@ from pandas.testing import assert_frame_equal import pytest from arcticdb.version_store.library import StagedDataFinalizeMethod -from arcticdb.exceptions import UserInputException +from arcticdb.exceptions import UserInputException, SortingException def test_merge_single_column(lmdb_library): lib = lmdb_library @@ -20,7 +20,8 @@ def test_merge_single_column(lmdb_library): sym1 = "symbol_1" lib.write(sym1, df1, staged=True) lib.write(sym1, df2, staged=True) - lib.sort_and_finalize_staged_data(sym1) + metadata = {"meta": ["data"]} + sort_and_finalize_res = lib.sort_and_finalize_staged_data(sym1, metadata=metadata) expected_dates = [np.datetime64('2023-01-01'), np.datetime64('2023-01-02'), np.datetime64('2023-01-03'), np.datetime64('2023-01-04'), np.datetime64('2023-01-05'), np.datetime64('2023-01-06')] @@ -28,7 +29,11 @@ def test_merge_single_column(lmdb_library): expected_values = {"x": [1, 2, 3, 4, 5, 6]} expected_df = pd.DataFrame(expected_values, index=expected_dates) assert_frame_equal(lib.read(sym1).data, expected_df) - + assert sort_and_finalize_res.metadata == {"meta": ["data"]} + assert sort_and_finalize_res.symbol == sym1 + assert sort_and_finalize_res.library == lib.name + assert sort_and_finalize_res.version == 0 + assert lib.read(sym1).metadata == metadata def test_merge_two_column(lmdb_library): lib = lmdb_library @@ -80,6 +85,7 @@ def test_merge_dynamic(lmdb_library): assert_frame_equal(lib.read(sym1).data, expected_df) + def test_merge_strings(lmdb_library): lib = lmdb_library @@ -183,7 +189,7 @@ def test_appended_df_interleaves_with_storage(self, lmdb_library): lib.write("sym", initial_df) df1 = pd.DataFrame({"col": [2]}, index=pd.DatetimeIndex([np.datetime64('2023-01-02')], dtype="datetime64[ns]")) lib.write("sym", df1, staged=True) - with pytest.raises(UserInputException) as exception_info: + with pytest.raises(SortingException) as exception_info: lib.sort_and_finalize_staged_data("sym", mode=StagedDataFinalizeMethod.APPEND) assert "append" in str(exception_info.value) @@ -217,14 +223,25 @@ def test_prune_previous(lmdb_library): assert len(lib.list_versions("sym")) == 1 class TestEmptySegments: - @pytest.mark.xfail(reason="Bug. Throws: `E_ASSERTION_FAILURE Descriptor id mismatch in atom key sym != 0") - def test_empty_df_in_staged_segment(self, lmdb_library): + def test_staged_segment_is_only_empty_dfs(self, lmdb_library): lib = lmdb_library lib.write("sym", pd.DataFrame([]), staged=True) + lib.write("sym", pd.DataFrame([]), staged=True) lib.sort_and_finalize_staged_data("sym") - assert_frame_equal(lib.read("sym").data, pd.DataFrame([])) + assert_frame_equal(lib.read("sym").data, pd.DataFrame([], index=pd.DatetimeIndex([]))) + + def test_staged_segment_has_empty_df(self, lmdb_library): + lib = lmdb_library + index = pd.DatetimeIndex([pd.Timestamp(2024, 1, 1), pd.Timestamp(2024, 1, 3), pd.Timestamp(2024, 1, 4)]) + df1 = pd.DataFrame({"col": [1, 2, 3]}, index=index) + df2 = pd.DataFrame({}) + df3 = pd.DataFrame({"col": [5]}, index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 2)])) + lib.write("sym", df1, staged=True) + lib.write("sym", df2, staged=True) + lib.write("sym", df3, staged=True) + lib.sort_and_finalize_staged_data("sym") + assert_frame_equal(lib.read("sym").data, pd.concat([df1, df2, df3]).sort_index()) - @pytest.mark.xfail(reason="Bug. Throws: E_ASSERTION_FAILURE Allocate data called with zero size") def test_df_without_rows(self, lmdb_library): lib = lmdb_library df = pd.DataFrame({"col": []}, index=pd.DatetimeIndex([])) @@ -233,11 +250,10 @@ def test_df_without_rows(self, lmdb_library): assert_frame_equal(lib.read("sym").data, df) -@pytest.mark.xfail(reason="Throws: E_ASSERTION_FAILURE Stream descriptor not found in pipeline context") def test_finalize_without_adding_segments(lmdb_library): lib = lmdb_library - lib.write("sym", pd.DataFrame({"col": [1]}, index=pd.DatetimeIndex([np.datetime64('2023-01-01')]))) - lib.sort_and_finalize_staged_data("sym") + with pytest.raises(UserInputException) as exception_info: + lib.sort_and_finalize_staged_data("sym") def test_type_mismatch_throws(lmdb_library): lib = lmdb_library