From 7cdcebcf227ad7089d5414259aaa63ba984ba448 Mon Sep 17 00:00:00 2001 From: Vasil Danielov Pashov Date: Mon, 6 Jan 2025 17:47:55 +0200 Subject: [PATCH 1/2] =?UTF-8?q?Improve=20the=20performance=20of=20update?= =?UTF-8?q?=20by=20parallelising=20reads.=20Implement=20=D0=B8=D0=BD=D1=82?= =?UTF-8?q?=D0=B5=D1=80=D0=BD=D0=B0=D0=BB=20async=20update=20method.=20(#2?= =?UTF-8?q?087)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #### Reference Issues/PRs Implement `async_update_impl` function which returns a future. The synchronous version for update just calls it and waits for the future just like append does. This keeps most of the code for update the same, however instead of calling `.get` on futures it will chain then and return a future. In the process of doing this the reads needed by update were made in parallel. Thus the regular update will have improved performance. Slight refactor of C++ unit tests, using std::array instead of std::vector for fixed size collections and placing const and constexpr specifiers. No functional changes. #### What does this implement or fix? #### Any other comments? #### Checklist
Checklist for code changes... - [ ] Have you updated the relevant docstrings, documentation and copyright notice? - [ ] Is this contribution tested against [all ArcticDB's features](../docs/mkdocs/docs/technical/contributing.md)? - [ ] Do all exceptions introduced raise appropriate [error messages](https://docs.arcticdb.io/error_messages/)? - [ ] Are API changes highlighted in the PR description? - [ ] Is the PR labelled as enhancement or bug so it appears in autogenerated release notes?
--- .../pipeline/index_segment_reader.cpp | 6 + .../pipeline/index_segment_reader.hpp | 10 +- cpp/arcticdb/pipeline/index_utils.cpp | 4 + cpp/arcticdb/pipeline/index_utils.hpp | 2 + cpp/arcticdb/pipeline/query.hpp | 41 ++- cpp/arcticdb/pipeline/read_pipeline.hpp | 33 +- cpp/arcticdb/pipeline/write_frame.cpp | 70 +++-- cpp/arcticdb/pipeline/write_frame.hpp | 2 +- .../processing/test/benchmark_clause.cpp | 4 +- cpp/arcticdb/stream/stream_source.hpp | 1 - .../stream/test/stream_test_common.hpp | 25 +- .../version/local_versioned_engine.cpp | 4 +- .../version/test/test_version_store.cpp | 179 +++++------ cpp/arcticdb/version/version_core.cpp | 285 ++++++++++++------ cpp/arcticdb/version/version_core.hpp | 2 +- 15 files changed, 376 insertions(+), 292 deletions(-) diff --git a/cpp/arcticdb/pipeline/index_segment_reader.cpp b/cpp/arcticdb/pipeline/index_segment_reader.cpp index db77bf51d5..5a8a718efc 100644 --- a/cpp/arcticdb/pipeline/index_segment_reader.cpp +++ b/cpp/arcticdb/pipeline/index_segment_reader.cpp @@ -24,6 +24,12 @@ IndexSegmentReader get_index_reader(const AtomKey &prev_index, const std::shared return index::IndexSegmentReader{std::move(seg)}; } +folly::Future async_get_index_reader(const AtomKey &prev_index, const std::shared_ptr &store) { + return store->read(prev_index).thenValueInline([](std::pair&& key_seg) { + return IndexSegmentReader{std::move(key_seg.second)}; + }); +} + IndexSegmentReader::IndexSegmentReader(SegmentInMemory&& s) : seg_(std::move(s)) { } diff --git a/cpp/arcticdb/pipeline/index_segment_reader.hpp b/cpp/arcticdb/pipeline/index_segment_reader.hpp index 14ad963df2..edfb10b803 100644 --- a/cpp/arcticdb/pipeline/index_segment_reader.hpp +++ b/cpp/arcticdb/pipeline/index_segment_reader.hpp @@ -8,13 +8,9 @@ #pragma once #include -#include #include #include - -#include - -#include +#include namespace arcticdb { class Store; @@ -135,6 +131,10 @@ index::IndexSegmentReader get_index_reader( const AtomKey &prev_index, const std::shared_ptr &store); +folly::Future async_get_index_reader( + const AtomKey &prev_index, + const std::shared_ptr &store); + IndexRange get_index_segment_range( const AtomKey &prev_index, const std::shared_ptr &store); diff --git a/cpp/arcticdb/pipeline/index_utils.cpp b/cpp/arcticdb/pipeline/index_utils.cpp index 943147dcd1..401a0719e2 100644 --- a/cpp/arcticdb/pipeline/index_utils.cpp +++ b/cpp/arcticdb/pipeline/index_utils.cpp @@ -129,4 +129,8 @@ TimeseriesDescriptor get_merged_tsd( ); } +bool is_timeseries_index(const IndexDescriptorImpl& index_desc) { + return index_desc.type() == IndexDescriptor::Type::TIMESTAMP || index_desc.type() == IndexDescriptor::Type::EMPTY; +} + } //namespace arcticdb::pipelines::index \ No newline at end of file diff --git a/cpp/arcticdb/pipeline/index_utils.hpp b/cpp/arcticdb/pipeline/index_utils.hpp index 931cbaf3ef..4641188b8c 100644 --- a/cpp/arcticdb/pipeline/index_utils.hpp +++ b/cpp/arcticdb/pipeline/index_utils.hpp @@ -138,4 +138,6 @@ TimeseriesDescriptor get_merged_tsd( const TimeseriesDescriptor& existing_tsd, const std::shared_ptr& new_frame); +[[nodiscard]] bool is_timeseries_index(const IndexDescriptorImpl& index_desc); + } //namespace arcticdb::pipelines::index \ No newline at end of file diff --git a/cpp/arcticdb/pipeline/query.hpp b/cpp/arcticdb/pipeline/query.hpp index 9a3d55b183..a0d13cbf9e 100644 --- a/cpp/arcticdb/pipeline/query.hpp +++ b/cpp/arcticdb/pipeline/query.hpp @@ -10,15 +10,10 @@ #include #include #include -#include -#include #include #include -#include #include #include -#include -#include #include #include #include @@ -28,9 +23,13 @@ #include #include #include +#include +#include namespace arcticdb::pipelines { +namespace ranges = std::ranges; + using FilterRange = std::variant; /* @@ -405,20 +404,18 @@ inline FilterRange get_query_index_range( return RowRange{std::get(index_range.start_), std::get(index_range.end_)}; } -inline std::vector strictly_before(const FilterRange &range, const std::vector &input) { +inline std::vector strictly_before(const FilterRange &range, std::span input) { std::vector output; util::variant_match(range, [&](const RowRange &row_range) { - std::copy_if(std::begin(input), std::end(input), std::back_inserter(output), - [&](const auto &sk) { - return sk.slice_.row_range.second < row_range.first; - }); + ranges::copy_if(input, std::back_inserter(output), [&](const auto &sk) { + return sk.slice_.row_range.second < row_range.first; + }); }, [&](const IndexRange &index_range) { - std::copy_if(std::begin(input), std::end(input), std::back_inserter(output), - [&](const auto &sk) { - return sk.key().index_range().end_ < index_range.start_; - }); + ranges::copy_if(input, std::back_inserter(output), [&](const auto &sk) { + return sk.key().index_range().end_ < index_range.start_; + }); }, [&](const auto &) { util::raise_rte("Expected specified range "); @@ -426,20 +423,18 @@ inline std::vector strictly_before(const FilterRange &range, const return output; } -inline std::vector strictly_after(const FilterRange &range, const std::vector &input) { +inline std::vector strictly_after(const FilterRange &range, std::span input) { std::vector output; util::variant_match(range, [&input, &output](const RowRange &row_range) { - std::copy_if(std::begin(input), std::end(input), std::back_inserter(output), - [&](const auto &sk) { - return sk.slice_.row_range.first > row_range.second; - }); + ranges::copy_if(input, std::back_inserter(output), [&](const auto &sk) { + return sk.slice_.row_range.first > row_range.second; + }); }, [&input, &output](const IndexRange &index_range) { - std::copy_if(std::begin(input), std::end(input), std::back_inserter(output), - [&](const auto &sk) { - return sk.key().index_range().start_ > index_range.end_; - }); + ranges::copy_if(input, std::back_inserter(output), [&](const auto &sk) { + return sk.key().index_range().start_ > index_range.end_; + }); }, [](const auto &) { util::raise_rte("Expected specified range "); diff --git a/cpp/arcticdb/pipeline/read_pipeline.hpp b/cpp/arcticdb/pipeline/read_pipeline.hpp index 00ed5c207a..9bfffca85f 100644 --- a/cpp/arcticdb/pipeline/read_pipeline.hpp +++ b/cpp/arcticdb/pipeline/read_pipeline.hpp @@ -8,24 +8,9 @@ #pragma once #include - -#include -#include - #include -#include -#include -#include - -#include -#include -#include #include -#include #include -#include -#include -#include #include #include #include @@ -61,22 +46,24 @@ void foreach_active_bit(const util::BitSet &bs, C &&visitor) { } } -template -std::vector filter_index(const ContainerType &container, std::optional> &&query) { +inline std::vector filter_index( + const index::IndexSegmentReader& index_segment_reader, + std::optional> &&query +) { ARCTICDB_SAMPLE_DEFAULT(FilterIndex) std::vector output{}; - if (container.size()> 0) { + if (!index_segment_reader.empty()) { if(query) { - auto row_bitset = (*query)(container); + auto row_bitset = (*query)(index_segment_reader); ARCTICDB_DEBUG(log::version(), "Row bitset has {} bits set of {}", row_bitset->count(), row_bitset->size()); output.reserve(row_bitset->count()); foreach_active_bit(*row_bitset, [&](auto r) { - output.emplace_back(get_row(container, r)); + output.emplace_back(get_row(index_segment_reader, r)); }); } else { - output.reserve(container.size()); - for(auto i = 0u; i < container.size(); ++i) { - output.emplace_back(get_row(container, i)); + output.reserve(index_segment_reader.size()); + for(auto i = 0u; i < index_segment_reader.size(); ++i) { + output.emplace_back(get_row(index_segment_reader, i)); } } } diff --git a/cpp/arcticdb/pipeline/write_frame.cpp b/cpp/arcticdb/pipeline/write_frame.cpp index 4cca47957c..29205880d1 100644 --- a/cpp/arcticdb/pipeline/write_frame.cpp +++ b/cpp/arcticdb/pipeline/write_frame.cpp @@ -16,7 +16,6 @@ #include #include #include -#include #include #include #include @@ -24,12 +23,14 @@ #include #include #include +#include namespace arcticdb::pipelines { using namespace arcticdb::entity; using namespace arcticdb::stream; +namespace ranges = std::ranges; WriteToSegmentTask::WriteToSegmentTask( std::shared_ptr frame, @@ -252,40 +253,46 @@ static RowRange partial_rewrite_row_range( } } -std::optional rewrite_partial_segment( +folly::Future> async_rewrite_partial_segment( const SliceAndKey& existing, const IndexRange& index_range, VersionId version_id, AffectedSegmentPart affected_part, const std::shared_ptr& store) { - const auto& key = existing.key(); - auto kv = store->read(key).get(); - const SegmentInMemory& segment = kv.second; - const RowRange affected_row_range = partial_rewrite_row_range(segment, index_range, affected_part); - const int64_t num_rows = affected_row_range.end() - affected_row_range.start(); - if (num_rows <= 0) { - return std::nullopt; - } - SegmentInMemory output = segment.truncate(affected_row_range.start(), affected_row_range.end(), true); - const IndexValue start_ts = TimeseriesIndex::start_value_for_segment(output); - // +1 as in the key we store one nanosecond greater than the last index value in the segment - const IndexValue end_ts = std::get(TimeseriesIndex::end_value_for_segment(output)) + 1; - FrameSlice new_slice{ - std::make_shared(output.descriptor()), - existing.slice_.col_range, - RowRange{0, num_rows}, - existing.slice_.hash_bucket(), - existing.slice_.num_buckets()}; - - auto fut_key = store->write( - key.type(), + return store->read(existing.key()).thenValueInline([ + existing, + index_range, version_id, - key.id(), - start_ts, - end_ts, - std::move(output) - ); - return SliceAndKey{std::move(new_slice), std::get(std::move(fut_key).get())}; + affected_part, + store](std::pair&& key_segment) -> folly::Future> { + const auto& key = existing.key(); + const SegmentInMemory& segment = key_segment.second; + const RowRange affected_row_range = partial_rewrite_row_range(segment, index_range, affected_part); + const int64_t num_rows = affected_row_range.end() - affected_row_range.start(); + if (num_rows <= 0) { + return std::nullopt; + } + SegmentInMemory output = segment.truncate(affected_row_range.start(), affected_row_range.end(), true); + const IndexValue start_ts = TimeseriesIndex::start_value_for_segment(output); + // +1 as in the key we store one nanosecond greater than the last index value in the segment + const IndexValue end_ts = std::get(TimeseriesIndex::end_value_for_segment(output)) + 1; + FrameSlice new_slice{ + std::make_shared(output.descriptor()), + existing.slice_.col_range, + RowRange{0, num_rows}, + existing.slice_.hash_bucket(), + existing.slice_.num_buckets()}; + return store->write( + key.type(), + version_id, + key.id(), + start_ts, + end_ts, + std::move(output) + ).thenValueInline([new_slice=std::move(new_slice)](VariantKey&& k) { + return std::make_optional(std::move(new_slice), std::get(std::move(k))); + }); + }); } std::vector flatten_and_fix_rows(const std::array, 5>& groups, size_t& global_count) { @@ -301,10 +308,9 @@ std::vector flatten_and_fix_rows(const std::array rewrite_partial_segment( +folly::Future> async_rewrite_partial_segment( const SliceAndKey& existing, const IndexRange& index_range, VersionId version_id, diff --git a/cpp/arcticdb/processing/test/benchmark_clause.cpp b/cpp/arcticdb/processing/test/benchmark_clause.cpp index 748faaf3e7..d306e234f3 100644 --- a/cpp/arcticdb/processing/test/benchmark_clause.cpp +++ b/cpp/arcticdb/processing/test/benchmark_clause.cpp @@ -21,9 +21,7 @@ using namespace arcticdb; SegmentInMemory get_segment_for_merge(const StreamId &id, size_t num_rows, size_t start, size_t step){ auto segment = SegmentInMemory{ - get_test_descriptor(id, { - scalar_field(DataType::UINT8, "column") - }), + get_test_descriptor(id, std::array{scalar_field(DataType::UINT8, "column")}), num_rows }; auto& index_col = segment.column(0); diff --git a/cpp/arcticdb/stream/stream_source.hpp b/cpp/arcticdb/stream/stream_source.hpp index 3d7487fb9e..5745f82686 100644 --- a/cpp/arcticdb/stream/stream_source.hpp +++ b/cpp/arcticdb/stream/stream_source.hpp @@ -8,7 +8,6 @@ #pragma once #include -#include #include #include #include diff --git a/cpp/arcticdb/stream/test/stream_test_common.hpp b/cpp/arcticdb/stream/test/stream_test_common.hpp index ee46ddaeec..764e5f633b 100644 --- a/cpp/arcticdb/stream/test/stream_test_common.hpp +++ b/cpp/arcticdb/stream/test/stream_test_common.hpp @@ -7,17 +7,9 @@ #pragma once -#include #include #include -#include #include -#include - -#include -#include -#include -#include #include #include #include @@ -27,7 +19,12 @@ #include #include +#include +#include + #include +#include +#include namespace fg = folly::gen; @@ -173,10 +170,10 @@ NativeTensor test_string_column(ContainerType &vec, DTT, shape_t num_rows) { return NativeTensor{bytes, 1, &strides, &shapes, dt, elsize, vec.data(), 1}; } -inline std::vector get_test_timeseries_fields() { +inline auto get_test_timeseries_fields() { using namespace arcticdb::entity; - return { + return std::array { scalar_field(DataType::UINT8, "smallints"), scalar_field(DataType::INT64, "bigints"), scalar_field(DataType::FLOAT64, "floats"), @@ -184,10 +181,10 @@ inline std::vector get_test_timeseries_fields() { }; } -inline std::vector get_test_simple_fields() { +inline auto get_test_simple_fields() { using namespace arcticdb::entity; - return { + return std::array { scalar_field(DataType::UINT32, "index"), scalar_field(DataType::FLOAT64, "floats"), }; @@ -254,13 +251,13 @@ inline void fill_test_frame(SegmentInMemory &segment, } template -StreamDescriptor get_test_descriptor(const StreamId &id, const std::vector &fields) { +StreamDescriptor get_test_descriptor(const StreamId &id, std::span fields) { return IndexType::default_index().create_stream_descriptor(id, folly::Range(fields.begin(), fields.end())); } template TestTensorFrame get_test_frame(const StreamId &id, - const std::vector &fields, + std::span fields, size_t num_rows, size_t start_val, size_t opt_row_offset = 0) { diff --git a/cpp/arcticdb/version/local_versioned_engine.cpp b/cpp/arcticdb/version/local_versioned_engine.cpp index cd2ee0e5a1..3c35ad92ff 100644 --- a/cpp/arcticdb/version/local_versioned_engine.cpp +++ b/cpp/arcticdb/version/local_versioned_engine.cpp @@ -560,9 +560,7 @@ VersionedItem LocalVersionedEngine::update_internal( bool prune_previous_versions) { ARCTICDB_RUNTIME_DEBUG(log::version(), "Command: update"); py::gil_scoped_release release_gil; - auto update_info = get_latest_undeleted_version_and_next_version_id(store(), - version_map(), - stream_id); + auto update_info = get_latest_undeleted_version_and_next_version_id(store(), version_map(), stream_id); if (update_info.previous_index_key_.has_value()) { if (frame->empty()) { ARCTICDB_DEBUG(log::version(), "Updating existing data with an empty item has no effect. \n" diff --git a/cpp/arcticdb/version/test/test_version_store.cpp b/cpp/arcticdb/version/test/test_version_store.cpp index 060d3813a2..9f51ff98da 100644 --- a/cpp/arcticdb/version/test/test_version_store.cpp +++ b/cpp/arcticdb/version/test/test_version_store.cpp @@ -452,7 +452,7 @@ TEST(VersionStore, AppendRefKeyOptimisation) { size_t num_rows{5}; size_t start_val{0}; - std::vector fields{ + const std::array fields{ scalar_field(DataType::UINT8, "thing1"), scalar_field(DataType::UINT8, "thing2"), scalar_field(DataType::UINT16, "thing3"), @@ -512,24 +512,24 @@ TEST(VersionStore, UpdateWithin) { ScopedConfig reload_interval("VersionMap.ReloadInterval", 0); PilotedClock::reset(); - StreamId symbol("update_schema"); + const StreamId symbol("update_schema"); auto version_store = get_test_engine(); - size_t num_rows{100}; - size_t start_val{0}; + constexpr size_t num_rows{100}; + constexpr size_t start_val{0}; - std::vector fields{ + const std::array fields{ scalar_field(DataType::UINT8, "thing1"), scalar_field(DataType::UINT8, "thing2"), scalar_field(DataType::UINT16, "thing3"), scalar_field(DataType::UINT16, "thing4") }; - auto test_frame = get_test_frame(symbol, fields, num_rows, start_val); + auto test_frame = get_test_frame(symbol, fields, num_rows, start_val); version_store.write_versioned_dataframe_internal(symbol, std::move(test_frame.frame_), false, false, false); - RowRange update_range{10, 15}; - size_t update_val{1}; - auto update_frame = get_test_frame(symbol, fields, update_range.diff(), update_range.first, update_val); + constexpr RowRange update_range{10, 15}; + constexpr size_t update_val{100}; + auto update_frame = get_test_frame(symbol, fields, update_range.diff(), update_range.first, update_val); version_store.update_internal(symbol, UpdateQuery{}, std::move(update_frame.frame_), false, false, false); auto read_query = std::make_shared(); @@ -539,12 +539,9 @@ TEST(VersionStore, UpdateWithin) { const auto& seg = read_result.frame_and_descriptor_.frame_; for(auto i = 0u; i < num_rows; ++i) { - auto expected = i; - if(update_range.contains(i)) - expected += update_val; - - auto val1 = seg.scalar_at(i, 1); - ASSERT_EQ(val1.value(), expected); + const uint8_t expected = update_range.contains(i) ? i + update_val : i; + const auto value = seg.scalar_at(i, 1).value(); + EXPECT_EQ(expected, value); } } @@ -555,12 +552,12 @@ TEST(VersionStore, UpdateBefore) { using namespace arcticdb::pipelines; PilotedClock::reset(); - StreamId symbol("update_schema"); + const StreamId symbol("update_schema"); auto version_store = get_test_engine(); - size_t num_rows{100}; - size_t start_val{10}; + constexpr size_t num_rows{100}; + constexpr size_t start_val{10}; - std::vector fields{ + const std::array fields{ scalar_field(DataType::UINT8, "thing1"), scalar_field(DataType::UINT8, "thing2"), scalar_field(DataType::UINT16, "thing3"), @@ -570,8 +567,8 @@ TEST(VersionStore, UpdateBefore) { auto test_frame = get_test_frame(symbol, fields, num_rows, start_val); version_store.write_versioned_dataframe_internal(symbol, std::move(test_frame.frame_), false, false, false); - RowRange update_range{0, 10}; - size_t update_val{1}; + constexpr RowRange update_range{0, 10}; + constexpr size_t update_val{1}; auto update_frame = get_test_frame(symbol, fields, update_range.diff(), update_range.first, update_val); version_store.update_internal(symbol, UpdateQuery{}, std::move(update_frame.frame_), false, false, false); @@ -582,12 +579,9 @@ TEST(VersionStore, UpdateBefore) { const auto& seg = read_result.frame_and_descriptor_.frame_; for(auto i = 0u; i < num_rows + update_range.diff(); ++i) { - auto expected = i; - if(update_range.contains(i)) - expected += update_val; - - auto val1 = seg.scalar_at(i, 1); - ASSERT_EQ(val1.value(), expected); + const auto expected = update_range.contains(i) ? i + update_val : i; + const auto value = seg.scalar_at(i, 1).value(); + ASSERT_EQ(value, expected); } } @@ -598,12 +592,12 @@ TEST(VersionStore, UpdateAfter) { using namespace arcticdb::pipelines; PilotedClock::reset(); - StreamId symbol("update_schema"); + const StreamId symbol("update_schema"); auto version_store = get_test_engine(); - size_t num_rows{100}; - size_t start_val{0}; + constexpr size_t num_rows{100}; + constexpr size_t start_val{0}; - std::vector fields{ + const std::array fields{ scalar_field(DataType::UINT8, "thing1"), scalar_field(DataType::UINT8, "thing2"), scalar_field(DataType::UINT16, "thing3"), @@ -613,8 +607,8 @@ TEST(VersionStore, UpdateAfter) { auto test_frame = get_test_frame(symbol, fields, num_rows, start_val); version_store.write_versioned_dataframe_internal(symbol, std::move(test_frame.frame_), false, false, false); - RowRange update_range{100, 110}; - size_t update_val{1}; + constexpr RowRange update_range{100, 110}; + constexpr size_t update_val{1}; auto update_frame = get_test_frame(symbol, fields, update_range.diff(), update_range.first, update_val); version_store.update_internal(symbol, UpdateQuery{}, std::move(update_frame.frame_), false, false, false); @@ -625,12 +619,9 @@ TEST(VersionStore, UpdateAfter) { const auto& seg = read_result.frame_and_descriptor_.frame_; for(auto i = 0u; i < num_rows + update_range.diff(); ++i) { - auto expected = i; - if(update_range.contains(i)) - expected += update_val; - - auto val1 = seg.scalar_at(i, 1); - ASSERT_EQ(val1.value(), expected); + const auto expected = update_range.contains(i) ? i + update_val : i; + const auto value = seg.scalar_at(i, 1).value(); + ASSERT_EQ(value, expected); } } @@ -641,12 +632,12 @@ TEST(VersionStore, UpdateIntersectBefore) { using namespace arcticdb::pipelines; PilotedClock::reset(); - StreamId symbol("update_schema"); + const StreamId symbol("update_schema"); auto version_store = get_test_engine(); - size_t num_rows{100}; - size_t start_val{5}; + constexpr size_t num_rows{100}; + constexpr size_t start_val{5}; - std::vector fields{ + const std::array fields{ scalar_field(DataType::UINT8, "thing1"), scalar_field(DataType::UINT8, "thing2"), scalar_field(DataType::UINT16, "thing3"), @@ -656,8 +647,8 @@ TEST(VersionStore, UpdateIntersectBefore) { auto test_frame = get_test_frame(symbol, fields, num_rows, start_val); version_store.write_versioned_dataframe_internal(symbol, std::move(test_frame.frame_), false, false, false); - RowRange update_range{0, 10}; - size_t update_val{1}; + constexpr RowRange update_range{0, 10}; + constexpr size_t update_val{1}; auto update_frame = get_test_frame(symbol, fields, update_range.diff(), update_range.first, update_val); version_store.update_internal(symbol, UpdateQuery{}, std::move(update_frame.frame_), false, false, false); @@ -669,12 +660,9 @@ TEST(VersionStore, UpdateIntersectBefore) { const auto &seg = read_result.frame_and_descriptor_.frame_; for (auto i = 0u; i < num_rows + 5; ++i) { - auto expected = i; - if (update_range.contains(i)) - expected += update_val; - - auto val1 = seg.scalar_at(i, 1); - ASSERT_EQ(val1.value(), expected); + const auto expected = update_range.contains(i) ? i + update_val : i; + const auto value = seg.scalar_at(i, 1).value(); + ASSERT_EQ(value, expected); } } @@ -685,12 +673,12 @@ TEST(VersionStore, UpdateIntersectAfter) { using namespace arcticdb::pipelines; PilotedClock::reset(); - StreamId symbol("update_schema"); + const StreamId symbol("update_schema"); auto version_store = get_test_engine(); - size_t num_rows{100}; - size_t start_val{0}; + constexpr size_t num_rows{100}; + constexpr size_t start_val{0}; - std::vector fields{ + const std::array fields{ scalar_field(DataType::UINT8, "thing1"), scalar_field(DataType::UINT8, "thing2"), scalar_field(DataType::UINT16, "thing3"), @@ -700,8 +688,8 @@ TEST(VersionStore, UpdateIntersectAfter) { auto test_frame = get_test_frame(symbol, fields, num_rows, start_val); version_store.write_versioned_dataframe_internal(symbol, std::move(test_frame.frame_), false, false, false); - RowRange update_range{95, 105}; - size_t update_val{1}; + constexpr RowRange update_range{95, 105}; + constexpr size_t update_val{1}; auto update_frame = get_test_frame(symbol, fields, update_range.diff(), update_range.first, update_val); version_store.update_internal(symbol, UpdateQuery{}, std::move(update_frame.frame_), false, false, false); @@ -713,12 +701,9 @@ TEST(VersionStore, UpdateIntersectAfter) { const auto &seg = read_result.frame_and_descriptor_.frame_; for (auto i = 0u; i < num_rows + 5; ++i) { - auto expected = i; - if (update_range.contains(i)) - expected += update_val; - - auto val1 = seg.scalar_at(i, 1); - ASSERT_EQ(val1.value(), expected); + const auto expected = update_range.contains(i) ? i + update_val : i; + const auto value = seg.scalar_at(i, 1).value(); + ASSERT_EQ(value, expected); } } @@ -729,12 +714,12 @@ TEST(VersionStore, UpdateWithinSchemaChange) { using namespace arcticdb::pipelines; PilotedClock::reset(); - StreamId symbol("update_schema"); + const StreamId symbol("update_schema"); auto version_store = get_test_engine(); - size_t num_rows{100}; - size_t start_val{0}; + constexpr size_t num_rows{100}; + constexpr size_t start_val{0}; - std::vector fields{ + const std::array fields{ scalar_field(DataType::UINT8, "thing1"), scalar_field(DataType::UINT8, "thing2"), scalar_field(DataType::UINT16, "thing3"), @@ -745,10 +730,10 @@ TEST(VersionStore, UpdateWithinSchemaChange) { version_store. write_versioned_dataframe_internal(symbol, std::move(test_frame.frame_), false, false, false); - RowRange update_range{10, 15}; - size_t update_val{1}; + constexpr RowRange update_range{10, 15}; + constexpr size_t update_val{1}; - std::vector update_fields{ + const std::array update_fields{ scalar_field(DataType::UINT8, "thing1"), scalar_field(DataType::UINT8, "thing2"), scalar_field(DataType::UINT16, "thing3"), @@ -767,20 +752,17 @@ TEST(VersionStore, UpdateWithinSchemaChange) { const auto &seg = read_result.frame_and_descriptor_.frame_; for (auto i = 0u;i < num_rows; ++i) { - auto expected = i; - if (update_range.contains(i)) - expected += update_val; - - auto val1 = seg.scalar_at(i, 1); - check_value(val1.value(), expected); + auto expected = update_range.contains(i) ? i + update_val : i; + const auto val1 = seg.scalar_at(i, 1).value(); + check_value(val1, expected); expected = update_range.contains(i) ? 0 : i; - auto val2 = seg.scalar_at(i, 4); - check_value(val2.value(), expected); + const auto val2 = seg.scalar_at(i, 4).value(); + check_value(val2, expected); expected = !update_range.contains(i) ? 0 : i + update_val; - auto val3 = seg.scalar_at(i, 5); - check_value(val3.value(), expected); + const auto val3 = seg.scalar_at(i, 5).value(); + check_value(val3, expected); } } @@ -791,14 +773,14 @@ TEST(VersionStore, UpdateWithinTypeAndSchemaChange) { using namespace arcticdb::pipelines; PilotedClock::reset(); - StreamId symbol("update_schema"); + const StreamId symbol("update_schema"); arcticdb::proto::storage::VersionStoreConfig version_store_cfg; version_store_cfg.mutable_write_options()->set_dynamic_schema(true); auto version_store = get_test_engine({version_store_cfg}); - size_t num_rows{100}; - size_t start_val{0}; + constexpr size_t num_rows{100}; + constexpr size_t start_val{0}; - std::vector fields{ + const std::array fields{ scalar_field(DataType::UINT8, "thing1"), scalar_field(DataType::UINT8, "thing2"), scalar_field(DataType::UINT16, "thing3"), @@ -808,10 +790,10 @@ TEST(VersionStore, UpdateWithinTypeAndSchemaChange) { auto test_frame = get_test_frame(symbol, fields, num_rows, start_val); version_store.write_versioned_dataframe_internal(symbol, std::move(test_frame.frame_), false, false, false); - RowRange update_range{10, 15}; - size_t update_val{1}; + constexpr RowRange update_range{10, 15}; + constexpr size_t update_val{1}; - std::vector update_fields{ + const std::array update_fields{ scalar_field(DataType::UINT8, "thing1"), scalar_field(DataType::UINT16, "thing2"), scalar_field(DataType::UINT32, "thing3"), @@ -830,20 +812,17 @@ TEST(VersionStore, UpdateWithinTypeAndSchemaChange) { const auto &seg = read_result.frame_and_descriptor_.frame_; for (auto i = 0u;i < num_rows; ++i) { - auto expected = i; - if (update_range.contains(i)) - expected += update_val; - - auto val1 = seg.scalar_at(i, 1); - check_value(val1.value(), expected); + auto expected = update_range.contains(i) ? i + update_val : i; + const auto val1 = seg.scalar_at(i, 1).value(); + check_value(val1, expected); expected = update_range.contains(i) ? 0 : i; - auto val2 = seg.scalar_at(i, 4); - check_value(val2.value(), expected); + const auto val2 = seg.scalar_at(i, 4).value(); + check_value(val2, expected); expected = !update_range.contains(i) ? 0 : i + update_val; - auto val3 = seg.scalar_at(i, 5); - check_value(val3.value(), expected); + const auto val3 = seg.scalar_at(i, 5).value(); + check_value(val3, expected); } } @@ -855,11 +834,11 @@ TEST(VersionStore, TestWriteAppendMapHead) { using namespace arcticdb::pipelines; PilotedClock::reset(); - StreamId symbol("append"); + const StreamId symbol("append"); auto version_store = get_test_engine(); - size_t num_rows{100}; + constexpr size_t num_rows{100}; - std::vector fields{ + const std::array fields{ scalar_field(DataType::UINT8, "thing1"), scalar_field(DataType::UINT8, "thing2"), scalar_field(DataType::UINT16, "thing3"), diff --git a/cpp/arcticdb/version/version_core.cpp b/cpp/arcticdb/version/version_core.cpp index b5e5be90eb..56113f9851 100644 --- a/cpp/arcticdb/version/version_core.cpp +++ b/cpp/arcticdb/version/version_core.cpp @@ -190,45 +190,65 @@ template } } -inline std::pair, std::vector> intersecting_segments( +std::vector filter_existing_slices(std::vector>&& maybe_slices) { + std::vector result; + for (auto& maybe_slice : maybe_slices) { + if (maybe_slice.has_value()) { + result.push_back(std::move(*maybe_slice)); + } + } + return result; +} + +/// Represents all slices which are intersecting (but not overlapping) with range passed to update +/// First member is a vector of all segments intersecting with the first row-slice of the update range +/// Second member is a vector of all segments intersecting with the last row-slice of the update range +using IntersectingSegments = std::tuple, std::vector>; + +[[nodiscard]] folly::Future async_intersecting_segments( const std::vector& affected_keys, const IndexRange& front_range, const IndexRange& back_range, VersionId version_id, const std::shared_ptr& store ) { - std::vector intersect_before; - std::vector intersect_after; + if (!front_range.specified_ && !back_range.specified_) { + return folly::makeFuture(IntersectingSegments{}); + } + internal::check( + front_range.specified_ && back_range.specified_, + "Both first and last index range of the update range must intersect with at least one of the slices in the dataframe"); + std::vector>> maybe_intersect_before_fut; + std::vector>> maybe_intersect_after_fut; for (const auto& affected_slice_and_key : affected_keys) { const auto& affected_range = affected_slice_and_key.key().index_range(); if (intersects(affected_range, front_range) && !overlaps(affected_range, front_range) && is_before(affected_range, front_range)) { - auto front_overlap_key = rewrite_partial_segment( + maybe_intersect_before_fut.emplace_back(async_rewrite_partial_segment( affected_slice_and_key, front_range, version_id, AffectedSegmentPart::START, store - ); - if (front_overlap_key) - intersect_before.push_back(*front_overlap_key); + )); } if (intersects(affected_range, back_range) && !overlaps(affected_range, back_range) && is_after(affected_range, back_range)) { - auto back_overlap_key = rewrite_partial_segment( + maybe_intersect_after_fut.emplace_back(async_rewrite_partial_segment( affected_slice_and_key, back_range, version_id, AffectedSegmentPart::END, store - ); - if (back_overlap_key) - intersect_after.push_back(*back_overlap_key); + )); } } - return std::make_pair(std::move(intersect_before), std::move(intersect_after)); + return collect( + collect(maybe_intersect_before_fut).via(&async::io_executor()).thenValueInline(filter_existing_slices), + collect(maybe_intersect_after_fut).via(&async::io_executor()).thenValueInline(filter_existing_slices) + ).via(&async::io_executor()); } } // namespace @@ -263,7 +283,7 @@ VersionedItem delete_range_impl( std::end(affected_keys), std::back_inserter(unaffected_keys)); - auto [intersect_before, intersect_after] = intersecting_segments(affected_keys, index_range, index_range, update_info.next_version_id_, store); + auto [intersect_before, intersect_after] = async_intersecting_segments(affected_keys, index_range, index_range, update_info.next_version_id_, store).get(); auto orig_filter_range = std::holds_alternative(query.row_filter) ? get_query_index_range(index, index_range) : query.row_filter; @@ -286,7 +306,7 @@ VersionedItem delete_range_impl( return versioned_item; } -void check_update_data_is_sorted(InputTensorFrame& frame, const index::IndexSegmentReader& index_segment_reader){ +void check_update_data_is_sorted(const InputTensorFrame& frame, const index::IndexSegmentReader& index_segment_reader){ bool is_time_series = std::holds_alternative(frame.index); sorting::check( is_time_series, @@ -304,92 +324,185 @@ void check_update_data_is_sorted(InputTensorFrame& frame, const index::IndexSegm "When calling update, the existing data must be sorted."); } -VersionedItem update_impl( - const std::shared_ptr& store, +struct UpdateRanges { + IndexRange front; + IndexRange back; + IndexRange original_index_range; +}; + +static UpdateRanges compute_update_ranges(const FilterRange& row_filter, const InputTensorFrame& update_frame, std::span update_slice_and_keys) { + return util::variant_match(row_filter, + [&](std::monostate) -> UpdateRanges { + util::check(std::holds_alternative(update_frame.index), "Update with row count index is not permitted"); + if (update_slice_and_keys.empty()) { + // If there are no new keys, then we can't intersect with the existing data. + return UpdateRanges{{}, {}, update_frame.index_range}; + } + IndexRange back_range = update_slice_and_keys.back().key().index_range(); + back_range.adjust_open_closed_interval(); + return UpdateRanges{ + update_slice_and_keys.front().key().index_range(), + std::move(back_range), + update_frame.index_range}; + }, + [&](const IndexRange& idx_range) { + return UpdateRanges{idx_range, idx_range, idx_range}; + }, + [](const RowRange&) -> UpdateRanges { + util::raise_rte("Unexpected row_range in update query"); + return {}; + } + ); +} + +static void check_can_update( + const InputTensorFrame& frame, + const index::IndexSegmentReader& index_segment_reader, const UpdateInfo& update_info, - const UpdateQuery& query, - const std::shared_ptr& frame, - const WriteOptions&& options, bool dynamic_schema, - bool empty_types) { + bool empty_types +) { util::check(update_info.previous_index_key_.has_value(), "Cannot update as there is no previous index key to update into"); - const StreamId stream_id = frame->desc.id(); - ARCTICDB_DEBUG(log::version(), "Update versioned dataframe for stream_id: {} , version_id = {}", stream_id, update_info.previous_index_key_->version_id()); - auto index_segment_reader = index::get_index_reader(*(update_info.previous_index_key_), store); util::check_rte(!index_segment_reader.is_pickled(), "Cannot update pickled data"); - auto index_desc = check_index_match(frame->index, index_segment_reader.tsd().index()); - util::check( - index_desc.type() == IndexDescriptor::Type::TIMESTAMP || index_desc.type() == IndexDescriptor::Type::EMPTY, - "Update not supported for non-timeseries indexes" - ); - check_update_data_is_sorted(*frame, index_segment_reader); - bool bucketize_dynamic = index_segment_reader.bucketize_dynamic(); - (void)check_and_mark_slices(index_segment_reader, dynamic_schema, false, std::nullopt, bucketize_dynamic); - fix_descriptor_mismatch_or_throw(UPDATE, dynamic_schema, index_segment_reader, *frame, empty_types); - - std::vector> queries = - build_update_query_filters(query.row_filter, frame->index, frame->index_range, dynamic_schema, index_segment_reader.bucketize_dynamic()); - auto combined = combine_filter_functions(queries); - auto affected_keys = filter_index(index_segment_reader, std::move(combined)); - std::vector unaffected_keys; - std::set_difference(std::begin(index_segment_reader), - std::end(index_segment_reader), - std::begin(affected_keys), - std::end(affected_keys), - std::back_inserter(unaffected_keys)); - - util::check(affected_keys.size() + unaffected_keys.size() == index_segment_reader.size(), "Unaffected vs affected keys split was inconsistent {} + {} != {}", - affected_keys.size(), unaffected_keys.size(), index_segment_reader.size()); + const auto index_desc = check_index_match(frame.index, index_segment_reader.tsd().index()); + util::check(index::is_timeseries_index(index_desc), "Update not supported for non-timeseries indexes"); + check_update_data_is_sorted(frame, index_segment_reader); + (void)check_and_mark_slices(index_segment_reader, dynamic_schema, false, std::nullopt, index_segment_reader.bucketize_dynamic()); + fix_descriptor_mismatch_or_throw(UPDATE, dynamic_schema, index_segment_reader, frame, empty_types); +} - frame->set_bucketize_dynamic(bucketize_dynamic); - auto slicing_arg = get_slicing_policy(options, *frame); +static std::vector get_keys_affected_by_update( + const index::IndexSegmentReader& index_segment_reader, + const InputTensorFrame& frame, + const UpdateQuery& query, + bool dynamic_schema +) { + std::vector> queries = build_update_query_filters( + query.row_filter, + frame.index, + frame.index_range, + dynamic_schema, + index_segment_reader.bucketize_dynamic()); + return filter_index(index_segment_reader, combine_filter_functions(queries)); +} - auto new_slice_and_keys = slice_and_write(frame, slicing_arg, IndexPartialKey{stream_id, update_info.next_version_id_}, store).wait().value(); - std::sort(std::begin(new_slice_and_keys), std::end(new_slice_and_keys)); - - IndexRange orig_filter_range; - auto[intersect_before, intersect_after] = util::variant_match(query.row_filter, - [&](std::monostate) { - util::check(std::holds_alternative(frame->index), "Update with row count index is not permitted"); - orig_filter_range = frame->index_range; - if (new_slice_and_keys.empty()) { - // If there are no new keys, then we can't intersect with the existing data. - return std::make_pair(std::vector{}, std::vector{}); - } - auto front_range = new_slice_and_keys.begin()->key().index_range(); - auto back_range = new_slice_and_keys.rbegin()->key().index_range(); - back_range.adjust_open_closed_interval(); - return intersecting_segments(affected_keys, front_range, back_range, update_info.next_version_id_, store); - }, - [&](const IndexRange& idx_range) { - orig_filter_range = idx_range; - return intersecting_segments(affected_keys, idx_range, idx_range, update_info.next_version_id_, store); - }, - [](const RowRange&)-> std::pair, std::vector> { - util::raise_rte("Unexpected row_range in update query"); - } - ); +static std::vector get_keys_not_affected_by_update( + const index::IndexSegmentReader& index_segment_reader, + std::span affected_keys +) { + std::vector unaffected_keys; + std::set_difference(index_segment_reader.begin(), + index_segment_reader.end(), + affected_keys.begin(), + affected_keys.end(), + std::back_inserter(unaffected_keys)); + return unaffected_keys; +} - size_t row_count = 0; +static std::pair, size_t> get_slice_and_keys_for_update( + const UpdateRanges& update_ranges, + std::span unaffected_keys, + std::span affected_keys, + const IntersectingSegments& segments_intersecting_with_update_range, + std::vector&& new_slice_and_keys +) { const size_t new_keys_size = new_slice_and_keys.size(); + size_t row_count = 0; const std::array, 5> groups{ - strictly_before(orig_filter_range, unaffected_keys), - std::move(intersect_before), + strictly_before(update_ranges.original_index_range, unaffected_keys), + std::move(std::get<0>(segments_intersecting_with_update_range)), std::move(new_slice_and_keys), - std::move(intersect_after), - strictly_after(orig_filter_range, unaffected_keys)}; + std::move(std::get<1>(segments_intersecting_with_update_range)), + strictly_after(update_ranges.original_index_range, unaffected_keys)}; auto flattened_slice_and_keys = flatten_and_fix_rows(groups, row_count); - util::check(unaffected_keys.size() + new_keys_size + (affected_keys.size() * 2) >= flattened_slice_and_keys.size(), - "Output size mismatch: {} + {} + (2 * {}) < {}", - unaffected_keys.size(), new_keys_size, affected_keys.size(), flattened_slice_and_keys.size()); - + "Output size mismatch: {} + {} + (2 * {}) < {}", + unaffected_keys.size(), new_keys_size, affected_keys.size(), flattened_slice_and_keys.size()); std::sort(std::begin(flattened_slice_and_keys), std::end(flattened_slice_and_keys)); - auto tsd = index::get_merged_tsd(row_count, dynamic_schema, index_segment_reader.tsd(), frame); - auto version_key_fut = index::write_index(stream::index_type_from_descriptor(tsd.as_stream_descriptor()), std::move(tsd), std::move(flattened_slice_and_keys), IndexPartialKey{stream_id, update_info.next_version_id_}, store); - auto version_key = std::move(version_key_fut).get(); + return {flattened_slice_and_keys, row_count}; +} + +folly::Future async_update_impl( + const std::shared_ptr& store, + const UpdateInfo& update_info, + const UpdateQuery& query, + const std::shared_ptr& frame, + WriteOptions&& options, + bool dynamic_schema, + bool empty_types) { + return index::async_get_index_reader(*(update_info.previous_index_key_), store).thenValue([ + store, + update_info, + query, + frame, + options=std::move(options), + dynamic_schema, + empty_types + ](index::IndexSegmentReader&& index_segment_reader) { + check_can_update(*frame, index_segment_reader, update_info, dynamic_schema, empty_types); + ARCTICDB_DEBUG(log::version(), "Update versioned dataframe for stream_id: {} , version_id = {}", frame->desc.id(), update_info.previous_index_key_->version_id()); + frame->set_bucketize_dynamic(index_segment_reader.bucketize_dynamic()); + return slice_and_write(frame, get_slicing_policy(options, *frame), IndexPartialKey{frame->desc.id(), update_info.next_version_id_} , store + ).via(&async::cpu_executor()).thenValue([ + store, + update_info, + query, + frame, + dynamic_schema, + index_segment_reader=std::move(index_segment_reader) + ](std::vector&& new_slice_and_keys) mutable { + std::sort(std::begin(new_slice_and_keys), std::end(new_slice_and_keys)); + auto affected_keys = get_keys_affected_by_update(index_segment_reader, *frame, query, dynamic_schema); + auto unaffected_keys = get_keys_not_affected_by_update(index_segment_reader, affected_keys); + util::check( + affected_keys.size() + unaffected_keys.size() == index_segment_reader.size(), + "The sum of affected keys and unaffected keys must be equal to the total number of keys {} + {} != {}", + affected_keys.size(), unaffected_keys.size(), index_segment_reader.size()); + const UpdateRanges update_ranges = compute_update_ranges(query.row_filter, *frame, new_slice_and_keys); + return async_intersecting_segments( + affected_keys, + update_ranges.front, + update_ranges.back, + update_info.next_version_id_, + store).thenValue([new_slice_and_keys=std::move(new_slice_and_keys), + update_ranges=update_ranges, + unaffected_keys=std::move(unaffected_keys), + affected_keys=std::move(affected_keys), + index_segment_reader=std::move(index_segment_reader), + frame, + dynamic_schema, + update_info, + store](IntersectingSegments&& intersecting_segments) mutable { + auto [flattened_slice_and_keys, row_count] = get_slice_and_keys_for_update( + update_ranges, + unaffected_keys, + affected_keys, + intersecting_segments, + std::move(new_slice_and_keys)); + auto tsd = index::get_merged_tsd(row_count, dynamic_schema, index_segment_reader.tsd(), frame); + return index::write_index( + index_type_from_descriptor(tsd.as_stream_descriptor()), + std::move(tsd), + std::move(flattened_slice_and_keys), + IndexPartialKey{frame->desc.id(), update_info.next_version_id_}, + store + ); + }); + }); + }); +} + +VersionedItem update_impl( + const std::shared_ptr& store, + const UpdateInfo& update_info, + const UpdateQuery& query, + const std::shared_ptr& frame, + WriteOptions&& options, + bool dynamic_schema, + bool empty_types) { + auto version_key = async_update_impl(store, update_info, query, frame, std::move(options), dynamic_schema, empty_types).get(); auto versioned_item = VersionedItem(to_atom(std::move(version_key))); - ARCTICDB_DEBUG(log::version(), "updated stream_id: {} , version_id: {}", stream_id, update_info.next_version_id_); + ARCTICDB_DEBUG(log::version(), "updated stream_id: {} , version_id: {}", frame->desc.id(), update_info.next_version_id_); return versioned_item; } diff --git a/cpp/arcticdb/version/version_core.hpp b/cpp/arcticdb/version/version_core.hpp index a60e80ddbf..7a26c69946 100644 --- a/cpp/arcticdb/version/version_core.hpp +++ b/cpp/arcticdb/version/version_core.hpp @@ -93,7 +93,7 @@ VersionedItem update_impl( const UpdateInfo& update_info, const UpdateQuery & query, const std::shared_ptr& frame, - const WriteOptions&& options, + WriteOptions&& options, bool dynamic_schema, bool empty_types); From ff4acaab7b20dd3b487253f39fe16833a026f6eb Mon Sep 17 00:00:00 2001 From: Vasil Pashov Date: Tue, 7 Jan 2025 12:53:54 +0200 Subject: [PATCH 2/2] Upload additional logs --- .github/workflows/static_analysis.yml | 8 ++++++++ cpp/third_party/pybind11 | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/.github/workflows/static_analysis.yml b/.github/workflows/static_analysis.yml index 7c439946fc..557128c441 100644 --- a/.github/workflows/static_analysis.yml +++ b/.github/workflows/static_analysis.yml @@ -71,3 +71,11 @@ polaris_reports_sarif_groupSCAIssues: true polaris_upload_sarif_report: true polaris_prComment_severities: "high,critical,medium,low" + + - name: Upload .bridge artifact + uses: actions/upload-artifact@v4 + with: + name: Bridge Logs + path: ${{github.workspace}}/.bridge + if-no-files-found: error + diff --git a/cpp/third_party/pybind11 b/cpp/third_party/pybind11 index 80dc998efc..a2e59f0e70 160000 --- a/cpp/third_party/pybind11 +++ b/cpp/third_party/pybind11 @@ -1 +1 @@ -Subproject commit 80dc998efced8ceb2be59756668a7e90e8bef917 +Subproject commit a2e59f0e7065404b44dfe92a28aca47ba1378dc4