Skip to content

Commit

Permalink
Fix compaction start condition to ensure all relevant PWAL sessions a…
Browse files Browse the repository at this point in the history
…re closed
  • Loading branch information
umegane committed Dec 12, 2024
1 parent 0abbb64 commit 20a1999
Show file tree
Hide file tree
Showing 11 changed files with 188 additions and 432 deletions.
19 changes: 17 additions & 2 deletions include/limestone/api/datastore.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ namespace limestone::api {
*/
class datastore {

Check warning on line 51 in include/limestone/api/datastore.h

View workflow job for this annotation

GitHub Actions / Clang-Tidy

cppcoreguidelines-virtual-class-destructor

destructor of 'datastore' is public and non-virtual
friend class log_channel;
friend class rotation_task;

enum class state : std::int64_t {
not_ready = 0,
Expand Down Expand Up @@ -253,7 +252,16 @@ class datastore {
auto epoch_id_switched_for_tests() const noexcept { return epoch_id_switched_.load(); }
auto& files_for_tests() const noexcept { return files_; }
void rotate_epoch_file_for_tests() { rotate_epoch_file(); }


// These virtual methods are hooks for testing thread synchronization.
// They allow derived classes to inject custom behavior or notifications
// at specific wait points during the execution of the datastore class.
// The default implementation does nothing, ensuring no impact on production code.
virtual void on_wait1() {}
virtual void on_wait2() {}
virtual void on_wait3() {}
virtual void on_wait4() {}

private:
std::vector<std::unique_ptr<log_channel>> log_channels_;

Expand Down Expand Up @@ -338,6 +346,13 @@ class datastore {
*/
rotation_result rotate_log_files();

// Mutex to protect rotate_log_files from concurrent access
std::mutex rotate_mutex;

// Mutex and condition variable for synchronizing epoch_id_informed_ updates.
std::mutex informed_mutex;
std::condition_variable cv_epoch_informed;

/**
* @brief rotate epoch file
*/
Expand Down
17 changes: 1 addition & 16 deletions include/limestone/api/log_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
#include <limestone/api/write_version_type.h>
#include <limestone/api/large_object_input.h>


namespace limestone::api {

class datastore;
Expand Down Expand Up @@ -166,12 +165,6 @@ class log_channel {
[[nodiscard]] boost::filesystem::path file_path() const noexcept;

private:
/**
* @brief Waits until the specified epoch's session is completed and the epoch ID is removed from waiting_epoch_ids_.
* @param epoch The epoch ID associated with the session to wait for.
*/
void wait_for_end_session(epoch_id_type epoch);

datastore& envelope_;

boost::filesystem::path location_;
Expand All @@ -190,17 +183,9 @@ class log_channel {

std::atomic_uint64_t finished_epoch_id_{0};

std::atomic<epoch_id_type> latest_session_epoch_id_{0};

std::mutex session_mutex_;

std::condition_variable session_cv_;

std::set<epoch_id_type> waiting_epoch_ids_{};

log_channel(boost::filesystem::path location, std::size_t id, datastore& envelope) noexcept;

rotation_result do_rotate_file(epoch_id_type epoch = 0);
std::string do_rotate_file(epoch_id_type epoch = 0);

friend class datastore;
friend class rotation_task;
Expand Down
71 changes: 48 additions & 23 deletions src/limestone/datastore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@

#include <limestone/api/datastore.h>
#include "internal.h"

#include "rotation_task.h"
#include "rotation_result.h"
#include "log_entry.h"
#include "online_compaction.h"
#include "compaction_catalog.h"
Expand Down Expand Up @@ -154,7 +153,6 @@ epoch_id_type datastore::last_epoch() const noexcept { return static_cast<epoch_
void datastore::switch_epoch(epoch_id_type new_epoch_id) {
try {
check_after_ready(static_cast<const char*>(__func__));
rotation_task_helper::attempt_task_execution_from_queue();
auto neid = static_cast<std::uint64_t>(new_epoch_id);
if (auto switched = epoch_id_switched_.load(); neid <= switched) {
LOG_LP(WARNING) << "switch to epoch_id_type of " << neid << " (<=" << switched << ") is curious";
Expand Down Expand Up @@ -238,12 +236,19 @@ void datastore::update_min_epoch_id(bool from_switch_epoch) { // NOLINT(readabi
break;
}
if (epoch_id_informed_.compare_exchange_strong(old_epoch_id, to_be_epoch)) {
std::lock_guard<std::mutex> lock(mtx_epoch_persistent_callback_);
if (to_be_epoch < epoch_id_informed_.load()) {
break;
{
std::lock_guard<std::mutex> lock(mtx_epoch_persistent_callback_);
if (to_be_epoch < epoch_id_informed_.load()) {
break;
}
if (persistent_callback_) {
persistent_callback_(to_be_epoch);
}
}
if (persistent_callback_) {
persistent_callback_(to_be_epoch);
{
// Notify waiting threads in rotate_log_files() about the update to epoch_id_informed_
std::lock_guard<std::mutex> lock(informed_mutex);
cv_epoch_informed.notify_all();
}
break;
}
Expand Down Expand Up @@ -304,8 +309,9 @@ backup& datastore::begin_backup() {
}

std::unique_ptr<backup_detail> datastore::begin_backup(backup_type btype) { // NOLINT(readability-function-cognitive-complexity)
try {
rotation_result result = rotate_log_files();
try {
rotate_epoch_file();
rotation_result result = rotate_log_files();

// LOG-0: all files are log file, so all files are selected in both standard/transaction mode.
(void) btype;
Expand Down Expand Up @@ -400,19 +406,35 @@ void datastore::recover([[maybe_unused]] const epoch_tag& tag) const noexcept {
}

rotation_result datastore::rotate_log_files() {
// Create and enqueue a rotation task.
// Rotation task is executed when switch_epoch() is called.
// Wait for the result of the rotation task.
auto task = rotation_task_helper::create_and_enqueue_task(*this);
rotation_result result = task->wait_for_result();

// Wait for all log channels to complete the session with the specified session ID.
auto epoch_id = result.get_epoch_id();
if (epoch_id.has_value()) {
for (auto& lc : log_channels_) {
lc->wait_for_end_session(epoch_id.value());
VLOG(50) << "start rotate_log_files()";
std::lock_guard<std::mutex> lock(rotate_mutex);
VLOG(50) << "start rotate_log_files() critical section";
auto epoch_id = epoch_id_switched_.load();
if (epoch_id == 0) {
LOG_AND_THROW_EXCEPTION("rotation requires epoch_id > 0, but got epoch_id = 0");
}
VLOG(50) << "epoch_id = " << epoch_id;
{
on_wait1();
// Wait until epoch_id_informed_ is less than rotated_epoch_id to ensure safe rotation.
std::unique_lock<std::mutex> ul(informed_mutex);
while (epoch_id_informed_.load() < epoch_id) {
cv_epoch_informed.wait(ul);
}
}
VLOG(50) << "end waiting for epoch_id_informed_ to catch up";
rotation_result result(epoch_id);
for (const auto& lc : log_channels_) {
boost::system::error_code error;
bool ret = boost::filesystem::exists(lc->file_path(), error);
if (!ret || error) {
continue; // skip if not exists
}
std::string rotated_file = lc->do_rotate_file();
result.add_rotated_file(rotated_file);
}
result.set_rotation_end_files(get_files());
VLOG(50) << "end rotate_log_files()";
return result;
}

Expand Down Expand Up @@ -520,6 +542,7 @@ void datastore::stop_online_compaction_worker() {
}

void datastore::compact_with_online() {
VLOG(50) << "start compact_with_online()";
check_after_ready(static_cast<const char*>(__func__));

// rotate first
Expand All @@ -539,6 +562,7 @@ void datastore::compact_with_online() {
(need_compaction_filenames.size() == 1 &&
need_compaction_filenames.find(compaction_catalog::get_compacted_filename()) != need_compaction_filenames.end())) {
LOG_LP(INFO) << "no files to compact";
VLOG(50) << "return compact_with_online() without compaction";
return;
}

Expand All @@ -563,7 +587,7 @@ void datastore::compact_with_online() {

// get a set of all files in the location_ directory
std::set<std::string> files_in_location = get_files_in_directory(location_);

// check if detached_pwals exist in location_
for (auto it = detached_pwals.begin(); it != detached_pwals.end();) {
if (files_in_location.find(*it) == files_in_location.end()) {
Expand All @@ -580,13 +604,14 @@ void datastore::compact_with_online() {
// update compaction catalog
compacted_file_info compacted_file_info{compacted_file.filename().string(), 1};
detached_pwals.erase(compacted_file.filename().string());
compaction_catalog_->update_catalog_file(result.get_epoch_id().value_or(0), {compacted_file_info}, detached_pwals);
compaction_catalog_->update_catalog_file(result.get_epoch_id(), {compacted_file_info}, detached_pwals);
add_file(compacted_file);

// remove pwal_0000.compacted.prev
remove_file_safely(location_ / compaction_catalog::get_compacted_backup_filename());

LOG_LP(INFO) << "compaction finished";
VLOG(50) << "end compact_with_online()";
}

} // namespace limestone::api
Expand Down
32 changes: 3 additions & 29 deletions src/limestone/log_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
#include <limestone/api/datastore.h>
#include "internal.h"
#include "log_entry.h"
#include "rotation_task.h"
#include "rotation_result.h"

namespace limestone::api {

Expand Down Expand Up @@ -60,8 +60,6 @@ void log_channel::begin_session() {
std::atomic_thread_fence(std::memory_order_acq_rel);
} while (current_epoch_id_.load() != envelope_.epoch_id_switched_.load());

latest_session_epoch_id_.store(static_cast<epoch_id_type>(current_epoch_id_.load()));

auto log_file = file_path();
strm_ = fopen(log_file.c_str(), "a"); // NOLINT(*-owning-memory)
if (!strm_) {
Expand All @@ -73,10 +71,6 @@ void log_channel::begin_session() {
registered_ = true;
}
log_entry::begin_session(strm_, static_cast<epoch_id_type>(current_epoch_id_.load()));
{
std::lock_guard<std::mutex> lock(session_mutex_);
waiting_epoch_ids_.insert(latest_session_epoch_id_);
}
} catch (...) {
HANDLE_EXCEPTION_AND_ABORT();
}
Expand All @@ -97,14 +91,6 @@ void log_channel::end_session() {
if (fclose(strm_) != 0) { // NOLINT(*-owning-memory)
LOG_AND_THROW_IO_EXCEPTION("fclose failed", errno);
}

// Remove current_epoch_id_ from waiting_epoch_ids_
{
std::lock_guard<std::mutex> lock(session_mutex_);
waiting_epoch_ids_.erase(latest_session_epoch_id_.load());
// Notify waiting threads
session_cv_.notify_all();
}
} catch (...) {
HANDLE_EXCEPTION_AND_ABORT();
}
Expand Down Expand Up @@ -174,7 +160,7 @@ boost::filesystem::path log_channel::file_path() const noexcept {

// DO rotate without condition check.
// use this after your check
rotation_result log_channel::do_rotate_file(epoch_id_type epoch) {
std::string log_channel::do_rotate_file(epoch_id_type epoch) {
std::stringstream ss;
ss << file_.string() << "."
<< std::setw(14) << std::setfill('0') << envelope_.current_unix_epoch_in_millis()
Expand All @@ -192,19 +178,7 @@ rotation_result log_channel::do_rotate_file(epoch_id_type epoch) {
registered_ = false;
envelope_.subtract_file(location_ / file_);

// Create a rotation result with the current epoch ID
rotation_result result(new_name, latest_session_epoch_id_);
return result;
}

void log_channel::wait_for_end_session(epoch_id_type epoch) {
std::unique_lock<std::mutex> lock(session_mutex_);

// Wait until the specified epoch_id is removed from waiting_epoch_ids_
session_cv_.wait(lock, [this, epoch]() {
// Ensure that no ID less than or equal to the specified epoch exists in waiting_epoch_ids_
return waiting_epoch_ids_.empty() || *waiting_epoch_ids_.begin() > epoch;
});
return new_name;
}

} // namespace limestone::api
39 changes: 39 additions & 0 deletions src/limestone/rotation_result.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2022-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <limestone/api/datastore.h>
#include "rotation_result.h"

namespace limestone::api {

epoch_id_type rotation_result::get_epoch_id() const {
return epoch_id_;
}

void rotation_result::set_rotation_end_files(const std::set<boost::filesystem::path>& files) {
rotation_end_files = files;
}

const std::set<boost::filesystem::path>& rotation_result::get_rotation_end_files() const {
return rotation_end_files;
}

void rotation_result::add_rotated_file(const std::string filename) {

Check warning on line 34 in src/limestone/rotation_result.cpp

View workflow job for this annotation

GitHub Actions / Clang-Tidy

performance-unnecessary-value-param

the const qualified parameter 'filename' is copied for each invocation; consider making it a reference
latest_rotated_files_.insert(filename);

}

} // namespace limestone::api
Loading

0 comments on commit 20a1999

Please sign in to comment.