From c1a3460c7e0dba4e28c035c6071a9e594bf3122e Mon Sep 17 00:00:00 2001 From: Shinichi Umegane Date: Mon, 21 Oct 2024 19:06:38 +0900 Subject: [PATCH] Wip/log 0.7 (#57) feat(performance): optimize startup time by eliminating unnecessary sorting This commit improves startup performance by removing unnecessary sorting operations during system startup. Redundant file handling has been excluded, and the snapshot and compacted file processing logic has been optimized. --- include/limestone/api/cursor.h | 20 +- include/limestone/api/datastore.h | 6 +- include/limestone/api/log_channel.h | 2 +- include/limestone/api/snapshot.h | 19 +- src/limestone/cursor.cpp | 36 +- src/limestone/cursor_impl.cpp | 214 +++++++ src/limestone/cursor_impl.h | 68 +++ src/limestone/datastore.cpp | 28 +- src/limestone/datastore_snapshot.cpp | 156 +++-- src/limestone/file_operations.cpp | 5 + src/limestone/file_operations.h | 4 + src/limestone/snapshot.cpp | 18 +- src/limestone/snapshot_impl.cpp | 43 ++ src/limestone/snapshot_impl.h | 42 ++ src/limestone/sortdb_wrapper.h | 2 +- src/limestone/sorting_context.cpp | 52 ++ src/limestone/sorting_context.h | 55 ++ test/CMakeLists.txt | 1 + ...assemble_snapshot_input_filenames_test.cpp | 165 ++++++ ...ompaction_test.cpp => compaction_test.cpp} | 558 +++++++++++++++++- test/limestone/log/log_channel_test.cpp | 1 + test/limestone/snapshot/cursor_impl_test.cpp | 244 ++++++++ 22 files changed, 1583 insertions(+), 156 deletions(-) create mode 100644 src/limestone/cursor_impl.cpp create mode 100644 src/limestone/cursor_impl.h create mode 100644 src/limestone/snapshot_impl.cpp create mode 100644 src/limestone/snapshot_impl.h create mode 100644 src/limestone/sorting_context.cpp create mode 100644 src/limestone/sorting_context.h create mode 100644 test/limestone/compaction/assemble_snapshot_input_filenames_test.cpp rename test/limestone/compaction/{online_compaction_test.cpp => compaction_test.cpp} (62%) create mode 100644 test/limestone/snapshot/cursor_impl_test.cpp diff --git a/include/limestone/api/cursor.h b/include/limestone/api/cursor.h index 6cf430d6..58bc57bf 100644 --- a/include/limestone/api/cursor.h +++ b/include/limestone/api/cursor.h @@ -17,6 +17,7 @@ #include #include +#include #include #include @@ -24,11 +25,17 @@ #include #include + +namespace limestone::internal { + class cursor_impl; +} + namespace limestone::api { class log_entry; class snapshot; + /** * @brief a cursor to scan entries on the snapshot */ @@ -56,7 +63,7 @@ class cursor { * @brief returns the storage ID of the entry at the current cursor position * @return the storage ID of the current entry */ - storage_id_type storage() const noexcept; + [[nodiscard]] storage_id_type storage() const noexcept; /** * @brief returns the key byte string of the entry at the current cursor position @@ -77,13 +84,14 @@ class cursor { std::vector& large_objects() noexcept; private: - boost::filesystem::ifstream istrm_{}; - std::unique_ptr log_entry_; + std::unique_ptr pimpl; + std::vector large_objects_{}; - explicit cursor(const boost::filesystem::path& file); - - friend class snapshot; + explicit cursor(const boost::filesystem::path& snapshot_file); + explicit cursor(const boost::filesystem::path& snapshot_file, const boost::filesystem::path& compacted_file); + + friend class internal::cursor_impl; }; } // namespace limestone::api diff --git a/include/limestone/api/datastore.h b/include/limestone/api/datastore.h index 08d86ff9..7af35d2f 100644 --- a/include/limestone/api/datastore.h +++ b/include/limestone/api/datastore.h @@ -23,6 +23,7 @@ #include #include #include +#include #include @@ -309,7 +310,7 @@ class datastore { * @param from the location of log files * @attention this function is not thread-safe. */ - void create_snapshot(const std::set& file_names); + void create_snapshot(); epoch_id_type last_durable_epoch_in_dir(); @@ -324,7 +325,8 @@ class datastore { void rotate_epoch_file(); int64_t current_unix_epoch_in_millis(); - + + std::map clear_storage; }; } // namespace limestone::api diff --git a/include/limestone/api/log_channel.h b/include/limestone/api/log_channel.h index 3d48eb8f..1c05b32e 100644 --- a/include/limestone/api/log_channel.h +++ b/include/limestone/api/log_channel.h @@ -41,7 +41,6 @@ class rotation_result; * @details this object is not thread-safe, assuming each thread uses its own log_channel */ class log_channel { - friend class rotation_task; public: /** @@ -180,6 +179,7 @@ class log_channel { rotation_result do_rotate_file(epoch_id_type epoch = 0); friend class datastore; + friend class rotation_task; }; } // namespace limestone::api diff --git a/include/limestone/api/snapshot.h b/include/limestone/api/snapshot.h index 03088cd7..197ffb33 100644 --- a/include/limestone/api/snapshot.h +++ b/include/limestone/api/snapshot.h @@ -17,10 +17,15 @@ #include #include - +#include #include #include +#include + +namespace limestone::internal { + class snapshot_impl; +} namespace limestone::api { @@ -28,8 +33,14 @@ namespace limestone::api { * @brief a snapshot of the data at a point in time on the data store */ class snapshot { + public: snapshot() noexcept = delete; + snapshot(const snapshot&) = delete; + snapshot& operator=(const snapshot&) = delete; + snapshot(snapshot&&) noexcept = delete; + snapshot& operator=(snapshot&&) noexcept = delete; + ~snapshot(); /** * @brief directory name of a snapshot @@ -73,11 +84,9 @@ class snapshot { [[nodiscard]] std::unique_ptr scan(storage_id_type storage_id, std::string_view entry_key, bool inclusive) const noexcept; private: - boost::filesystem::path dir_{}; - - [[nodiscard]] boost::filesystem::path file_path() const noexcept; + std::unique_ptr pimpl; - explicit snapshot(const boost::filesystem::path& location) noexcept; + explicit snapshot(boost::filesystem::path location, std::map clear_storage) noexcept; friend class datastore; }; diff --git a/src/limestone/cursor.cpp b/src/limestone/cursor.cpp index 96d23160..5fc0d30f 100644 --- a/src/limestone/cursor.cpp +++ b/src/limestone/cursor.cpp @@ -20,43 +20,37 @@ #include "logging_helper.h" #include "limestone_exception_helper.h" #include "log_entry.h" +#include "cursor_impl.h" + namespace limestone::api { -cursor::cursor(const boost::filesystem::path& file) : log_entry_(std::make_unique()) { - istrm_.open(file, std::ios_base::in | std::ios_base::binary ); - if (!istrm_.good()) { - LOG_AND_THROW_EXCEPTION("file stream of the cursor is not good (" + file.string() + ")"); - } -} + +cursor::cursor(const boost::filesystem::path& snapshot_file) + : pimpl(std::make_unique(snapshot_file)) {} + +cursor::cursor(const boost::filesystem::path& snapshot_file, const boost::filesystem::path& compacted_file) + : pimpl(std::make_unique(snapshot_file, compacted_file)) {} + cursor::~cursor() noexcept { - istrm_.close(); + // TODO: handle close failure + pimpl->close(); } bool cursor::next() { - if (!istrm_.good()) { - DVLOG_LP(log_trace) << "file stream of the cursor is not good"; - return false; - } - if (istrm_.eof()) { - DVLOG_LP(log_trace) << "already detected eof of the cursor"; - return false; - } - auto rv = log_entry_->read(istrm_); - DVLOG_LP(log_trace) << (rv ? "read an entry from the cursor" : "detect eof of the cursor"); - return rv; + return pimpl->next(); } storage_id_type cursor::storage() const noexcept { - return log_entry_->storage(); + return pimpl->storage(); } void cursor::key(std::string& buf) const noexcept { - log_entry_->key(buf); + pimpl->key(buf); } void cursor::value(std::string& buf) const noexcept { - log_entry_->value(buf); + pimpl->value(buf); } std::vector& cursor::large_objects() noexcept { diff --git a/src/limestone/cursor_impl.cpp b/src/limestone/cursor_impl.cpp new file mode 100644 index 00000000..d5b533dd --- /dev/null +++ b/src/limestone/cursor_impl.cpp @@ -0,0 +1,214 @@ +/* + * Copyright 2022-2023 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include "cursor_impl.h" +#include +#include "limestone_exception_helper.h" + +namespace limestone::internal { + +using limestone::api::log_entry; +using limestone::api::write_version_type; + +std::unique_ptr cursor_impl::create_cursor(const boost::filesystem::path& snapshot_file, + const std::map& clear_storage) { + auto cursor_instance = std::unique_ptr(new cursor(snapshot_file)); + cursor_instance->pimpl->set_clear_storage(clear_storage); + return cursor_instance; +} + +std::unique_ptr cursor_impl::create_cursor(const boost::filesystem::path& snapshot_file, const boost::filesystem::path& compacted_file, + const std::map& clear_storage) { + auto cursor_instance = std::unique_ptr(new cursor(snapshot_file, compacted_file)); + cursor_instance->pimpl->set_clear_storage(clear_storage); + return cursor_instance; +} + +cursor_impl::cursor_impl(const boost::filesystem::path& snapshot_file) + : compacted_istrm_(std::nullopt) { + open(snapshot_file, snapshot_istrm_); +} + +cursor_impl::cursor_impl(const boost::filesystem::path& snapshot_file, const boost::filesystem::path& compacted_file) { + open(snapshot_file, snapshot_istrm_); + open(compacted_file, compacted_istrm_); +} + +void cursor_impl::open(const boost::filesystem::path& file, std::optional& stream) { + stream.emplace(file, std::ios_base::in | std::ios_base::binary); + if (!stream->is_open() || !stream->good()) { + LOG_AND_THROW_EXCEPTION("Failed to open file: " + file.string()); + } +} + +void cursor_impl::close() { + if (snapshot_istrm_) snapshot_istrm_->close(); + if (compacted_istrm_) compacted_istrm_->close(); +} + +void cursor_impl::validate_and_read_stream(std::optional& stream, const std::string& stream_name, + std::optional& log_entry, std::string& previous_key_sid) { + while (stream) { + // If the stream is not in good condition, close it and exit + if (!stream->good()) { + DVLOG_LP(log_trace) << stream_name << " stream is not good, closing it."; + stream->close(); + stream = std::nullopt; + return; + } + + // If the stream has reached EOF, close it and exit + if (stream->eof()) { + DVLOG_LP(log_trace) << stream_name << " stream reached EOF, closing it."; + stream->close(); + stream = std::nullopt; + return; + } + + // If the entry is not yet read, read it + if (!log_entry) { + log_entry.emplace(); // Construct a new log_entry + if (!log_entry->read(*stream)) { + // If reading fails, close the stream and reset the log_entry + stream->close(); + stream = std::nullopt; + log_entry = std::nullopt; + return; + } + // Check if the key_sid is in ascending order + // TODO: Key order violation is detected here and the process is aborted. + // However, this check should be moved to an earlier point, and if the key order is invalid, + // a different processing method should be considered instead of aborting immediately. + if (!previous_key_sid.empty() && log_entry->key_sid() < previous_key_sid) { + LOG(ERROR) << "Key order violation in " << stream_name << ": current key_sid (" << log_entry->key_sid() + << ") is smaller than the previous key_sid (" << previous_key_sid << ")"; + THROW_LIMESTONE_EXCEPTION("Key order violation detected in " + stream_name); + } + + // Update the previous key_sid to the current one + previous_key_sid = log_entry->key_sid(); + } + + // Check the validity of the entry using the lambda function + if (is_relevant_entry(log_entry.value())) { + // If a valid entry is found, return + return; + } + + // If the entry is invalid, reset the log_entry and continue processing the stream + log_entry = std::nullopt; + } +} + +bool cursor_impl::is_relevant_entry(const limestone::api::log_entry& entry) { + // Step 1: Check if the entry is either normal_entry or remove_entry + if (entry.type() != limestone::api::log_entry::entry_type::normal_entry && + entry.type() != limestone::api::log_entry::entry_type::remove_entry) { + return false; // Skip this entry if it's not normal or remove entry + } + + // Step 2: Get the storage ID from log_entry + auto storage_id = entry.storage(); // Assuming storage() returns the storage ID + // Step 3: Check if clear_storage_ contains the same storage ID + auto it = clear_storage_.find(storage_id); + if (it != clear_storage_.end()) { + // Step 4: Retrieve the write_version from log_entry (only if the storage ID is found) + write_version_type wv; + entry.write_version(wv); + + // Step 5: Retrieve the write_version from clear_storage_ for the same storage ID + write_version_type range_ver = it->second; + + // Step 6: Compare the versions (only if the entry is normal_entry or remove_entry) + if (wv < range_ver) { + return false; // Skip this entry as it is outdated + } + } + + // If everything is valid, return true + return true; +} + +bool cursor_impl::next() { + while (true) { + // Read from the snapshot stream if the snapshot_log_entry_ is empty + if (!snapshot_log_entry_) { + validate_and_read_stream(snapshot_istrm_, "Snapshot", snapshot_log_entry_, previous_snapshot_key_sid); + } + + // Read from the compacted stream if the compacted_log_entry_ is empty + if (!compacted_log_entry_) { + validate_and_read_stream(compacted_istrm_, "Compacted", compacted_log_entry_, previous_compacted_key_sid); + } + + // Case 1: Both snapshot and compacted are empty, return false + if (!snapshot_log_entry_ && !compacted_log_entry_) { + DVLOG_LP(log_trace) << "Both snapshot and compacted streams are closed"; + return false; + } + + // Case 2: Either snapshot or compacted has a value, use the one that is not empty + if (snapshot_log_entry_ && !compacted_log_entry_) { + log_entry_ = std::move(snapshot_log_entry_.value()); + snapshot_log_entry_ = std::nullopt; + } else if (!snapshot_log_entry_ && compacted_log_entry_) { + log_entry_ = std::move(compacted_log_entry_.value()); + compacted_log_entry_ = std::nullopt; + } else { + // Case 3: Both snapshot and compacted have values + if (snapshot_log_entry_->key_sid() < compacted_log_entry_->key_sid()) { + log_entry_ = std::move(snapshot_log_entry_.value()); + snapshot_log_entry_ = std::nullopt; + } else if (snapshot_log_entry_->key_sid() > compacted_log_entry_->key_sid()) { + log_entry_ = std::move(compacted_log_entry_.value()); + compacted_log_entry_ = std::nullopt; + } else { + // If key_sid is equal, use snapshot_log_entry_, but reset both entries + log_entry_ = std::move(snapshot_log_entry_.value()); + snapshot_log_entry_ = std::nullopt; + compacted_log_entry_ = std::nullopt; + } + } + // Check if the current log_entry_ is a normal entry + if (log_entry_.type() == log_entry::entry_type::normal_entry) { + return true; + } + // If it's not a normal entry, continue the loop to skip it and read the next entry + } +} + + +limestone::api::storage_id_type cursor_impl::storage() const noexcept { + return log_entry_.storage(); +} + +void cursor_impl::key(std::string& buf) const noexcept { + log_entry_.key(buf); +} + +void cursor_impl::value(std::string& buf) const noexcept { + log_entry_.value(buf); +} + +log_entry::entry_type cursor_impl::type() const { + return log_entry_.type(); +} + +void cursor_impl::set_clear_storage(const std::map& clear_storage) { + clear_storage_ = clear_storage; +} + +} // namespace limestone::internal diff --git a/src/limestone/cursor_impl.h b/src/limestone/cursor_impl.h new file mode 100644 index 00000000..f41b9000 --- /dev/null +++ b/src/limestone/cursor_impl.h @@ -0,0 +1,68 @@ +/* + * Copyright 2022-2023 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include +#include +#include +#include +#include +#include +#include "log_entry.h" + +namespace limestone::internal { + +using limestone::api::cursor; +class cursor_impl { +public: + explicit cursor_impl(const boost::filesystem::path& snapshot_file); + explicit cursor_impl(const boost::filesystem::path& snapshot_file, const boost::filesystem::path& compacted_file); + + static std::unique_ptr create_cursor(const boost::filesystem::path& snapshot_file, + const std::map& clear_storage); + static std::unique_ptr create_cursor(const boost::filesystem::path& snapshot_file, const boost::filesystem::path& compacted_file, + const std::map& clear_storage); + + void set_clear_storage(const std::map& clear_storage); + +private: + limestone::api::log_entry log_entry_; + std::optional snapshot_log_entry_; + std::optional compacted_log_entry_; + std::optional snapshot_istrm_; + std::optional compacted_istrm_; + std::string previous_snapshot_key_sid; + std::string previous_compacted_key_sid; + std::map clear_storage_; + +protected: + void open(const boost::filesystem::path& file, std::optional& stream); + void close(); + + bool next(); + void validate_and_read_stream(std::optional& stream, const std::string& stream_name, + std::optional& log_entry, std::string& previous_key_sid); + + [[nodiscard]] limestone::api::storage_id_type storage() const noexcept; + void key(std::string& buf) const noexcept; + void value(std::string& buf) const noexcept; + [[nodiscard]] limestone::api::log_entry::entry_type type() const; + bool is_relevant_entry(const limestone::api::log_entry& entry); + // Making the cursor class a friend so that it can access protected members + friend class limestone::api::cursor; +}; + +} // namespace limestone::internal diff --git a/src/limestone/datastore.cpp b/src/limestone/datastore.cpp index 9f3450be..ecbfa273 100644 --- a/src/limestone/datastore.cpp +++ b/src/limestone/datastore.cpp @@ -104,41 +104,19 @@ void datastore::recover() const noexcept { } void datastore::ready() { - // create filename set for snapshot - std::set detached_pwals = compaction_catalog_->get_detached_pwals(); - - std::set filename_set; - boost::system::error_code error; - boost::filesystem::directory_iterator it(location_, error); - boost::filesystem::directory_iterator end; - if (error) { - LOG_AND_THROW_IO_EXCEPTION("Failed to initialize directory iterator, path: " + location_.string(), error); - } - for (; it != end; it.increment(error)) { - if (error) { - LOG_AND_THROW_IO_EXCEPTION("Failed to access directory entry: , path: " + location_.string(), error); - } - if (boost::filesystem::is_regular_file(it->path())) { - std::string filename = it->path().filename().string(); - if (detached_pwals.find(filename) == detached_pwals.end()) { - filename_set.insert(filename); - } - } - } - - create_snapshot(filename_set); + create_snapshot(); online_compaction_worker_future_ = std::async(std::launch::async, &datastore::online_compaction_worker, this); state_ = state::ready; } std::unique_ptr datastore::get_snapshot() const { check_after_ready(static_cast(__func__)); - return std::unique_ptr(new snapshot(location_)); + return std::unique_ptr(new snapshot(location_, clear_storage)); } std::shared_ptr datastore::shared_snapshot() const { check_after_ready(static_cast(__func__)); - return std::shared_ptr(new snapshot(location_)); + return std::shared_ptr(new snapshot(location_, clear_storage)); } log_channel& datastore::create_channel(const boost::filesystem::path& location) { diff --git a/src/limestone/datastore_snapshot.cpp b/src/limestone/datastore_snapshot.cpp index 273c5536..24853c8f 100644 --- a/src/limestone/datastore_snapshot.cpp +++ b/src/limestone/datastore_snapshot.cpp @@ -27,56 +27,19 @@ #include #include "limestone_exception_helper.h" +#include "compaction_catalog.h" #include "dblog_scan.h" #include "internal.h" #include "log_entry.h" #include "sortdb_wrapper.h" +#include "snapshot_impl.h" +#include "sorting_context.h" namespace limestone::internal { constexpr std::size_t write_version_size = sizeof(epoch_id_type) + sizeof(std::uint64_t); static_assert(write_version_size == 16); -class sorting_context { -public: - sorting_context(sorting_context&& obj) noexcept : sortdb(std::move(obj.sortdb)) { - std::unique_lock lk{obj.mtx_clear_storage}; - clear_storage = std::move(obj.clear_storage); // NOLINT(*-prefer-member-initializer): need lock - } - sorting_context(const sorting_context&) = delete; - sorting_context& operator=(const sorting_context&) = delete; - sorting_context& operator=(sorting_context&&) = delete; - sorting_context() = default; - ~sorting_context() = default; - explicit sorting_context(std::unique_ptr&& s) noexcept : sortdb(std::move(s)) { - } - - // point entries -private: - std::unique_ptr sortdb; -public: - sortdb_wrapper* get_sortdb() { return sortdb.get(); } - - // range delete entries -private: - std::mutex mtx_clear_storage; - std::map clear_storage; -public: - void clear_storage_update(const storage_id_type sid, const write_version_type wv) { - std::unique_lock lk{mtx_clear_storage}; - if (auto [it, inserted] = clear_storage.emplace(sid, wv); - !inserted) { - it->second = std::max(it->second, wv); - } - } - std::optional clear_storage_find(const storage_id_type sid) { - // no need to lock, for now - auto itr = clear_storage.find(sid); - if (itr == clear_storage.end()) return {}; - return {itr->second}; - } -}; - [[maybe_unused]] static void store_bswap64_value(void *dest, const void *src) { auto* p64_dest = reinterpret_cast(dest); // NOLINT(*-reinterpret-cast) @@ -185,10 +148,30 @@ static std::pair create_sorted_from_wals( } } -static void sortdb_foreach(sorting_context& sctx, std::function write_snapshot_entry) { + +[[maybe_unused]] +static write_version_type extract_write_version(const std::string_view& db_key) { + std::string wv(write_version_size, '\0'); + store_bswap64_value(&wv[0], &db_key[0]); // NOLINT(readability-container-data-pointer) + store_bswap64_value(&wv[8], &db_key[8]); + return write_version_type{wv}; +} + +[[maybe_unused]] +static std::string create_value_from_db_key_and_value(const std::string_view& db_key, const std::string_view& db_value) { + std::string value(write_version_size + db_value.size() - 1, '\0'); + store_bswap64_value(&value[0], &db_key[0]); // NOLINT(readability-container-data-pointer) + store_bswap64_value(&value[8], &db_key[8]); + std::memcpy(&value[write_version_size], &db_value[1], db_value.size() - 1); + return value; +} + +static void sortdb_foreach(sorting_context& sctx, + const std::function& write_snapshot_entry, + const std::function& write_snapshot_remove_entry) { static_assert(sizeof(log_entry::entry_type) == 1); #if defined SORT_METHOD_PUT_ONLY - sctx.get_sortdb()->each([&sctx, write_snapshot_entry, last_key = std::string{}](const std::string_view db_key, const std::string_view db_value) mutable { + sctx.get_sortdb()->each([&sctx, write_snapshot_entry, write_snapshot_remove_entry, last_key = std::string{}](const std::string_view db_key, const std::string_view db_value) mutable { // using the first entry in GROUP BY (original-)key // NB: max versions comes first (by the custom-comparator) std::string_view key(db_key.data() + write_version_size, db_key.size() - write_version_size); @@ -199,37 +182,31 @@ static void sortdb_foreach(sorting_context& sctx, std::function(&st_bytes), key.data(), sizeof(storage_id_type)); storage_id_type st = le64toh(st_bytes); + if (auto ret = sctx.clear_storage_find(st); ret) { // check range delete write_version_type range_ver = ret.value(); - std::string wv(write_version_size, '\0'); - store_bswap64_value(&wv[0], &db_key[0]); - store_bswap64_value(&wv[8], &db_key[8]); - write_version_type point_ver{wv}; - if (point_ver < range_ver) { + if (extract_write_version(db_key) < range_ver) { return; // skip } } auto entry_type = static_cast(db_value[0]); switch (entry_type) { - case log_entry::entry_type::normal_entry: { - std::string value(write_version_size + db_value.size() - 1, '\0'); - store_bswap64_value(&value[0], &db_key[0]); - store_bswap64_value(&value[8], &db_key[8]); - std::memcpy(&value[write_version_size], &db_value[1], db_value.size() - 1); - write_snapshot_entry(key, value); + case log_entry::entry_type::normal_entry: + write_snapshot_entry(key, create_value_from_db_key_and_value(db_key, db_value)); + break; + case log_entry::entry_type::remove_entry: { + write_snapshot_remove_entry(key, create_value_from_db_key_and_value(db_key, db_value)); break; } - case log_entry::entry_type::remove_entry: - break; // skip default: LOG(ERROR) << "never reach " << static_cast(entry_type); std::abort(); } }); #else - sctx.get_sortdb()->each([&sctx, &write_snapshot_entry](const std::string_view db_key, const std::string_view db_value) { + sctx.get_sortdb()->each([&sctx, &write_snapshot_entry, write_snapshot_remove_entry](const std::string_view db_key, const std::string_view db_value) { storage_id_type st_bytes{}; memcpy(static_cast(&st_bytes), db_key.data(), sizeof(storage_id_type)); storage_id_type st = le64toh(st_bytes); @@ -246,8 +223,9 @@ static void sortdb_foreach(sorting_context& sctx, std::function(entry_type); std::abort(); @@ -292,20 +270,62 @@ void create_compact_pwal( log_entry::write(ostrm, key_stid, value_etc); } }; - sortdb_foreach(sctx, write_snapshot_entry); + sortdb_foreach(sctx, write_snapshot_entry, [](std::string_view, std::string_view) {}); //log_entry::end_session(ostrm, epoch); if (fclose(ostrm) != 0) { // NOLINT(*-owning-memory) LOG_AND_THROW_IO_EXCEPTION("cannot close snapshot file (" + snapshot_file.string() + ")", errno); } } +std::set assemble_snapshot_input_filenames( + const std::unique_ptr& compaction_catalog, + const boost::filesystem::path& location, + file_operations& file_ops) { + std::set detached_pwals = compaction_catalog->get_detached_pwals(); + std::set filename_set; + boost::system::error_code error; + boost::filesystem::directory_iterator it(location, error); + boost::filesystem::directory_iterator end; + + if (error) { + LOG_AND_THROW_IO_EXCEPTION("Failed to initialize directory iterator, path: " + location.string(), error); + } + + for (; it != end; file_ops.directory_iterator_next(it, error)) { + if (error) { + LOG_AND_THROW_IO_EXCEPTION("Failed to access directory entry, path: " + location.string(), error); + } + if (boost::filesystem::is_regular_file(it->path())) { + std::string filename = it->path().filename().string(); + if (detached_pwals.find(filename) == detached_pwals.end() + && filename != compaction_catalog::get_catalog_filename() + && filename != compaction_catalog::get_compacted_filename()) { + filename_set.insert(filename); + } + } + } + return filename_set; +} + +std::set assemble_snapshot_input_filenames( + const std::unique_ptr& compaction_catalog, + const boost::filesystem::path& location) { + real_file_operations file_ops; + return assemble_snapshot_input_filenames(compaction_catalog, location, file_ops); } + + +} // namespace limestone::internal + namespace limestone::api { using namespace limestone::internal; + +snapshot::~snapshot() = default; -void datastore::create_snapshot(const std::set& file_names) { +void datastore::create_snapshot() { const auto& from_dir = location_; + std::set file_names = assemble_snapshot_input_filenames(compaction_catalog_, from_dir); auto [max_appeared_epoch, sctx] = create_sorted_from_wals(from_dir, recover_max_parallelism_, file_names); epoch_id_switched_.store(max_appeared_epoch); epoch_id_informed_.store(max_appeared_epoch); @@ -326,12 +346,24 @@ void datastore::create_snapshot(const std::set& file_names) { if (!ostrm) { LOG_AND_THROW_IO_EXCEPTION("cannot create snapshot file", errno); } + log_entry::begin_session(ostrm, 0); setvbuf(ostrm, nullptr, _IOFBF, 128L * 1024L); // NOLINT, NB. glibc may ignore size when _IOFBF and buffer=NULL - auto write_snapshot_entry = [&ostrm](std::string_view key, std::string_view value){log_entry::write(ostrm, key, value);}; - sortdb_foreach(sctx, write_snapshot_entry); + auto write_snapshot_entry = [&ostrm](std::string_view key_sid, std::string_view value_etc) { log_entry::write(ostrm, key_sid, value_etc); }; + + std::function write_snapshot_remove_entry; + if (compaction_catalog_->get_compacted_files().empty()) { + write_snapshot_remove_entry = [](std::string_view, std::string_view) {}; + } else { + write_snapshot_remove_entry = [&ostrm](std::string_view key, std::string_view value_etc) { + log_entry::write_remove(ostrm, key, value_etc); + }; + } + sortdb_foreach(sctx, write_snapshot_entry, write_snapshot_remove_entry); if (fclose(ostrm) != 0) { // NOLINT(*-owning-memory) LOG_AND_THROW_IO_EXCEPTION("cannot close snapshot file (" + snapshot_file.string() + ")", errno); } + + clear_storage = sctx.get_clear_storage(); } } // namespace limestone::api diff --git a/src/limestone/file_operations.cpp b/src/limestone/file_operations.cpp index 5711db49..e3bb3d91 100644 --- a/src/limestone/file_operations.cpp +++ b/src/limestone/file_operations.cpp @@ -101,6 +101,11 @@ bool real_file_operations::exists(const boost::filesystem::path& p, boost::syste return boost::filesystem::exists(p, ec); } +void real_file_operations::directory_iterator_next(boost::filesystem::directory_iterator& it, boost::system::error_code& ec) { + it.increment(ec); +} + + } // namespace limestone::internal diff --git a/src/limestone/file_operations.h b/src/limestone/file_operations.h index ea3b2b71..a9c8fd3b 100644 --- a/src/limestone/file_operations.h +++ b/src/limestone/file_operations.h @@ -104,6 +104,9 @@ class file_operations { // Checks if a file or directory exists (Boost) virtual bool exists(const boost::filesystem::path& p, boost::system::error_code& ec) = 0; + + // call directory_iterator::increment + virtual void directory_iterator_next(boost::filesystem::directory_iterator& it, boost::system::error_code& ec) = 0; }; class real_file_operations : public file_operations { @@ -125,6 +128,7 @@ class real_file_operations : public file_operations { bool has_error(std::ifstream& file) override; bool exists(const boost::filesystem::path& p, boost::system::error_code& ec) override; + void directory_iterator_next(boost::filesystem::directory_iterator& it, boost::system::error_code& ec) override; }; } // namespace limestone::internal diff --git a/src/limestone/snapshot.cpp b/src/limestone/snapshot.cpp index 12081a71..ab189e2f 100644 --- a/src/limestone/snapshot.cpp +++ b/src/limestone/snapshot.cpp @@ -14,18 +14,24 @@ * limitations under the License. */ #include - +#include #include #include +#include "compaction_catalog.h" #include "logging_helper.h" +#include "snapshot_impl.h" namespace limestone::api { // FIXME fill implementation -snapshot::snapshot(const boost::filesystem::path& location) noexcept : dir_(location / boost::filesystem::path(std::string(subdirectory_name_))) { -} +using limestone::internal::snapshot_impl; + +snapshot::snapshot(boost::filesystem::path location, + std::map clear_storage) noexcept + : pimpl(std::make_unique(std::move(location), std::move(clear_storage))) { +} std::unique_ptr snapshot::get_cursor() const { - return std::unique_ptr(new cursor(file_path())); + return pimpl->get_cursor(); } std::unique_ptr snapshot::find([[maybe_unused]] storage_id_type storage_id, [[maybe_unused]] std::string_view entry_key) const noexcept { @@ -38,8 +44,4 @@ std::unique_ptr snapshot::scan([[maybe_unused]] storage_id_type storage_ std::abort(); // FIXME should implement } -boost::filesystem::path snapshot::file_path() const noexcept { - return dir_ / boost::filesystem::path(std::string(file_name_)); -} - } // namespace limestone::api diff --git a/src/limestone/snapshot_impl.cpp b/src/limestone/snapshot_impl.cpp new file mode 100644 index 00000000..faf63ad6 --- /dev/null +++ b/src/limestone/snapshot_impl.cpp @@ -0,0 +1,43 @@ +/* + * Copyright 2022-2023 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include "snapshot_impl.h" +#include +#include +#include "compaction_catalog.h" +#include "logging_helper.h" +#include "cursor_impl.h" +#include + +namespace limestone::internal { + +snapshot_impl::snapshot_impl(boost::filesystem::path location, + std::map clear_storage) noexcept + : location_(std::move(location)), clear_storage(std::move(clear_storage)) { +} + +std::unique_ptr snapshot_impl::get_cursor() const { + boost::filesystem::path compacted_file = location_ / limestone::internal::compaction_catalog::get_compacted_filename(); + boost::filesystem::path snapshot_file = location_ / std::string(snapshot::subdirectory_name_) / std::string(snapshot::file_name_); + + if (boost::filesystem::exists(compacted_file)) { + return cursor_impl::create_cursor(snapshot_file, compacted_file, clear_storage); + } + return cursor_impl::create_cursor(snapshot_file, clear_storage); +} + + +} // namespace limestone::internal diff --git a/src/limestone/snapshot_impl.h b/src/limestone/snapshot_impl.h new file mode 100644 index 00000000..c3c1e948 --- /dev/null +++ b/src/limestone/snapshot_impl.h @@ -0,0 +1,42 @@ +/* + * Copyright 2022-2023 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace limestone::internal { + +using limestone::api::cursor; +using limestone::api::storage_id_type; +using limestone::api::write_version_type; + +class snapshot_impl { +public: + explicit snapshot_impl(boost::filesystem::path location, std::map clear_storage) noexcept; + [[nodiscard]] std::unique_ptr get_cursor() const; + +private: + boost::filesystem::path location_; + std::map clear_storage; +}; + +} // namespace limestone::internal \ No newline at end of file diff --git a/src/limestone/sortdb_wrapper.h b/src/limestone/sortdb_wrapper.h index 99ed3db9..3c4243dc 100644 --- a/src/limestone/sortdb_wrapper.h +++ b/src/limestone/sortdb_wrapper.h @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - +#pragma once #include #ifdef SORT_METHOD_USE_ROCKSDB diff --git a/src/limestone/sorting_context.cpp b/src/limestone/sorting_context.cpp new file mode 100644 index 00000000..ad4dc36e --- /dev/null +++ b/src/limestone/sorting_context.cpp @@ -0,0 +1,52 @@ +/* + * Copyright 2022-2024 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include "sorting_context.h" +#include + +namespace limestone::internal { + +sorting_context::sorting_context(sorting_context&& obj) noexcept : sortdb(std::move(obj.sortdb)) { + std::unique_lock lk{obj.mtx_clear_storage}; + clear_storage = std::move(obj.clear_storage); // NOLINT(*-prefer-member-initializer): need lock +} + +sorting_context::sorting_context(std::unique_ptr&& s) noexcept : sortdb(std::move(s)) { +} + +sortdb_wrapper* sorting_context::get_sortdb() { + return sortdb.get(); +} + +void sorting_context::clear_storage_update(storage_id_type sid, write_version_type wv) { + std::unique_lock lk{mtx_clear_storage}; + if (auto [it, inserted] = clear_storage.emplace(sid, wv); + !inserted) { + it->second = std::max(it->second, wv); + } +} + +std::optional sorting_context::clear_storage_find(storage_id_type sid) { + auto itr = clear_storage.find(sid); + if (itr == clear_storage.end()) return {}; + return {itr->second}; +} + +std::map sorting_context::get_clear_storage() const { + return clear_storage; +} + +} // namespace limestone::internal \ No newline at end of file diff --git a/src/limestone/sorting_context.h b/src/limestone/sorting_context.h new file mode 100644 index 00000000..6634c7c3 --- /dev/null +++ b/src/limestone/sorting_context.h @@ -0,0 +1,55 @@ +/* + * Copyright 2022-2024 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once +#include +#include +#include +#include +#include "sortdb_wrapper.h" +#include +#include + +namespace limestone::internal { + +using api::sortdb_wrapper; +using api::storage_id_type; +using api::write_version_type; + +class sorting_context { +public: + sorting_context(sorting_context&& obj) noexcept; + sorting_context(const sorting_context&) = delete; + sorting_context& operator=(const sorting_context&) = delete; + sorting_context& operator=(sorting_context&&) = delete; + sorting_context() = default; + ~sorting_context() = default; + explicit sorting_context(std::unique_ptr&& s) noexcept; + + // public getter + sortdb_wrapper* get_sortdb(); + + // clear_storage methods + [[nodiscard]] std::map get_clear_storage() const; + std::optional clear_storage_find(storage_id_type sid); + void clear_storage_update(storage_id_type sid, write_version_type wv); + +private: + std::unique_ptr sortdb; + std::mutex mtx_clear_storage; + std::map clear_storage; +}; + +} // namespace limestone::internal \ No newline at end of file diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index d9441c0c..0555ec06 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -34,6 +34,7 @@ file(GLOB SRCS "limestone/epoch/*.cpp" "limestone/utils/*.cpp" "limestone/compaction/*.cpp" + "limestone/snapshot/*.cpp" ) foreach(file ${SRCS}) diff --git a/test/limestone/compaction/assemble_snapshot_input_filenames_test.cpp b/test/limestone/compaction/assemble_snapshot_input_filenames_test.cpp new file mode 100644 index 00000000..dac2b0bc --- /dev/null +++ b/test/limestone/compaction/assemble_snapshot_input_filenames_test.cpp @@ -0,0 +1,165 @@ +/* + * Copyright 2022-2024 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include "compaction_catalog.h" +#include "limestone/api/epoch_id_type.h" +#include "limestone/api/limestone_exception.h" + +namespace limestone::internal { +// Forward declare the target function for testing +std::set assemble_snapshot_input_filenames( + const std::unique_ptr& compaction_catalog, + const boost::filesystem::path& location); +std::set assemble_snapshot_input_filenames( + const std::unique_ptr& compaction_catalog, + const boost::filesystem::path& location, + file_operations& file_ops); +} + +namespace limestone::testing { + +using limestone::internal::compacted_file_info; +using limestone::internal::compaction_catalog; +using limestone::api::limestone_io_exception; + +class mock_file_operations : public limestone::internal::real_file_operations { +public: + void directory_iterator_next(boost::filesystem::directory_iterator& it, boost::system::error_code& ec) { + ec = boost::system::errc::make_error_code(boost::system::errc::permission_denied); + }; +}; + +class assemble_snapshot_input_filenames_test : public ::testing::Test { +protected: + void SetUp() override { + // Set up temporary log directory for the test + if (system("rm -rf /tmp/assemble_snapshot_input_filenames_test") != 0) { + std::cerr << "Cannot remove directory" << std::endl; + } + if (system("mkdir -p /tmp/assemble_snapshot_input_filenames_test") != 0) { + std::cerr << "Cannot create directory" << std::endl; + } + + // Initialize the compaction catalog with a valid log directory path + log_location_ = "/tmp/assemble_snapshot_input_filenames_test"; + compaction_catalog_ = std::make_unique(log_location_); + } + + void TearDown() override { + // Clean up temporary log directory after the test + if (system("rm -rf /tmp/assemble_snapshot_input_filenames_test") != 0) { + std::cerr << "Cannot remove directory" << std::endl; + } + } + + void clear_compaction_catalog() { + compacted_files_.clear(); + detached_pwals_.clear(); + } + + void add_detached_pwals(const std::initializer_list& pwals) { + detached_pwals_.insert(pwals.begin(), pwals.end()); + compaction_catalog_->update_catalog_file(0, compacted_files_, detached_pwals_); + } + + std::unique_ptr compaction_catalog_; + boost::filesystem::path log_location_; + + std::set compacted_files_; + std::set detached_pwals_; +}; + + TEST_F(assemble_snapshot_input_filenames_test, retrieves_filenames_correctly) { + // Prepare some files in the log location directory + std::ofstream(log_location_ / "pwal_0001"); + std::ofstream(log_location_ / "pwal_0002"); + std::ofstream(log_location_ / "pwal_0003"); + std::ofstream(log_location_ / "pwal_0004"); + + // Simulate detached PWALs in the compaction catalog + add_detached_pwals({"pwal_0001", "pwal_0002"}); + + // Get the filenames that should be used for the snapshot + std::set filenames; + filenames = assemble_snapshot_input_filenames(compaction_catalog_, log_location_); + + // Ensure the correct files are retrieved + EXPECT_EQ(filenames.size(), 2); + EXPECT_NE(filenames.find("pwal_0003"), filenames.end()); + EXPECT_NE(filenames.find("pwal_0004"), filenames.end()); + + // Ensure the detached PWALs are not included + std::ofstream(log_location_ / compaction_catalog::get_compacted_filename()); + filenames = assemble_snapshot_input_filenames(compaction_catalog_, log_location_); + EXPECT_EQ(filenames.size(), 2); + EXPECT_NE(filenames.find("pwal_0003"), filenames.end()); + EXPECT_NE(filenames.find("pwal_0004"), filenames.end()); + } + + TEST_F(assemble_snapshot_input_filenames_test, throws_exception_when_directory_does_not_exist) { + // Set a non-existent directory + boost::filesystem::path non_existent_directory = "/tmp/non_existent_directory"; + + // Check if an exception is thrown + try { + auto filenames = assemble_snapshot_input_filenames(compaction_catalog_, non_existent_directory); + FAIL() << "Expected an exception to be thrown"; + } catch (const limestone_io_exception& e) { + // Check if the exception message contains the expected content + std::string expected_message = "Failed to initialize directory iterator, path:"; + std::string actual_message = e.what(); + std::cout << "Caught expected exception: " << actual_message << std::endl; + EXPECT_TRUE(actual_message.find(expected_message) != std::string::npos) << "Expected exception message to contain: " << expected_message; + } catch (...) { + FAIL() << "Expected a limestone_io_exception, but got a different exception"; + } + } + + TEST_F(assemble_snapshot_input_filenames_test, throws_exception_when_directory_iterator_increment) { + // Prepare some files in the log location directory + std::ofstream(log_location_ / "pwal_0001"); + std::ofstream(log_location_ / "pwal_0002"); + std::ofstream(log_location_ / "pwal_0003"); + std::ofstream(log_location_ / "pwal_0004"); + + // Check if an exception is thrown + try { + mock_file_operations file_ops; + auto filenames = assemble_snapshot_input_filenames(compaction_catalog_,log_location_, file_ops); + FAIL() << "Expected an exception to be thrown"; + } catch (const limestone_io_exception& e) { + // Check if the exception message contains the expected content + std::string expected_message = "Failed to access directory entry, path:"; + std::string actual_message = e.what(); + std::cout << "Caught expected exception: " << actual_message << std::endl; + EXPECT_TRUE(actual_message.find(expected_message) != std::string::npos) << "Expected exception message to contain: " << expected_message; + } catch (...) { + FAIL() << "Expected a limestone_io_exception, but got a different exception"; + } + } + + TEST_F(assemble_snapshot_input_filenames_test, handles_empty_directory) { + // No files are created in the directory + auto filenames = assemble_snapshot_input_filenames(compaction_catalog_, log_location_); + + // Ensure that no files are retrieved + EXPECT_TRUE(filenames.empty()); + } + +} // namespace limestone::testing diff --git a/test/limestone/compaction/online_compaction_test.cpp b/test/limestone/compaction/compaction_test.cpp similarity index 62% rename from test/limestone/compaction/online_compaction_test.cpp rename to test/limestone/compaction/compaction_test.cpp index 5dd0d46e..1a74b8de 100644 --- a/test/limestone/compaction/online_compaction_test.cpp +++ b/test/limestone/compaction/compaction_test.cpp @@ -42,9 +42,9 @@ extern std::string data_manifest(int persistent_format_version = 1); extern const std::string_view data_normal; extern const std::string_view data_nondurable; -class online_compaction_test : public ::testing::Test { +class compaction_test : public ::testing::Test { public: - static constexpr const char* location = "/tmp/online_compaction_test"; + static constexpr const char* location = "/tmp/compaction_test"; const boost::filesystem::path manifest_path = boost::filesystem::path(location) / std::string(limestone::internal::manifest_file_name); const boost::filesystem::path compaction_catalog_path = boost::filesystem::path(location) / "compaction_catalog"; const std::string compacted_filename = compaction_catalog::get_compacted_filename(); @@ -149,7 +149,105 @@ class online_compaction_test : public ::testing::Test { } return kv_list; } - + + void print_log_entry(const log_entry& entry) { + std::string key; + storage_id_type storage_id = entry.storage(); + log_entry::entry_type type = entry.type(); + + if (type == log_entry::entry_type::normal_entry || type == log_entry::entry_type::remove_entry) { + entry.key(key); + } + + switch (type) { + case log_entry::entry_type::normal_entry: { + std::string value; + entry.value(value); + std::cout << "Entry Type: normal_entry, Storage ID: " << storage_id + << ", Key: " << key << ", Value: " << value + << ", Write Version: Epoch: " << log_entry::write_version_epoch_number(entry.value_etc()) + << ", Minor: " << log_entry::write_version_minor_write_version(entry.value_etc()) + << std::endl; + break; + } + case log_entry::entry_type::remove_entry: { + std::cout << "Entry Type: remove_entry, Storage ID: " << storage_id << ", Key: " << key + << ", Write Version: Epoch: " << log_entry::write_version_epoch_number(entry.value_etc()) + << ", Minor: " << log_entry::write_version_minor_write_version(entry.value_etc()) + << std::endl; + break; + } + case log_entry::entry_type::clear_storage: + case log_entry::entry_type::add_storage: + case log_entry::entry_type::remove_storage: { + write_version_type write_version; + entry.write_version(write_version); + std::cout << "Entry Type: " << static_cast(type) << ", Storage ID: " << storage_id + << ", Write Version: Epoch: " << log_entry::write_version_epoch_number(entry.value_etc()) + << ", Minor: " << log_entry::write_version_minor_write_version(entry.value_etc()) + << std::endl; + break; + } + case log_entry::entry_type::marker_begin: + std::cout << "Entry Type: marker_begin, Epoch ID: " << entry.epoch_id() << std::endl; + break; + case log_entry::entry_type::marker_end: + std::cout << "Entry Type: marker_end, Epoch ID: " << entry.epoch_id() << std::endl; + break; + case log_entry::entry_type::marker_durable: + std::cout << "Entry Type: marker_durable, Epoch ID: " << entry.epoch_id() << std::endl; + break; + case log_entry::entry_type::marker_invalidated_begin: + std::cout << "Entry Type: marker_invalidated_begin, Epoch ID: " << entry.epoch_id() << std::endl; + break; + default: + std::cout << "Entry Type: unknown" << std::endl; + break; + } + } + + + + + /** + * @brief Reads a specified log file (PWAL, compacted_file, snapshot) and returns a list of log entries. + * @param log_file The path to the log file to be scanned. + * @return A vector of log_entry objects read from the log file. + */ + std::vector read_log_file(const std::string& log_file, const boost::filesystem::path& log_dir) { + boost::filesystem::path log_path = log_dir / log_file; + + std::vector log_entries; + limestone::internal::dblog_scan::parse_error pe; + + // Define a lambda function to capture and store log entries + auto add_entry = [&](log_entry& e) { log_entries.push_back(e); }; + + // Error reporting function, returning bool as expected by error_report_func_t + auto report_error = [](log_entry::read_error& error) -> bool { + std::cerr << "Error during log file scan: " << error.message() << std::endl; + return false; // Return false to indicate an error occurred + }; + + // Initialize a dblog_scan instance with the log directory + limestone::internal::dblog_scan scanner(log_dir); + + // Scan the specified log file + epoch_id_type max_epoch = scanner.scan_one_pwal_file(log_path, UINT64_MAX, add_entry, report_error, pe); + + if (pe.value() != limestone::internal::dblog_scan::parse_error::ok) { + std::cerr << "Parse error occurred while reading the log file: " << log_path.string() << std::endl; + } + + // Iterate over the log entries and print relevant information + std::cout << std::endl << "Log entries read from " << log_path.string() << ":" << std::endl; + for (const auto& entry : log_entries) { + print_log_entry(entry); + } + + return log_entries; + } + ::testing::AssertionResult ContainsPrefix(const char* files_expr, const char* prefix_expr, const char* expected_count_expr, const std::set& files, const std::string& prefix, int expected_count) { int match_count = 0; @@ -224,9 +322,70 @@ class online_compaction_test : public ::testing::Test { return pwal_file_names; } + + ::testing::AssertionResult AssertLogEntry(const log_entry& entry, const std::optional& expected_storage_id, + const std::optional& expected_key, const std::optional& expected_value, + const std::optional& expected_epoch_number, const std::optional& expected_minor_version, + log_entry::entry_type expected_type) { + // Check the entry type + if (entry.type() != expected_type) { + return ::testing::AssertionFailure() + << "Expected entry type: " << static_cast(expected_type) + << ", but got: " << static_cast(entry.type()); + } + + // Check the storage ID if it exists + if (expected_storage_id.has_value()) { + if (entry.storage() != expected_storage_id.value()) { + return ::testing::AssertionFailure() + << "Expected storage ID: " << expected_storage_id.value() + << ", but got: " << entry.storage(); + } + } + + // Check the key if it exists + if (expected_key.has_value()) { + std::string actual_key; + entry.key(actual_key); + if (actual_key != expected_key.value()) { + return ::testing::AssertionFailure() + << "Expected key: " << expected_key.value() + << ", but got: " << actual_key; + } + } + + // Check the value if it exists + if (expected_value.has_value()) { + std::string actual_value; + entry.value(actual_value); + if (actual_value != expected_value.value()) { + return ::testing::AssertionFailure() + << "Expected value: " << expected_value.value() + << ", but got: " << actual_value; + } + } + + // Check the write version if it exists + if (expected_epoch_number.has_value() && expected_minor_version.has_value()) { + epoch_id_type actual_epoch_number = log_entry::write_version_epoch_number(entry.value_etc()); + std::uint64_t actual_minor_version = log_entry::write_version_minor_write_version(entry.value_etc()); + + if (actual_epoch_number != expected_epoch_number.value() || + actual_minor_version != expected_minor_version.value()) { + return ::testing::AssertionFailure() + << "Expected write version (epoch_number: " << expected_epoch_number.value() + << ", minor_write_version: " << expected_minor_version.value() + << "), but got (epoch_number: " << actual_epoch_number + << ", minor_write_version: " << actual_minor_version << ")"; + } + } + + // If all checks pass, return success + return ::testing::AssertionSuccess(); + } }; -TEST_F(online_compaction_test, no_pwals) { +TEST_F(compaction_test, no_pwals) { gen_datastore(); auto pwals = extract_pwal_files_from_datastore(); EXPECT_TRUE(pwals.empty()); @@ -248,7 +407,7 @@ TEST_F(online_compaction_test, no_pwals) { EXPECT_TRUE(pwals.empty()); } -TEST_F(online_compaction_test, scenario01) { +TEST_F(compaction_test, scenario01) { gen_datastore(); datastore_->switch_epoch(1); auto pwals = extract_pwal_files_from_datastore(); @@ -598,7 +757,7 @@ TEST_F(online_compaction_test, scenario01) { // is not restarted, the timing of when the set of PWAL files maintained // by the datastore is updated differs from scenario01, and therefore the // test expectations have been changed. -TEST_F(online_compaction_test, scenario02) { +TEST_F(compaction_test, scenario02) { gen_datastore(); datastore_->switch_epoch(1); auto pwals = extract_pwal_files_from_datastore(); @@ -826,8 +985,357 @@ TEST_F(online_compaction_test, scenario02) { ASSERT_PRED_FORMAT3(ContainsPrefix, pwals, "pwal_0002.", 1); } +// This test case verifies the correct behavior of `remove_entry`. +TEST_F(compaction_test, scenario03) { + FLAGS_v = 50; // set VLOG level to 50 + + // 1. Create multiple PWALs using two different storage IDs + gen_datastore(); + datastore_->switch_epoch(1); + + // Storage ID 1: key1 added, then removed + lc0_->begin_session(); + lc0_->add_entry(1, "key1", "value1", {1, 0}); + lc0_->remove_entry(1, "key1", {1, 1}); // use remove_entry + lc0_->end_session(); + + // Storage ID 2: key2 added, no removal + lc1_->begin_session(); + lc1_->add_entry(2, "key2", "value2", {1, 0}); + lc1_->end_session(); + + // Storage ID 1: key3 removed first, then added + lc2_->begin_session(); + lc2_->remove_entry(1, "key3", {1, 0}); + lc2_->add_entry(1, "key3", "value3", {1, 3}); + lc2_->end_session(); + + // Storeage ID 1: key4 deleted witout adding + lc0_->begin_session(); + lc0_->remove_entry(1, "key4", {1, 0}); + lc0_->end_session(); + + datastore_->switch_epoch(2); + + // Check the created PWAL files + auto pwals = extract_pwal_files_from_datastore(); + EXPECT_EQ(pwals.size(), 3); + ASSERT_PRED_FORMAT2(ContainsString, pwals, "pwal_0000"); + ASSERT_PRED_FORMAT2(ContainsString, pwals, "pwal_0001"); + ASSERT_PRED_FORMAT2(ContainsString, pwals, "pwal_0002"); + + auto log_entries = read_log_file("pwal_0000", location); + ASSERT_EQ(log_entries.size(), 3); // Ensure that there are log entries + EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key1", "value1", 1, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[1], 1, "key1", std::nullopt, 1, 1, log_entry::entry_type::remove_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[2], 1, "key4", std::nullopt, 1, 0, log_entry::entry_type::remove_entry)); + log_entries = read_log_file("pwal_0001", location); + ASSERT_EQ(log_entries.size(), 1); // Ensure that there are log entries + EXPECT_TRUE(AssertLogEntry(log_entries[0], 2, "key2", "value2", 1, 0, log_entry::entry_type::normal_entry)); + log_entries = read_log_file("pwal_0002", location); + ASSERT_EQ(log_entries.size(), 2); // Ensure that there are log entries + EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key3", std::nullopt, 1, 0,log_entry ::entry_type::remove_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[1], 1, "key3", "value3", 1, 3, log_entry::entry_type::normal_entry)); + + // 2. Execute compaction + run_compact_with_epoch_switch(2); + + // Check the catalog and PWALs after compaction + compaction_catalog catalog = compaction_catalog::from_catalog_file(location); + EXPECT_EQ(catalog.get_max_epoch_id(), 1); + EXPECT_EQ(catalog.get_compacted_files().size(), 1); + EXPECT_EQ(catalog.get_detached_pwals().size(), 3); + + pwals = extract_pwal_files_from_datastore(); + EXPECT_EQ(pwals.size(), 4); // Includes the compacted file + ASSERT_PRED_FORMAT2(ContainsString, pwals, "pwal_0000.compacted"); + + log_entries = read_log_file("pwal_0000.compacted", location); + ASSERT_EQ(log_entries.size(), 2); // Ensure that there are log entries + EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key3", "value3", 0, 0, log_entry::entry_type::normal_entry)); // write version changed to 0 + EXPECT_TRUE(AssertLogEntry(log_entries[1], 2, "key2", "value2", 0, 0, log_entry::entry_type::normal_entry)); // write version changed to 0 + + // 3. Add/Update PWALs (include remove_entry again) + + // Storage ID 1: key1 added, then removed + lc0_->begin_session(); + lc0_->add_entry(1, "key11", "value1", {2, 0}); + lc0_->remove_entry(1, "key11", {2, 1}); // use remove_entry + lc0_->end_session(); + + // Storage ID 2: key2 added, no removal + lc1_->begin_session(); + lc1_->add_entry(2, "key21", "value2", {2, 0}); + lc1_->end_session(); + + // Storage ID 1: key3 removed first, then added + lc2_->begin_session(); + lc2_->remove_entry(1, "key31", {2, 0}); + lc2_->add_entry(1, "key31", "value3", {2, 3}); + lc2_->end_session(); + + // Storeage ID 1: key4 deleted witout adding + lc0_->begin_session(); + lc0_->remove_entry(1, "key41", {2, 0}); + lc0_->end_session(); + + datastore_->switch_epoch(3); + pwals = extract_pwal_files_from_datastore(); + + // Check the created PWAL files + pwals = extract_pwal_files_from_datastore(); + EXPECT_EQ(pwals.size(), 7); // 3 new pwals and 3 rotaed pwals and 1 compacted file + ASSERT_PRED_FORMAT2(ContainsString, pwals, "pwal_0000"); + ASSERT_PRED_FORMAT2(ContainsString, pwals, "pwal_0001"); + ASSERT_PRED_FORMAT2(ContainsString, pwals, "pwal_0002"); + + log_entries = read_log_file("pwal_0000", location); + ASSERT_EQ(log_entries.size(), 3); // Ensure that there are log entries + EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key11", "value1", 2, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[1], 1, "key11", std::nullopt, 2, 1, log_entry::entry_type::remove_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[2], 1, "key41", std::nullopt, 2, 0, log_entry::entry_type::remove_entry)); + log_entries = read_log_file("pwal_0001", location); + ASSERT_EQ(log_entries.size(), 1); // Ensure that there are log entries + EXPECT_TRUE(AssertLogEntry(log_entries[0], 2, "key21", "value2", 2, 0, log_entry::entry_type::normal_entry)); + log_entries = read_log_file("pwal_0002", location); + ASSERT_EQ(log_entries.size(), 2); // Ensure that there are log entries + EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key31", std::nullopt, 2, 0, log_entry::entry_type::remove_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[1], 1, "key31", "value3", 2, 3, log_entry::entry_type::normal_entry)); + + // 4. Restart the datastore + datastore_->shutdown(); + datastore_ = nullptr; + gen_datastore(); // Regenerate datastore after restart + + // 5. check the compacted file and snapshot creating at the boot time + log_entries = read_log_file("pwal_0000.compacted", location); + ASSERT_EQ(log_entries.size(), 2); // Ensure that there are log entries + EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key3", "value3", 0, 0, log_entry::entry_type::normal_entry)); // write version changed to 0 + EXPECT_TRUE(AssertLogEntry(log_entries[1], 2, "key2", "value2", 0, 0, log_entry::entry_type::normal_entry)); // write version changed to 0 + + log_entries = read_log_file("data/snapshot", location); + ASSERT_EQ(log_entries.size(), 4); // Ensure that there are log entries + EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key11", std::nullopt, 2, 1, log_entry::entry_type::remove_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[1], 1, "key31", "value3", 2, 3, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[2], 1, "key41", std::nullopt, 2, 0, log_entry::entry_type::remove_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[3], 2, "key21", "value2", 2, 0, log_entry::entry_type::normal_entry)); + + // 5. Verify the snapshot contents after restart + std::vector> kv_list = restart_datastore_and_read_snapshot(); + + // key1 should exist with its updated value, key2 and key3 should be removed + ASSERT_EQ(kv_list.size(), 4); + EXPECT_EQ(kv_list[0].first, "key3"); + EXPECT_EQ(kv_list[0].second, "value3"); + EXPECT_EQ(kv_list[1].first, "key31"); + EXPECT_EQ(kv_list[1].second, "value3"); + EXPECT_EQ(kv_list[2].first, "key2"); + EXPECT_EQ(kv_list[2].second, "value2"); + EXPECT_EQ(kv_list[3].first, "key21"); + EXPECT_EQ(kv_list[3].second, "value2"); +} + +// This test case verifies the correct behavior of `remove_storage`. +// This test case verifies the correct behavior of `remove_storage`. +TEST_F(compaction_test, scenario04) { + FLAGS_v = 50; // set VLOG level to 50 + + gen_datastore(); + datastore_->switch_epoch(1); + + // Storage ID 1: Add normal entries + lc0_->begin_session(); + lc0_->add_entry(1, "key1", "value1", {1, 0}); + lc0_->add_entry(1, "key2", "value2", {1, 1}); + lc0_->end_session(); + + // Storage ID 2: Add normal entries + lc1_->begin_session(); + lc1_->add_entry(2, "key3", "value3", {1, 0}); + lc1_->add_entry(2, "key4", "value4", {1, 1}); + lc1_->end_session(); + + // Storage ID 1: Add more normal entries + lc2_->begin_session(); + lc2_->add_entry(1, "key5", "value5", {1, 2}); + lc2_->add_entry(1, "key6", "value6", {1, 3}); + lc2_->end_session(); + + // Advance the epoch to 2 + datastore_->switch_epoch(2); + + // Remove storage for Storage ID 2 + lc1_->begin_session(); + lc1_->remove_storage(2, {2, 0}); // Removes the storage with ID 2 + lc1_->end_session(); + + // Advance the epoch to 3 + datastore_->switch_epoch(3); + + // Add an entry to Storage ID 1 + lc0_->begin_session(); + lc0_->add_entry(1, "key7", "value7", {3, 0}); + lc0_->end_session(); + + // Add an entry to Storage ID 2 + lc1_->begin_session(); + lc1_->add_entry(2, "key8", "value8", {3, 0}); + lc1_->end_session(); + + // Chek PWALs before compaction + auto pwals = extract_pwal_files_from_datastore(); + EXPECT_EQ(pwals.size(), 3); + + std::vector log_entries = read_log_file("pwal_0000", location); + ASSERT_EQ(log_entries.size(), 3); + EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key1", "value1", 1, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[1], 1, "key2", "value2", 1, 1, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[2], 1, "key7", "value7", 3, 0, log_entry::entry_type::normal_entry)); + + log_entries = read_log_file("pwal_0001", location); + ASSERT_EQ(log_entries.size(), 4); + EXPECT_TRUE(AssertLogEntry(log_entries[0], 2, "key3", "value3", 1, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[1], 2, "key4", "value4", 1, 1, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[2], 2, "", "", 2, 0, log_entry::entry_type::remove_storage)); + EXPECT_TRUE(AssertLogEntry(log_entries[3], 2, "key8", "value8", 3, 0, log_entry::entry_type::normal_entry)); + + log_entries = read_log_file("pwal_0002", location); + ASSERT_EQ(log_entries.size(), 2); + EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key5", "value5", 1, 2, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[1], 1, "key6", "value6", 1, 3, log_entry::entry_type::normal_entry)); + + // online compaction + run_compact_with_epoch_switch(4); + + // Check PWALs after compaction + pwals = extract_pwal_files_from_datastore(); + EXPECT_EQ(pwals.size(), 4); + ASSERT_PRED_FORMAT2(ContainsString, pwals, "pwal_0000.compacted"); + ASSERT_PRED_FORMAT3(ContainsPrefix, pwals, "pwal_0000.", 2); + ASSERT_PRED_FORMAT3(ContainsPrefix, pwals, "pwal_0001.", 1); + ASSERT_PRED_FORMAT3(ContainsPrefix, pwals, "pwal_0002.", 1); + + log_entries = read_log_file("pwal_0000.compacted", location); + ASSERT_EQ(log_entries.size(), 6); + EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key1", "value1", 0, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[1], 1, "key2", "value2", 0, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[2], 1, "key5", "value5", 0, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[3], 1, "key6", "value6", 0, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[4], 1, "key7", "value7", 0, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[5], 2, "key8", "value8", 0, 0, log_entry::entry_type::normal_entry)); + + + // Storage ID 1: Add normal entries + lc0_->begin_session(); + lc0_->add_entry(1, "key11", "value1", {4, 0}); + lc0_->add_entry(1, "key12", "value2", {4, 1}); + lc0_->end_session(); + + // Storage ID 2: Add normal entries + lc1_->begin_session(); + lc1_->add_entry(2, "key13", "value3", {4, 0}); + lc1_->add_entry(2, "key14", "value4", {4, 1}); + lc1_->end_session(); + + // Storage ID 1: Add more normal entries + lc2_->begin_session(); + lc2_->add_entry(1, "key15", "value5", {4, 2}); + lc2_->add_entry(1, "key16", "value6", {4, 3}); + lc2_->end_session(); + + // Advance the epoch to 5 + datastore_->switch_epoch(5); + + // Remove storage for Storage ID 1 + lc1_->begin_session(); + lc1_->remove_storage(1, {5, 0}); // Removes the storage with ID 2 + lc1_->end_session(); + + // Advance the epoch to 6 + datastore_->switch_epoch(6); + + // Add an entry to Storage ID 1 + lc0_->begin_session(); + lc0_->add_entry(1, "key17", "value7", {6, 0}); + lc0_->end_session(); + + // Add an entry to Storage ID 2 + lc1_->begin_session(); + lc1_->add_entry(2, "key18", "value8", {6, 0}); + lc1_->end_session(); + + // Advance the epoch to 6 + datastore_->switch_epoch(7); + + // Chek newly created PWALs + pwals = extract_pwal_files_from_datastore(); + EXPECT_EQ(pwals.size(), 7); + ASSERT_PRED_FORMAT2(ContainsString, pwals, "pwal_0000.compacted"); + ASSERT_PRED_FORMAT3(ContainsPrefix, pwals, "pwal_0000.", 2); + ASSERT_PRED_FORMAT3(ContainsPrefix, pwals, "pwal_0001.", 1); + ASSERT_PRED_FORMAT3(ContainsPrefix, pwals, "pwal_0002.", 1); + + log_entries = read_log_file("pwal_0000", location); + ASSERT_EQ(log_entries.size(), 3); + EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key11", "value1", 4, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[1], 1, "key12", "value2", 4, 1, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[2], 1, "key17", "value7", 6, 0, log_entry::entry_type::normal_entry)); + + log_entries = read_log_file("pwal_0001", location); + ASSERT_EQ(log_entries.size(), 4); + EXPECT_TRUE(AssertLogEntry(log_entries[0], 2, "key13", "value3", 4, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[1], 2, "key14", "value4", 4, 1, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[2], 1, "", "", 5, 0, log_entry::entry_type::remove_storage)); + EXPECT_TRUE(AssertLogEntry(log_entries[3], 2, "key18", "value8", 6, 0, log_entry::entry_type::normal_entry)); + + log_entries = read_log_file("pwal_0002", location); + ASSERT_EQ(log_entries.size(), 2); + EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key15", "value5", 4, 2, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[1], 1, "key16", "value6", 4, 3, log_entry::entry_type::normal_entry)); + + // Restart the datastore + + std::vector> kv_list = restart_datastore_and_read_snapshot(); + + // check the compacted file and snapshot creating at the boot time + log_entries = read_log_file("pwal_0000.compacted", location); + ASSERT_EQ(log_entries.size(), 6); + EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key1", "value1", 0, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[1], 1, "key2", "value2", 0, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[2], 1, "key5", "value5", 0, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[3], 1, "key6", "value6", 0, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[4], 1, "key7", "value7", 0, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[5], 2, "key8", "value8", 0, 0, log_entry::entry_type::normal_entry)); + + log_entries = read_log_file("data/snapshot", location); + ASSERT_EQ(log_entries.size(), 4); + EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key17", "value7", 6, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[1], 2, "key13", "value3", 4, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[2], 2, "key14", "value4", 4, 1, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[3], 2, "key18", "value8", 6, 0, log_entry::entry_type::normal_entry)); + + // 5. Verify the snapshot contents after restart + + // key1 should exist with its updated value, key2 and key3 should be removed + ASSERT_EQ(kv_list.size(), 5); + EXPECT_EQ(kv_list[0].first, "key17"); + EXPECT_EQ(kv_list[0].second, "value7"); + EXPECT_EQ(kv_list[1].first, "key13"); + EXPECT_EQ(kv_list[1].second, "value3"); + EXPECT_EQ(kv_list[2].first, "key14"); + EXPECT_EQ(kv_list[2].second, "value4"); + EXPECT_EQ(kv_list[3].first, "key18"); + EXPECT_EQ(kv_list[3].second, "value8"); + EXPECT_EQ(kv_list[4].first, "key8"); + EXPECT_EQ(kv_list[4].second, "value8"); +} + + + + + // This test is disabled because it is environment-dependent and may not work properly in CI environments. -TEST_F(online_compaction_test, DISABLED_fail_compact_with_io_error) { +TEST_F(compaction_test, DISABLED_fail_compact_with_io_error) { gen_datastore(); datastore_->switch_epoch(1); auto pwals = extract_pwal_files_from_datastore(); @@ -858,7 +1366,7 @@ TEST_F(online_compaction_test, DISABLED_fail_compact_with_io_error) { } -TEST_F(online_compaction_test, safe_rename_success) { +TEST_F(compaction_test, safe_rename_success) { boost::filesystem::path from = boost::filesystem::path(location) / "test_file.txt"; boost::filesystem::path to = boost::filesystem::path(location) / "renamed_file.txt"; @@ -873,14 +1381,14 @@ TEST_F(online_compaction_test, safe_rename_success) { boost::filesystem::remove(to); } -TEST_F(online_compaction_test, safe_rename_throws_exception) { +TEST_F(compaction_test, safe_rename_throws_exception) { boost::filesystem::path from = boost::filesystem::path(location) / "non_existent_file.txt"; boost::filesystem::path to = boost::filesystem::path(location) / "renamed_file.txt"; ASSERT_THROW(safe_rename(from, to), std::runtime_error); } -TEST_F(online_compaction_test, select_files_for_compaction) { +TEST_F(compaction_test, select_files_for_compaction) { std::set rotation_end_files = { boost::filesystem::path(location) / "pwal_0001.0123456", boost::filesystem::path(location) / "pwal_0002.0123456", @@ -895,21 +1403,21 @@ TEST_F(online_compaction_test, select_files_for_compaction) { } -TEST_F(online_compaction_test, ensure_directory_exists_directory_exists) { +TEST_F(compaction_test, ensure_directory_exists_directory_exists) { boost::filesystem::path dir = boost::filesystem::path(location) / "test_dir"; boost::filesystem::create_directory(dir); ASSERT_NO_THROW(ensure_directory_exists(dir)); } -TEST_F(online_compaction_test, ensure_directory_exists_directory_created) { +TEST_F(compaction_test, ensure_directory_exists_directory_created) { boost::filesystem::path dir = boost::filesystem::path(location) / "test_dir"; ASSERT_NO_THROW(ensure_directory_exists(dir)); ASSERT_TRUE(boost::filesystem::exists(dir)); } -TEST_F(online_compaction_test, ensure_directory_exists_throws_exception) { +TEST_F(compaction_test, ensure_directory_exists_throws_exception) { boost::filesystem::path file = boost::filesystem::path(location) / "test_file.txt"; boost::filesystem::ofstream ofs(file); @@ -918,19 +1426,19 @@ TEST_F(online_compaction_test, ensure_directory_exists_throws_exception) { ASSERT_THROW(ensure_directory_exists(file), std::runtime_error); } -TEST_F(online_compaction_test, ensure_directory_exists_parent_directory_missing) { +TEST_F(compaction_test, ensure_directory_exists_parent_directory_missing) { boost::filesystem::path dir = boost::filesystem::path(location) / "nonexistent_parent/test_dir"; ASSERT_THROW(ensure_directory_exists(dir), std::runtime_error); } -TEST_F(online_compaction_test, handle_existing_compacted_file_no_existing_files) { +TEST_F(compaction_test, handle_existing_compacted_file_no_existing_files) { boost::filesystem::path location_path = boost::filesystem::path(location); ASSERT_NO_THROW(handle_existing_compacted_file(location_path)); } -TEST_F(online_compaction_test, handle_existing_compacted_file_with_existing_file) { +TEST_F(compaction_test, handle_existing_compacted_file_with_existing_file) { boost::filesystem::path location_path = boost::filesystem::path(location); boost::filesystem::path compacted_file = location_path / "pwal_0000.compacted"; boost::filesystem::ofstream ofs(compacted_file); @@ -940,7 +1448,7 @@ TEST_F(online_compaction_test, handle_existing_compacted_file_with_existing_file ASSERT_TRUE(boost::filesystem::exists(location_path / "pwal_0000.compacted.prev")); } -TEST_F(online_compaction_test, handle_existing_compacted_file_throws_exception) { +TEST_F(compaction_test, handle_existing_compacted_file_throws_exception) { boost::filesystem::path location_path = boost::filesystem::path(location); boost::filesystem::path compacted_file = location_path / "pwal_0000.compacted"; boost::filesystem::path compacted_prev_file = location_path / "pwal_0000.compacted.prev"; @@ -952,7 +1460,7 @@ TEST_F(online_compaction_test, handle_existing_compacted_file_throws_exception) ASSERT_THROW(handle_existing_compacted_file(location_path), std::runtime_error); } -TEST_F(online_compaction_test, get_files_in_directory) { +TEST_F(compaction_test, get_files_in_directory) { boost::filesystem::path test_dir = boost::filesystem::path(location); boost::filesystem::path file1 = test_dir / "file1.txt"; boost::filesystem::path file2 = test_dir / "file2.txt"; @@ -967,12 +1475,12 @@ TEST_F(online_compaction_test, get_files_in_directory) { EXPECT_EQ(files, expected_files); } -TEST_F(online_compaction_test, get_files_in_directory_directory_not_exists) { +TEST_F(compaction_test, get_files_in_directory_directory_not_exists) { boost::filesystem::path non_existent_dir = boost::filesystem::path(location) / "non_existent_dir"; ASSERT_THROW(get_files_in_directory(non_existent_dir), std::runtime_error); } -TEST_F(online_compaction_test, get_files_in_directory_not_a_directory) { +TEST_F(compaction_test, get_files_in_directory_not_a_directory) { boost::filesystem::path file_path = boost::filesystem::path(location) / "test_file.txt"; boost::filesystem::ofstream ofs(file_path); ofs.close(); @@ -980,7 +1488,7 @@ TEST_F(online_compaction_test, get_files_in_directory_not_a_directory) { ASSERT_THROW(get_files_in_directory(file_path), std::runtime_error); } -TEST_F(online_compaction_test, get_files_in_directory_with_files) { +TEST_F(compaction_test, get_files_in_directory_with_files) { boost::filesystem::path test_dir = boost::filesystem::path(location) / "test_dir"; boost::filesystem::create_directory(test_dir); @@ -995,7 +1503,7 @@ TEST_F(online_compaction_test, get_files_in_directory_with_files) { EXPECT_EQ(files, expected_files); } -TEST_F(online_compaction_test, get_files_in_directory_empty_directory) { +TEST_F(compaction_test, get_files_in_directory_empty_directory) { boost::filesystem::path empty_dir = boost::filesystem::path(location) / "empty_test_dir"; boost::filesystem::create_directory(empty_dir); @@ -1004,7 +1512,7 @@ TEST_F(online_compaction_test, get_files_in_directory_empty_directory) { } -TEST_F(online_compaction_test, remove_file_safely_success) { +TEST_F(compaction_test, remove_file_safely_success) { boost::filesystem::path file = boost::filesystem::path(location) / "test_file_to_remove.txt"; { @@ -1017,14 +1525,14 @@ TEST_F(online_compaction_test, remove_file_safely_success) { ASSERT_FALSE(boost::filesystem::exists(file)); } -TEST_F(online_compaction_test, remove_file_safely_no_exception_for_nonexistent_file) { +TEST_F(compaction_test, remove_file_safely_no_exception_for_nonexistent_file) { boost::filesystem::path file = boost::filesystem::path(location) / "non_existent_file.txt"; ASSERT_NO_THROW(remove_file_safely(file)); } // This test is disabled because it is environment-dependent and may not work properly in CI environments. -TEST_F(online_compaction_test, DISABLED_remove_file_safely_fails_to_remove_file) { +TEST_F(compaction_test, DISABLED_remove_file_safely_fails_to_remove_file) { boost::filesystem::path test_dir = boost::filesystem::path(location); boost::filesystem::path file = test_dir / "protected_file.txt"; diff --git a/test/limestone/log/log_channel_test.cpp b/test/limestone/log/log_channel_test.cpp index 45813caa..d9d21d5a 100644 --- a/test/limestone/log/log_channel_test.cpp +++ b/test/limestone/log/log_channel_test.cpp @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include #include #include "internal.h" #include "test_root.h" diff --git a/test/limestone/snapshot/cursor_impl_test.cpp b/test/limestone/snapshot/cursor_impl_test.cpp new file mode 100644 index 00000000..06e652bd --- /dev/null +++ b/test/limestone/snapshot/cursor_impl_test.cpp @@ -0,0 +1,244 @@ +/* + * Copyright 2022-2024 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include "cursor_impl.h" + + +#include "test_root.h" +#include "limestone/api/log_channel.h" + +namespace limestone::testing { + +using limestone::api::log_channel; + +class cursor_impl_testable : public limestone::internal::cursor_impl { +public: + using cursor_impl::cursor_impl; + using cursor_impl::next; + using cursor_impl::validate_and_read_stream; + using cursor_impl::open; + using cursor_impl::close; + using cursor_impl::storage; + using cursor_impl::key; + using cursor_impl::value; + using cursor_impl::type; + + ~cursor_impl_testable() { + // Ensure that the close() method is called to release resources. + // Normally, resources would be released explicitly, but for this test, we ensure that close() + // is always called by invoking it in the destructor. This is important to avoid resource leaks + // in cases where the test does not explicitly call close(). + close(); + } +}; + +class entry_maker { +public: + entry_maker& init() { + entries_.clear(); + return *this; + } + + entry_maker& add_entry(limestone::api::storage_id_type storage_id, std::string key, std::string value, limestone::api::write_version_type write_version) { + entries_.emplace_back(storage_id, key, value, write_version); + return *this; + } + + std::vector> get_default_entries() { + return { + {1, "key1", "value1", {1, 0}}, + {1, "key2", "value2", {1, 1}} + }; + } + + const std::vector>& get_entries() const { + return entries_; + } + +private: + std::vector> entries_; +}; + +class cursor_impl_test : public ::testing::Test { +protected: + static constexpr const char* location = "/tmp/cursor_impl_test"; + std::unique_ptr datastore_; + log_channel* lc0_{}; + entry_maker entry_maker_; + + void SetUp() override { + if (boost::filesystem::exists(location)) { + boost::filesystem::permissions(location, boost::filesystem::owner_all); + } + boost::filesystem::remove_all(location); + if (!boost::filesystem::create_directory(location)) { + std::cerr << "cannot make directory" << std::endl; + } + gen_datastore(); + } + + void gen_datastore() { + std::vector data_locations{}; + data_locations.emplace_back(location); + boost::filesystem::path metadata_location{location}; + limestone::api::configuration conf(data_locations, metadata_location); + + datastore_ = std::make_unique(conf); + lc0_ = &datastore_->create_channel(location); + + datastore_->ready(); + } + + void TearDown() override { + datastore_ = nullptr; + if (boost::filesystem::exists(location)) { + boost::filesystem::permissions(location, boost::filesystem::owner_all); + } + boost::filesystem::remove_all(location); + } + + void create_log_file( + const std::string& new_filename, + const std::vector>& entries) { + + lc0_->begin_session(); + + for (const auto& entry : entries) { + lc0_->add_entry(std::get<0>(entry), std::get<1>(entry), std::get<2>(entry), std::get<3>(entry)); + } + + lc0_->end_session(); + + boost::filesystem::path pwal_file = boost::filesystem::path(location) / "pwal_0000"; + boost::filesystem::path new_file = boost::filesystem::path(location) / new_filename; + + if (boost::filesystem::exists(pwal_file)) { + boost::filesystem::rename(pwal_file, new_file); + } else { + std::cerr << "Error: pwal_0000 file not found for renaming." << std::endl; + } + } +}; + + + +// Test case 1: Only Snapshot exists +TEST_F(cursor_impl_test, snapshot_only) { + create_log_file("snapshot", entry_maker_.get_default_entries()); + boost::filesystem::path snapshot_file = boost::filesystem::path(location) / "snapshot"; + + cursor_impl_testable cursor(snapshot_file); + EXPECT_TRUE(cursor.next()) << "Should be able to read the snapshot"; +} + +// Test case 2: Both Snapshot and Compacted files exist +TEST_F(cursor_impl_test, snapshot_and_compacted) { + create_log_file("snapshot", entry_maker_.get_default_entries()); + create_log_file("compacted", entry_maker_.get_default_entries()); + + boost::filesystem::path snapshot_file = boost::filesystem::path(location) / "snapshot"; + boost::filesystem::path compacted_file = boost::filesystem::path(location) / "compacted"; + + cursor_impl_testable cursor(snapshot_file, compacted_file); + EXPECT_TRUE(cursor.next()) << "Should be able to read both snapshot and compacted files"; +} + +// Test case 3: Error cases +TEST_F(cursor_impl_test, error_case) { + // No files exist, should throw limestone_exception + boost::filesystem::path snapshot_file = boost::filesystem::path(location) / "not_existing_snapshot"; + EXPECT_THROW({ + cursor_impl_testable cursor{boost::filesystem::path(snapshot_file)}; + }, limestone::limestone_exception) << "No files should result in a limestone_exception being thrown"; + + // Expect the next() method to throw a limestone_exception + { + cursor_impl_testable cursor{boost::filesystem::path(location)}; + EXPECT_THROW({ + cursor.next(); + }, limestone::limestone_exception) << "No files should result in a limestone_exception being thrown"; + } + // invalid sort order + { + entry_maker_.init() + .add_entry(1, "key2", "value2", {1, 1}) + .add_entry(1, "key1", "value1", {1, 0}) + .add_entry(1, "key3", "value3", {1, 2}); + boost::filesystem::path snapshot_file = boost::filesystem::path(location) / "snapshot"; + create_log_file("snapshot", entry_maker_.get_entries()); + cursor_impl_testable cursor{boost::filesystem::path(snapshot_file)}; + EXPECT_THROW({ + while (cursor.next()); + }, limestone::limestone_exception) << "No files should result in a limestone_exception being thrown"; + } +} + +// Test Case 4: Verify the entry methods after reading from a snapshot file +TEST_F(cursor_impl_test, verify_entry_methods) { + // Create a snapshot file with default entries + create_log_file("snapshot", entry_maker_.get_default_entries()); + boost::filesystem::path snapshot_file = boost::filesystem::path(location) / "snapshot"; + + // Use cursor_impl_testable to read the file + cursor_impl_testable cursor(snapshot_file); + + // Verify the first entry + ASSERT_TRUE(cursor.next()) << "First entry should be read"; + + // Verify storage() method + EXPECT_EQ(cursor.storage(), 1) << "Storage ID should be 1"; + + // Verify key() method + std::string key; + cursor.key(key); + EXPECT_EQ(key, "key1") << "First key should be 'key1'"; + + // Verify value() method + std::string value; + cursor.value(value); + EXPECT_EQ(value, "value1") << "First value should be 'value1'"; + + // Verify type() method + EXPECT_EQ(cursor.type(), limestone::api::log_entry::entry_type::normal_entry) + << "First entry type should be normal_entry"; + + // Verify the second entry + ASSERT_TRUE(cursor.next()) << "Second entry should be read"; + + // Verify storage() method for the second entry + EXPECT_EQ(cursor.storage(), 1) << "Storage ID should be 1"; + + // Verify key() method for the second entry + cursor.key(key); + EXPECT_EQ(key, "key2") << "Second key should be 'key2'"; + + // Verify value() method for the second entry + cursor.value(value); + EXPECT_EQ(value, "value2") << "Second value should be 'value2'"; + + // Verify type() method for the second entry + EXPECT_EQ(cursor.type(), limestone::api::log_entry::entry_type::normal_entry) + << "Second entry type should be normal_entry"; + + // Verify that next() returns false when no more entries are available + EXPECT_FALSE(cursor.next()) << "No more entries should be available, next() should return false"; +} + + + +} // namespace limestone::testing