Skip to content

Commit

Permalink
Refactor: Separate snapshot_tracker class into standalone files
Browse files Browse the repository at this point in the history
  • Loading branch information
umegane committed Oct 10, 2024
1 parent 2f313b7 commit d514252
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 141 deletions.
9 changes: 7 additions & 2 deletions include/limestone/api/cursor.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,16 @@
#include <limestone/api/storage_id_type.h>
#include <limestone/api/large_object_view.h>


namespace limestone::internal {
class snapshot_tracker;
}

namespace limestone::api {

class log_entry;
class snapshot;
class snapshot_tracker;


/**
* @brief a cursor to scan entries on the snapshot
Expand Down Expand Up @@ -79,7 +84,7 @@ class cursor {
std::vector<large_object_view>& large_objects() noexcept;

private:
std::unique_ptr<snapshot_tracker> log_entry_tracker_;
std::unique_ptr<limestone::internal::snapshot_tracker> log_entry_tracker_;

std::vector<large_object_view> large_objects_{};

Expand Down
146 changes: 7 additions & 139 deletions src/limestone/cursor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,163 +20,31 @@
#include "logging_helper.h"
#include "limestone_exception_helper.h"
#include "log_entry.h"
#include <limestone/api/write_version_type.h>
#include "snapshot_tracker.h"

namespace limestone::api {

// snapshot_tracker handles the retrieval and comparison of log entries from snapshot and compacted streams.
// It ensures that the entries are read in the correct order and manages the state of both streams.
class snapshot_tracker {

private:
log_entry log_entry_;
std::optional<log_entry> snapshot_log_entry_;
std::optional<log_entry> compacted_log_entry_;
std::optional<boost::filesystem::ifstream> snapshot_istrm_;
std::optional<boost::filesystem::ifstream> compacted_istrm_;
std::string previous_snapshot_key_sid;
std::string previous_compacted_key_sid;

public:
explicit snapshot_tracker(const boost::filesystem::path& snapshot_file)
: compacted_istrm_(std::nullopt) {
open(snapshot_file, snapshot_istrm_);
}

explicit snapshot_tracker(const boost::filesystem::path& snapshot_file, const boost::filesystem::path& compacted_file) {
open(snapshot_file, snapshot_istrm_);
open(compacted_file, compacted_istrm_);
}

void open(const boost::filesystem::path& file, std::optional<boost::filesystem::ifstream>& stream) {
stream.emplace(file, std::ios_base::in | std::ios_base::binary);
if (!stream->good()) {
LOG_AND_THROW_EXCEPTION("Failed to open file: " + file.string());
}
}

void close() {
if (snapshot_istrm_) snapshot_istrm_->close();
if (compacted_istrm_) compacted_istrm_->close();
}

void validate_and_read_stream(std::optional<boost::filesystem::ifstream>& stream, const std::string& stream_name, std::optional<log_entry>& log_entry,
std::string& previous_key_sid) {
if (stream) {
if (!stream->good()) {
DVLOG_LP(log_trace) << stream_name << " stream is not good, closing it.";
stream->close();
stream = std::nullopt;
} else if (stream->eof()) {
DVLOG_LP(log_trace) << stream_name << " stream reached EOF, closing it.";
stream->close();
stream = std::nullopt;
} else if (!log_entry) {
log_entry.emplace(); // Construct a new log_entry
if (!log_entry->read(*stream)) {
stream->close();
stream = std::nullopt; // If reading fails, reset log_entry
log_entry = std::nullopt; // Reset the log_entry as well
} else {
// Check if the key_sid is in ascending order
// TODO: Key order violation is detected here and the process is aborted.
// However, this check should be moved to an earlier point, and if the key order is invalid,
// a different processing method should be considered instead of aborting immediately.

if (!previous_key_sid.empty() && log_entry->key_sid() < previous_key_sid) {
LOG(ERROR) << "Key order violation in " << stream_name << ": current key_sid (" << log_entry->key_sid()
<< ") is smaller than the previous key_sid (" << previous_key_sid << ")";
THROW_LIMESTONE_EXCEPTION("Key order violation detected in " + stream_name);
}

// Update the previous key_sid to the current one
previous_key_sid = log_entry->key_sid();
}
}
}
}

// Next function to read from the appropriate stream
bool next() {
// Only read from snapshot stream if snapshot_log_entry_ is empty, with key_sid check
if (!snapshot_log_entry_) {
validate_and_read_stream(snapshot_istrm_, "Snapshot", snapshot_log_entry_, previous_snapshot_key_sid);
}

// Only read from compacted stream if compacted_log_entry_ is empty, with key_sid check
if (!compacted_log_entry_) {
validate_and_read_stream(compacted_istrm_, "Compacted", compacted_log_entry_, previous_compacted_key_sid);
}

// Case 1: Both snapshot and compacted are empty, return false
if (!snapshot_log_entry_ && !compacted_log_entry_) {
DVLOG_LP(log_trace) << "Both snapshot and compacted streams are closed";
return false;
}

// Case 2: Either snapshot or compacted has a value, use the one that is not empty
if (snapshot_log_entry_ && !compacted_log_entry_) {
log_entry_ = std::move(snapshot_log_entry_.value());
snapshot_log_entry_ = std::nullopt;
} else if (!snapshot_log_entry_ && compacted_log_entry_) {
log_entry_ = std::move(compacted_log_entry_.value());
compacted_log_entry_ = std::nullopt;
} else {
// Case 3: Both snapshot and compacted have values
if (snapshot_log_entry_->key_sid() < compacted_log_entry_->key_sid()) {
log_entry_ = std::move(snapshot_log_entry_.value());
snapshot_log_entry_ = std::nullopt;
} else if (snapshot_log_entry_->key_sid() > compacted_log_entry_->key_sid()) {
log_entry_ = std::move(compacted_log_entry_.value());
compacted_log_entry_ = std::nullopt;
} else {
// If key_sid is equal, use snapshot_log_entry_, but reset both entries
log_entry_ = std::move(snapshot_log_entry_.value());
snapshot_log_entry_ = std::nullopt;
compacted_log_entry_ = std::nullopt;
}
}
return true;
}

[[nodiscard]] storage_id_type storage() const noexcept {
return log_entry_.storage();
}

void key(std::string& buf) const noexcept {
log_entry_.key(buf);
}

void value(std::string& buf) const noexcept {
log_entry_.value(buf);
}

[[nodiscard]] log_entry::entry_type type() const {
return log_entry_.type();
}
namespace limestone::api {

};

cursor::cursor(const boost::filesystem::path& snapshot_file)
: log_entry_tracker_(std::make_unique<snapshot_tracker>(snapshot_file)) {}
: log_entry_tracker_(std::make_unique<limestone::internal::snapshot_tracker>(snapshot_file)) {}

cursor::cursor(const boost::filesystem::path& snapshot_file, const boost::filesystem::path& compacted_file)
: log_entry_tracker_(std::make_unique<snapshot_tracker>(snapshot_file, compacted_file)) {}

: log_entry_tracker_(std::make_unique<limestone::internal::snapshot_tracker>(snapshot_file, compacted_file)) {}

cursor::~cursor() noexcept {
// TODO: handle close failure
log_entry_tracker_->close();
log_entry_tracker_->close();
}

bool cursor::next() {
// Keep calling next() until we find a noraml_entry, or no more entries
while (log_entry_tracker_->next()) {
if (log_entry_tracker_->type() == limestone::api::log_entry::entry_type::normal_entry) {
if (log_entry_tracker_->type() == log_entry::entry_type::normal_entry) {
return true;
}
}
return false;
return false;
}

storage_id_type cursor::storage() const noexcept {
Expand Down
138 changes: 138 additions & 0 deletions src/limestone/snapshot_tracker.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright 2022-2023 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "snapshot_tracker.h"
#include <glog/logging.h>
#include "limestone_exception_helper.h"

namespace limestone::internal {

snapshot_tracker::snapshot_tracker(const boost::filesystem::path& snapshot_file)
: compacted_istrm_(std::nullopt) {
open(snapshot_file, snapshot_istrm_);
}

snapshot_tracker::snapshot_tracker(const boost::filesystem::path& snapshot_file, const boost::filesystem::path& compacted_file) {
open(snapshot_file, snapshot_istrm_);
open(compacted_file, compacted_istrm_);
}

void snapshot_tracker::open(const boost::filesystem::path& file, std::optional<boost::filesystem::ifstream>& stream) {
stream.emplace(file, std::ios_base::in | std::ios_base::binary);
if (!stream->good()) {
LOG_AND_THROW_EXCEPTION("Failed to open file: " + file.string());
}
}

void snapshot_tracker::close() {
if (snapshot_istrm_) snapshot_istrm_->close();
if (compacted_istrm_) compacted_istrm_->close();
}

void snapshot_tracker::validate_and_read_stream(std::optional<boost::filesystem::ifstream>& stream, const std::string& stream_name,
std::optional<limestone::api::log_entry>& log_entry, std::string& previous_key_sid) {
if (stream) {
if (!stream->good()) {
DVLOG_LP(log_trace) << stream_name << " stream is not good, closing it.";
stream->close();
stream = std::nullopt;
} else if (stream->eof()) {
DVLOG_LP(log_trace) << stream_name << " stream reached EOF, closing it.";
stream->close();
stream = std::nullopt;
} else if (!log_entry) {
log_entry.emplace(); // Construct a new log_entry
if (!log_entry->read(*stream)) {
stream->close(); // If reading fails, reset the stream
stream = std::nullopt;
log_entry = std::nullopt; // Reset the log_entry as well
} else {
// Check if the key_sid is in ascending order
// TODO: Key order violation is detected here and the process is aborted.
// However, this check should be moved to an earlier point, and if the key order is invalid,
// a different processing method should be considered instead of aborting immediately.
if (!previous_key_sid.empty() && log_entry->key_sid() < previous_key_sid) {
LOG(ERROR) << "Key order violation in " << stream_name << ": current key_sid (" << log_entry->key_sid()
<< ") is smaller than the previous key_sid (" << previous_key_sid << ")";
THROW_LIMESTONE_EXCEPTION("Key order violation detected in " + stream_name);
}

// Update the previous key_sid to the current one
previous_key_sid = log_entry->key_sid();
}
}
}
}

bool snapshot_tracker::next() {
// Only read from snapshot stream if snapshot_log_entry_ is empty, with key_sid check
if (!snapshot_log_entry_) {
validate_and_read_stream(snapshot_istrm_, "Snapshot", snapshot_log_entry_, previous_snapshot_key_sid);
}

// Only read from compacted stream if compacted_log_entry_ is empty, with key_sid check
if (!compacted_log_entry_) {
validate_and_read_stream(compacted_istrm_, "Compacted", compacted_log_entry_, previous_compacted_key_sid);
}

// Case 1: Both snapshot and compacted are empty, return false
if (!snapshot_log_entry_ && !compacted_log_entry_) {
DVLOG_LP(log_trace) << "Both snapshot and compacted streams are closed";
return false;
}

// Case 2: Either snapshot or compacted has a value, use the one that is not empty
if (snapshot_log_entry_ && !compacted_log_entry_) {
log_entry_ = std::move(snapshot_log_entry_.value());
snapshot_log_entry_ = std::nullopt;
} else if (!snapshot_log_entry_ && compacted_log_entry_) {
log_entry_ = std::move(compacted_log_entry_.value());
compacted_log_entry_ = std::nullopt;
} else {
// Case 3: Both snapshot and compacted have values
if (snapshot_log_entry_->key_sid() < compacted_log_entry_->key_sid()) {
log_entry_ = std::move(snapshot_log_entry_.value());
snapshot_log_entry_ = std::nullopt;
} else if (snapshot_log_entry_->key_sid() > compacted_log_entry_->key_sid()) {
log_entry_ = std::move(compacted_log_entry_.value());
compacted_log_entry_ = std::nullopt;
} else {
// If key_sid is equal, use snapshot_log_entry_, but reset both entries
log_entry_ = std::move(snapshot_log_entry_.value());
snapshot_log_entry_ = std::nullopt;
compacted_log_entry_ = std::nullopt;
}
}
return true;
}

limestone::api::storage_id_type snapshot_tracker::storage() const noexcept {
return log_entry_.storage();
}

void snapshot_tracker::key(std::string& buf) const noexcept {
log_entry_.key(buf);
}

void snapshot_tracker::value(std::string& buf) const noexcept {
log_entry_.value(buf);
}

limestone::api::log_entry::entry_type snapshot_tracker::type() const {
return log_entry_.type();
}

} // namespace limestone::internal
62 changes: 62 additions & 0 deletions src/limestone/snapshot_tracker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2022-2023 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <optional>
#include <boost/filesystem.hpp>
#include <boost/filesystem/fstream.hpp>
#include <limestone/api/storage_id_type.h>
#include <limestone/api/cursor.h>
#include "log_entry.h"

namespace limestone::internal {

// snapshot_tracker handles the retrieval and comparison of log entries from snapshot and compacted streams.
// It ensures that the entries are read in the correct order and manages the state of both streams.
class snapshot_tracker {

private:
limestone::api::log_entry log_entry_;
std::optional<limestone::api::log_entry> snapshot_log_entry_;
std::optional<limestone::api::log_entry> compacted_log_entry_;
std::optional<boost::filesystem::ifstream> snapshot_istrm_;
std::optional<boost::filesystem::ifstream> compacted_istrm_;
std::string previous_snapshot_key_sid;
std::string previous_compacted_key_sid;

public:
explicit snapshot_tracker(const boost::filesystem::path& snapshot_file);
explicit snapshot_tracker(const boost::filesystem::path& snapshot_file, const boost::filesystem::path& compacted_file);

protected:
void open(const boost::filesystem::path& file, std::optional<boost::filesystem::ifstream>& stream);
void close();

bool next();
void validate_and_read_stream(std::optional<boost::filesystem::ifstream>& stream, const std::string& stream_name,
std::optional<limestone::api::log_entry>& log_entry, std::string& previous_key_sid);

[[nodiscard]] limestone::api::storage_id_type storage() const noexcept;
void key(std::string& buf) const noexcept;
void value(std::string& buf) const noexcept;
[[nodiscard]] limestone::api::log_entry::entry_type type() const;

// Making the cursor class a friend so that it can access protected members
friend class limestone::api::cursor;
};

} // namespace limestone::internal

0 comments on commit d514252

Please sign in to comment.