diff --git a/.github/workflows/static_analysis.yml b/.github/workflows/static_analysis.yml index 7c439946fc..557128c441 100644 --- a/.github/workflows/static_analysis.yml +++ b/.github/workflows/static_analysis.yml @@ -71,3 +71,11 @@ polaris_reports_sarif_groupSCAIssues: true polaris_upload_sarif_report: true polaris_prComment_severities: "high,critical,medium,low" + + - name: Upload .bridge artifact + uses: actions/upload-artifact@v4 + with: + name: Bridge Logs + path: ${{github.workspace}}/.bridge + if-no-files-found: error + diff --git a/cpp/arcticdb/pipeline/index_segment_reader.cpp b/cpp/arcticdb/pipeline/index_segment_reader.cpp index db77bf51d5..5a8a718efc 100644 --- a/cpp/arcticdb/pipeline/index_segment_reader.cpp +++ b/cpp/arcticdb/pipeline/index_segment_reader.cpp @@ -24,6 +24,12 @@ IndexSegmentReader get_index_reader(const AtomKey &prev_index, const std::shared return index::IndexSegmentReader{std::move(seg)}; } +folly::Future async_get_index_reader(const AtomKey &prev_index, const std::shared_ptr &store) { + return store->read(prev_index).thenValueInline([](std::pair&& key_seg) { + return IndexSegmentReader{std::move(key_seg.second)}; + }); +} + IndexSegmentReader::IndexSegmentReader(SegmentInMemory&& s) : seg_(std::move(s)) { } diff --git a/cpp/arcticdb/pipeline/index_segment_reader.hpp b/cpp/arcticdb/pipeline/index_segment_reader.hpp index 14ad963df2..edfb10b803 100644 --- a/cpp/arcticdb/pipeline/index_segment_reader.hpp +++ b/cpp/arcticdb/pipeline/index_segment_reader.hpp @@ -8,13 +8,9 @@ #pragma once #include -#include #include #include - -#include - -#include +#include namespace arcticdb { class Store; @@ -135,6 +131,10 @@ index::IndexSegmentReader get_index_reader( const AtomKey &prev_index, const std::shared_ptr &store); +folly::Future async_get_index_reader( + const AtomKey &prev_index, + const std::shared_ptr &store); + IndexRange get_index_segment_range( const AtomKey &prev_index, const std::shared_ptr &store); diff --git a/cpp/arcticdb/pipeline/index_utils.cpp b/cpp/arcticdb/pipeline/index_utils.cpp index 943147dcd1..401a0719e2 100644 --- a/cpp/arcticdb/pipeline/index_utils.cpp +++ b/cpp/arcticdb/pipeline/index_utils.cpp @@ -129,4 +129,8 @@ TimeseriesDescriptor get_merged_tsd( ); } +bool is_timeseries_index(const IndexDescriptorImpl& index_desc) { + return index_desc.type() == IndexDescriptor::Type::TIMESTAMP || index_desc.type() == IndexDescriptor::Type::EMPTY; +} + } //namespace arcticdb::pipelines::index \ No newline at end of file diff --git a/cpp/arcticdb/pipeline/index_utils.hpp b/cpp/arcticdb/pipeline/index_utils.hpp index 931cbaf3ef..4641188b8c 100644 --- a/cpp/arcticdb/pipeline/index_utils.hpp +++ b/cpp/arcticdb/pipeline/index_utils.hpp @@ -138,4 +138,6 @@ TimeseriesDescriptor get_merged_tsd( const TimeseriesDescriptor& existing_tsd, const std::shared_ptr& new_frame); +[[nodiscard]] bool is_timeseries_index(const IndexDescriptorImpl& index_desc); + } //namespace arcticdb::pipelines::index \ No newline at end of file diff --git a/cpp/arcticdb/pipeline/query.hpp b/cpp/arcticdb/pipeline/query.hpp index 9a3d55b183..a0d13cbf9e 100644 --- a/cpp/arcticdb/pipeline/query.hpp +++ b/cpp/arcticdb/pipeline/query.hpp @@ -10,15 +10,10 @@ #include #include #include -#include -#include #include #include -#include #include #include -#include -#include #include #include #include @@ -28,9 +23,13 @@ #include #include #include +#include +#include namespace arcticdb::pipelines { +namespace ranges = std::ranges; + using FilterRange = std::variant; /* @@ -405,20 +404,18 @@ inline FilterRange get_query_index_range( return RowRange{std::get(index_range.start_), std::get(index_range.end_)}; } -inline std::vector strictly_before(const FilterRange &range, const std::vector &input) { +inline std::vector strictly_before(const FilterRange &range, std::span input) { std::vector output; util::variant_match(range, [&](const RowRange &row_range) { - std::copy_if(std::begin(input), std::end(input), std::back_inserter(output), - [&](const auto &sk) { - return sk.slice_.row_range.second < row_range.first; - }); + ranges::copy_if(input, std::back_inserter(output), [&](const auto &sk) { + return sk.slice_.row_range.second < row_range.first; + }); }, [&](const IndexRange &index_range) { - std::copy_if(std::begin(input), std::end(input), std::back_inserter(output), - [&](const auto &sk) { - return sk.key().index_range().end_ < index_range.start_; - }); + ranges::copy_if(input, std::back_inserter(output), [&](const auto &sk) { + return sk.key().index_range().end_ < index_range.start_; + }); }, [&](const auto &) { util::raise_rte("Expected specified range "); @@ -426,20 +423,18 @@ inline std::vector strictly_before(const FilterRange &range, const return output; } -inline std::vector strictly_after(const FilterRange &range, const std::vector &input) { +inline std::vector strictly_after(const FilterRange &range, std::span input) { std::vector output; util::variant_match(range, [&input, &output](const RowRange &row_range) { - std::copy_if(std::begin(input), std::end(input), std::back_inserter(output), - [&](const auto &sk) { - return sk.slice_.row_range.first > row_range.second; - }); + ranges::copy_if(input, std::back_inserter(output), [&](const auto &sk) { + return sk.slice_.row_range.first > row_range.second; + }); }, [&input, &output](const IndexRange &index_range) { - std::copy_if(std::begin(input), std::end(input), std::back_inserter(output), - [&](const auto &sk) { - return sk.key().index_range().start_ > index_range.end_; - }); + ranges::copy_if(input, std::back_inserter(output), [&](const auto &sk) { + return sk.key().index_range().start_ > index_range.end_; + }); }, [](const auto &) { util::raise_rte("Expected specified range "); diff --git a/cpp/arcticdb/pipeline/read_pipeline.hpp b/cpp/arcticdb/pipeline/read_pipeline.hpp index 00ed5c207a..9bfffca85f 100644 --- a/cpp/arcticdb/pipeline/read_pipeline.hpp +++ b/cpp/arcticdb/pipeline/read_pipeline.hpp @@ -8,24 +8,9 @@ #pragma once #include - -#include -#include - #include -#include -#include -#include - -#include -#include -#include #include -#include #include -#include -#include -#include #include #include #include @@ -61,22 +46,24 @@ void foreach_active_bit(const util::BitSet &bs, C &&visitor) { } } -template -std::vector filter_index(const ContainerType &container, std::optional> &&query) { +inline std::vector filter_index( + const index::IndexSegmentReader& index_segment_reader, + std::optional> &&query +) { ARCTICDB_SAMPLE_DEFAULT(FilterIndex) std::vector output{}; - if (container.size()> 0) { + if (!index_segment_reader.empty()) { if(query) { - auto row_bitset = (*query)(container); + auto row_bitset = (*query)(index_segment_reader); ARCTICDB_DEBUG(log::version(), "Row bitset has {} bits set of {}", row_bitset->count(), row_bitset->size()); output.reserve(row_bitset->count()); foreach_active_bit(*row_bitset, [&](auto r) { - output.emplace_back(get_row(container, r)); + output.emplace_back(get_row(index_segment_reader, r)); }); } else { - output.reserve(container.size()); - for(auto i = 0u; i < container.size(); ++i) { - output.emplace_back(get_row(container, i)); + output.reserve(index_segment_reader.size()); + for(auto i = 0u; i < index_segment_reader.size(); ++i) { + output.emplace_back(get_row(index_segment_reader, i)); } } } diff --git a/cpp/arcticdb/pipeline/write_frame.cpp b/cpp/arcticdb/pipeline/write_frame.cpp index 4cca47957c..29205880d1 100644 --- a/cpp/arcticdb/pipeline/write_frame.cpp +++ b/cpp/arcticdb/pipeline/write_frame.cpp @@ -16,7 +16,6 @@ #include #include #include -#include #include #include #include @@ -24,12 +23,14 @@ #include #include #include +#include namespace arcticdb::pipelines { using namespace arcticdb::entity; using namespace arcticdb::stream; +namespace ranges = std::ranges; WriteToSegmentTask::WriteToSegmentTask( std::shared_ptr frame, @@ -252,40 +253,46 @@ static RowRange partial_rewrite_row_range( } } -std::optional rewrite_partial_segment( +folly::Future> async_rewrite_partial_segment( const SliceAndKey& existing, const IndexRange& index_range, VersionId version_id, AffectedSegmentPart affected_part, const std::shared_ptr& store) { - const auto& key = existing.key(); - auto kv = store->read(key).get(); - const SegmentInMemory& segment = kv.second; - const RowRange affected_row_range = partial_rewrite_row_range(segment, index_range, affected_part); - const int64_t num_rows = affected_row_range.end() - affected_row_range.start(); - if (num_rows <= 0) { - return std::nullopt; - } - SegmentInMemory output = segment.truncate(affected_row_range.start(), affected_row_range.end(), true); - const IndexValue start_ts = TimeseriesIndex::start_value_for_segment(output); - // +1 as in the key we store one nanosecond greater than the last index value in the segment - const IndexValue end_ts = std::get(TimeseriesIndex::end_value_for_segment(output)) + 1; - FrameSlice new_slice{ - std::make_shared(output.descriptor()), - existing.slice_.col_range, - RowRange{0, num_rows}, - existing.slice_.hash_bucket(), - existing.slice_.num_buckets()}; - - auto fut_key = store->write( - key.type(), + return store->read(existing.key()).thenValueInline([ + existing, + index_range, version_id, - key.id(), - start_ts, - end_ts, - std::move(output) - ); - return SliceAndKey{std::move(new_slice), std::get(std::move(fut_key).get())}; + affected_part, + store](std::pair&& key_segment) -> folly::Future> { + const auto& key = existing.key(); + const SegmentInMemory& segment = key_segment.second; + const RowRange affected_row_range = partial_rewrite_row_range(segment, index_range, affected_part); + const int64_t num_rows = affected_row_range.end() - affected_row_range.start(); + if (num_rows <= 0) { + return std::nullopt; + } + SegmentInMemory output = segment.truncate(affected_row_range.start(), affected_row_range.end(), true); + const IndexValue start_ts = TimeseriesIndex::start_value_for_segment(output); + // +1 as in the key we store one nanosecond greater than the last index value in the segment + const IndexValue end_ts = std::get(TimeseriesIndex::end_value_for_segment(output)) + 1; + FrameSlice new_slice{ + std::make_shared(output.descriptor()), + existing.slice_.col_range, + RowRange{0, num_rows}, + existing.slice_.hash_bucket(), + existing.slice_.num_buckets()}; + return store->write( + key.type(), + version_id, + key.id(), + start_ts, + end_ts, + std::move(output) + ).thenValueInline([new_slice=std::move(new_slice)](VariantKey&& k) { + return std::make_optional(std::move(new_slice), std::get(std::move(k))); + }); + }); } std::vector flatten_and_fix_rows(const std::array, 5>& groups, size_t& global_count) { @@ -301,10 +308,9 @@ std::vector flatten_and_fix_rows(const std::array rewrite_partial_segment( +folly::Future> async_rewrite_partial_segment( const SliceAndKey& existing, const IndexRange& index_range, VersionId version_id, diff --git a/cpp/arcticdb/processing/test/benchmark_clause.cpp b/cpp/arcticdb/processing/test/benchmark_clause.cpp index 748faaf3e7..d306e234f3 100644 --- a/cpp/arcticdb/processing/test/benchmark_clause.cpp +++ b/cpp/arcticdb/processing/test/benchmark_clause.cpp @@ -21,9 +21,7 @@ using namespace arcticdb; SegmentInMemory get_segment_for_merge(const StreamId &id, size_t num_rows, size_t start, size_t step){ auto segment = SegmentInMemory{ - get_test_descriptor(id, { - scalar_field(DataType::UINT8, "column") - }), + get_test_descriptor(id, std::array{scalar_field(DataType::UINT8, "column")}), num_rows }; auto& index_col = segment.column(0); diff --git a/cpp/arcticdb/stream/stream_source.hpp b/cpp/arcticdb/stream/stream_source.hpp index 3d7487fb9e..5745f82686 100644 --- a/cpp/arcticdb/stream/stream_source.hpp +++ b/cpp/arcticdb/stream/stream_source.hpp @@ -8,7 +8,6 @@ #pragma once #include -#include #include #include #include diff --git a/cpp/arcticdb/stream/test/stream_test_common.hpp b/cpp/arcticdb/stream/test/stream_test_common.hpp index ee46ddaeec..764e5f633b 100644 --- a/cpp/arcticdb/stream/test/stream_test_common.hpp +++ b/cpp/arcticdb/stream/test/stream_test_common.hpp @@ -7,17 +7,9 @@ #pragma once -#include #include #include -#include #include -#include - -#include -#include -#include -#include #include #include #include @@ -27,7 +19,12 @@ #include #include +#include +#include + #include +#include +#include namespace fg = folly::gen; @@ -173,10 +170,10 @@ NativeTensor test_string_column(ContainerType &vec, DTT, shape_t num_rows) { return NativeTensor{bytes, 1, &strides, &shapes, dt, elsize, vec.data(), 1}; } -inline std::vector get_test_timeseries_fields() { +inline auto get_test_timeseries_fields() { using namespace arcticdb::entity; - return { + return std::array { scalar_field(DataType::UINT8, "smallints"), scalar_field(DataType::INT64, "bigints"), scalar_field(DataType::FLOAT64, "floats"), @@ -184,10 +181,10 @@ inline std::vector get_test_timeseries_fields() { }; } -inline std::vector get_test_simple_fields() { +inline auto get_test_simple_fields() { using namespace arcticdb::entity; - return { + return std::array { scalar_field(DataType::UINT32, "index"), scalar_field(DataType::FLOAT64, "floats"), }; @@ -254,13 +251,13 @@ inline void fill_test_frame(SegmentInMemory &segment, } template -StreamDescriptor get_test_descriptor(const StreamId &id, const std::vector &fields) { +StreamDescriptor get_test_descriptor(const StreamId &id, std::span fields) { return IndexType::default_index().create_stream_descriptor(id, folly::Range(fields.begin(), fields.end())); } template TestTensorFrame get_test_frame(const StreamId &id, - const std::vector &fields, + std::span fields, size_t num_rows, size_t start_val, size_t opt_row_offset = 0) { diff --git a/cpp/arcticdb/version/local_versioned_engine.cpp b/cpp/arcticdb/version/local_versioned_engine.cpp index cd2ee0e5a1..3c35ad92ff 100644 --- a/cpp/arcticdb/version/local_versioned_engine.cpp +++ b/cpp/arcticdb/version/local_versioned_engine.cpp @@ -560,9 +560,7 @@ VersionedItem LocalVersionedEngine::update_internal( bool prune_previous_versions) { ARCTICDB_RUNTIME_DEBUG(log::version(), "Command: update"); py::gil_scoped_release release_gil; - auto update_info = get_latest_undeleted_version_and_next_version_id(store(), - version_map(), - stream_id); + auto update_info = get_latest_undeleted_version_and_next_version_id(store(), version_map(), stream_id); if (update_info.previous_index_key_.has_value()) { if (frame->empty()) { ARCTICDB_DEBUG(log::version(), "Updating existing data with an empty item has no effect. \n" diff --git a/cpp/arcticdb/version/test/test_version_store.cpp b/cpp/arcticdb/version/test/test_version_store.cpp index 060d3813a2..9f51ff98da 100644 --- a/cpp/arcticdb/version/test/test_version_store.cpp +++ b/cpp/arcticdb/version/test/test_version_store.cpp @@ -452,7 +452,7 @@ TEST(VersionStore, AppendRefKeyOptimisation) { size_t num_rows{5}; size_t start_val{0}; - std::vector fields{ + const std::array fields{ scalar_field(DataType::UINT8, "thing1"), scalar_field(DataType::UINT8, "thing2"), scalar_field(DataType::UINT16, "thing3"), @@ -512,24 +512,24 @@ TEST(VersionStore, UpdateWithin) { ScopedConfig reload_interval("VersionMap.ReloadInterval", 0); PilotedClock::reset(); - StreamId symbol("update_schema"); + const StreamId symbol("update_schema"); auto version_store = get_test_engine(); - size_t num_rows{100}; - size_t start_val{0}; + constexpr size_t num_rows{100}; + constexpr size_t start_val{0}; - std::vector fields{ + const std::array fields{ scalar_field(DataType::UINT8, "thing1"), scalar_field(DataType::UINT8, "thing2"), scalar_field(DataType::UINT16, "thing3"), scalar_field(DataType::UINT16, "thing4") }; - auto test_frame = get_test_frame(symbol, fields, num_rows, start_val); + auto test_frame = get_test_frame(symbol, fields, num_rows, start_val); version_store.write_versioned_dataframe_internal(symbol, std::move(test_frame.frame_), false, false, false); - RowRange update_range{10, 15}; - size_t update_val{1}; - auto update_frame = get_test_frame(symbol, fields, update_range.diff(), update_range.first, update_val); + constexpr RowRange update_range{10, 15}; + constexpr size_t update_val{100}; + auto update_frame = get_test_frame(symbol, fields, update_range.diff(), update_range.first, update_val); version_store.update_internal(symbol, UpdateQuery{}, std::move(update_frame.frame_), false, false, false); auto read_query = std::make_shared(); @@ -539,12 +539,9 @@ TEST(VersionStore, UpdateWithin) { const auto& seg = read_result.frame_and_descriptor_.frame_; for(auto i = 0u; i < num_rows; ++i) { - auto expected = i; - if(update_range.contains(i)) - expected += update_val; - - auto val1 = seg.scalar_at(i, 1); - ASSERT_EQ(val1.value(), expected); + const uint8_t expected = update_range.contains(i) ? i + update_val : i; + const auto value = seg.scalar_at(i, 1).value(); + EXPECT_EQ(expected, value); } } @@ -555,12 +552,12 @@ TEST(VersionStore, UpdateBefore) { using namespace arcticdb::pipelines; PilotedClock::reset(); - StreamId symbol("update_schema"); + const StreamId symbol("update_schema"); auto version_store = get_test_engine(); - size_t num_rows{100}; - size_t start_val{10}; + constexpr size_t num_rows{100}; + constexpr size_t start_val{10}; - std::vector fields{ + const std::array fields{ scalar_field(DataType::UINT8, "thing1"), scalar_field(DataType::UINT8, "thing2"), scalar_field(DataType::UINT16, "thing3"), @@ -570,8 +567,8 @@ TEST(VersionStore, UpdateBefore) { auto test_frame = get_test_frame(symbol, fields, num_rows, start_val); version_store.write_versioned_dataframe_internal(symbol, std::move(test_frame.frame_), false, false, false); - RowRange update_range{0, 10}; - size_t update_val{1}; + constexpr RowRange update_range{0, 10}; + constexpr size_t update_val{1}; auto update_frame = get_test_frame(symbol, fields, update_range.diff(), update_range.first, update_val); version_store.update_internal(symbol, UpdateQuery{}, std::move(update_frame.frame_), false, false, false); @@ -582,12 +579,9 @@ TEST(VersionStore, UpdateBefore) { const auto& seg = read_result.frame_and_descriptor_.frame_; for(auto i = 0u; i < num_rows + update_range.diff(); ++i) { - auto expected = i; - if(update_range.contains(i)) - expected += update_val; - - auto val1 = seg.scalar_at(i, 1); - ASSERT_EQ(val1.value(), expected); + const auto expected = update_range.contains(i) ? i + update_val : i; + const auto value = seg.scalar_at(i, 1).value(); + ASSERT_EQ(value, expected); } } @@ -598,12 +592,12 @@ TEST(VersionStore, UpdateAfter) { using namespace arcticdb::pipelines; PilotedClock::reset(); - StreamId symbol("update_schema"); + const StreamId symbol("update_schema"); auto version_store = get_test_engine(); - size_t num_rows{100}; - size_t start_val{0}; + constexpr size_t num_rows{100}; + constexpr size_t start_val{0}; - std::vector fields{ + const std::array fields{ scalar_field(DataType::UINT8, "thing1"), scalar_field(DataType::UINT8, "thing2"), scalar_field(DataType::UINT16, "thing3"), @@ -613,8 +607,8 @@ TEST(VersionStore, UpdateAfter) { auto test_frame = get_test_frame(symbol, fields, num_rows, start_val); version_store.write_versioned_dataframe_internal(symbol, std::move(test_frame.frame_), false, false, false); - RowRange update_range{100, 110}; - size_t update_val{1}; + constexpr RowRange update_range{100, 110}; + constexpr size_t update_val{1}; auto update_frame = get_test_frame(symbol, fields, update_range.diff(), update_range.first, update_val); version_store.update_internal(symbol, UpdateQuery{}, std::move(update_frame.frame_), false, false, false); @@ -625,12 +619,9 @@ TEST(VersionStore, UpdateAfter) { const auto& seg = read_result.frame_and_descriptor_.frame_; for(auto i = 0u; i < num_rows + update_range.diff(); ++i) { - auto expected = i; - if(update_range.contains(i)) - expected += update_val; - - auto val1 = seg.scalar_at(i, 1); - ASSERT_EQ(val1.value(), expected); + const auto expected = update_range.contains(i) ? i + update_val : i; + const auto value = seg.scalar_at(i, 1).value(); + ASSERT_EQ(value, expected); } } @@ -641,12 +632,12 @@ TEST(VersionStore, UpdateIntersectBefore) { using namespace arcticdb::pipelines; PilotedClock::reset(); - StreamId symbol("update_schema"); + const StreamId symbol("update_schema"); auto version_store = get_test_engine(); - size_t num_rows{100}; - size_t start_val{5}; + constexpr size_t num_rows{100}; + constexpr size_t start_val{5}; - std::vector fields{ + const std::array fields{ scalar_field(DataType::UINT8, "thing1"), scalar_field(DataType::UINT8, "thing2"), scalar_field(DataType::UINT16, "thing3"), @@ -656,8 +647,8 @@ TEST(VersionStore, UpdateIntersectBefore) { auto test_frame = get_test_frame(symbol, fields, num_rows, start_val); version_store.write_versioned_dataframe_internal(symbol, std::move(test_frame.frame_), false, false, false); - RowRange update_range{0, 10}; - size_t update_val{1}; + constexpr RowRange update_range{0, 10}; + constexpr size_t update_val{1}; auto update_frame = get_test_frame(symbol, fields, update_range.diff(), update_range.first, update_val); version_store.update_internal(symbol, UpdateQuery{}, std::move(update_frame.frame_), false, false, false); @@ -669,12 +660,9 @@ TEST(VersionStore, UpdateIntersectBefore) { const auto &seg = read_result.frame_and_descriptor_.frame_; for (auto i = 0u; i < num_rows + 5; ++i) { - auto expected = i; - if (update_range.contains(i)) - expected += update_val; - - auto val1 = seg.scalar_at(i, 1); - ASSERT_EQ(val1.value(), expected); + const auto expected = update_range.contains(i) ? i + update_val : i; + const auto value = seg.scalar_at(i, 1).value(); + ASSERT_EQ(value, expected); } } @@ -685,12 +673,12 @@ TEST(VersionStore, UpdateIntersectAfter) { using namespace arcticdb::pipelines; PilotedClock::reset(); - StreamId symbol("update_schema"); + const StreamId symbol("update_schema"); auto version_store = get_test_engine(); - size_t num_rows{100}; - size_t start_val{0}; + constexpr size_t num_rows{100}; + constexpr size_t start_val{0}; - std::vector fields{ + const std::array fields{ scalar_field(DataType::UINT8, "thing1"), scalar_field(DataType::UINT8, "thing2"), scalar_field(DataType::UINT16, "thing3"), @@ -700,8 +688,8 @@ TEST(VersionStore, UpdateIntersectAfter) { auto test_frame = get_test_frame(symbol, fields, num_rows, start_val); version_store.write_versioned_dataframe_internal(symbol, std::move(test_frame.frame_), false, false, false); - RowRange update_range{95, 105}; - size_t update_val{1}; + constexpr RowRange update_range{95, 105}; + constexpr size_t update_val{1}; auto update_frame = get_test_frame(symbol, fields, update_range.diff(), update_range.first, update_val); version_store.update_internal(symbol, UpdateQuery{}, std::move(update_frame.frame_), false, false, false); @@ -713,12 +701,9 @@ TEST(VersionStore, UpdateIntersectAfter) { const auto &seg = read_result.frame_and_descriptor_.frame_; for (auto i = 0u; i < num_rows + 5; ++i) { - auto expected = i; - if (update_range.contains(i)) - expected += update_val; - - auto val1 = seg.scalar_at(i, 1); - ASSERT_EQ(val1.value(), expected); + const auto expected = update_range.contains(i) ? i + update_val : i; + const auto value = seg.scalar_at(i, 1).value(); + ASSERT_EQ(value, expected); } } @@ -729,12 +714,12 @@ TEST(VersionStore, UpdateWithinSchemaChange) { using namespace arcticdb::pipelines; PilotedClock::reset(); - StreamId symbol("update_schema"); + const StreamId symbol("update_schema"); auto version_store = get_test_engine(); - size_t num_rows{100}; - size_t start_val{0}; + constexpr size_t num_rows{100}; + constexpr size_t start_val{0}; - std::vector fields{ + const std::array fields{ scalar_field(DataType::UINT8, "thing1"), scalar_field(DataType::UINT8, "thing2"), scalar_field(DataType::UINT16, "thing3"), @@ -745,10 +730,10 @@ TEST(VersionStore, UpdateWithinSchemaChange) { version_store. write_versioned_dataframe_internal(symbol, std::move(test_frame.frame_), false, false, false); - RowRange update_range{10, 15}; - size_t update_val{1}; + constexpr RowRange update_range{10, 15}; + constexpr size_t update_val{1}; - std::vector update_fields{ + const std::array update_fields{ scalar_field(DataType::UINT8, "thing1"), scalar_field(DataType::UINT8, "thing2"), scalar_field(DataType::UINT16, "thing3"), @@ -767,20 +752,17 @@ TEST(VersionStore, UpdateWithinSchemaChange) { const auto &seg = read_result.frame_and_descriptor_.frame_; for (auto i = 0u;i < num_rows; ++i) { - auto expected = i; - if (update_range.contains(i)) - expected += update_val; - - auto val1 = seg.scalar_at(i, 1); - check_value(val1.value(), expected); + auto expected = update_range.contains(i) ? i + update_val : i; + const auto val1 = seg.scalar_at(i, 1).value(); + check_value(val1, expected); expected = update_range.contains(i) ? 0 : i; - auto val2 = seg.scalar_at(i, 4); - check_value(val2.value(), expected); + const auto val2 = seg.scalar_at(i, 4).value(); + check_value(val2, expected); expected = !update_range.contains(i) ? 0 : i + update_val; - auto val3 = seg.scalar_at(i, 5); - check_value(val3.value(), expected); + const auto val3 = seg.scalar_at(i, 5).value(); + check_value(val3, expected); } } @@ -791,14 +773,14 @@ TEST(VersionStore, UpdateWithinTypeAndSchemaChange) { using namespace arcticdb::pipelines; PilotedClock::reset(); - StreamId symbol("update_schema"); + const StreamId symbol("update_schema"); arcticdb::proto::storage::VersionStoreConfig version_store_cfg; version_store_cfg.mutable_write_options()->set_dynamic_schema(true); auto version_store = get_test_engine({version_store_cfg}); - size_t num_rows{100}; - size_t start_val{0}; + constexpr size_t num_rows{100}; + constexpr size_t start_val{0}; - std::vector fields{ + const std::array fields{ scalar_field(DataType::UINT8, "thing1"), scalar_field(DataType::UINT8, "thing2"), scalar_field(DataType::UINT16, "thing3"), @@ -808,10 +790,10 @@ TEST(VersionStore, UpdateWithinTypeAndSchemaChange) { auto test_frame = get_test_frame(symbol, fields, num_rows, start_val); version_store.write_versioned_dataframe_internal(symbol, std::move(test_frame.frame_), false, false, false); - RowRange update_range{10, 15}; - size_t update_val{1}; + constexpr RowRange update_range{10, 15}; + constexpr size_t update_val{1}; - std::vector update_fields{ + const std::array update_fields{ scalar_field(DataType::UINT8, "thing1"), scalar_field(DataType::UINT16, "thing2"), scalar_field(DataType::UINT32, "thing3"), @@ -830,20 +812,17 @@ TEST(VersionStore, UpdateWithinTypeAndSchemaChange) { const auto &seg = read_result.frame_and_descriptor_.frame_; for (auto i = 0u;i < num_rows; ++i) { - auto expected = i; - if (update_range.contains(i)) - expected += update_val; - - auto val1 = seg.scalar_at(i, 1); - check_value(val1.value(), expected); + auto expected = update_range.contains(i) ? i + update_val : i; + const auto val1 = seg.scalar_at(i, 1).value(); + check_value(val1, expected); expected = update_range.contains(i) ? 0 : i; - auto val2 = seg.scalar_at(i, 4); - check_value(val2.value(), expected); + const auto val2 = seg.scalar_at(i, 4).value(); + check_value(val2, expected); expected = !update_range.contains(i) ? 0 : i + update_val; - auto val3 = seg.scalar_at(i, 5); - check_value(val3.value(), expected); + const auto val3 = seg.scalar_at(i, 5).value(); + check_value(val3, expected); } } @@ -855,11 +834,11 @@ TEST(VersionStore, TestWriteAppendMapHead) { using namespace arcticdb::pipelines; PilotedClock::reset(); - StreamId symbol("append"); + const StreamId symbol("append"); auto version_store = get_test_engine(); - size_t num_rows{100}; + constexpr size_t num_rows{100}; - std::vector fields{ + const std::array fields{ scalar_field(DataType::UINT8, "thing1"), scalar_field(DataType::UINT8, "thing2"), scalar_field(DataType::UINT16, "thing3"), diff --git a/cpp/arcticdb/version/version_core.cpp b/cpp/arcticdb/version/version_core.cpp index b5e5be90eb..56113f9851 100644 --- a/cpp/arcticdb/version/version_core.cpp +++ b/cpp/arcticdb/version/version_core.cpp @@ -190,45 +190,65 @@ template } } -inline std::pair, std::vector> intersecting_segments( +std::vector filter_existing_slices(std::vector>&& maybe_slices) { + std::vector result; + for (auto& maybe_slice : maybe_slices) { + if (maybe_slice.has_value()) { + result.push_back(std::move(*maybe_slice)); + } + } + return result; +} + +/// Represents all slices which are intersecting (but not overlapping) with range passed to update +/// First member is a vector of all segments intersecting with the first row-slice of the update range +/// Second member is a vector of all segments intersecting with the last row-slice of the update range +using IntersectingSegments = std::tuple, std::vector>; + +[[nodiscard]] folly::Future async_intersecting_segments( const std::vector& affected_keys, const IndexRange& front_range, const IndexRange& back_range, VersionId version_id, const std::shared_ptr& store ) { - std::vector intersect_before; - std::vector intersect_after; + if (!front_range.specified_ && !back_range.specified_) { + return folly::makeFuture(IntersectingSegments{}); + } + internal::check( + front_range.specified_ && back_range.specified_, + "Both first and last index range of the update range must intersect with at least one of the slices in the dataframe"); + std::vector>> maybe_intersect_before_fut; + std::vector>> maybe_intersect_after_fut; for (const auto& affected_slice_and_key : affected_keys) { const auto& affected_range = affected_slice_and_key.key().index_range(); if (intersects(affected_range, front_range) && !overlaps(affected_range, front_range) && is_before(affected_range, front_range)) { - auto front_overlap_key = rewrite_partial_segment( + maybe_intersect_before_fut.emplace_back(async_rewrite_partial_segment( affected_slice_and_key, front_range, version_id, AffectedSegmentPart::START, store - ); - if (front_overlap_key) - intersect_before.push_back(*front_overlap_key); + )); } if (intersects(affected_range, back_range) && !overlaps(affected_range, back_range) && is_after(affected_range, back_range)) { - auto back_overlap_key = rewrite_partial_segment( + maybe_intersect_after_fut.emplace_back(async_rewrite_partial_segment( affected_slice_and_key, back_range, version_id, AffectedSegmentPart::END, store - ); - if (back_overlap_key) - intersect_after.push_back(*back_overlap_key); + )); } } - return std::make_pair(std::move(intersect_before), std::move(intersect_after)); + return collect( + collect(maybe_intersect_before_fut).via(&async::io_executor()).thenValueInline(filter_existing_slices), + collect(maybe_intersect_after_fut).via(&async::io_executor()).thenValueInline(filter_existing_slices) + ).via(&async::io_executor()); } } // namespace @@ -263,7 +283,7 @@ VersionedItem delete_range_impl( std::end(affected_keys), std::back_inserter(unaffected_keys)); - auto [intersect_before, intersect_after] = intersecting_segments(affected_keys, index_range, index_range, update_info.next_version_id_, store); + auto [intersect_before, intersect_after] = async_intersecting_segments(affected_keys, index_range, index_range, update_info.next_version_id_, store).get(); auto orig_filter_range = std::holds_alternative(query.row_filter) ? get_query_index_range(index, index_range) : query.row_filter; @@ -286,7 +306,7 @@ VersionedItem delete_range_impl( return versioned_item; } -void check_update_data_is_sorted(InputTensorFrame& frame, const index::IndexSegmentReader& index_segment_reader){ +void check_update_data_is_sorted(const InputTensorFrame& frame, const index::IndexSegmentReader& index_segment_reader){ bool is_time_series = std::holds_alternative(frame.index); sorting::check( is_time_series, @@ -304,92 +324,185 @@ void check_update_data_is_sorted(InputTensorFrame& frame, const index::IndexSegm "When calling update, the existing data must be sorted."); } -VersionedItem update_impl( - const std::shared_ptr& store, +struct UpdateRanges { + IndexRange front; + IndexRange back; + IndexRange original_index_range; +}; + +static UpdateRanges compute_update_ranges(const FilterRange& row_filter, const InputTensorFrame& update_frame, std::span update_slice_and_keys) { + return util::variant_match(row_filter, + [&](std::monostate) -> UpdateRanges { + util::check(std::holds_alternative(update_frame.index), "Update with row count index is not permitted"); + if (update_slice_and_keys.empty()) { + // If there are no new keys, then we can't intersect with the existing data. + return UpdateRanges{{}, {}, update_frame.index_range}; + } + IndexRange back_range = update_slice_and_keys.back().key().index_range(); + back_range.adjust_open_closed_interval(); + return UpdateRanges{ + update_slice_and_keys.front().key().index_range(), + std::move(back_range), + update_frame.index_range}; + }, + [&](const IndexRange& idx_range) { + return UpdateRanges{idx_range, idx_range, idx_range}; + }, + [](const RowRange&) -> UpdateRanges { + util::raise_rte("Unexpected row_range in update query"); + return {}; + } + ); +} + +static void check_can_update( + const InputTensorFrame& frame, + const index::IndexSegmentReader& index_segment_reader, const UpdateInfo& update_info, - const UpdateQuery& query, - const std::shared_ptr& frame, - const WriteOptions&& options, bool dynamic_schema, - bool empty_types) { + bool empty_types +) { util::check(update_info.previous_index_key_.has_value(), "Cannot update as there is no previous index key to update into"); - const StreamId stream_id = frame->desc.id(); - ARCTICDB_DEBUG(log::version(), "Update versioned dataframe for stream_id: {} , version_id = {}", stream_id, update_info.previous_index_key_->version_id()); - auto index_segment_reader = index::get_index_reader(*(update_info.previous_index_key_), store); util::check_rte(!index_segment_reader.is_pickled(), "Cannot update pickled data"); - auto index_desc = check_index_match(frame->index, index_segment_reader.tsd().index()); - util::check( - index_desc.type() == IndexDescriptor::Type::TIMESTAMP || index_desc.type() == IndexDescriptor::Type::EMPTY, - "Update not supported for non-timeseries indexes" - ); - check_update_data_is_sorted(*frame, index_segment_reader); - bool bucketize_dynamic = index_segment_reader.bucketize_dynamic(); - (void)check_and_mark_slices(index_segment_reader, dynamic_schema, false, std::nullopt, bucketize_dynamic); - fix_descriptor_mismatch_or_throw(UPDATE, dynamic_schema, index_segment_reader, *frame, empty_types); - - std::vector> queries = - build_update_query_filters(query.row_filter, frame->index, frame->index_range, dynamic_schema, index_segment_reader.bucketize_dynamic()); - auto combined = combine_filter_functions(queries); - auto affected_keys = filter_index(index_segment_reader, std::move(combined)); - std::vector unaffected_keys; - std::set_difference(std::begin(index_segment_reader), - std::end(index_segment_reader), - std::begin(affected_keys), - std::end(affected_keys), - std::back_inserter(unaffected_keys)); - - util::check(affected_keys.size() + unaffected_keys.size() == index_segment_reader.size(), "Unaffected vs affected keys split was inconsistent {} + {} != {}", - affected_keys.size(), unaffected_keys.size(), index_segment_reader.size()); + const auto index_desc = check_index_match(frame.index, index_segment_reader.tsd().index()); + util::check(index::is_timeseries_index(index_desc), "Update not supported for non-timeseries indexes"); + check_update_data_is_sorted(frame, index_segment_reader); + (void)check_and_mark_slices(index_segment_reader, dynamic_schema, false, std::nullopt, index_segment_reader.bucketize_dynamic()); + fix_descriptor_mismatch_or_throw(UPDATE, dynamic_schema, index_segment_reader, frame, empty_types); +} - frame->set_bucketize_dynamic(bucketize_dynamic); - auto slicing_arg = get_slicing_policy(options, *frame); +static std::vector get_keys_affected_by_update( + const index::IndexSegmentReader& index_segment_reader, + const InputTensorFrame& frame, + const UpdateQuery& query, + bool dynamic_schema +) { + std::vector> queries = build_update_query_filters( + query.row_filter, + frame.index, + frame.index_range, + dynamic_schema, + index_segment_reader.bucketize_dynamic()); + return filter_index(index_segment_reader, combine_filter_functions(queries)); +} - auto new_slice_and_keys = slice_and_write(frame, slicing_arg, IndexPartialKey{stream_id, update_info.next_version_id_}, store).wait().value(); - std::sort(std::begin(new_slice_and_keys), std::end(new_slice_and_keys)); - - IndexRange orig_filter_range; - auto[intersect_before, intersect_after] = util::variant_match(query.row_filter, - [&](std::monostate) { - util::check(std::holds_alternative(frame->index), "Update with row count index is not permitted"); - orig_filter_range = frame->index_range; - if (new_slice_and_keys.empty()) { - // If there are no new keys, then we can't intersect with the existing data. - return std::make_pair(std::vector{}, std::vector{}); - } - auto front_range = new_slice_and_keys.begin()->key().index_range(); - auto back_range = new_slice_and_keys.rbegin()->key().index_range(); - back_range.adjust_open_closed_interval(); - return intersecting_segments(affected_keys, front_range, back_range, update_info.next_version_id_, store); - }, - [&](const IndexRange& idx_range) { - orig_filter_range = idx_range; - return intersecting_segments(affected_keys, idx_range, idx_range, update_info.next_version_id_, store); - }, - [](const RowRange&)-> std::pair, std::vector> { - util::raise_rte("Unexpected row_range in update query"); - } - ); +static std::vector get_keys_not_affected_by_update( + const index::IndexSegmentReader& index_segment_reader, + std::span affected_keys +) { + std::vector unaffected_keys; + std::set_difference(index_segment_reader.begin(), + index_segment_reader.end(), + affected_keys.begin(), + affected_keys.end(), + std::back_inserter(unaffected_keys)); + return unaffected_keys; +} - size_t row_count = 0; +static std::pair, size_t> get_slice_and_keys_for_update( + const UpdateRanges& update_ranges, + std::span unaffected_keys, + std::span affected_keys, + const IntersectingSegments& segments_intersecting_with_update_range, + std::vector&& new_slice_and_keys +) { const size_t new_keys_size = new_slice_and_keys.size(); + size_t row_count = 0; const std::array, 5> groups{ - strictly_before(orig_filter_range, unaffected_keys), - std::move(intersect_before), + strictly_before(update_ranges.original_index_range, unaffected_keys), + std::move(std::get<0>(segments_intersecting_with_update_range)), std::move(new_slice_and_keys), - std::move(intersect_after), - strictly_after(orig_filter_range, unaffected_keys)}; + std::move(std::get<1>(segments_intersecting_with_update_range)), + strictly_after(update_ranges.original_index_range, unaffected_keys)}; auto flattened_slice_and_keys = flatten_and_fix_rows(groups, row_count); - util::check(unaffected_keys.size() + new_keys_size + (affected_keys.size() * 2) >= flattened_slice_and_keys.size(), - "Output size mismatch: {} + {} + (2 * {}) < {}", - unaffected_keys.size(), new_keys_size, affected_keys.size(), flattened_slice_and_keys.size()); - + "Output size mismatch: {} + {} + (2 * {}) < {}", + unaffected_keys.size(), new_keys_size, affected_keys.size(), flattened_slice_and_keys.size()); std::sort(std::begin(flattened_slice_and_keys), std::end(flattened_slice_and_keys)); - auto tsd = index::get_merged_tsd(row_count, dynamic_schema, index_segment_reader.tsd(), frame); - auto version_key_fut = index::write_index(stream::index_type_from_descriptor(tsd.as_stream_descriptor()), std::move(tsd), std::move(flattened_slice_and_keys), IndexPartialKey{stream_id, update_info.next_version_id_}, store); - auto version_key = std::move(version_key_fut).get(); + return {flattened_slice_and_keys, row_count}; +} + +folly::Future async_update_impl( + const std::shared_ptr& store, + const UpdateInfo& update_info, + const UpdateQuery& query, + const std::shared_ptr& frame, + WriteOptions&& options, + bool dynamic_schema, + bool empty_types) { + return index::async_get_index_reader(*(update_info.previous_index_key_), store).thenValue([ + store, + update_info, + query, + frame, + options=std::move(options), + dynamic_schema, + empty_types + ](index::IndexSegmentReader&& index_segment_reader) { + check_can_update(*frame, index_segment_reader, update_info, dynamic_schema, empty_types); + ARCTICDB_DEBUG(log::version(), "Update versioned dataframe for stream_id: {} , version_id = {}", frame->desc.id(), update_info.previous_index_key_->version_id()); + frame->set_bucketize_dynamic(index_segment_reader.bucketize_dynamic()); + return slice_and_write(frame, get_slicing_policy(options, *frame), IndexPartialKey{frame->desc.id(), update_info.next_version_id_} , store + ).via(&async::cpu_executor()).thenValue([ + store, + update_info, + query, + frame, + dynamic_schema, + index_segment_reader=std::move(index_segment_reader) + ](std::vector&& new_slice_and_keys) mutable { + std::sort(std::begin(new_slice_and_keys), std::end(new_slice_and_keys)); + auto affected_keys = get_keys_affected_by_update(index_segment_reader, *frame, query, dynamic_schema); + auto unaffected_keys = get_keys_not_affected_by_update(index_segment_reader, affected_keys); + util::check( + affected_keys.size() + unaffected_keys.size() == index_segment_reader.size(), + "The sum of affected keys and unaffected keys must be equal to the total number of keys {} + {} != {}", + affected_keys.size(), unaffected_keys.size(), index_segment_reader.size()); + const UpdateRanges update_ranges = compute_update_ranges(query.row_filter, *frame, new_slice_and_keys); + return async_intersecting_segments( + affected_keys, + update_ranges.front, + update_ranges.back, + update_info.next_version_id_, + store).thenValue([new_slice_and_keys=std::move(new_slice_and_keys), + update_ranges=update_ranges, + unaffected_keys=std::move(unaffected_keys), + affected_keys=std::move(affected_keys), + index_segment_reader=std::move(index_segment_reader), + frame, + dynamic_schema, + update_info, + store](IntersectingSegments&& intersecting_segments) mutable { + auto [flattened_slice_and_keys, row_count] = get_slice_and_keys_for_update( + update_ranges, + unaffected_keys, + affected_keys, + intersecting_segments, + std::move(new_slice_and_keys)); + auto tsd = index::get_merged_tsd(row_count, dynamic_schema, index_segment_reader.tsd(), frame); + return index::write_index( + index_type_from_descriptor(tsd.as_stream_descriptor()), + std::move(tsd), + std::move(flattened_slice_and_keys), + IndexPartialKey{frame->desc.id(), update_info.next_version_id_}, + store + ); + }); + }); + }); +} + +VersionedItem update_impl( + const std::shared_ptr& store, + const UpdateInfo& update_info, + const UpdateQuery& query, + const std::shared_ptr& frame, + WriteOptions&& options, + bool dynamic_schema, + bool empty_types) { + auto version_key = async_update_impl(store, update_info, query, frame, std::move(options), dynamic_schema, empty_types).get(); auto versioned_item = VersionedItem(to_atom(std::move(version_key))); - ARCTICDB_DEBUG(log::version(), "updated stream_id: {} , version_id: {}", stream_id, update_info.next_version_id_); + ARCTICDB_DEBUG(log::version(), "updated stream_id: {} , version_id: {}", frame->desc.id(), update_info.next_version_id_); return versioned_item; } diff --git a/cpp/arcticdb/version/version_core.hpp b/cpp/arcticdb/version/version_core.hpp index a60e80ddbf..7a26c69946 100644 --- a/cpp/arcticdb/version/version_core.hpp +++ b/cpp/arcticdb/version/version_core.hpp @@ -93,7 +93,7 @@ VersionedItem update_impl( const UpdateInfo& update_info, const UpdateQuery & query, const std::shared_ptr& frame, - const WriteOptions&& options, + WriteOptions&& options, bool dynamic_schema, bool empty_types); diff --git a/cpp/third_party/pybind11 b/cpp/third_party/pybind11 index 80dc998efc..a2e59f0e70 160000 --- a/cpp/third_party/pybind11 +++ b/cpp/third_party/pybind11 @@ -1 +1 @@ -Subproject commit 80dc998efced8ceb2be59756668a7e90e8bef917 +Subproject commit a2e59f0e7065404b44dfe92a28aca47ba1378dc4