Skip to content

Commit

Permalink
Merge branch 'master' into test_strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
grusev authored Jan 7, 2025
2 parents 0f8b177 + ff4acaa commit 1038b6c
Show file tree
Hide file tree
Showing 17 changed files with 385 additions and 293 deletions.
8 changes: 8 additions & 0 deletions .github/workflows/static_analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,11 @@
polaris_reports_sarif_groupSCAIssues: true
polaris_upload_sarif_report: true
polaris_prComment_severities: "high,critical,medium,low"

- name: Upload .bridge artifact
uses: actions/upload-artifact@v4
with:
name: Bridge Logs
path: ${{github.workspace}}/.bridge
if-no-files-found: error

6 changes: 6 additions & 0 deletions cpp/arcticdb/pipeline/index_segment_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ IndexSegmentReader get_index_reader(const AtomKey &prev_index, const std::shared
return index::IndexSegmentReader{std::move(seg)};
}

folly::Future<IndexSegmentReader> async_get_index_reader(const AtomKey &prev_index, const std::shared_ptr<Store> &store) {
return store->read(prev_index).thenValueInline([](std::pair<VariantKey, SegmentInMemory>&& key_seg) {
return IndexSegmentReader{std::move(key_seg.second)};
});
}

IndexSegmentReader::IndexSegmentReader(SegmentInMemory&& s) :
seg_(std::move(s)) {
}
Expand Down
10 changes: 5 additions & 5 deletions cpp/arcticdb/pipeline/index_segment_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,9 @@
#pragma once

#include <arcticdb/column_store/memory_segment.hpp>
#include <arcticdb/entity/protobufs.hpp>
#include <arcticdb/pipeline/frame_slice.hpp>
#include <arcticdb/pipeline/index_fields.hpp>

#include <boost/noncopyable.hpp>

#include <cstdint>
#include <folly/futures/Future.h>

namespace arcticdb {
class Store;
Expand Down Expand Up @@ -135,6 +131,10 @@ index::IndexSegmentReader get_index_reader(
const AtomKey &prev_index,
const std::shared_ptr<Store> &store);

folly::Future<IndexSegmentReader> async_get_index_reader(
const AtomKey &prev_index,
const std::shared_ptr<Store> &store);

IndexRange get_index_segment_range(
const AtomKey &prev_index,
const std::shared_ptr<Store> &store);
Expand Down
4 changes: 4 additions & 0 deletions cpp/arcticdb/pipeline/index_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,8 @@ TimeseriesDescriptor get_merged_tsd(
);
}

bool is_timeseries_index(const IndexDescriptorImpl& index_desc) {
return index_desc.type() == IndexDescriptor::Type::TIMESTAMP || index_desc.type() == IndexDescriptor::Type::EMPTY;
}

} //namespace arcticdb::pipelines::index
2 changes: 2 additions & 0 deletions cpp/arcticdb/pipeline/index_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,6 @@ TimeseriesDescriptor get_merged_tsd(
const TimeseriesDescriptor& existing_tsd,
const std::shared_ptr<pipelines::InputTensorFrame>& new_frame);

[[nodiscard]] bool is_timeseries_index(const IndexDescriptorImpl& index_desc);

} //namespace arcticdb::pipelines::index
41 changes: 18 additions & 23 deletions cpp/arcticdb/pipeline/query.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,10 @@
#include <arcticdb/util/bitset.hpp>
#include <arcticdb/entity/index_range.hpp>
#include <arcticdb/processing/expression_context.hpp>
#include <arcticdb/entity/versioned_item.hpp>
#include <arcticdb/pipeline/python_output_frame.hpp>
#include <arcticdb/pipeline/write_frame.hpp>
#include <arcticdb/pipeline/frame_slice.hpp>
#include <arcticdb/util/constructors.hpp>
#include <arcticdb/util/variant.hpp>
#include <arcticdb/pipeline/index_segment_reader.hpp>
#include <arcticdb/pipeline/input_tensor_frame.hpp>
#include <arcticdb/pipeline/read_options.hpp>
#include <arcticdb/stream/stream_utils.hpp>
#include <arcticdb/processing/clause.hpp>
#include <arcticdb/util/simple_string_hash.hpp>
Expand All @@ -28,9 +23,13 @@
#include <vector>
#include <string>
#include <variant>
#include <ranges>
#include <span>

