Skip to content

Commit

Permalink
Fix: Ensure proper deletion of entries in snapshot when drop or trunc…
Browse files Browse the repository at this point in the history
…ate table operations are present
  • Loading branch information
umegane committed Oct 18, 2024
1 parent 434d44c commit f8799b5
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 11 deletions.
80 changes: 70 additions & 10 deletions src/limestone/cursor_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
namespace limestone::internal {

using limestone::api::log_entry;
using limestone::api::write_version_type;

std::unique_ptr<cursor> cursor_impl::create_cursor(const boost::filesystem::path& snapshot_file,
const std::map<limestone::api::storage_id_type, limestone::api::write_version_type>& clear_storage) {
Expand Down Expand Up @@ -59,22 +60,34 @@ void cursor_impl::close() {
}

void cursor_impl::validate_and_read_stream(std::optional<boost::filesystem::ifstream>& stream, const std::string& stream_name,
std::optional<log_entry>& log_entry, std::string& previous_key_sid) {
if (stream) {
std::optional<log_entry>& log_entry, std::string& previous_key_sid,
std::function<bool(const limestone::api::log_entry&)> is_valid_entry) {

Check warning on line 64 in src/limestone/cursor_impl.cpp

View workflow job for this annotation

GitHub Actions / Clang-Tidy

performance-unnecessary-value-param

the parameter 'is_valid_entry' is copied for each invocation but only used as a const reference; consider making it a const reference
while (stream) {
// If the stream is not in good condition, close it and exit
if (!stream->good()) {
DVLOG_LP(log_trace) << stream_name << " stream is not good, closing it.";
stream->close();
stream = std::nullopt;
} else if (stream->eof()) {
return;
}

// If the stream has reached EOF, close it and exit
if (stream->eof()) {
DVLOG_LP(log_trace) << stream_name << " stream reached EOF, closing it.";
stream->close();
stream = std::nullopt;
} else if (!log_entry) {
return;
}

// If the entry is not yet read, read it
if (!log_entry) {
log_entry.emplace(); // Construct a new log_entry
if (!log_entry->read(*stream)) {
stream->close(); // If reading fails, reset the stream
// If reading fails, close the stream and reset the log_entry
stream->close();
stream = std::nullopt;
log_entry = std::nullopt; // Reset the log_entry as well
log_entry = std::nullopt;
return;
} else {

Check warning on line 91 in src/limestone/cursor_impl.cpp

View workflow job for this annotation

GitHub Actions / Clang-Tidy

llvm-else-after-return,readability-else-after-return

do not use 'else' after 'return'
// Check if the key_sid is in ascending order
// TODO: Key order violation is detected here and the process is aborted.
Expand All @@ -90,19 +103,65 @@ void cursor_impl::validate_and_read_stream(std::optional<boost::filesystem::ifst
previous_key_sid = log_entry->key_sid();
}
}

// Check the validity of the entry using the lambda function
if (is_valid_entry(log_entry.value())) {
// If a valid entry is found, return
return;
}

// If the entry is invalid, reset the log_entry and continue processing the stream
log_entry = std::nullopt;
}
}


bool cursor_impl::next() {

Check warning on line 119 in src/limestone/cursor_impl.cpp

View workflow job for this annotation

GitHub Actions / Clang-Tidy

readability-function-cognitive-complexity

function 'next' has cognitive complexity of 29 (threshold 25)
// Lambda for snapshot to allow both normal_entry and remove_entry
auto is_relevant_snapshot_entry = [this](const limestone::api::log_entry& entry) -> bool {
// Step 1: Check if the entry is either normal_entry or remove_entry
if (entry.type() != limestone::api::log_entry::entry_type::normal_entry &&
entry.type() != limestone::api::log_entry::entry_type::remove_entry) {
return false; // Skip this entry if it's not normal or remove entry
}

// Step 2: Get the storage ID from log_entry
auto storage_id = entry.storage(); // Assuming storage() returns the storage ID

// Step 3: Check if clear_storage_ contains the same storage ID
auto it = clear_storage_.find(storage_id);
if (it != clear_storage_.end()) {
// Step 4: Retrieve the write_version from log_entry (only if the storage ID is found)
write_version_type wv;
entry.write_version(wv);

// Step 5: Retrieve the write_version from clear_storage_ for the same storage ID
write_version_type range_ver = it->second;

// Step 6: Compare the versions (only if the entry is normal_entry or remove_entry)
if (wv < range_ver) {
return false; // Skip this entry as it is outdated
}
}

// If everything is valid, return true
return true;
};

// Lambda for compacted to allow only normal_entry
auto is_relevant_compacted_entry = [](const limestone::api::log_entry& entry) -> bool {
return entry.type() == limestone::api::log_entry::entry_type::normal_entry;
};

while (true) {
// Only read from snapshot stream if snapshot_log_entry_ is empty, with key_sid check
// Read from the snapshot stream if the snapshot_log_entry_ is empty
if (!snapshot_log_entry_) {
validate_and_read_stream(snapshot_istrm_, "Snapshot", snapshot_log_entry_, previous_snapshot_key_sid);
validate_and_read_stream(snapshot_istrm_, "Snapshot", snapshot_log_entry_, previous_snapshot_key_sid, is_relevant_snapshot_entry);
}

// Only read from compacted stream if compacted_log_entry_ is empty, with key_sid check
// Read from the compacted stream if the compacted_log_entry_ is empty
if (!compacted_log_entry_) {
validate_and_read_stream(compacted_istrm_, "Compacted", compacted_log_entry_, previous_compacted_key_sid);
validate_and_read_stream(compacted_istrm_, "Compacted", compacted_log_entry_, previous_compacted_key_sid, is_relevant_compacted_entry);
}

// Case 1: Both snapshot and compacted are empty, return false
Expand Down Expand Up @@ -141,6 +200,7 @@ bool cursor_impl::next() {
}
}


limestone::api::storage_id_type cursor_impl::storage() const noexcept {
return log_entry_.storage();
}
Expand Down
3 changes: 2 additions & 1 deletion src/limestone/cursor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class cursor_impl {

bool next();
void validate_and_read_stream(std::optional<boost::filesystem::ifstream>& stream, const std::string& stream_name,
std::optional<limestone::api::log_entry>& log_entry, std::string& previous_key_sid);
std::optional<limestone::api::log_entry>& log_entry, std::string& previous_key_sid,
std::function<bool(const limestone::api::log_entry&)> is_valid_entry);

[[nodiscard]] limestone::api::storage_id_type storage() const noexcept;
void key(std::string& buf) const noexcept;
Expand Down

0 comments on commit f8799b5

Please sign in to comment.