diff --git a/include/limestone/api/cursor.h b/include/limestone/api/cursor.h index 6cf430d6..58bc57bf 100644 --- a/include/limestone/api/cursor.h +++ b/include/limestone/api/cursor.h @@ -17,6 +17,7 @@ #include #include +#include #include #include @@ -24,11 +25,17 @@ #include #include + +namespace limestone::internal { + class cursor_impl; +} + namespace limestone::api { class log_entry; class snapshot; + /** * @brief a cursor to scan entries on the snapshot */ @@ -56,7 +63,7 @@ class cursor { * @brief returns the storage ID of the entry at the current cursor position * @return the storage ID of the current entry */ - storage_id_type storage() const noexcept; + [[nodiscard]] storage_id_type storage() const noexcept; /** * @brief returns the key byte string of the entry at the current cursor position @@ -77,13 +84,14 @@ class cursor { std::vector& large_objects() noexcept; private: - boost::filesystem::ifstream istrm_{}; - std::unique_ptr log_entry_; + std::unique_ptr pimpl; + std::vector large_objects_{}; - explicit cursor(const boost::filesystem::path& file); - - friend class snapshot; + explicit cursor(const boost::filesystem::path& snapshot_file); + explicit cursor(const boost::filesystem::path& snapshot_file, const boost::filesystem::path& compacted_file); + + friend class internal::cursor_impl; }; } // namespace limestone::api diff --git a/include/limestone/api/datastore.h b/include/limestone/api/datastore.h index 08d86ff9..7af35d2f 100644 --- a/include/limestone/api/datastore.h +++ b/include/limestone/api/datastore.h @@ -23,6 +23,7 @@ #include #include #include +#include #include @@ -309,7 +310,7 @@ class datastore { * @param from the location of log files * @attention this function is not thread-safe. */ - void create_snapshot(const std::set& file_names); + void create_snapshot(); epoch_id_type last_durable_epoch_in_dir(); @@ -324,7 +325,8 @@ class datastore { void rotate_epoch_file(); int64_t current_unix_epoch_in_millis(); - + + std::map clear_storage; }; } // namespace limestone::api diff --git a/include/limestone/api/log_channel.h b/include/limestone/api/log_channel.h index 3d48eb8f..1c05b32e 100644 --- a/include/limestone/api/log_channel.h +++ b/include/limestone/api/log_channel.h @@ -41,7 +41,6 @@ class rotation_result; * @details this object is not thread-safe, assuming each thread uses its own log_channel */ class log_channel { - friend class rotation_task; public: /** @@ -180,6 +179,7 @@ class log_channel { rotation_result do_rotate_file(epoch_id_type epoch = 0); friend class datastore; + friend class rotation_task; }; } // namespace limestone::api diff --git a/include/limestone/api/snapshot.h b/include/limestone/api/snapshot.h index 03088cd7..197ffb33 100644 --- a/include/limestone/api/snapshot.h +++ b/include/limestone/api/snapshot.h @@ -17,10 +17,15 @@ #include #include - +#include #include #include +#include + +namespace limestone::internal { + class snapshot_impl; +} namespace limestone::api { @@ -28,8 +33,14 @@ namespace limestone::api { * @brief a snapshot of the data at a point in time on the data store */ class snapshot { + public: snapshot() noexcept = delete; + snapshot(const snapshot&) = delete; + snapshot& operator=(const snapshot&) = delete; + snapshot(snapshot&&) noexcept = delete; + snapshot& operator=(snapshot&&) noexcept = delete; + ~snapshot(); /** * @brief directory name of a snapshot @@ -73,11 +84,9 @@ class snapshot { [[nodiscard]] std::unique_ptr scan(storage_id_type storage_id, std::string_view entry_key, bool inclusive) const noexcept; private: - boost::filesystem::path dir_{}; - - [[nodiscard]] boost::filesystem::path file_path() const noexcept; + std::unique_ptr pimpl; - explicit snapshot(const boost::filesystem::path& location) noexcept; + explicit snapshot(boost::filesystem::path location, std::map clear_storage) noexcept; friend class datastore; }; diff --git a/src/limestone/cursor.cpp b/src/limestone/cursor.cpp index 96d23160..5fc0d30f 100644 --- a/src/limestone/cursor.cpp +++ b/src/limestone/cursor.cpp @@ -20,43 +20,37 @@ #include "logging_helper.h" #include "limestone_exception_helper.h" #include "log_entry.h" +#include "cursor_impl.h" + namespace limestone::api { -cursor::cursor(const boost::filesystem::path& file) : log_entry_(std::make_unique()) { - istrm_.open(file, std::ios_base::in | std::ios_base::binary ); - if (!istrm_.good()) { - LOG_AND_THROW_EXCEPTION("file stream of the cursor is not good (" + file.string() + ")"); - } -} + +cursor::cursor(const boost::filesystem::path& snapshot_file) + : pimpl(std::make_unique(snapshot_file)) {} + +cursor::cursor(const boost::filesystem::path& snapshot_file, const boost::filesystem::path& compacted_file) + : pimpl(std::make_unique(snapshot_file, compacted_file)) {} + cursor::~cursor() noexcept { - istrm_.close(); + // TODO: handle close failure + pimpl->close(); } bool cursor::next() { - if (!istrm_.good()) { - DVLOG_LP(log_trace) << "file stream of the cursor is not good"; - return false; - } - if (istrm_.eof()) { - DVLOG_LP(log_trace) << "already detected eof of the cursor"; - return false; - } - auto rv = log_entry_->read(istrm_); - DVLOG_LP(log_trace) << (rv ? "read an entry from the cursor" : "detect eof of the cursor"); - return rv; + return pimpl->next(); } storage_id_type cursor::storage() const noexcept { - return log_entry_->storage(); + return pimpl->storage(); } void cursor::key(std::string& buf) const noexcept { - log_entry_->key(buf); + pimpl->key(buf); } void cursor::value(std::string& buf) const noexcept { - log_entry_->value(buf); + pimpl->value(buf); } std::vector& cursor::large_objects() noexcept { diff --git a/src/limestone/cursor_impl.cpp b/src/limestone/cursor_impl.cpp new file mode 100644 index 00000000..d5b533dd --- /dev/null +++ b/src/limestone/cursor_impl.cpp @@ -0,0 +1,214 @@ +/* + * Copyright 2022-2023 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include "cursor_impl.h" +#include +#include "limestone_exception_helper.h" + +namespace limestone::internal { + +using limestone::api::log_entry; +using limestone::api::write_version_type; + +std::unique_ptr cursor_impl::create_cursor(const boost::filesystem::path& snapshot_file, + const std::map& clear_storage) { + auto cursor_instance = std::unique_ptr(new cursor(snapshot_file)); + cursor_instance->pimpl->set_clear_storage(clear_storage); + return cursor_instance; +} + +std::unique_ptr cursor_impl::create_cursor(const boost::filesystem::path& snapshot_file, const boost::filesystem::path& compacted_file, + const std::map& clear_storage) { + auto cursor_instance = std::unique_ptr(new cursor(snapshot_file, compacted_file)); + cursor_instance->pimpl->set_clear_storage(clear_storage); + return cursor_instance; +} + +cursor_impl::cursor_impl(const boost::filesystem::path& snapshot_file) + : compacted_istrm_(std::nullopt) { + open(snapshot_file, snapshot_istrm_); +} + +cursor_impl::cursor_impl(const boost::filesystem::path& snapshot_file, const boost::filesystem::path& compacted_file) { + open(snapshot_file, snapshot_istrm_); + open(compacted_file, compacted_istrm_); +} + +void cursor_impl::open(const boost::filesystem::path& file, std::optional& stream) { + stream.emplace(file, std::ios_base::in | std::ios_base::binary); + if (!stream->is_open() || !stream->good()) { + LOG_AND_THROW_EXCEPTION("Failed to open file: " + file.string()); + } +} + +void cursor_impl::close() { + if (snapshot_istrm_) snapshot_istrm_->close(); + if (compacted_istrm_) compacted_istrm_->close(); +} + +void cursor_impl::validate_and_read_stream(std::optional& stream, const std::string& stream_name, + std::optional& log_entry, std::string& previous_key_sid) { + while (stream) { + // If the stream is not in good condition, close it and exit + if (!stream->good()) { + DVLOG_LP(log_trace) << stream_name << " stream is not good, closing it."; + stream->close(); + stream = std::nullopt; + return; + } + + // If the stream has reached EOF, close it and exit + if (stream->eof()) { + DVLOG_LP(log_trace) << stream_name << " stream reached EOF, closing it."; + stream->close(); + stream = std::nullopt; + return; + } + + // If the entry is not yet read, read it + if (!log_entry) { + log_entry.emplace(); // Construct a new log_entry + if (!log_entry->read(*stream)) { + // If reading fails, close the stream and reset the log_entry + stream->close(); + stream = std::nullopt; + log_entry = std::nullopt; + return; + } + // Check if the key_sid is in ascending order + // TODO: Key order violation is detected here and the process is aborted. + // However, this check should be moved to an earlier point, and if the key order is invalid, + // a different processing method should be considered instead of aborting immediately. + if (!previous_key_sid.empty() && log_entry->key_sid() < previous_key_sid) { + LOG(ERROR) << "Key order violation in " << stream_name << ": current key_sid (" << log_entry->key_sid() + << ") is smaller than the previous key_sid (" << previous_key_sid << ")"; + THROW_LIMESTONE_EXCEPTION("Key order violation detected in " + stream_name); + } + + // Update the previous key_sid to the current one + previous_key_sid = log_entry->key_sid(); + } + + // Check the validity of the entry using the lambda function + if (is_relevant_entry(log_entry.value())) { + // If a valid entry is found, return + return; + } + + // If the entry is invalid, reset the log_entry and continue processing the stream + log_entry = std::nullopt; + } +} + +bool cursor_impl::is_relevant_entry(const limestone::api::log_entry& entry) { + // Step 1: Check if the entry is either normal_entry or remove_entry + if (entry.type() != limestone::api::log_entry::entry_type::normal_entry && + entry.type() != limestone::api::log_entry::entry_type::remove_entry) { + return false; // Skip this entry if it's not normal or remove entry + } + + // Step 2: Get the storage ID from log_entry + auto storage_id = entry.storage(); // Assuming storage() returns the storage ID + // Step 3: Check if clear_storage_ contains the same storage ID + auto it = clear_storage_.find(storage_id); + if (it != clear_storage_.end()) { + // Step 4: Retrieve the write_version from log_entry (only if the storage ID is found) + write_version_type wv; + entry.write_version(wv); + + // Step 5: Retrieve the write_version from clear_storage_ for the same storage ID + write_version_type range_ver = it->second; + + // Step 6: Compare the versions (only if the entry is normal_entry or remove_entry) + if (wv < range_ver) { + return false; // Skip this entry as it is outdated + } + } + + // If everything is valid, return true + return true; +} + +bool cursor_impl::next() { + while (true) { + // Read from the snapshot stream if the snapshot_log_entry_ is empty + if (!snapshot_log_entry_) { + validate_and_read_stream(snapshot_istrm_, "Snapshot", snapshot_log_entry_, previous_snapshot_key_sid); + } + + // Read from the compacted stream if the compacted_log_entry_ is empty + if (!compacted_log_entry_) { + validate_and_read_stream(compacted_istrm_, "Compacted", compacted_log_entry_, previous_compacted_key_sid); + } + + // Case 1: Both snapshot and compacted are empty, return false + if (!snapshot_log_entry_ && !compacted_log_entry_) { + DVLOG_LP(log_trace) << "Both snapshot and compacted streams are closed"; + return false; + } + + // Case 2: Either snapshot or compacted has a value, use the one that is not empty + if (snapshot_log_entry_ && !compacted_log_entry_) { + log_entry_ = std::move(snapshot_log_entry_.value()); + snapshot_log_entry_ = std::nullopt; + } else if (!snapshot_log_entry_ && compacted_log_entry_) { + log_entry_ = std::move(compacted_log_entry_.value()); + compacted_log_entry_ = std::nullopt; + } else { + // Case 3: Both snapshot and compacted have values + if (snapshot_log_entry_->key_sid() < compacted_log_entry_->key_sid()) { + log_entry_ = std::move(snapshot_log_entry_.value()); + snapshot_log_entry_ = std::nullopt; + } else if (snapshot_log_entry_->key_sid() > compacted_log_entry_->key_sid()) { + log_entry_ = std::move(compacted_log_entry_.value()); + compacted_log_entry_ = std::nullopt; + } else { + // If key_sid is equal, use snapshot_log_entry_, but reset both entries + log_entry_ = std::move(snapshot_log_entry_.value()); + snapshot_log_entry_ = std::nullopt; + compacted_log_entry_ = std::nullopt; + } + } + // Check if the current log_entry_ is a normal entry + if (log_entry_.type() == log_entry::entry_type::normal_entry) { + return true; + } + // If it's not a normal entry, continue the loop to skip it and read the next entry + } +} + + +limestone::api::storage_id_type cursor_impl::storage() const noexcept { + return log_entry_.storage(); +} + +void cursor_impl::key(std::string& buf) const noexcept { + log_entry_.key(buf); +} + +void cursor_impl::value(std::string& buf) const noexcept { + log_entry_.value(buf); +} + +log_entry::entry_type cursor_impl::type() const { + return log_entry_.type(); +} + +void cursor_impl::set_clear_storage(const std::map& clear_storage) { + clear_storage_ = clear_storage; +} + +} // namespace limestone::internal diff --git a/src/limestone/cursor_impl.h b/src/limestone/cursor_impl.h new file mode 100644 index 00000000..f41b9000 --- /dev/null +++ b/src/limestone/cursor_impl.h @@ -0,0 +1,68 @@ +/* + * Copyright 2022-2023 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include +#include +#include +#include +#include +#include +#include "log_entry.h" + +namespace limestone::internal { + +using limestone::api::cursor; +class cursor_impl { +public: + explicit cursor_impl(const boost::filesystem::path& snapshot_file); + explicit cursor_impl(const boost::filesystem::path& snapshot_file, const boost::filesystem::path& compacted_file); + + static std::unique_ptr create_cursor(const boost::filesystem::path& snapshot_file, + const std::map& clear_storage); + static std::unique_ptr create_cursor(const boost::filesystem::path& snapshot_file, const boost::filesystem::path& compacted_file, + const std::map& clear_storage); + + void set_clear_storage(const std::map& clear_storage); + +private: + limestone::api::log_entry log_entry_; + std::optional snapshot_log_entry_; + std::optional compacted_log_entry_; + std::optional snapshot_istrm_; + std::optional compacted_istrm_; + std::string previous_snapshot_key_sid; + std::string previous_compacted_key_sid; + std::map clear_storage_; + +protected: + void open(const boost::filesystem::path& file, std::optional& stream); + void close(); + + bool next(); + void validate_and_read_stream(std::optional& stream, const std::string& stream_name, + std::optional& log_entry, std::string& previous_key_sid); + + [[nodiscard]] limestone::api::storage_id_type storage() const noexcept; + void key(std::string& buf) const noexcept; + void value(std::string& buf) const noexcept; + [[nodiscard]] limestone::api::log_entry::entry_type type() const; + bool is_relevant_entry(const limestone::api::log_entry& entry); + // Making the cursor class a friend so that it can access protected members + friend class limestone::api::cursor; +}; + +} // namespace limestone::internal diff --git a/src/limestone/datastore.cpp b/src/limestone/datastore.cpp index 9f3450be..ecbfa273 100644 --- a/src/limestone/datastore.cpp +++ b/src/limestone/datastore.cpp @@ -104,41 +104,19 @@ void datastore::recover() const noexcept { } void datastore::ready() { - // create filename set for snapshot - std::set detached_pwals = compaction_catalog_->get_detached_pwals(); - - std::set filename_set; - boost::system::error_code error; - boost::filesystem::directory_iterator it(location_, error); - boost::filesystem::directory_iterator end; - if (error) { - LOG_AND_THROW_IO_EXCEPTION("Failed to initialize directory iterator, path: " + location_.string(), error); - } - for (; it != end; it.increment(error)) { - if (error) { - LOG_AND_THROW_IO_EXCEPTION("Failed to access directory entry: , path: " + location_.string(), error); - } - if (boost::filesystem::is_regular_file(it->path())) { - std::string filename = it->path().filename().string(); - if (detached_pwals.find(filename) == detached_pwals.end()) { - filename_set.insert(filename); - } - } - } - - create_snapshot(filename_set); + create_snapshot(); online_compaction_worker_future_ = std::async(std::launch::async, &datastore::online_compaction_worker, this); state_ = state::ready; } std::unique_ptr datastore::get_snapshot() const { check_after_ready(static_cast(__func__)); - return std::unique_ptr(new snapshot(location_)); + return std::unique_ptr(new snapshot(location_, clear_storage)); } std::shared_ptr datastore::shared_snapshot() const { check_after_ready(static_cast(__func__)); - return std::shared_ptr(new snapshot(location_)); + return std::shared_ptr(new snapshot(location_, clear_storage)); } log_channel& datastore::create_channel(const boost::filesystem::path& location) { diff --git a/src/limestone/datastore_snapshot.cpp b/src/limestone/datastore_snapshot.cpp index 273c5536..24853c8f 100644 --- a/src/limestone/datastore_snapshot.cpp +++ b/src/limestone/datastore_snapshot.cpp @@ -27,56 +27,19 @@ #include #include "limestone_exception_helper.h" +#include "compaction_catalog.h" #include "dblog_scan.h" #include "internal.h" #include "log_entry.h" #include "sortdb_wrapper.h" +#include "snapshot_impl.h" +#include "sorting_context.h" namespace limestone::internal { constexpr std::size_t write_version_size = sizeof(epoch_id_type) + sizeof(std::uint64_t); static_assert(write_version_size == 16); -class sorting_context { -public: - sorting_context(sorting_context&& obj) noexcept : sortdb(std::move(obj.sortdb)) { - std::unique_lock lk{obj.mtx_clear_storage}; - clear_storage = std::move(obj.clear_storage); // NOLINT(*-prefer-member-initializer): need lock - } - sorting_context(const sorting_context&) = delete; - sorting_context& operator=(const sorting_context&) = delete; - sorting_context& operator=(sorting_context&&) = delete; - sorting_context() = default; - ~sorting_context() = default; - explicit sorting_context(std::unique_ptr&& s) noexcept : sortdb(std::move(s)) { - } - - // point entries -private: - std::unique_ptr sortdb; -public: - sortdb_wrapper* get_sortdb() { return sortdb.get(); } - - // range delete entries -private: - std::mutex mtx_clear_storage; - std::map clear_storage; -public: - void clear_storage_update(const storage_id_type sid, const write_version_type wv) { - std::unique_lock lk{mtx_clear_storage}; - if (auto [it, inserted] = clear_storage.emplace(sid, wv); - !inserted) { - it->second = std::max(it->second, wv); - } - } - std::optional clear_storage_find(const storage_id_type sid) { - // no need to lock, for now - auto itr = clear_storage.find(sid); - if (itr == clear_storage.end()) return {}; - return {itr->second}; - } -}; - [[maybe_unused]] static void store_bswap64_value(void *dest, const void *src) { auto* p64_dest = reinterpret_cast(dest); // NOLINT(*-reinterpret-cast) @@ -185,10 +148,30 @@ static std::pair create_sorted_from_wals( } } -static void sortdb_foreach(sorting_context& sctx, std::function write_snapshot_entry) { + +[[maybe_unused]] +static write_version_type extract_write_version(const std::string_view& db_key) { + std::string wv(write_version_size, '\0'); + store_bswap64_value(&wv[0], &db_key[0]); // NOLINT(readability-container-data-pointer) + store_bswap64_value(&wv[8], &db_key[8]); + return write_version_type{wv}; +} + +[[maybe_unused]] +static std::string create_value_from_db_key_and_value(const std::string_view& db_key, const std::string_view& db_value) { + std::string value(write_version_size + db_value.size() - 1, '\0'); + store_bswap64_value(&value[0], &db_key[0]); // NOLINT(readability-container-data-pointer) + store_bswap64_value(&value[8], &db_key[8]); + std::memcpy(&value[write_version_size], &db_value[1], db_value.size() - 1); + return value; +} + +static void sortdb_foreach(sorting_context& sctx, + const std::function& write_snapshot_entry, + const std::function& write_snapshot_remove_entry) { static_assert(sizeof(log_entry::entry_type) == 1); #if defined SORT_METHOD_PUT_ONLY - sctx.get_sortdb()->each([&sctx, write_snapshot_entry, last_key = std::string{}](const std::string_view db_key, const std::string_view db_value) mutable { + sctx.get_sortdb()->each([&sctx, write_snapshot_entry, write_snapshot_remove_entry, last_key = std::string{}](const std::string_view db_key, const std::string_view db_value) mutable { // using the first entry in GROUP BY (original-)key // NB: max versions comes first (by the custom-comparator) std::string_view key(db_key.data() + write_version_size, db_key.size() - write_version_size); @@ -199,37 +182,31 @@ static void sortdb_foreach(sorting_context& sctx, std::function(&st_bytes), key.data(), sizeof(storage_id_type)); storage_id_type st = le64toh(st_bytes); + if (auto ret = sctx.clear_storage_find(st); ret) { // check range delete write_version_type range_ver = ret.value(); - std::string wv(write_version_size, '\0'); - store_bswap64_value(&wv[0], &db_key[0]); - store_bswap64_value(&wv[8], &db_key[8]); - write_version_type point_ver{wv}; - if (point_ver < range_ver) { + if (extract_write_version(db_key) < range_ver) { return; // skip } } auto entry_type = static_cast(db_value[0]); switch (entry_type) { - case log_entry::entry_type::normal_entry: { - std::string value(write_version_size + db_value.size() - 1, '\0'); - store_bswap64_value(&value[0], &db_key[0]); - store_bswap64_value(&value[8], &db_key[8]); - std::memcpy(&value[write_version_size], &db_value[1], db_value.size() - 1); - write_snapshot_entry(key, value); + case log_entry::entry_type::normal_entry: + write_snapshot_entry(key, create_value_from_db_key_and_value(db_key, db_value)); + break; + case log_entry::entry_type::remove_entry: { + write_snapshot_remove_entry(key, create_value_from_db_key_and_value(db_key, db_value)); break; } - case log_entry::entry_type::remove_entry: - break; // skip default: LOG(ERROR) << "never reach " << static_cast(entry_type); std::abort(); } }); #else - sctx.get_sortdb()->each([&sctx, &write_snapshot_entry](const std::string_view db_key, const std::string_view db_value) { + sctx.get_sortdb()->each([&sctx, &write_snapshot_entry, write_snapshot_remove_entry](const std::string_view db_key, const std::string_view db_value) { storage_id_type st_bytes{}; memcpy(static_cast(&st_bytes), db_key.data(), sizeof(storage_id_type)); storage_id_type st = le64toh(st_bytes); @@ -246,8 +223,9 @@ static void sortdb_foreach(sorting_context& sctx, std::function(entry_type); std::abort(); @@ -292,20 +270,62 @@ void create_compact_pwal( log_entry::write(ostrm, key_stid, value_etc); } }; - sortdb_foreach(sctx, write_snapshot_entry); + sortdb_foreach(sctx, write_snapshot_entry, [](std::string_view, std::string_view) {}); //log_entry::end_session(ostrm, epoch); if (fclose(ostrm) != 0) { // NOLINT(*-owning-memory) LOG_AND_THROW_IO_EXCEPTION("cannot close snapshot file (" + snapshot_file.string() + ")", errno); } } +std::set assemble_snapshot_input_filenames( + const std::unique_ptr& compaction_catalog, + const boost::filesystem::path& location, + file_operations& file_ops) { + std::set detached_pwals = compaction_catalog->get_detached_pwals(); + std::set filename_set; + boost::system::error_code error; + boost::filesystem::directory_iterator it(location, error); + boost::filesystem::directory_iterator end; + + if (error) { + LOG_AND_THROW_IO_EXCEPTION("Failed to initialize directory iterator, path: " + location.string(), error); + } + + for (; it != end; file_ops.directory_iterator_next(it, error)) { + if (error) { + LOG_AND_THROW_IO_EXCEPTION("Failed to access directory entry, path: " + location.string(), error); + } + if (boost::filesystem::is_regular_file(it->path())) { + std::string filename = it->path().filename().string(); + if (detached_pwals.find(filename) == detached_pwals.end() + && filename != compaction_catalog::get_catalog_filename() + && filename != compaction_catalog::get_compacted_filename()) { + filename_set.insert(filename); + } + } + } + return filename_set; +} + +std::set assemble_snapshot_input_filenames( + const std::unique_ptr& compaction_catalog, + const boost::filesystem::path& location) { + real_file_operations file_ops; + return assemble_snapshot_input_filenames(compaction_catalog, location, file_ops); } + + +} // namespace limestone::internal + namespace limestone::api { using namespace limestone::internal; + +snapshot::~snapshot() = default; -void datastore::create_snapshot(const std::set& file_names) { +void datastore::create_snapshot() { const auto& from_dir = location_; + std::set file_names = assemble_snapshot_input_filenames(compaction_catalog_, from_dir); auto [max_appeared_epoch, sctx] = create_sorted_from_wals(from_dir, recover_max_parallelism_, file_names); epoch_id_switched_.store(max_appeared_epoch); epoch_id_informed_.store(max_appeared_epoch); @@ -326,12 +346,24 @@ void datastore::create_snapshot(const std::set& file_names) { if (!ostrm) { LOG_AND_THROW_IO_EXCEPTION("cannot create snapshot file", errno); } + log_entry::begin_session(ostrm, 0); setvbuf(ostrm, nullptr, _IOFBF, 128L * 1024L); // NOLINT, NB. glibc may ignore size when _IOFBF and buffer=NULL - auto write_snapshot_entry = [&ostrm](std::string_view key, std::string_view value){log_entry::write(ostrm, key, value);}; - sortdb_foreach(sctx, write_snapshot_entry); + auto write_snapshot_entry = [&ostrm](std::string_view key_sid, std::string_view value_etc) { log_entry::write(ostrm, key_sid, value_etc); }; + + std::function write_snapshot_remove_entry; + if (compaction_catalog_->get_compacted_files().empty()) { + write_snapshot_remove_entry = [](std::string_view, std::string_view) {}; + } else { + write_snapshot_remove_entry = [&ostrm](std::string_view key, std::string_view value_etc) { + log_entry::write_remove(ostrm, key, value_etc); + }; + } + sortdb_foreach(sctx, write_snapshot_entry, write_snapshot_remove_entry); if (fclose(ostrm) != 0) { // NOLINT(*-owning-memory) LOG_AND_THROW_IO_EXCEPTION("cannot close snapshot file (" + snapshot_file.string() + ")", errno); } + + clear_storage = sctx.get_clear_storage(); } } // namespace limestone::api diff --git a/src/limestone/file_operations.cpp b/src/limestone/file_operations.cpp index 5711db49..e3bb3d91 100644 --- a/src/limestone/file_operations.cpp +++ b/src/limestone/file_operations.cpp @@ -101,6 +101,11 @@ bool real_file_operations::exists(const boost::filesystem::path& p, boost::syste return boost::filesystem::exists(p, ec); } +void real_file_operations::directory_iterator_next(boost::filesystem::directory_iterator& it, boost::system::error_code& ec) { + it.increment(ec); +} + + } // namespace limestone::internal diff --git a/src/limestone/file_operations.h b/src/limestone/file_operations.h index ea3b2b71..a9c8fd3b 100644 --- a/src/limestone/file_operations.h +++ b/src/limestone/file_operations.h @@ -104,6 +104,9 @@ class file_operations { // Checks if a file or directory exists (Boost) virtual bool exists(const boost::filesystem::path& p, boost::system::error_code& ec) = 0; + + // call directory_iterator::increment + virtual void directory_iterator_next(boost::filesystem::directory_iterator& it, boost::system::error_code& ec) = 0; }; class real_file_operations : public file_operations { @@ -125,6 +128,7 @@ class real_file_operations : public file_operations { bool has_error(std::ifstream& file) override; bool exists(const boost::filesystem::path& p, boost::system::error_code& ec) override; + void directory_iterator_next(boost::filesystem::directory_iterator& it, boost::system::error_code& ec) override; }; } // namespace limestone::internal diff --git a/src/limestone/snapshot.cpp b/src/limestone/snapshot.cpp index 12081a71..ab189e2f 100644 --- a/src/limestone/snapshot.cpp +++ b/src/limestone/snapshot.cpp @@ -14,18 +14,24 @@ * limitations under the License. */ #include - +#include #include #include +#include "compaction_catalog.h" #include "logging_helper.h" +#include "snapshot_impl.h" namespace limestone::api { // FIXME fill implementation -snapshot::snapshot(const boost::filesystem::path& location) noexcept : dir_(location / boost::filesystem::path(std::string(subdirectory_name_))) { -} +using limestone::internal::snapshot_impl; + +snapshot::snapshot(boost::filesystem::path location, + std::map clear_storage) noexcept + : pimpl(std::make_unique(std::move(location), std::move(clear_storage))) { +} std::unique_ptr snapshot::get_cursor() const { - return std::unique_ptr(new cursor(file_path())); + return pimpl->get_cursor(); } std::unique_ptr snapshot::find([[maybe_unused]] storage_id_type storage_id, [[maybe_unused]] std::string_view entry_key) const noexcept { @@ -38,8 +44,4 @@ std::unique_ptr snapshot::scan([[maybe_unused]] storage_id_type storage_ std::abort(); // FIXME should implement } -boost::filesystem::path snapshot::file_path() const noexcept { - return dir_ / boost::filesystem::path(std::string(file_name_)); -} - } // namespace limestone::api diff --git a/src/limestone/snapshot_impl.cpp b/src/limestone/snapshot_impl.cpp new file mode 100644 index 00000000..faf63ad6 --- /dev/null +++ b/src/limestone/snapshot_impl.cpp @@ -0,0 +1,43 @@ +/* + * Copyright 2022-2023 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include "snapshot_impl.h" +#include +#include +#include "compaction_catalog.h" +#include "logging_helper.h" +#include "cursor_impl.h" +#include + +namespace limestone::internal { + +snapshot_impl::snapshot_impl(boost::filesystem::path location, + std::map clear_storage) noexcept + : location_(std::move(location)), clear_storage(std::move(clear_storage)) { +} + +std::unique_ptr snapshot_impl::get_cursor() const { + boost::filesystem::path compacted_file = location_ / limestone::internal::compaction_catalog::get_compacted_filename(); + boost::filesystem::path snapshot_file = location_ / std::string(snapshot::subdirectory_name_) / std::string(snapshot::file_name_); + + if (boost::filesystem::exists(compacted_file)) { + return cursor_impl::create_cursor(snapshot_file, compacted_file, clear_storage); + } + return cursor_impl::create_cursor(snapshot_file, clear_storage); +} + + +} // namespace limestone::internal diff --git a/src/limestone/snapshot_impl.h b/src/limestone/snapshot_impl.h new file mode 100644 index 00000000..c3c1e948 --- /dev/null +++ b/src/limestone/snapshot_impl.h @@ -0,0 +1,42 @@ +/* + * Copyright 2022-2023 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace limestone::internal { + +using limestone::api::cursor; +using limestone::api::storage_id_type; +using limestone::api::write_version_type; + +class snapshot_impl { +public: + explicit snapshot_impl(boost::filesystem::path location, std::map clear_storage) noexcept; + [[nodiscard]] std::unique_ptr get_cursor() const; + +private: + boost::filesystem::path location_; + std::map clear_storage; +}; + +} // namespace limestone::internal \ No newline at end of file diff --git a/src/limestone/sortdb_wrapper.h b/src/limestone/sortdb_wrapper.h index 99ed3db9..3c4243dc 100644 --- a/src/limestone/sortdb_wrapper.h +++ b/src/limestone/sortdb_wrapper.h @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - +#pragma once #include #ifdef SORT_METHOD_USE_ROCKSDB diff --git a/src/limestone/sorting_context.cpp b/src/limestone/sorting_context.cpp new file mode 100644 index 00000000..ad4dc36e --- /dev/null +++ b/src/limestone/sorting_context.cpp @@ -0,0 +1,52 @@ +/* + * Copyright 2022-2024 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include "sorting_context.h" +#include + +namespace limestone::internal { + +sorting_context::sorting_context(sorting_context&& obj) noexcept : sortdb(std::move(obj.sortdb)) { + std::unique_lock lk{obj.mtx_clear_storage}; + clear_storage = std::move(obj.clear_storage); // NOLINT(*-prefer-member-initializer): need lock +} + +sorting_context::sorting_context(std::unique_ptr&& s) noexcept : sortdb(std::move(s)) { +} + +sortdb_wrapper* sorting_context::get_sortdb() { + return sortdb.get(); +} + +void sorting_context::clear_storage_update(storage_id_type sid, write_version_type wv) { + std::unique_lock lk{mtx_clear_storage}; + if (auto [it, inserted] = clear_storage.emplace(sid, wv); + !inserted) { + it->second = std::max(it->second, wv); + } +} + +std::optional sorting_context::clear_storage_find(storage_id_type sid) { + auto itr = clear_storage.find(sid); + if (itr == clear_storage.end()) return {}; + return {itr->second}; +} + +std::map sorting_context::get_clear_storage() const { + return clear_storage; +} + +} // namespace limestone::internal \ No newline at end of file diff --git a/src/limestone/sorting_context.h b/src/limestone/sorting_context.h new file mode 100644 index 00000000..6634c7c3 --- /dev/null +++ b/src/limestone/sorting_context.h @@ -0,0 +1,55 @@ +/* + * Copyright 2022-2024 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once +#include +#include +#include +#include +#include "sortdb_wrapper.h" +#include +#include + +namespace limestone::internal { + +using api::sortdb_wrapper; +using api::storage_id_type; +using api::write_version_type; + +class sorting_context { +public: + sorting_context(sorting_context&& obj) noexcept; + sorting_context(const sorting_context&) = delete; + sorting_context& operator=(const sorting_context&) = delete; + sorting_context& operator=(sorting_context&&) = delete; + sorting_context() = default; + ~sorting_context() = default; + explicit sorting_context(std::unique_ptr&& s) noexcept; + + // public getter + sortdb_wrapper* get_sortdb(); + + // clear_storage methods + [[nodiscard]] std::map get_clear_storage() const; + std::optional clear_storage_find(storage_id_type sid); + void clear_storage_update(storage_id_type sid, write_version_type wv); + +private: + std::unique_ptr sortdb; + std::mutex mtx_clear_storage; + std::map clear_storage; +}; + +} // namespace limestone::internal \ No newline at end of file diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index d9441c0c..0555ec06 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -34,6 +34,7 @@ file(GLOB SRCS "limestone/epoch/*.cpp" "limestone/utils/*.cpp" "limestone/compaction/*.cpp" + "limestone/snapshot/*.cpp" ) foreach(file ${SRCS}) diff --git a/test/limestone/compaction/assemble_snapshot_input_filenames_test.cpp b/test/limestone/compaction/assemble_snapshot_input_filenames_test.cpp new file mode 100644 index 00000000..dac2b0bc --- /dev/null +++ b/test/limestone/compaction/assemble_snapshot_input_filenames_test.cpp @@ -0,0 +1,165 @@ +/* + * Copyright 2022-2024 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include "compaction_catalog.h" +#include "limestone/api/epoch_id_type.h" +#include "limestone/api/limestone_exception.h" + +namespace limestone::internal { +// Forward declare the target function for testing +std::set assemble_snapshot_input_filenames( + const std::unique_ptr& compaction_catalog, + const boost::filesystem::path& location); +std::set assemble_snapshot_input_filenames( + const std::unique_ptr& compaction_catalog, + const boost::filesystem::path& location, + file_operations& file_ops); +} + +namespace limestone::testing { + +using limestone::internal::compacted_file_info; +using limestone::internal::compaction_catalog; +using limestone::api::limestone_io_exception; + +class mock_file_operations : public limestone::internal::real_file_operations { +public: + void directory_iterator_next(boost::filesystem::directory_iterator& it, boost::system::error_code& ec) { + ec = boost::system::errc::make_error_code(boost::system::errc::permission_denied); + }; +}; + +class assemble_snapshot_input_filenames_test : public ::testing::Test { +protected: + void SetUp() override { + // Set up temporary log directory for the test + if (system("rm -rf /tmp/assemble_snapshot_input_filenames_test") != 0) { + std::cerr << "Cannot remove directory" << std::endl; + } + if (system("mkdir -p /tmp/assemble_snapshot_input_filenames_test") != 0) { + std::cerr << "Cannot create directory" << std::endl; + } + + // Initialize the compaction catalog with a valid log directory path + log_location_ = "/tmp/assemble_snapshot_input_filenames_test"; + compaction_catalog_ = std::make_unique(log_location_); + } + + void TearDown() override { + // Clean up temporary log directory after the test + if (system("rm -rf /tmp/assemble_snapshot_input_filenames_test") != 0) { + std::cerr << "Cannot remove directory" << std::endl; + } + } + + void clear_compaction_catalog() { + compacted_files_.clear(); + detached_pwals_.clear(); + } + + void add_detached_pwals(const std::initializer_list& pwals) { + detached_pwals_.insert(pwals.begin(), pwals.end()); + compaction_catalog_->update_catalog_file(0, compacted_files_, detached_pwals_); + } + + std::unique_ptr compaction_catalog_; + boost::filesystem::path log_location_; + + std::set compacted_files_; + std::set detached_pwals_; +}; + + TEST_F(assemble_snapshot_input_filenames_test, retrieves_filenames_correctly) { + // Prepare some files in the log location directory + std::ofstream(log_location_ / "pwal_0001"); + std::ofstream(log_location_ / "pwal_0002"); + std::ofstream(log_location_ / "pwal_0003"); + std::ofstream(log_location_ / "pwal_0004"); + + // Simulate detached PWALs in the compaction catalog + add_detached_pwals({"pwal_0001", "pwal_0002"}); + + // Get the filenames that should be used for the snapshot + std::set filenames; + filenames = assemble_snapshot_input_filenames(compaction_catalog_, log_location_); + + // Ensure the correct files are retrieved + EXPECT_EQ(filenames.size(), 2); + EXPECT_NE(filenames.find("pwal_0003"), filenames.end()); + EXPECT_NE(filenames.find("pwal_0004"), filenames.end()); + + // Ensure the detached PWALs are not included + std::ofstream(log_location_ / compaction_catalog::get_compacted_filename()); + filenames = assemble_snapshot_input_filenames(compaction_catalog_, log_location_); + EXPECT_EQ(filenames.size(), 2); + EXPECT_NE(filenames.find("pwal_0003"), filenames.end()); + EXPECT_NE(filenames.find("pwal_0004"), filenames.end()); + } + + TEST_F(assemble_snapshot_input_filenames_test, throws_exception_when_directory_does_not_exist) { + // Set a non-existent directory + boost::filesystem::path non_existent_directory = "/tmp/non_existent_directory"; + + // Check if an exception is thrown + try { + auto filenames = assemble_snapshot_input_filenames(compaction_catalog_, non_existent_directory); + FAIL() << "Expected an exception to be thrown"; + } catch (const limestone_io_exception& e) { + // Check if the exception message contains the expected content + std::string expected_message = "Failed to initialize directory iterator, path:"; + std::string actual_message = e.what(); + std::cout << "Caught expected exception: " << actual_message << std::endl; + EXPECT_TRUE(actual_message.find(expected_message) != std::string::npos) << "Expected exception message to contain: " << expected_message; + } catch (...) { + FAIL() << "Expected a limestone_io_exception, but got a different exception"; + } + } + + TEST_F(assemble_snapshot_input_filenames_test, throws_exception_when_directory_iterator_increment) { + // Prepare some files in the log location directory + std::ofstream(log_location_ / "pwal_0001"); + std::ofstream(log_location_ / "pwal_0002"); + std::ofstream(log_location_ / "pwal_0003"); + std::ofstream(log_location_ / "pwal_0004"); + + // Check if an exception is thrown + try { + mock_file_operations file_ops; + auto filenames = assemble_snapshot_input_filenames(compaction_catalog_,log_location_, file_ops); + FAIL() << "Expected an exception to be thrown"; + } catch (const limestone_io_exception& e) { + // Check if the exception message contains the expected content + std::string expected_message = "Failed to access directory entry, path:"; + std::string actual_message = e.what(); + std::cout << "Caught expected exception: " << actual_message << std::endl; + EXPECT_TRUE(actual_message.find(expected_message) != std::string::npos) << "Expected exception message to contain: " << expected_message; + } catch (...) { + FAIL() << "Expected a limestone_io_exception, but got a different exception"; + } + } + + TEST_F(assemble_snapshot_input_filenames_test, handles_empty_directory) { + // No files are created in the directory + auto filenames = assemble_snapshot_input_filenames(compaction_catalog_, log_location_); + + // Ensure that no files are retrieved + EXPECT_TRUE(filenames.empty()); + } + +} // namespace limestone::testing diff --git a/test/limestone/compaction/online_compaction_test.cpp b/test/limestone/compaction/compaction_test.cpp similarity index 62% rename from test/limestone/compaction/online_compaction_test.cpp rename to test/limestone/compaction/compaction_test.cpp index 5dd0d46e..1a74b8de 100644 --- a/test/limestone/compaction/online_compaction_test.cpp +++ b/test/limestone/compaction/compaction_test.cpp @@ -42,9 +42,9 @@ extern std::string data_manifest(int persistent_format_version = 1); extern const std::string_view data_normal; extern const std::string_view data_nondurable; -class online_compaction_test : public ::testing::Test { +class compaction_test : public ::testing::Test { public: - static constexpr const char* location = "/tmp/online_compaction_test"; + static constexpr const char* location = "/tmp/compaction_test"; const boost::filesystem::path manifest_path = boost::filesystem::path(location) / std::string(limestone::internal::manifest_file_name); const boost::filesystem::path compaction_catalog_path = boost::filesystem::path(location) / "compaction_catalog"; const std::string compacted_filename = compaction_catalog::get_compacted_filename(); @@ -149,7 +149,105 @@ class online_compaction_test : public ::testing::Test { } return kv_list; } - + + void print_log_entry(const log_entry& entry) { + std::string key; + storage_id_type storage_id = entry.storage(); + log_entry::entry_type type = entry.type(); + + if (type == log_entry::entry_type::normal_entry || type == log_entry::entry_type::remove_entry) { + entry.key(key); + } + + switch (type) { + case log_entry::entry_type::normal_entry: { + std::string value; + entry.value(value); + std::cout << "Entry Type: normal_entry, Storage ID: " << storage_id + << ", Key: " << key << ", Value: " << value + << ", Write Version: Epoch: " << log_entry::write_version_epoch_number(entry.value_etc()) + << ", Minor: " << log_entry::write_version_minor_write_version(entry.value_etc()) + << std::endl; + break; + } + case log_entry::entry_type::remove_entry: { + std::cout << "Entry Type: remove_entry, Storage ID: " << storage_id << ", Key: " << key + << ", Write Version: Epoch: " << log_entry::write_version_epoch_number(entry.value_etc()) + << ", Minor: " << log_entry::write_version_minor_write_version(entry.value_etc()) + << std::endl; + break; + } + case log_entry::entry_type::clear_storage: + case log_entry::entry_type::add_storage: + case log_entry::entry_type::remove_storage: { + write_version_type write_version; + entry.write_version(write_version); + std::cout << "Entry Type: " << static_cast(type) << ", Storage ID: " << storage_id + << ", Write Version: Epoch: " << log_entry::write_version_epoch_number(entry.value_etc()) + << ", Minor: " << log_entry::write_version_minor_write_version(entry.value_etc()) + << std::endl; + break; + } + case log_entry::entry_type::marker_begin: + std::cout << "Entry Type: marker_begin, Epoch ID: " << entry.epoch_id() << std::endl; + break; + case log_entry::entry_type::marker_end: + std::cout << "Entry Type: marker_end, Epoch ID: " << entry.epoch_id() << std::endl; + break; + case log_entry::entry_type::marker_durable: + std::cout << "Entry Type: marker_durable, Epoch ID: " << entry.epoch_id() << std::endl; + break; + case log_entry::entry_type::marker_invalidated_begin: + std::cout << "Entry Type: marker_invalidated_begin, Epoch ID: " << entry.epoch_id() << std::endl; + break; + default: + std::cout << "Entry Type: unknown" << std::endl; + break; + } + } + + + + + /** + * @brief Reads a specified log file (PWAL, compacted_file, snapshot) and returns a list of log entries. + * @param log_file The path to the log file to be scanned. + * @return A vector of log_entry objects read from the log file. + */ + std::vector read_log_file(const std::string& log_file, const boost::filesystem::path& log_dir) { + boost::filesystem::path log_path = log_dir / log_file; + + std::vector log_entries; + limestone::internal::dblog_scan::parse_error pe; + + // Define a lambda function to capture and store log entries + auto add_entry = [&](log_entry& e) { log_entries.push_back(e); }; + + // Error reporting function, returning bool as expected by error_report_func_t + auto report_error = [](log_entry::read_error& error) -> bool { + std::cerr << "Error during log file scan: " << error.message() << std::endl; + return false; // Return false to indicate an error occurred + }; + + // Initialize a dblog_scan instance with the log directory + limestone::internal::dblog_scan scanner(log_dir); + + // Scan the specified log file + epoch_id_type max_epoch = scanner.scan_one_pwal_file(log_path, UINT64_MAX, add_entry, report_error, pe); + + if (pe.value() != limestone::internal::dblog_scan::parse_error::ok) { + std::cerr << "Parse error occurred while reading the log file: " << log_path.string() << std::endl; + } + + // Iterate over the log entries and print relevant information + std::cout << std::endl << "Log entries read from " << log_path.string() << ":" << std::endl; + for (const auto& entry : log_entries) { + print_log_entry(entry); + } + + return log_entries; + } + ::testing::AssertionResult ContainsPrefix(const char* files_expr, const char* prefix_expr, const char* expected_count_expr, const std::set& files, const std::string& prefix, int expected_count) { int match_count = 0; @@ -224,9 +322,70 @@ class online_compaction_test : public ::testing::Test { return pwal_file_names; } + + ::testing::AssertionResult AssertLogEntry(const log_entry& entry, const std::optional& expected_storage_id, + const std::optional& expected_key, const std::optional& expected_value, + const std::optional& expected_epoch_number, const std::optional& expected_minor_version, + log_entry::entry_type expected_type) { + // Check the entry type + if (entry.type() != expected_type) { + return ::testing::AssertionFailure() + << "Expected entry type: " << static_cast(expected_type) + << ", but got: " << static_cast(entry.type()); + } + + // Check the storage ID if it exists + if (expected_storage_id.has_value()) { + if (entry.storage() != expected_storage_id.value()) { + return ::testing::AssertionFailure() + << "Expected storage ID: " << expected_storage_id.value() + << ", but got: " << entry.storage(); + } + } + + // Check the key if it exists + if (expected_key.has_value()) { + std::string actual_key; + entry.key(actual_key); + if (actual_key != expected_key.value()) { + return ::testing::AssertionFailure() + << "Expected key: " << expected_key.value() + << ", but got: " << actual_key; + } + } + + // Check the value if it exists + if (expected_value.has_value()) { + std::string actual_value; + entry.value(actual_value); + if (actual_value != expected_value.value()) { + return ::testing::AssertionFailure() + << "Expected value: " << expected_value.value() + << ", but got: " << actual_value; + } + } + + // Check the write version if it exists + if (expected_epoch_number.has_value() && expected_minor_version.has_value()) { + epoch_id_type actual_epoch_number = log_entry::write_version_epoch_number(entry.value_etc()); + std::uint64_t actual_minor_version = log_entry::write_version_minor_write_version(entry.value_etc()); + + if (actual_epoch_number != expected_epoch_number.value() || + actual_minor_version != expected_minor_version.value()) { + return ::testing::AssertionFailure() + << "Expected write version (epoch_number: " << expected_epoch_number.value() + << ", minor_write_version: " << expected_minor_version.value() + << "), but got (epoch_number: " << actual_epoch_number + << ", minor_write_version: " << actual_minor_version << ")"; + } + } + + // If all checks pass, return success + return ::testing::AssertionSuccess(); + } }; -TEST_F(online_compaction_test, no_pwals) { +TEST_F(compaction_test, no_pwals) { gen_datastore(); auto pwals = extract_pwal_files_from_datastore(); EXPECT_TRUE(pwals.empty()); @@ -248,7 +407,7 @@ TEST_F(online_compaction_test, no_pwals) { EXPECT_TRUE(pwals.empty()); } -TEST_F(online_compaction_test, scenario01) { +TEST_F(compaction_test, scenario01) { gen_datastore(); datastore_->switch_epoch(1); auto pwals = extract_pwal_files_from_datastore(); @@ -598,7 +757,7 @@ TEST_F(online_compaction_test, scenario01) { // is not restarted, the timing of when the set of PWAL files maintained // by the datastore is updated differs from scenario01, and therefore the // test expectations have been changed. -TEST_F(online_compaction_test, scenario02) { +TEST_F(compaction_test, scenario02) { gen_datastore(); datastore_->switch_epoch(1); auto pwals = extract_pwal_files_from_datastore(); @@ -826,8 +985,357 @@ TEST_F(online_compaction_test, scenario02) { ASSERT_PRED_FORMAT3(ContainsPrefix, pwals, "pwal_0002.", 1); } +// This test case verifies the correct behavior of `remove_entry`. +TEST_F(compaction_test, scenario03) { + FLAGS_v = 50; // set VLOG level to 50 + + // 1. Create multiple PWALs using two different storage IDs + gen_datastore(); + datastore_->switch_epoch(1); + + // Storage ID 1: key1 added, then removed + lc0_->begin_session(); + lc0_->add_entry(1, "key1", "value1", {1, 0}); + lc0_->remove_entry(1, "key1", {1, 1}); // use remove_entry + lc0_->end_session(); + + // Storage ID 2: key2 added, no removal + lc1_->begin_session(); + lc1_->add_entry(2, "key2", "value2", {1, 0}); + lc1_->end_session(); + + // Storage ID 1: key3 removed first, then added + lc2_->begin_session(); + lc2_->remove_entry(1, "key3", {1, 0}); + lc2_->add_entry(1, "key3", "value3", {1, 3}); + lc2_->end_session(); + + // Storeage ID 1: key4 deleted witout adding + lc0_->begin_session(); + lc0_->remove_entry(1, "key4", {1, 0}); + lc0_->end_session(); + + datastore_->switch_epoch(2); + + // Check the created PWAL files + auto pwals = extract_pwal_files_from_datastore(); + EXPECT_EQ(pwals.size(), 3); + ASSERT_PRED_FORMAT2(ContainsString, pwals, "pwal_0000"); + ASSERT_PRED_FORMAT2(ContainsString, pwals, "pwal_0001"); + ASSERT_PRED_FORMAT2(ContainsString, pwals, "pwal_0002"); + + auto log_entries = read_log_file("pwal_0000", location); + ASSERT_EQ(log_entries.size(), 3); // Ensure that there are log entries + EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key1", "value1", 1, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[1], 1, "key1", std::nullopt, 1, 1, log_entry::entry_type::remove_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[2], 1, "key4", std::nullopt, 1, 0, log_entry::entry_type::remove_entry)); + log_entries = read_log_file("pwal_0001", location); + ASSERT_EQ(log_entries.size(), 1); // Ensure that there are log entries + EXPECT_TRUE(AssertLogEntry(log_entries[0], 2, "key2", "value2", 1, 0, log_entry::entry_type::normal_entry)); + log_entries = read_log_file("pwal_0002", location); + ASSERT_EQ(log_entries.size(), 2); // Ensure that there are log entries + EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key3", std::nullopt, 1, 0,log_entry ::entry_type::remove_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[1], 1, "key3", "value3", 1, 3, log_entry::entry_type::normal_entry)); + + // 2. Execute compaction + run_compact_with_epoch_switch(2); + + // Check the catalog and PWALs after compaction + compaction_catalog catalog = compaction_catalog::from_catalog_file(location); + EXPECT_EQ(catalog.get_max_epoch_id(), 1); + EXPECT_EQ(catalog.get_compacted_files().size(), 1); + EXPECT_EQ(catalog.get_detached_pwals().size(), 3); + + pwals = extract_pwal_files_from_datastore(); + EXPECT_EQ(pwals.size(), 4); // Includes the compacted file + ASSERT_PRED_FORMAT2(ContainsString, pwals, "pwal_0000.compacted"); + + log_entries = read_log_file("pwal_0000.compacted", location); + ASSERT_EQ(log_entries.size(), 2); // Ensure that there are log entries + EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key3", "value3", 0, 0, log_entry::entry_type::normal_entry)); // write version changed to 0 + EXPECT_TRUE(AssertLogEntry(log_entries[1], 2, "key2", "value2", 0, 0, log_entry::entry_type::normal_entry)); // write version changed to 0 + + // 3. Add/Update PWALs (include remove_entry again) + + // Storage ID 1: key1 added, then removed + lc0_->begin_session(); + lc0_->add_entry(1, "key11", "value1", {2, 0}); + lc0_->remove_entry(1, "key11", {2, 1}); // use remove_entry + lc0_->end_session(); + + // Storage ID 2: key2 added, no removal + lc1_->begin_session(); + lc1_->add_entry(2, "key21", "value2", {2, 0}); + lc1_->end_session(); + + // Storage ID 1: key3 removed first, then added + lc2_->begin_session(); + lc2_->remove_entry(1, "key31", {2, 0}); + lc2_->add_entry(1, "key31", "value3", {2, 3}); + lc2_->end_session(); + + // Storeage ID 1: key4 deleted witout adding + lc0_->begin_session(); + lc0_->remove_entry(1, "key41", {2, 0}); + lc0_->end_session(); + + datastore_->switch_epoch(3); + pwals = extract_pwal_files_from_datastore(); + + // Check the created PWAL files + pwals = extract_pwal_files_from_datastore(); + EXPECT_EQ(pwals.size(), 7); // 3 new pwals and 3 rotaed pwals and 1 compacted file + ASSERT_PRED_FORMAT2(ContainsString, pwals, "pwal_0000"); + ASSERT_PRED_FORMAT2(ContainsString, pwals, "pwal_0001"); + ASSERT_PRED_FORMAT2(ContainsString, pwals, "pwal_0002"); + + log_entries = read_log_file("pwal_0000", location); + ASSERT_EQ(log_entries.size(), 3); // Ensure that there are log entries + EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key11", "value1", 2, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[1], 1, "key11", std::nullopt, 2, 1, log_entry::entry_type::remove_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[2], 1, "key41", std::nullopt, 2, 0, log_entry::entry_type::remove_entry)); + log_entries = read_log_file("pwal_0001", location); + ASSERT_EQ(log_entries.size(), 1); // Ensure that there are log entries + EXPECT_TRUE(AssertLogEntry(log_entries[0], 2, "key21", "value2", 2, 0, log_entry::entry_type::normal_entry)); + log_entries = read_log_file("pwal_0002", location); + ASSERT_EQ(log_entries.size(), 2); // Ensure that there are log entries + EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key31", std::nullopt, 2, 0, log_entry::entry_type::remove_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[1], 1, "key31", "value3", 2, 3, log_entry::entry_type::normal_entry)); + + // 4. Restart the datastore + datastore_->shutdown(); + datastore_ = nullptr; + gen_datastore(); // Regenerate datastore after restart + + // 5. check the compacted file and snapshot creating at the boot time + log_entries = read_log_file("pwal_0000.compacted", location); + ASSERT_EQ(log_entries.size(), 2); // Ensure that there are log entries + EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key3", "value3", 0, 0, log_entry::entry_type::normal_entry)); // write version changed to 0 + EXPECT_TRUE(AssertLogEntry(log_entries[1], 2, "key2", "value2", 0, 0, log_entry::entry_type::normal_entry)); // write version changed to 0 + + log_entries = read_log_file("data/snapshot", location); + ASSERT_EQ(log_entries.size(), 4); // Ensure that there are log entries + EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key11", std::nullopt, 2, 1, log_entry::entry_type::remove_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[1], 1, "key31", "value3", 2, 3, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[2], 1, "key41", std::nullopt, 2, 0, log_entry::entry_type::remove_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[3], 2, "key21", "value2", 2, 0, log_entry::entry_type::normal_entry)); + + // 5. Verify the snapshot contents after restart + std::vector> kv_list = restart_datastore_and_read_snapshot(); + + // key1 should exist with its updated value, key2 and key3 should be removed + ASSERT_EQ(kv_list.size(), 4); + EXPECT_EQ(kv_list[0].first, "key3"); + EXPECT_EQ(kv_list[0].second, "value3"); + EXPECT_EQ(kv_list[1].first, "key31"); + EXPECT_EQ(kv_list[1].second, "value3"); + EXPECT_EQ(kv_list[2].first, "key2"); + EXPECT_EQ(kv_list[2].second, "value2"); + EXPECT_EQ(kv_list[3].first, "key21"); + EXPECT_EQ(kv_list[3].second, "value2"); +} + +// This test case verifies the correct behavior of `remove_storage`. +// This test case verifies the correct behavior of `remove_storage`. +TEST_F(compaction_test, scenario04) { + FLAGS_v = 50; // set VLOG level to 50 + + gen_datastore(); + datastore_->switch_epoch(1); + + // Storage ID 1: Add normal entries + lc0_->begin_session(); + lc0_->add_entry(1, "key1", "value1", {1, 0}); + lc0_->add_entry(1, "key2", "value2", {1, 1}); + lc0_->end_session(); + + // Storage ID 2: Add normal entries + lc1_->begin_session(); + lc1_->add_entry(2, "key3", "value3", {1, 0}); + lc1_->add_entry(2, "key4", "value4", {1, 1}); + lc1_->end_session(); + + // Storage ID 1: Add more normal entries + lc2_->begin_session(); + lc2_->add_entry(1, "key5", "value5", {1, 2}); + lc2_->add_entry(1, "key6", "value6", {1, 3}); + lc2_->end_session(); + + // Advance the epoch to 2 + datastore_->switch_epoch(2); + + // Remove storage for Storage ID 2 + lc1_->begin_session(); + lc1_->remove_storage(2, {2, 0}); // Removes the storage with ID 2 + lc1_->end_session(); + + // Advance the epoch to 3 + datastore_->switch_epoch(3); + + // Add an entry to Storage ID 1 + lc0_->begin_session(); + lc0_->add_entry(1, "key7", "value7", {3, 0}); + lc0_->end_session(); + + // Add an entry to Storage ID 2 + lc1_->begin_session(); + lc1_->add_entry(2, "key8", "value8", {3, 0}); + lc1_->end_session(); + + // Chek PWALs before compaction + auto pwals = extract_pwal_files_from_datastore(); + EXPECT_EQ(pwals.size(), 3); + + std::vector log_entries = read_log_file("pwal_0000", location); + ASSERT_EQ(log_entries.size(), 3); + EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key1", "value1", 1, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[1], 1, "key2", "value2", 1, 1, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[2], 1, "key7", "value7", 3, 0, log_entry::entry_type::normal_entry)); + + log_entries = read_log_file("pwal_0001", location); + ASSERT_EQ(log_entries.size(), 4); + EXPECT_TRUE(AssertLogEntry(log_entries[0], 2, "key3", "value3", 1, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[1], 2, "key4", "value4", 1, 1, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[2], 2, "", "", 2, 0, log_entry::entry_type::remove_storage)); + EXPECT_TRUE(AssertLogEntry(log_entries[3], 2, "key8", "value8", 3, 0, log_entry::entry_type::normal_entry)); + + log_entries = read_log_file("pwal_0002", location); + ASSERT_EQ(log_entries.size(), 2); + EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key5", "value5", 1, 2, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[1], 1, "key6", "value6", 1, 3, log_entry::entry_type::normal_entry)); + + // online compaction + run_compact_with_epoch_switch(4); + + // Check PWALs after compaction + pwals = extract_pwal_files_from_datastore(); + EXPECT_EQ(pwals.size(), 4); + ASSERT_PRED_FORMAT2(ContainsString, pwals, "pwal_0000.compacted"); + ASSERT_PRED_FORMAT3(ContainsPrefix, pwals, "pwal_0000.", 2); + ASSERT_PRED_FORMAT3(ContainsPrefix, pwals, "pwal_0001.", 1); + ASSERT_PRED_FORMAT3(ContainsPrefix, pwals, "pwal_0002.", 1); + + log_entries = read_log_file("pwal_0000.compacted", location); + ASSERT_EQ(log_entries.size(), 6); + EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key1", "value1", 0, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[1], 1, "key2", "value2", 0, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[2], 1, "key5", "value5", 0, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[3], 1, "key6", "value6", 0, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[4], 1, "key7", "value7", 0, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[5], 2, "key8", "value8", 0, 0, log_entry::entry_type::normal_entry)); + + + // Storage ID 1: Add normal entries + lc0_->begin_session(); + lc0_->add_entry(1, "key11", "value1", {4, 0}); + lc0_->add_entry(1, "key12", "value2", {4, 1}); + lc0_->end_session(); + + // Storage ID 2: Add normal entries + lc1_->begin_session(); + lc1_->add_entry(2, "key13", "value3", {4, 0}); + lc1_->add_entry(2, "key14", "value4", {4, 1}); + lc1_->end_session(); + + // Storage ID 1: Add more normal entries + lc2_->begin_session(); + lc2_->add_entry(1, "key15", "value5", {4, 2}); + lc2_->add_entry(1, "key16", "value6", {4, 3}); + lc2_->end_session(); + + // Advance the epoch to 5 + datastore_->switch_epoch(5); + + // Remove storage for Storage ID 1 + lc1_->begin_session(); + lc1_->remove_storage(1, {5, 0}); // Removes the storage with ID 2 + lc1_->end_session(); + + // Advance the epoch to 6 + datastore_->switch_epoch(6); + + // Add an entry to Storage ID 1 + lc0_->begin_session(); + lc0_->add_entry(1, "key17", "value7", {6, 0}); + lc0_->end_session(); + + // Add an entry to Storage ID 2 + lc1_->begin_session(); + lc1_->add_entry(2, "key18", "value8", {6, 0}); + lc1_->end_session(); + + // Advance the epoch to 6 + datastore_->switch_epoch(7); + + // Chek newly created PWALs + pwals = extract_pwal_files_from_datastore(); + EXPECT_EQ(pwals.size(), 7); + ASSERT_PRED_FORMAT2(ContainsString, pwals, "pwal_0000.compacted"); + ASSERT_PRED_FORMAT3(ContainsPrefix, pwals, "pwal_0000.", 2); + ASSERT_PRED_FORMAT3(ContainsPrefix, pwals, "pwal_0001.", 1); + ASSERT_PRED_FORMAT3(ContainsPrefix, pwals, "pwal_0002.", 1); + + log_entries = read_log_file("pwal_0000", location); + ASSERT_EQ(log_entries.size(), 3); + EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key11", "value1", 4, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[1], 1, "key12", "value2", 4, 1, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[2], 1, "key17", "value7", 6, 0, log_entry::entry_type::normal_entry)); + + log_entries = read_log_file("pwal_0001", location); + ASSERT_EQ(log_entries.size(), 4); + EXPECT_TRUE(AssertLogEntry(log_entries[0], 2, "key13", "value3", 4, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[1], 2, "key14", "value4", 4, 1, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[2], 1, "", "", 5, 0, log_entry::entry_type::remove_storage)); + EXPECT_TRUE(AssertLogEntry(log_entries[3], 2, "key18", "value8", 6, 0, log_entry::entry_type::normal_entry)); + + log_entries = read_log_file("pwal_0002", location); + ASSERT_EQ(log_entries.size(), 2); + EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key15", "value5", 4, 2, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[1], 1, "key16", "value6", 4, 3, log_entry::entry_type::normal_entry)); + + // Restart the datastore + + std::vector> kv_list = restart_datastore_and_read_snapshot(); + + // check the compacted file and snapshot creating at the boot time + log_entries = read_log_file("pwal_0000.compacted", location); + ASSERT_EQ(log_entries.size(), 6); + EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key1", "value1", 0, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[1], 1, "key2", "value2", 0, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[2], 1, "key5", "value5", 0, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[3], 1, "key6", "value6", 0, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[4], 1, "key7", "value7", 0, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[5], 2, "key8", "value8", 0, 0, log_entry::entry_type::normal_entry)); + + log_entries = read_log_file("data/snapshot", location); + ASSERT_EQ(log_entries.size(), 4); + EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key17", "value7", 6, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[1], 2, "key13", "value3", 4, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[2], 2, "key14", "value4", 4, 1, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[3], 2, "key18", "value8", 6, 0, log_entry::entry_type::normal_entry)); + + // 5. Verify the snapshot contents after restart + + // key1 should exist with its updated value, key2 and key3 should be removed + ASSERT_EQ(kv_list.size(), 5); + EXPECT_EQ(kv_list[0].first, "key17"); + EXPECT_EQ(kv_list[0].second, "value7"); + EXPECT_EQ(kv_list[1].first, "key13"); + EXPECT_EQ(kv_list[1].second, "value3"); + EXPECT_EQ(kv_list[2].first, "key14"); + EXPECT_EQ(kv_list[2].second, "value4"); + EXPECT_EQ(kv_list[3].first, "key18"); + EXPECT_EQ(kv_list[3].second, "value8"); + EXPECT_EQ(kv_list[4].first, "key8"); + EXPECT_EQ(kv_list[4].second, "value8"); +} + + + + + // This test is disabled because it is environment-dependent and may not work properly in CI environments. -TEST_F(online_compaction_test, DISABLED_fail_compact_with_io_error) { +TEST_F(compaction_test, DISABLED_fail_compact_with_io_error) { gen_datastore(); datastore_->switch_epoch(1); auto pwals = extract_pwal_files_from_datastore(); @@ -858,7 +1366,7 @@ TEST_F(online_compaction_test, DISABLED_fail_compact_with_io_error) { } -TEST_F(online_compaction_test, safe_rename_success) { +TEST_F(compaction_test, safe_rename_success) { boost::filesystem::path from = boost::filesystem::path(location) / "test_file.txt"; boost::filesystem::path to = boost::filesystem::path(location) / "renamed_file.txt"; @@ -873,14 +1381,14 @@ TEST_F(online_compaction_test, safe_rename_success) { boost::filesystem::remove(to); } -TEST_F(online_compaction_test, safe_rename_throws_exception) { +TEST_F(compaction_test, safe_rename_throws_exception) { boost::filesystem::path from = boost::filesystem::path(location) / "non_existent_file.txt"; boost::filesystem::path to = boost::filesystem::path(location) / "renamed_file.txt"; ASSERT_THROW(safe_rename(from, to), std::runtime_error); } -TEST_F(online_compaction_test, select_files_for_compaction) { +TEST_F(compaction_test, select_files_for_compaction) { std::set rotation_end_files = { boost::filesystem::path(location) / "pwal_0001.0123456", boost::filesystem::path(location) / "pwal_0002.0123456", @@ -895,21 +1403,21 @@ TEST_F(online_compaction_test, select_files_for_compaction) { } -TEST_F(online_compaction_test, ensure_directory_exists_directory_exists) { +TEST_F(compaction_test, ensure_directory_exists_directory_exists) { boost::filesystem::path dir = boost::filesystem::path(location) / "test_dir"; boost::filesystem::create_directory(dir); ASSERT_NO_THROW(ensure_directory_exists(dir)); } -TEST_F(online_compaction_test, ensure_directory_exists_directory_created) { +TEST_F(compaction_test, ensure_directory_exists_directory_created) { boost::filesystem::path dir = boost::filesystem::path(location) / "test_dir"; ASSERT_NO_THROW(ensure_directory_exists(dir)); ASSERT_TRUE(boost::filesystem::exists(dir)); } -TEST_F(online_compaction_test, ensure_directory_exists_throws_exception) { +TEST_F(compaction_test, ensure_directory_exists_throws_exception) { boost::filesystem::path file = boost::filesystem::path(location) / "test_file.txt"; boost::filesystem::ofstream ofs(file); @@ -918,19 +1426,19 @@ TEST_F(online_compaction_test, ensure_directory_exists_throws_exception) { ASSERT_THROW(ensure_directory_exists(file), std::runtime_error); } -TEST_F(online_compaction_test, ensure_directory_exists_parent_directory_missing) { +TEST_F(compaction_test, ensure_directory_exists_parent_directory_missing) { boost::filesystem::path dir = boost::filesystem::path(location) / "nonexistent_parent/test_dir"; ASSERT_THROW(ensure_directory_exists(dir), std::runtime_error); } -TEST_F(online_compaction_test, handle_existing_compacted_file_no_existing_files) { +TEST_F(compaction_test, handle_existing_compacted_file_no_existing_files) { boost::filesystem::path location_path = boost::filesystem::path(location); ASSERT_NO_THROW(handle_existing_compacted_file(location_path)); } -TEST_F(online_compaction_test, handle_existing_compacted_file_with_existing_file) { +TEST_F(compaction_test, handle_existing_compacted_file_with_existing_file) { boost::filesystem::path location_path = boost::filesystem::path(location); boost::filesystem::path compacted_file = location_path / "pwal_0000.compacted"; boost::filesystem::ofstream ofs(compacted_file); @@ -940,7 +1448,7 @@ TEST_F(online_compaction_test, handle_existing_compacted_file_with_existing_file ASSERT_TRUE(boost::filesystem::exists(location_path / "pwal_0000.compacted.prev")); } -TEST_F(online_compaction_test, handle_existing_compacted_file_throws_exception) { +TEST_F(compaction_test, handle_existing_compacted_file_throws_exception) { boost::filesystem::path location_path = boost::filesystem::path(location); boost::filesystem::path compacted_file = location_path / "pwal_0000.compacted"; boost::filesystem::path compacted_prev_file = location_path / "pwal_0000.compacted.prev"; @@ -952,7 +1460,7 @@ TEST_F(online_compaction_test, handle_existing_compacted_file_throws_exception) ASSERT_THROW(handle_existing_compacted_file(location_path), std::runtime_error); } -TEST_F(online_compaction_test, get_files_in_directory) { +TEST_F(compaction_test, get_files_in_directory) { boost::filesystem::path test_dir = boost::filesystem::path(location); boost::filesystem::path file1 = test_dir / "file1.txt"; boost::filesystem::path file2 = test_dir / "file2.txt"; @@ -967,12 +1475,12 @@ TEST_F(online_compaction_test, get_files_in_directory) { EXPECT_EQ(files, expected_files); } -TEST_F(online_compaction_test, get_files_in_directory_directory_not_exists) { +TEST_F(compaction_test, get_files_in_directory_directory_not_exists) { boost::filesystem::path non_existent_dir = boost::filesystem::path(location) / "non_existent_dir"; ASSERT_THROW(get_files_in_directory(non_existent_dir), std::runtime_error); } -TEST_F(online_compaction_test, get_files_in_directory_not_a_directory) { +TEST_F(compaction_test, get_files_in_directory_not_a_directory) { boost::filesystem::path file_path = boost::filesystem::path(location) / "test_file.txt"; boost::filesystem::ofstream ofs(file_path); ofs.close(); @@ -980,7 +1488,7 @@ TEST_F(online_compaction_test, get_files_in_directory_not_a_directory) { ASSERT_THROW(get_files_in_directory(file_path), std::runtime_error); } -TEST_F(online_compaction_test, get_files_in_directory_with_files) { +TEST_F(compaction_test, get_files_in_directory_with_files) { boost::filesystem::path test_dir = boost::filesystem::path(location) / "test_dir"; boost::filesystem::create_directory(test_dir); @@ -995,7 +1503,7 @@ TEST_F(online_compaction_test, get_files_in_directory_with_files) { EXPECT_EQ(files, expected_files); } -TEST_F(online_compaction_test, get_files_in_directory_empty_directory) { +TEST_F(compaction_test, get_files_in_directory_empty_directory) { boost::filesystem::path empty_dir = boost::filesystem::path(location) / "empty_test_dir"; boost::filesystem::create_directory(empty_dir); @@ -1004,7 +1512,7 @@ TEST_F(online_compaction_test, get_files_in_directory_empty_directory) { } -TEST_F(online_compaction_test, remove_file_safely_success) { +TEST_F(compaction_test, remove_file_safely_success) { boost::filesystem::path file = boost::filesystem::path(location) / "test_file_to_remove.txt"; { @@ -1017,14 +1525,14 @@ TEST_F(online_compaction_test, remove_file_safely_success) { ASSERT_FALSE(boost::filesystem::exists(file)); } -TEST_F(online_compaction_test, remove_file_safely_no_exception_for_nonexistent_file) { +TEST_F(compaction_test, remove_file_safely_no_exception_for_nonexistent_file) { boost::filesystem::path file = boost::filesystem::path(location) / "non_existent_file.txt"; ASSERT_NO_THROW(remove_file_safely(file)); } // This test is disabled because it is environment-dependent and may not work properly in CI environments. -TEST_F(online_compaction_test, DISABLED_remove_file_safely_fails_to_remove_file) { +TEST_F(compaction_test, DISABLED_remove_file_safely_fails_to_remove_file) { boost::filesystem::path test_dir = boost::filesystem::path(location); boost::filesystem::path file = test_dir / "protected_file.txt"; diff --git a/test/limestone/log/log_channel_test.cpp b/test/limestone/log/log_channel_test.cpp index 45813caa..d9d21d5a 100644 --- a/test/limestone/log/log_channel_test.cpp +++ b/test/limestone/log/log_channel_test.cpp @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include #include #include "internal.h" #include "test_root.h" diff --git a/test/limestone/snapshot/cursor_impl_test.cpp b/test/limestone/snapshot/cursor_impl_test.cpp new file mode 100644 index 00000000..06e652bd --- /dev/null +++ b/test/limestone/snapshot/cursor_impl_test.cpp @@ -0,0 +1,244 @@ +/* + * Copyright 2022-2024 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include "cursor_impl.h" + + +#include "test_root.h" +#include "limestone/api/log_channel.h" + +namespace limestone::testing { + +using limestone::api::log_channel; + +class cursor_impl_testable : public limestone::internal::cursor_impl { +public: + using cursor_impl::cursor_impl; + using cursor_impl::next; + using cursor_impl::validate_and_read_stream; + using cursor_impl::open; + using cursor_impl::close; + using cursor_impl::storage; + using cursor_impl::key; + using cursor_impl::value; + using cursor_impl::type; + + ~cursor_impl_testable() { + // Ensure that the close() method is called to release resources. + // Normally, resources would be released explicitly, but for this test, we ensure that close() + // is always called by invoking it in the destructor. This is important to avoid resource leaks + // in cases where the test does not explicitly call close(). + close(); + } +}; + +class entry_maker { +public: + entry_maker& init() { + entries_.clear(); + return *this; + } + + entry_maker& add_entry(limestone::api::storage_id_type storage_id, std::string key, std::string value, limestone::api::write_version_type write_version) { + entries_.emplace_back(storage_id, key, value, write_version); + return *this; + } + + std::vector> get_default_entries() { + return { + {1, "key1", "value1", {1, 0}}, + {1, "key2", "value2", {1, 1}} + }; + } + + const std::vector>& get_entries() const { + return entries_; + } + +private: + std::vector> entries_; +}; + +class cursor_impl_test : public ::testing::Test { +protected: + static constexpr const char* location = "/tmp/cursor_impl_test"; + std::unique_ptr datastore_; + log_channel* lc0_{}; + entry_maker entry_maker_; + + void SetUp() override { + if (boost::filesystem::exists(location)) { + boost::filesystem::permissions(location, boost::filesystem::owner_all); + } + boost::filesystem::remove_all(location); + if (!boost::filesystem::create_directory(location)) { + std::cerr << "cannot make directory" << std::endl; + } + gen_datastore(); + } + + void gen_datastore() { + std::vector data_locations{}; + data_locations.emplace_back(location); + boost::filesystem::path metadata_location{location}; + limestone::api::configuration conf(data_locations, metadata_location); + + datastore_ = std::make_unique(conf); + lc0_ = &datastore_->create_channel(location); + + datastore_->ready(); + } + + void TearDown() override { + datastore_ = nullptr; + if (boost::filesystem::exists(location)) { + boost::filesystem::permissions(location, boost::filesystem::owner_all); + } + boost::filesystem::remove_all(location); + } + + void create_log_file( + const std::string& new_filename, + const std::vector>& entries) { + + lc0_->begin_session(); + + for (const auto& entry : entries) { + lc0_->add_entry(std::get<0>(entry), std::get<1>(entry), std::get<2>(entry), std::get<3>(entry)); + } + + lc0_->end_session(); + + boost::filesystem::path pwal_file = boost::filesystem::path(location) / "pwal_0000"; + boost::filesystem::path new_file = boost::filesystem::path(location) / new_filename; + + if (boost::filesystem::exists(pwal_file)) { + boost::filesystem::rename(pwal_file, new_file); + } else { + std::cerr << "Error: pwal_0000 file not found for renaming." << std::endl; + } + } +}; + + + +// Test case 1: Only Snapshot exists +TEST_F(cursor_impl_test, snapshot_only) { + create_log_file("snapshot", entry_maker_.get_default_entries()); + boost::filesystem::path snapshot_file = boost::filesystem::path(location) / "snapshot"; + + cursor_impl_testable cursor(snapshot_file); + EXPECT_TRUE(cursor.next()) << "Should be able to read the snapshot"; +} + +// Test case 2: Both Snapshot and Compacted files exist +TEST_F(cursor_impl_test, snapshot_and_compacted) { + create_log_file("snapshot", entry_maker_.get_default_entries()); + create_log_file("compacted", entry_maker_.get_default_entries()); + + boost::filesystem::path snapshot_file = boost::filesystem::path(location) / "snapshot"; + boost::filesystem::path compacted_file = boost::filesystem::path(location) / "compacted"; + + cursor_impl_testable cursor(snapshot_file, compacted_file); + EXPECT_TRUE(cursor.next()) << "Should be able to read both snapshot and compacted files"; +} + +// Test case 3: Error cases +TEST_F(cursor_impl_test, error_case) { + // No files exist, should throw limestone_exception + boost::filesystem::path snapshot_file = boost::filesystem::path(location) / "not_existing_snapshot"; + EXPECT_THROW({ + cursor_impl_testable cursor{boost::filesystem::path(snapshot_file)}; + }, limestone::limestone_exception) << "No files should result in a limestone_exception being thrown"; + + // Expect the next() method to throw a limestone_exception + { + cursor_impl_testable cursor{boost::filesystem::path(location)}; + EXPECT_THROW({ + cursor.next(); + }, limestone::limestone_exception) << "No files should result in a limestone_exception being thrown"; + } + // invalid sort order + { + entry_maker_.init() + .add_entry(1, "key2", "value2", {1, 1}) + .add_entry(1, "key1", "value1", {1, 0}) + .add_entry(1, "key3", "value3", {1, 2}); + boost::filesystem::path snapshot_file = boost::filesystem::path(location) / "snapshot"; + create_log_file("snapshot", entry_maker_.get_entries()); + cursor_impl_testable cursor{boost::filesystem::path(snapshot_file)}; + EXPECT_THROW({ + while (cursor.next()); + }, limestone::limestone_exception) << "No files should result in a limestone_exception being thrown"; + } +} + +// Test Case 4: Verify the entry methods after reading from a snapshot file +TEST_F(cursor_impl_test, verify_entry_methods) { + // Create a snapshot file with default entries + create_log_file("snapshot", entry_maker_.get_default_entries()); + boost::filesystem::path snapshot_file = boost::filesystem::path(location) / "snapshot"; + + // Use cursor_impl_testable to read the file + cursor_impl_testable cursor(snapshot_file); + + // Verify the first entry + ASSERT_TRUE(cursor.next()) << "First entry should be read"; + + // Verify storage() method + EXPECT_EQ(cursor.storage(), 1) << "Storage ID should be 1"; + + // Verify key() method + std::string key; + cursor.key(key); + EXPECT_EQ(key, "key1") << "First key should be 'key1'"; + + // Verify value() method + std::string value; + cursor.value(value); + EXPECT_EQ(value, "value1") << "First value should be 'value1'"; + + // Verify type() method + EXPECT_EQ(cursor.type(), limestone::api::log_entry::entry_type::normal_entry) + << "First entry type should be normal_entry"; + + // Verify the second entry + ASSERT_TRUE(cursor.next()) << "Second entry should be read"; + + // Verify storage() method for the second entry + EXPECT_EQ(cursor.storage(), 1) << "Storage ID should be 1"; + + // Verify key() method for the second entry + cursor.key(key); + EXPECT_EQ(key, "key2") << "Second key should be 'key2'"; + + // Verify value() method for the second entry + cursor.value(value); + EXPECT_EQ(value, "value2") << "Second value should be 'value2'"; + + // Verify type() method for the second entry + EXPECT_EQ(cursor.type(), limestone::api::log_entry::entry_type::normal_entry) + << "Second entry type should be normal_entry"; + + // Verify that next() returns false when no more entries are available + EXPECT_FALSE(cursor.next()) << "No more entries should be available, next() should return false"; +} + + + +} // namespace limestone::testing