namespace arcticdb::pipelines {

namespace ranges = std::ranges;

using FilterRange = std::variant<std::monostate, IndexRange, RowRange>;

/*
Expand Down Expand Up @@ -405,41 +404,37 @@ inline FilterRange get_query_index_range(
return RowRange{std::get<NumericIndex>(index_range.start_), std::get<NumericIndex>(index_range.end_)};
}

inline std::vector<SliceAndKey> strictly_before(const FilterRange &range, const std::vector<SliceAndKey> &input) {
inline std::vector<SliceAndKey> strictly_before(const FilterRange &range, std::span<const SliceAndKey> input) {
std::vector<SliceAndKey> output;
util::variant_match(range,
[&](const RowRange &row_range) {
std::copy_if(std::begin(input), std::end(input), std::back_inserter(output),
[&](const auto &sk) {
return sk.slice_.row_range.second < row_range.first;
});
ranges::copy_if(input, std::back_inserter(output), [&](const auto &sk) {
return sk.slice_.row_range.second < row_range.first;
});
},
[&](const IndexRange &index_range) {
std::copy_if(std::begin(input), std::end(input), std::back_inserter(output),
[&](const auto &sk) {
return sk.key().index_range().end_ < index_range.start_;
});
ranges::copy_if(input, std::back_inserter(output), [&](const auto &sk) {
return sk.key().index_range().end_ < index_range.start_;
});
},
[&](const auto &) {
util::raise_rte("Expected specified range ");
});
return output;
}

inline std::vector<SliceAndKey> strictly_after(const FilterRange &range, const std::vector<SliceAndKey> &input) {
inline std::vector<SliceAndKey> strictly_after(const FilterRange &range, std::span<const SliceAndKey> input) {
std::vector<SliceAndKey> output;
util::variant_match(range,
[&input, &output](const RowRange &row_range) {
std::copy_if(std::begin(input), std::end(input), std::back_inserter(output),
[&](const auto &sk) {
return sk.slice_.row_range.first > row_range.second;
});
ranges::copy_if(input, std::back_inserter(output), [&](const auto &sk) {
return sk.slice_.row_range.first > row_range.second;
});
},
[&input, &output](const IndexRange &index_range) {
std::copy_if(std::begin(input), std::end(input), std::back_inserter(output),
[&](const auto &sk) {
return sk.key().index_range().start_ > index_range.end_;
});
ranges::copy_if(input, std::back_inserter(output), [&](const auto &sk) {
return sk.key().index_range().start_ > index_range.end_;
});
},
[](const auto &) {
util::raise_rte("Expected specified range ");
Expand Down
33 changes: 10 additions & 23 deletions cpp/arcticdb/pipeline/read_pipeline.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,9 @@
#pragma once

#include <variant>

#include <folly/futures/Future.h>
#include <boost/noncopyable.hpp>

#include <arcticdb/entity/types.hpp>
#include <arcticdb/stream/index.hpp>
#include <arcticdb/entity/protobufs.hpp>
#include <pybind11/pybind11.h>

#include <arcticdb/stream/stream_sink.hpp>
#include <arcticdb/stream/stream_source.hpp>
#include <arcticdb/entity/native_tensor.hpp>
#include <arcticdb/entity/performance_tracing.hpp>
#include <arcticdb/entity/atom_key.hpp>
#include <arcticdb/util/bitset.hpp>
#include <arcticdb/util/constructors.hpp>
#include <folly/executors/FutureExecutor.h>
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <arcticdb/pipeline/frame_slice.hpp>
#include <arcticdb/pipeline/python_output_frame.hpp>
#include <arcticdb/pipeline/query.hpp>
Expand Down Expand Up @@ -61,22 +46,24 @@ void foreach_active_bit(const util::BitSet &bs, C &&visitor) {
}
}

template<typename ContainerType>
std::vector<SliceAndKey> filter_index(const ContainerType &container, std::optional<CombinedQuery<ContainerType>> &&query) {
inline std::vector<SliceAndKey> filter_index(
const index::IndexSegmentReader& index_segment_reader,
std::optional<CombinedQuery<index::IndexSegmentReader>> &&query
) {
ARCTICDB_SAMPLE_DEFAULT(FilterIndex)
std::vector<SliceAndKey> output{};
if (container.size()> 0) {
if (!index_segment_reader.empty()) {
if(query) {
auto row_bitset = (*query)(container);
auto row_bitset = (*query)(index_segment_reader);
ARCTICDB_DEBUG(log::version(), "Row bitset has {} bits set of {}", row_bitset->count(), row_bitset->size());
output.reserve(row_bitset->count());
foreach_active_bit(*row_bitset, [&](auto r) {
output.emplace_back(get_row(container, r));
output.emplace_back(get_row(index_segment_reader, r));
});
} else {
output.reserve(container.size());
for(auto i = 0u; i < container.size(); ++i) {
output.emplace_back(get_row(container, i));
output.reserve(index_segment_reader.size());
for(auto i = 0u; i < index_segment_reader.size(); ++i) {
output.emplace_back(get_row(index_segment_reader, i));
}
}
}
Expand Down
70 changes: 38 additions & 32 deletions cpp/arcticdb/pipeline/write_frame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,21 @@
#include <arcticdb/stream/aggregator.hpp>
#include <arcticdb/entity/protobufs.hpp>
#include <arcticdb/util/variant.hpp>
#include <arcticdb/python/python_types.hpp>
#include <arcticdb/pipeline/frame_utils.hpp>
#include <arcticdb/pipeline/write_frame.hpp>
#include <arcticdb/stream/append_map.hpp>
#include <arcticdb/async/task_scheduler.hpp>
#include <arcticdb/util/format_date.hpp>
#include <vector>
#include <array>
#include <ranges>


namespace arcticdb::pipelines {

using namespace arcticdb::entity;
using namespace arcticdb::stream;
namespace ranges = std::ranges;

WriteToSegmentTask::WriteToSegmentTask(
std::shared_ptr<InputTensorFrame> frame,
Expand Down Expand Up @@ -252,40 +253,46 @@ static RowRange partial_rewrite_row_range(
}
}

std::optional<SliceAndKey> rewrite_partial_segment(
folly::Future<std::optional<SliceAndKey>> async_rewrite_partial_segment(
const SliceAndKey& existing,
const IndexRange& index_range,
VersionId version_id,
AffectedSegmentPart affected_part,
const std::shared_ptr<Store>& store) {
const auto& key = existing.key();
auto kv = store->read(key).get();
const SegmentInMemory& segment = kv.second;
const RowRange affected_row_range = partial_rewrite_row_range(segment, index_range, affected_part);
const int64_t num_rows = affected_row_range.end() - affected_row_range.start();
if (num_rows <= 0) {
return std::nullopt;
}
SegmentInMemory output = segment.truncate(affected_row_range.start(), affected_row_range.end(), true);
const IndexValue start_ts = TimeseriesIndex::start_value_for_segment(output);
// +1 as in the key we store one nanosecond greater than the last index value in the segment
const IndexValue end_ts = std::get<NumericIndex>(TimeseriesIndex::end_value_for_segment(output)) + 1;
FrameSlice new_slice{
std::make_shared<StreamDescriptor>(output.descriptor()),
existing.slice_.col_range,
RowRange{0, num_rows},
existing.slice_.hash_bucket(),
existing.slice_.num_buckets()};

auto fut_key = store->write(
key.type(),
return store->read(existing.key()).thenValueInline([
existing,
index_range,
version_id,
key.id(),
start_ts,
end_ts,
std::move(output)
);
return SliceAndKey{std::move(new_slice), std::get<AtomKey>(std::move(fut_key).get())};
affected_part,
store](std::pair<VariantKey, SegmentInMemory>&& key_segment) -> folly::Future<std::optional<SliceAndKey>> {
const auto& key = existing.key();
const SegmentInMemory& segment = key_segment.second;
const RowRange affected_row_range = partial_rewrite_row_range(segment, index_range, affected_part);
const int64_t num_rows = affected_row_range.end() - affected_row_range.start();
if (num_rows <= 0) {
return std::nullopt;
}
SegmentInMemory output = segment.truncate(affected_row_range.start(), affected_row_range.end(), true);
const IndexValue start_ts = TimeseriesIndex::start_value_for_segment(output);
// +1 as in the key we store one nanosecond greater than the last index value in the segment
const IndexValue end_ts = std::get<NumericIndex>(TimeseriesIndex::end_value_for_segment(output)) + 1;
FrameSlice new_slice{
std::make_shared<StreamDescriptor>(output.descriptor()),
existing.slice_.col_range,
RowRange{0, num_rows},
existing.slice_.hash_bucket(),
existing.slice_.num_buckets()};
return store->write(
key.type(),
version_id,
key.id(),
start_ts,
end_ts,
std::move(output)
).thenValueInline([new_slice=std::move(new_slice)](VariantKey&& k) {
return std::make_optional<SliceAndKey>(std::move(new_slice), std::get<AtomKey>(std::move(k)));
});
});
}

std::vector<SliceAndKey> flatten_and_fix_rows(const std::array<std::vector<SliceAndKey>, 5>& groups, size_t& global_count) {
Expand All @@ -301,10 +308,9 @@ std::vector<SliceAndKey> flatten_and_fix_rows(const std::array<std::vector<Slice
return std::max(a, sk.slice_.row_range.second);
});

std::transform(std::begin(group), std::end(group), std::back_inserter(output), [&](SliceAndKey sk) {
ranges::transform(group, std::back_inserter(output), [&](SliceAndKey sk) {
auto range_start = global_count + (sk.slice_.row_range.first - group_start);
auto new_range = RowRange{range_start, range_start + (sk.slice_.row_range.diff())};
sk.slice_.row_range = new_range;
sk.slice_.row_range = RowRange{range_start, range_start + sk.slice_.row_range.diff()};
return sk;
});

Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/pipeline/write_frame.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ enum class AffectedSegmentPart {
END
};

std::optional<SliceAndKey> rewrite_partial_segment(
folly::Future<std::optional<SliceAndKey>> async_rewrite_partial_segment(
const SliceAndKey& existing,
const IndexRange& index_range,
VersionId version_id,
Expand Down
4 changes: 1 addition & 3 deletions cpp/arcticdb/processing/test/benchmark_clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ using namespace arcticdb;

SegmentInMemory get_segment_for_merge(const StreamId &id, size_t num_rows, size_t start, size_t step){
auto segment = SegmentInMemory{
get_test_descriptor<stream::TimeseriesIndex>(id, {
scalar_field(DataType::UINT8, "column")
}),
get_test_descriptor<stream::TimeseriesIndex>(id, std::array{scalar_field(DataType::UINT8, "column")}),
num_rows
};
auto& index_col = segment.column(0);
Expand Down
1 change: 0 additions & 1 deletion cpp/arcticdb/stream/stream_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#pragma once

#include <arcticdb/column_store/memory_segment.hpp>
#include <arcticdb/codec/segment.hpp>
#include <arcticdb/storage/storage.hpp>
#include <arcticdb/storage/storage_options.hpp>
#include <arcticdb/async/batch_read_args.hpp>
Expand Down
Loading

0 comments on commit 1038b6c

Please sign in to comment.