From d514252248daa2f1a35d7d17e9604437dd626b2a Mon Sep 17 00:00:00 2001 From: Shinichi Umegane Date: Thu, 10 Oct 2024 16:50:43 +0900 Subject: [PATCH] Refactor: Separate snapshot_tracker class into standalone files --- include/limestone/api/cursor.h | 9 +- src/limestone/cursor.cpp | 146 ++--------------------------- src/limestone/snapshot_tracker.cpp | 138 +++++++++++++++++++++++++++ src/limestone/snapshot_tracker.h | 62 ++++++++++++ 4 files changed, 214 insertions(+), 141 deletions(-) create mode 100644 src/limestone/snapshot_tracker.cpp create mode 100644 src/limestone/snapshot_tracker.h diff --git a/include/limestone/api/cursor.h b/include/limestone/api/cursor.h index 15e17795..4e3aefda 100644 --- a/include/limestone/api/cursor.h +++ b/include/limestone/api/cursor.h @@ -25,11 +25,16 @@ #include #include + +namespace limestone::internal { + class snapshot_tracker; +} + namespace limestone::api { class log_entry; class snapshot; -class snapshot_tracker; + /** * @brief a cursor to scan entries on the snapshot @@ -79,7 +84,7 @@ class cursor { std::vector& large_objects() noexcept; private: - std::unique_ptr log_entry_tracker_; + std::unique_ptr log_entry_tracker_; std::vector large_objects_{}; diff --git a/src/limestone/cursor.cpp b/src/limestone/cursor.cpp index 5fe7f766..ea45792b 100644 --- a/src/limestone/cursor.cpp +++ b/src/limestone/cursor.cpp @@ -20,163 +20,31 @@ #include "logging_helper.h" #include "limestone_exception_helper.h" #include "log_entry.h" -#include +#include "snapshot_tracker.h" -namespace limestone::api { - -// snapshot_tracker handles the retrieval and comparison of log entries from snapshot and compacted streams. -// It ensures that the entries are read in the correct order and manages the state of both streams. -class snapshot_tracker { - -private: - log_entry log_entry_; - std::optional snapshot_log_entry_; - std::optional compacted_log_entry_; - std::optional snapshot_istrm_; - std::optional compacted_istrm_; - std::string previous_snapshot_key_sid; - std::string previous_compacted_key_sid; - -public: - explicit snapshot_tracker(const boost::filesystem::path& snapshot_file) - : compacted_istrm_(std::nullopt) { - open(snapshot_file, snapshot_istrm_); - } - - explicit snapshot_tracker(const boost::filesystem::path& snapshot_file, const boost::filesystem::path& compacted_file) { - open(snapshot_file, snapshot_istrm_); - open(compacted_file, compacted_istrm_); - } - - void open(const boost::filesystem::path& file, std::optional& stream) { - stream.emplace(file, std::ios_base::in | std::ios_base::binary); - if (!stream->good()) { - LOG_AND_THROW_EXCEPTION("Failed to open file: " + file.string()); - } - } - - void close() { - if (snapshot_istrm_) snapshot_istrm_->close(); - if (compacted_istrm_) compacted_istrm_->close(); - } - - void validate_and_read_stream(std::optional& stream, const std::string& stream_name, std::optional& log_entry, - std::string& previous_key_sid) { - if (stream) { - if (!stream->good()) { - DVLOG_LP(log_trace) << stream_name << " stream is not good, closing it."; - stream->close(); - stream = std::nullopt; - } else if (stream->eof()) { - DVLOG_LP(log_trace) << stream_name << " stream reached EOF, closing it."; - stream->close(); - stream = std::nullopt; - } else if (!log_entry) { - log_entry.emplace(); // Construct a new log_entry - if (!log_entry->read(*stream)) { - stream->close(); - stream = std::nullopt; // If reading fails, reset log_entry - log_entry = std::nullopt; // Reset the log_entry as well - } else { - // Check if the key_sid is in ascending order - // TODO: Key order violation is detected here and the process is aborted. - // However, this check should be moved to an earlier point, and if the key order is invalid, - // a different processing method should be considered instead of aborting immediately. - - if (!previous_key_sid.empty() && log_entry->key_sid() < previous_key_sid) { - LOG(ERROR) << "Key order violation in " << stream_name << ": current key_sid (" << log_entry->key_sid() - << ") is smaller than the previous key_sid (" << previous_key_sid << ")"; - THROW_LIMESTONE_EXCEPTION("Key order violation detected in " + stream_name); - } - // Update the previous key_sid to the current one - previous_key_sid = log_entry->key_sid(); - } - } - } - } - - // Next function to read from the appropriate stream - bool next() { - // Only read from snapshot stream if snapshot_log_entry_ is empty, with key_sid check - if (!snapshot_log_entry_) { - validate_and_read_stream(snapshot_istrm_, "Snapshot", snapshot_log_entry_, previous_snapshot_key_sid); - } - - // Only read from compacted stream if compacted_log_entry_ is empty, with key_sid check - if (!compacted_log_entry_) { - validate_and_read_stream(compacted_istrm_, "Compacted", compacted_log_entry_, previous_compacted_key_sid); - } - - // Case 1: Both snapshot and compacted are empty, return false - if (!snapshot_log_entry_ && !compacted_log_entry_) { - DVLOG_LP(log_trace) << "Both snapshot and compacted streams are closed"; - return false; - } - - // Case 2: Either snapshot or compacted has a value, use the one that is not empty - if (snapshot_log_entry_ && !compacted_log_entry_) { - log_entry_ = std::move(snapshot_log_entry_.value()); - snapshot_log_entry_ = std::nullopt; - } else if (!snapshot_log_entry_ && compacted_log_entry_) { - log_entry_ = std::move(compacted_log_entry_.value()); - compacted_log_entry_ = std::nullopt; - } else { - // Case 3: Both snapshot and compacted have values - if (snapshot_log_entry_->key_sid() < compacted_log_entry_->key_sid()) { - log_entry_ = std::move(snapshot_log_entry_.value()); - snapshot_log_entry_ = std::nullopt; - } else if (snapshot_log_entry_->key_sid() > compacted_log_entry_->key_sid()) { - log_entry_ = std::move(compacted_log_entry_.value()); - compacted_log_entry_ = std::nullopt; - } else { - // If key_sid is equal, use snapshot_log_entry_, but reset both entries - log_entry_ = std::move(snapshot_log_entry_.value()); - snapshot_log_entry_ = std::nullopt; - compacted_log_entry_ = std::nullopt; - } - } - return true; - } - - [[nodiscard]] storage_id_type storage() const noexcept { - return log_entry_.storage(); - } - - void key(std::string& buf) const noexcept { - log_entry_.key(buf); - } - - void value(std::string& buf) const noexcept { - log_entry_.value(buf); - } - - [[nodiscard]] log_entry::entry_type type() const { - return log_entry_.type(); - } +namespace limestone::api { -}; cursor::cursor(const boost::filesystem::path& snapshot_file) - : log_entry_tracker_(std::make_unique(snapshot_file)) {} + : log_entry_tracker_(std::make_unique(snapshot_file)) {} cursor::cursor(const boost::filesystem::path& snapshot_file, const boost::filesystem::path& compacted_file) - : log_entry_tracker_(std::make_unique(snapshot_file, compacted_file)) {} - + : log_entry_tracker_(std::make_unique(snapshot_file, compacted_file)) {} cursor::~cursor() noexcept { // TODO: handle close failure - log_entry_tracker_->close(); + log_entry_tracker_->close(); } bool cursor::next() { // Keep calling next() until we find a noraml_entry, or no more entries while (log_entry_tracker_->next()) { - if (log_entry_tracker_->type() == limestone::api::log_entry::entry_type::normal_entry) { + if (log_entry_tracker_->type() == log_entry::entry_type::normal_entry) { return true; } } - return false; + return false; } storage_id_type cursor::storage() const noexcept { diff --git a/src/limestone/snapshot_tracker.cpp b/src/limestone/snapshot_tracker.cpp new file mode 100644 index 00000000..8e6e897c --- /dev/null +++ b/src/limestone/snapshot_tracker.cpp @@ -0,0 +1,138 @@ +/* + * Copyright 2022-2023 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "snapshot_tracker.h" +#include +#include "limestone_exception_helper.h" + +namespace limestone::internal { + +snapshot_tracker::snapshot_tracker(const boost::filesystem::path& snapshot_file) + : compacted_istrm_(std::nullopt) { + open(snapshot_file, snapshot_istrm_); +} + +snapshot_tracker::snapshot_tracker(const boost::filesystem::path& snapshot_file, const boost::filesystem::path& compacted_file) { + open(snapshot_file, snapshot_istrm_); + open(compacted_file, compacted_istrm_); +} + +void snapshot_tracker::open(const boost::filesystem::path& file, std::optional& stream) { + stream.emplace(file, std::ios_base::in | std::ios_base::binary); + if (!stream->good()) { + LOG_AND_THROW_EXCEPTION("Failed to open file: " + file.string()); + } +} + +void snapshot_tracker::close() { + if (snapshot_istrm_) snapshot_istrm_->close(); + if (compacted_istrm_) compacted_istrm_->close(); +} + +void snapshot_tracker::validate_and_read_stream(std::optional& stream, const std::string& stream_name, + std::optional& log_entry, std::string& previous_key_sid) { + if (stream) { + if (!stream->good()) { + DVLOG_LP(log_trace) << stream_name << " stream is not good, closing it."; + stream->close(); + stream = std::nullopt; + } else if (stream->eof()) { + DVLOG_LP(log_trace) << stream_name << " stream reached EOF, closing it."; + stream->close(); + stream = std::nullopt; + } else if (!log_entry) { + log_entry.emplace(); // Construct a new log_entry + if (!log_entry->read(*stream)) { + stream->close(); // If reading fails, reset the stream + stream = std::nullopt; + log_entry = std::nullopt; // Reset the log_entry as well + } else { + // Check if the key_sid is in ascending order + // TODO: Key order violation is detected here and the process is aborted. + // However, this check should be moved to an earlier point, and if the key order is invalid, + // a different processing method should be considered instead of aborting immediately. + if (!previous_key_sid.empty() && log_entry->key_sid() < previous_key_sid) { + LOG(ERROR) << "Key order violation in " << stream_name << ": current key_sid (" << log_entry->key_sid() + << ") is smaller than the previous key_sid (" << previous_key_sid << ")"; + THROW_LIMESTONE_EXCEPTION("Key order violation detected in " + stream_name); + } + + // Update the previous key_sid to the current one + previous_key_sid = log_entry->key_sid(); + } + } + } +} + +bool snapshot_tracker::next() { + // Only read from snapshot stream if snapshot_log_entry_ is empty, with key_sid check + if (!snapshot_log_entry_) { + validate_and_read_stream(snapshot_istrm_, "Snapshot", snapshot_log_entry_, previous_snapshot_key_sid); + } + + // Only read from compacted stream if compacted_log_entry_ is empty, with key_sid check + if (!compacted_log_entry_) { + validate_and_read_stream(compacted_istrm_, "Compacted", compacted_log_entry_, previous_compacted_key_sid); + } + + // Case 1: Both snapshot and compacted are empty, return false + if (!snapshot_log_entry_ && !compacted_log_entry_) { + DVLOG_LP(log_trace) << "Both snapshot and compacted streams are closed"; + return false; + } + + // Case 2: Either snapshot or compacted has a value, use the one that is not empty + if (snapshot_log_entry_ && !compacted_log_entry_) { + log_entry_ = std::move(snapshot_log_entry_.value()); + snapshot_log_entry_ = std::nullopt; + } else if (!snapshot_log_entry_ && compacted_log_entry_) { + log_entry_ = std::move(compacted_log_entry_.value()); + compacted_log_entry_ = std::nullopt; + } else { + // Case 3: Both snapshot and compacted have values + if (snapshot_log_entry_->key_sid() < compacted_log_entry_->key_sid()) { + log_entry_ = std::move(snapshot_log_entry_.value()); + snapshot_log_entry_ = std::nullopt; + } else if (snapshot_log_entry_->key_sid() > compacted_log_entry_->key_sid()) { + log_entry_ = std::move(compacted_log_entry_.value()); + compacted_log_entry_ = std::nullopt; + } else { + // If key_sid is equal, use snapshot_log_entry_, but reset both entries + log_entry_ = std::move(snapshot_log_entry_.value()); + snapshot_log_entry_ = std::nullopt; + compacted_log_entry_ = std::nullopt; + } + } + return true; +} + +limestone::api::storage_id_type snapshot_tracker::storage() const noexcept { + return log_entry_.storage(); +} + +void snapshot_tracker::key(std::string& buf) const noexcept { + log_entry_.key(buf); +} + +void snapshot_tracker::value(std::string& buf) const noexcept { + log_entry_.value(buf); +} + +limestone::api::log_entry::entry_type snapshot_tracker::type() const { + return log_entry_.type(); +} + +} // namespace limestone::internal diff --git a/src/limestone/snapshot_tracker.h b/src/limestone/snapshot_tracker.h new file mode 100644 index 00000000..bd527dc1 --- /dev/null +++ b/src/limestone/snapshot_tracker.h @@ -0,0 +1,62 @@ +/* + * Copyright 2022-2023 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include "log_entry.h" + +namespace limestone::internal { + +// snapshot_tracker handles the retrieval and comparison of log entries from snapshot and compacted streams. +// It ensures that the entries are read in the correct order and manages the state of both streams. +class snapshot_tracker { + +private: + limestone::api::log_entry log_entry_; + std::optional snapshot_log_entry_; + std::optional compacted_log_entry_; + std::optional snapshot_istrm_; + std::optional compacted_istrm_; + std::string previous_snapshot_key_sid; + std::string previous_compacted_key_sid; + +public: + explicit snapshot_tracker(const boost::filesystem::path& snapshot_file); + explicit snapshot_tracker(const boost::filesystem::path& snapshot_file, const boost::filesystem::path& compacted_file); + +protected: + void open(const boost::filesystem::path& file, std::optional& stream); + void close(); + + bool next(); + void validate_and_read_stream(std::optional& stream, const std::string& stream_name, + std::optional& log_entry, std::string& previous_key_sid); + + [[nodiscard]] limestone::api::storage_id_type storage() const noexcept; + void key(std::string& buf) const noexcept; + void value(std::string& buf) const noexcept; + [[nodiscard]] limestone::api::log_entry::entry_type type() const; + + // Making the cursor class a friend so that it can access protected members + friend class limestone::api::cursor; +}; + +} // namespace limestone::internal