Skip to content

Commit

Permalink
Refactor storages to async
Browse files Browse the repository at this point in the history
  • Loading branch information
willdealtry committed Jan 6, 2025
1 parent 7cdcebc commit 0486da9
Show file tree
Hide file tree
Showing 87 changed files with 2,440 additions and 1,852 deletions.
12 changes: 12 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ option(ARCTICDB_USING_ADDRESS_SANITIZER "Enable address sanitizer." OFF)
option(ARCTICDB_USING_THREAD_SANITIZER "Enable thread sanitizer." OFF)
option(ARCTICDB_USING_UB_SANITIZER "Enable undefined behavior sanitizer." OFF)
option(ARCTICDB_LOG_PERFORMANCE "Whether to log performance timings." OFF)
option(ARCTICDB_COUNT_ALLOCATION "Override new and delete to count allocations." OFF)

set(ARCTICDB_SANITIZER_FLAGS)
if(${ARCTICDB_USING_ADDRESS_SANITIZER})
Expand All @@ -47,6 +48,17 @@ if(${ARCTICDB_LOG_PERFORMANCE})
add_compile_definitions(ARCTICDB_LOG_PERFORMANCE)
endif()

if(${ARCTICDB_COUNT_ALLOCATIONS})
add_compile_definitions(ARCTICDB_COUNT_ALLOCATIONS)
include(FetchContent)
FetchContent_Declare(
cpptrace
GIT_REPOSITORY https://github.com/jeremy-rifkin/cpptrace.git
GIT_TAG v0.7.3 # <HASH or TAG>
)
FetchContent_MakeAvailable(cpptrace)
endif()

