From f8799b55f54407cbfbfef35db237cdbe96c2e3d5 Mon Sep 17 00:00:00 2001 From: Shinichi Umegane Date: Fri, 18 Oct 2024 11:07:36 +0900 Subject: [PATCH] Fix: Ensure proper deletion of entries in snapshot when drop or truncate table operations are present --- src/limestone/cursor_impl.cpp | 80 ++++++++++++++++++++++++++++++----- src/limestone/cursor_impl.h | 3 +- 2 files changed, 72 insertions(+), 11 deletions(-) diff --git a/src/limestone/cursor_impl.cpp b/src/limestone/cursor_impl.cpp index 03eee21..b331da8 100644 --- a/src/limestone/cursor_impl.cpp +++ b/src/limestone/cursor_impl.cpp @@ -21,6 +21,7 @@ namespace limestone::internal { using limestone::api::log_entry; +using limestone::api::write_version_type; std::unique_ptr cursor_impl::create_cursor(const boost::filesystem::path& snapshot_file, const std::map& clear_storage) { @@ -59,22 +60,34 @@ void cursor_impl::close() { } void cursor_impl::validate_and_read_stream(std::optional& stream, const std::string& stream_name, - std::optional& log_entry, std::string& previous_key_sid) { - if (stream) { + std::optional& log_entry, std::string& previous_key_sid, + std::function is_valid_entry) { + while (stream) { + // If the stream is not in good condition, close it and exit if (!stream->good()) { DVLOG_LP(log_trace) << stream_name << " stream is not good, closing it."; stream->close(); stream = std::nullopt; - } else if (stream->eof()) { + return; + } + + // If the stream has reached EOF, close it and exit + if (stream->eof()) { DVLOG_LP(log_trace) << stream_name << " stream reached EOF, closing it."; stream->close(); stream = std::nullopt; - } else if (!log_entry) { + return; + } + + // If the entry is not yet read, read it + if (!log_entry) { log_entry.emplace(); // Construct a new log_entry if (!log_entry->read(*stream)) { - stream->close(); // If reading fails, reset the stream + // If reading fails, close the stream and reset the log_entry + stream->close(); stream = std::nullopt; - log_entry = std::nullopt; // Reset the log_entry as well + log_entry = std::nullopt; + return; } else { // Check if the key_sid is in ascending order // TODO: Key order violation is detected here and the process is aborted. @@ -90,19 +103,65 @@ void cursor_impl::validate_and_read_stream(std::optionalkey_sid(); } } + + // Check the validity of the entry using the lambda function + if (is_valid_entry(log_entry.value())) { + // If a valid entry is found, return + return; + } + + // If the entry is invalid, reset the log_entry and continue processing the stream + log_entry = std::nullopt; } } + bool cursor_impl::next() { + // Lambda for snapshot to allow both normal_entry and remove_entry + auto is_relevant_snapshot_entry = [this](const limestone::api::log_entry& entry) -> bool { + // Step 1: Check if the entry is either normal_entry or remove_entry + if (entry.type() != limestone::api::log_entry::entry_type::normal_entry && + entry.type() != limestone::api::log_entry::entry_type::remove_entry) { + return false; // Skip this entry if it's not normal or remove entry + } + + // Step 2: Get the storage ID from log_entry + auto storage_id = entry.storage(); // Assuming storage() returns the storage ID + + // Step 3: Check if clear_storage_ contains the same storage ID + auto it = clear_storage_.find(storage_id); + if (it != clear_storage_.end()) { + // Step 4: Retrieve the write_version from log_entry (only if the storage ID is found) + write_version_type wv; + entry.write_version(wv); + + // Step 5: Retrieve the write_version from clear_storage_ for the same storage ID + write_version_type range_ver = it->second; + + // Step 6: Compare the versions (only if the entry is normal_entry or remove_entry) + if (wv < range_ver) { + return false; // Skip this entry as it is outdated + } + } + + // If everything is valid, return true + return true; + }; + + // Lambda for compacted to allow only normal_entry + auto is_relevant_compacted_entry = [](const limestone::api::log_entry& entry) -> bool { + return entry.type() == limestone::api::log_entry::entry_type::normal_entry; + }; + while (true) { - // Only read from snapshot stream if snapshot_log_entry_ is empty, with key_sid check + // Read from the snapshot stream if the snapshot_log_entry_ is empty if (!snapshot_log_entry_) { - validate_and_read_stream(snapshot_istrm_, "Snapshot", snapshot_log_entry_, previous_snapshot_key_sid); + validate_and_read_stream(snapshot_istrm_, "Snapshot", snapshot_log_entry_, previous_snapshot_key_sid, is_relevant_snapshot_entry); } - // Only read from compacted stream if compacted_log_entry_ is empty, with key_sid check + // Read from the compacted stream if the compacted_log_entry_ is empty if (!compacted_log_entry_) { - validate_and_read_stream(compacted_istrm_, "Compacted", compacted_log_entry_, previous_compacted_key_sid); + validate_and_read_stream(compacted_istrm_, "Compacted", compacted_log_entry_, previous_compacted_key_sid, is_relevant_compacted_entry); } // Case 1: Both snapshot and compacted are empty, return false @@ -141,6 +200,7 @@ bool cursor_impl::next() { } } + limestone::api::storage_id_type cursor_impl::storage() const noexcept { return log_entry_.storage(); } diff --git a/src/limestone/cursor_impl.h b/src/limestone/cursor_impl.h index ffadc56..621d756 100644 --- a/src/limestone/cursor_impl.h +++ b/src/limestone/cursor_impl.h @@ -54,7 +54,8 @@ class cursor_impl { bool next(); void validate_and_read_stream(std::optional& stream, const std::string& stream_name, - std::optional& log_entry, std::string& previous_key_sid); + std::optional& log_entry, std::string& previous_key_sid, + std::function is_valid_entry); [[nodiscard]] limestone::api::storage_id_type storage() const noexcept; void key(std::string& buf) const noexcept;