if(ARCTICDB_SANITIZER_FLAGS)
list(APPEND ARCTICDB_SANITIZER_FLAGS "-g;-fno-omit-frame-pointer;-fno-optimize-sibling-calls;")
if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
Expand Down
43 changes: 26 additions & 17 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ set(arcticdb_srcs
processing/query_planner.hpp
processing/sorted_aggregation.hpp
processing/unsorted_aggregation.hpp
storage/async_storage.hpp
storage/constants.hpp
storage/common.hpp
storage/config_resolvers.hpp
Expand All @@ -277,30 +278,30 @@ set(arcticdb_srcs
storage/library.hpp
storage/library_index.hpp
storage/library_manager.hpp
storage/storage_mock_client.hpp
storage/azure/azure_client_wrapper.hpp
storage/azure/azure_mock_client.hpp
storage/azure/azure_real_client.hpp
storage/mock/storage_mock_client.hpp
storage/azure/azure_client_interface.hpp
storage/mock/azure_mock_client.hpp
storage/azure/azure_client_impl.hpp
storage/azure/azure_storage.hpp
storage/lmdb/lmdb.hpp
storage/lmdb/lmdb_client_wrapper.hpp
storage/lmdb/lmdb_mock_client.hpp
storage/lmdb/lmdb_real_client.hpp
storage/lmdb/lmdb_client_interface.hpp
storage/mock/lmdb_mock_client.hpp
storage/lmdb/lmdb_client_impl.hpp
storage/lmdb/lmdb_storage.hpp
storage/memory/memory_storage.hpp
storage/memory/memory_storage.cpp
storage/mongo/mongo_client.hpp
storage/mongo/mongo_instance.hpp
storage/mongo/mongo_client_wrapper.hpp
storage/mongo/mongo_mock_client.hpp
storage/mongo/mongo_client_interface.hpp
storage/mock/mongo_mock_client.hpp
storage/mongo/mongo_storage.hpp
storage/object_store_utils.hpp
storage/file/file_store.hpp
storage/file/mapped_file_storage.hpp
storage/file/file_store.hpp
storage/single_file_storage.hpp
storage/s3/nfs_backed_storage.hpp
storage/s3/s3_client_wrapper.hpp
storage/s3/s3_client_interface.hpp
storage/s3/s3_storage_tool.hpp
storage/s3/s3_settings.hpp
storage/storage_factory.hpp
Expand Down Expand Up @@ -331,6 +332,7 @@ set(arcticdb_srcs
toolbox/library_tool.hpp
toolbox/storage_mover.hpp
util/allocator.hpp
util/allocation_tracing.hpp
util/bitset.hpp
util/buffer.hpp
util/buffer_pool.hpp
Expand Down Expand Up @@ -470,21 +472,21 @@ set(arcticdb_srcs
storage/failure_simulation.cpp
storage/library_manager.cpp
storage/azure/azure_storage.cpp
storage/azure/azure_real_client.cpp
storage/azure/azure_mock_client.cpp
storage/lmdb/lmdb_mock_client.cpp
storage/lmdb/lmdb_real_client.cpp
storage/azure/azure_client_impl.cpp
storage/mock/azure_mock_client.cpp
storage/mock/lmdb_mock_client.cpp
storage/lmdb/lmdb_client_impl.cpp
storage/lmdb/lmdb_storage.cpp
storage/file/mapped_file_storage.cpp
storage/mongo/mongo_client.cpp
storage/mongo/mongo_instance.cpp
storage/mongo/mongo_mock_client.cpp
storage/mock/mongo_mock_client.cpp
storage/mongo/mongo_storage.cpp
storage/s3/nfs_backed_storage.cpp
storage/s3/ec2_utils.cpp
storage/s3/s3_api.cpp
storage/s3/s3_real_client.cpp
storage/s3/s3_mock_client.cpp
storage/s3/s3_client_impl.cpp
storage/mock/s3_mock_client.cpp
storage/s3/s3_storage.cpp
storage/s3/s3_storage_tool.cpp
storage/storage_factory.cpp
Expand All @@ -497,6 +499,7 @@ set(arcticdb_srcs
toolbox/library_tool.cpp
util/allocator.cpp
util/buffer_holder.cpp
util/allocation_tracing.cpp
util/buffer_pool.cpp
util/configs_map.cpp
util/decimal.cpp
Expand Down Expand Up @@ -675,6 +678,12 @@ set (arcticdb_core_libraries
Azure::azure-storage-blobs
)

if(${ARCTICDB_COUNT_ALLOCATIONS})
list(APPEND arcticdb_core_libraries
cpptrace::cpptrace
)
endif()

if(${ARCTICDB_PYTHON_EXPLICIT_LINK})
# Even though python is transitive dependency MSVS builds fail if we don't link against python explicitly
# TODO: Figure out why
Expand Down
59 changes: 25 additions & 34 deletions cpp/arcticdb/async/async_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ std::pair<VariantKey, std::optional<Segment>> lookup_match_in_dedup_map(
template <typename Callable>
auto read_and_continue(const VariantKey& key, std::shared_ptr<storage::Library> library, const storage::ReadKeyOpts& opts, Callable&& c) {
return async::submit_io_task(ReadCompressedTask{key, library, opts, std::forward<decltype(c)>(c)})
.via(&async::cpu_executor())
.thenValue([](auto &&result) mutable {
auto&& [key_seg, continuation] = std::forward<decltype(result)>(result);
return continuation(std::move(key_seg));
.thenValueInline([](auto &&result) mutable {
auto&& [key_seg_fut, continuation] = std::forward<decltype(result)>(result);
return std::move(key_seg_fut).thenValueInline([continuation=std::move(continuation)] (storage::KeySegmentPair&& key_seg) mutable { return continuation(std::move(key_seg)); });
}
);
}
Expand Down Expand Up @@ -67,17 +66,13 @@ class AsyncStore : public Store {
IndexValue end_index,
SegmentInMemory &&segment) override {

util::check(segment.descriptor().id() == stream_id,
"Descriptor id mismatch in atom key {} != {}",
stream_id,
segment.descriptor().id());
util::check(segment.descriptor().id() == stream_id, "Descriptor id mismatch in atom key {} != {}", stream_id, segment.descriptor().id());

return async::submit_cpu_task(EncodeAtomTask{
key_type, version_id, stream_id, start_index, end_index, current_timestamp(),
std::move(segment), codec_, encoding_version_
})
.via(&async::io_executor())
.thenValue(WriteSegmentTask{library_});
}).via(&async::io_executor())
.thenValue(WriteSegmentTask{library_});
}

folly::Future<entity::VariantKey> write(
Expand All @@ -89,10 +84,7 @@ folly::Future<entity::VariantKey> write(
IndexValue end_index,
SegmentInMemory &&segment) override {

util::check(segment.descriptor().id() == stream_id,
"Descriptor id mismatch in atom key {} != {}",
stream_id,
segment.descriptor().id());
util::check(segment.descriptor().id() == stream_id, "Descriptor id mismatch in atom key {} != {}", stream_id, segment.descriptor().id());

return async::submit_cpu_task(EncodeAtomTask{
key_type, version_id, stream_id, start_index, end_index, creation_ts,
Expand Down Expand Up @@ -169,7 +161,7 @@ folly::Future<folly::Unit> write_compressed(storage::KeySegmentPair ks) override
}

void write_compressed_sync(storage::KeySegmentPair ks) override {
library_->write(Composite<storage::KeySegmentPair>(std::move(ks)));
library_->write(std::move(ks));
}

folly::Future<entity::VariantKey> update(const entity::VariantKey &key,
Expand Down Expand Up @@ -226,9 +218,8 @@ folly::Future<std::pair<entity::VariantKey, SegmentInMemory>> read(
return read_and_continue(key, library_, opts, DecodeSegmentTask{});
}

std::pair<entity::VariantKey, SegmentInMemory> read_sync(const entity::VariantKey &key,
storage::ReadKeyOpts opts) override {
return DecodeSegmentTask{}(read_dispatch(key, library_, opts));
std::pair<entity::VariantKey, SegmentInMemory> read_sync(const entity::VariantKey& key, storage::ReadKeyOpts opts) override {
return DecodeSegmentTask{}(read_sync_dispatch(key, library_, opts));
}

folly::Future<storage::KeySegmentPair> read_compressed(
Expand All @@ -237,11 +228,8 @@ folly::Future<storage::KeySegmentPair> read_compressed(
return read_and_continue(key, library_, opts, PassThroughTask{});
}

storage::KeySegmentPair read_compressed_sync(
const entity::VariantKey& key,
storage::ReadKeyOpts opts
) override {
return read_dispatch( key, library_, opts );
storage::KeySegmentPair read_compressed_sync(const entity::VariantKey& key, storage::ReadKeyOpts opts) override {
return read_sync_dispatch(key, library_, opts);
}

folly::Future<std::pair<std::optional<VariantKey>, std::optional<google::protobuf::Any>>> read_metadata(const entity::VariantKey &key, storage::ReadKeyOpts opts) override {
Expand All @@ -256,7 +244,7 @@ folly::Future<std::tuple<VariantKey, std::optional<google::protobuf::Any>, Strea

folly::Future<std::pair<VariantKey, TimeseriesDescriptor>> read_timeseries_descriptor(
const entity::VariantKey &key,
storage::ReadKeyOpts opts = storage::ReadKeyOpts{}) override {
storage::ReadKeyOpts opts) override {
return read_and_continue(key, library_, opts, DecodeTimeseriesDescriptorTask{});
}

Expand Down Expand Up @@ -349,12 +337,17 @@ folly::Future<std::vector<VariantKey>> batch_read_compressed(
std::vector<folly::Future<pipelines::SegmentAndSlice>> batch_read_uncompressed(
std::vector<pipelines::RangesAndKey>&& ranges_and_keys,
std::shared_ptr<std::unordered_set<std::string>> columns_to_decode) override {
return folly::window(
std::move(ranges_and_keys),
[this, columns_to_decode](auto&& ranges_and_key) {
const auto key = ranges_and_key.key_;
return read_and_continue(key, library_, storage::ReadKeyOpts{}, DecodeSliceTask{std::move(ranges_and_key), columns_to_decode});
}, async::TaskScheduler::instance()->io_thread_count() * 2);
ARCTICDB_RUNTIME_DEBUG(log::version(), "Reading {} keys", ranges_and_keys.size());
std::vector<folly::Future<pipelines::SegmentAndSlice>> output;
for(auto&& ranges_and_key : ranges_and_keys) {
const auto key = ranges_and_key.key_;
output.emplace_back(read_and_continue(
key,
library_,
storage::ReadKeyOpts{},
DecodeSliceTask{std::move(ranges_and_key), columns_to_decode}));
}
return output;
}

std::vector<folly::Future<bool>> batch_key_exists(
Expand All @@ -367,7 +360,6 @@ std::vector<folly::Future<bool>> batch_key_exists(
return res;
}


folly::Future<SliceAndKey> async_write(
folly::Future<std::tuple<PartialKey, SegmentInMemory, pipelines::FrameSlice>> &&input_fut,
const std::shared_ptr<DeDupMap> &de_dup_map) override {
Expand All @@ -390,8 +382,7 @@ folly::Future<SliceAndKey> async_write(
.thenValue([lib=library_](auto &&item) {
auto [key_opt_segment, slice] = std::forward<decltype(item)>(item);
if (key_opt_segment.second)
lib->write(Composite<storage::KeySegmentPair>({VariantKey{key_opt_segment.first},
std::move(*key_opt_segment.second)}));
lib->write({VariantKey{key_opt_segment.first}, std::move(*key_opt_segment.second)});

return SliceAndKey{slice, to_atom(key_opt_segment.first)};
});
Expand Down
9 changes: 2 additions & 7 deletions cpp/arcticdb/async/task_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,6 @@ inline auto get_default_num_cpus([[maybe_unused]] const std::string& cgroup_fold
* 3/ Priority: How to assign priorities to task in order to treat the most pressing first.
* 4/ Throttling: (similar to priority) how to absorb work spikes and apply memory backpressure
*/

inline int64_t default_io_thread_count(uint64_t cpu_count) {
return std::min(int64_t(100L), static_cast<int64_t>(static_cast<double>(cpu_count) * 1.5));
}

class TaskScheduler {
public:
using CPUSchedulerType = folly::FutureExecutor<folly::CPUThreadPoolExecutor>;
Expand All @@ -199,7 +194,7 @@ class TaskScheduler {
auto task = std::forward<decltype(t)>(t);
static_assert(std::is_base_of_v<BaseTask, std::decay_t<Task>>, "Only supports Task derived from BaseTask");
ARCTICDB_DEBUG(log::schedule(), "{} Submitting CPU task {}: {} of {}", uintptr_t(this), typeid(task).name(), cpu_exec_.getTaskQueueSize(), cpu_exec_.kDefaultMaxQueueSize);
std::lock_guard lock{cpu_mutex_};
//std::lock_guard lock{cpu_mutex_};
return cpu_exec_.addFuture(std::move(task));
}

Expand All @@ -208,7 +203,7 @@ class TaskScheduler {
auto task = std::forward<decltype(t)>(t);
static_assert(std::is_base_of_v<BaseTask, std::decay_t<Task>>, "Only support Tasks derived from BaseTask");
ARCTICDB_DEBUG(log::schedule(), "{} Submitting IO task {}: {}", uintptr_t(this), typeid(task).name(), io_exec_.getPendingTaskCount());
std::lock_guard lock{io_mutex_};
//std::lock_guard lock{io_mutex_};
return io_exec_.addFuture(std::move(task));
}

Expand Down
19 changes: 0 additions & 19 deletions cpp/arcticdb/async/tasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,23 +50,4 @@ namespace arcticdb::async {
decode_into_memory_segment(seg, hdr, segment_in_memory, desc);
return pipelines::SegmentAndSlice(std::move(ranges_and_key_), std::move(segment_in_memory));
}

pipelines::SliceAndKey DecodeSlicesTask::decode_into_slice(std::pair<Segment, pipelines::SliceAndKey>&& sk_pair) const {
auto [seg, sk] = std::move(sk_pair);
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeAtomTask decoding segment with key {}",
variant_key_view(sk.key()));

auto &hdr = seg.header();
const auto& desc = seg.descriptor();
auto descriptor = async::get_filtered_descriptor(desc, filter_columns_);
sk.slice_.adjust_columns(descriptor.field_count() - descriptor.index().field_count());

ARCTICDB_TRACE(log::codec(), "Creating segment");
SegmentInMemory res(std::move(descriptor));

decode_into_memory_segment(seg, hdr, res, desc);
sk.set_segment(std::move(res));
return sk;
}

} //namespace arcticdb::async
Loading

0 comments on commit 0486da9

Please sign in to comment.