Skip to content

Commit

Permalink
Add features for Epoch file management and cleanup
Browse files Browse the repository at this point in the history
1. Remove rotated Epoch files at startup

* All rotated Epoch files are removed at the end of the startup process.
* To prevent losing last_durable_epoch in case of a crash after file removal,
  the value of last_durable_epoch is written to the current Epoch file
  before removal.

2. Implement Epoch file refresh to limit file growth

* The refresh process is triggered when the number of entries exceeds
  a defined constant max_entries_in_epoch_file.

  Refresh steps:
    a. Create a temporary file and write necessary epochs.
    b. Atomically replace the original Epoch file with the temporary file
       using rename.
    c. Remove the temporary file after the swap.

* At startup, leftover temporary files from incomplete refreshes are also
  removed to ensure consistency.

These changes prevent excessive accumulation of rotated Epoch files and control
the size of the Epoch file while ensuring data integrity even in crash scenarios.
  • Loading branch information
umegane committed Dec 13, 2024
1 parent cdd79d5 commit 146193a
Show file tree
Hide file tree
Showing 8 changed files with 362 additions and 58 deletions.
6 changes: 6 additions & 0 deletions include/limestone/api/datastore.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,8 @@ class datastore {

boost::filesystem::path epoch_file_path_{};

boost::filesystem::path tmp_epoch_file_path_{};

tag_repository tag_repository_{};

std::atomic_uint64_t log_channel_id_{};
Expand Down Expand Up @@ -364,6 +366,10 @@ class datastore {

// File descriptor for file lock (flock) on the manifest file
int fd_for_flock_{-1};

void write_epoch_to_file(epoch_id_type epoch_id);

int epoch_write_counter = 0;
};

} // namespace limestone::api
80 changes: 60 additions & 20 deletions src/limestone/datastore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,8 @@ datastore::datastore(configuration const& conf) : location_(conf.data_locations_
add_file(compaction_catalog_path);
compaction_catalog_ = std::make_unique<compaction_catalog>(compaction_catalog::from_catalog_file(location_));

// XXX: prusik era
// TODO: read rotated epoch files if main epoch file does not exist
epoch_file_path_ = location_ / boost::filesystem::path(std::string(limestone::internal::epoch_file_name));
epoch_file_path_ = location_ / std::string(limestone::internal::epoch_file_name);
tmp_epoch_file_path_ = location_ / std::string(limestone::internal::tmp_epoch_file_name);
const bool result = boost::filesystem::exists(epoch_file_path_, error);
if (!result || error) {
FILE* strm = fopen(epoch_file_path_.c_str(), "a"); // NOLINT(*-owning-memory)
Expand All @@ -97,6 +96,14 @@ datastore::datastore(configuration const& conf) : location_(conf.data_locations_
add_file(epoch_file_path_);
}

const bool exists = boost::filesystem::exists(tmp_epoch_file_path_, error);
if (exists) {
const bool result_remove = boost::filesystem::remove(tmp_epoch_file_path_, error);
if (!result_remove || error) {
LOG_AND_THROW_IO_EXCEPTION("fail to remove temporary epoch file, path: " + tmp_epoch_file_path_.string(), errno);
}
}

recover_max_parallelism_ = conf.recover_max_parallelism_;
LOG(INFO) << "/:limestone:config:datastore setting the number of recover process thread = " << recover_max_parallelism_;

Expand All @@ -118,10 +125,57 @@ void datastore::recover() const noexcept {
check_before_ready(static_cast<const char*>(__func__));
}

enum class file_write_mode {
append,
overwrite
};

static void write_epoch_to_file_internal(const std::string& file_path, epoch_id_type epoch_id, file_write_mode mode) {
const char* fopen_mode = (mode == file_write_mode::append) ? "a" : "w";
FILE* strm = fopen(file_path.c_str(), fopen_mode);

Check warning on line 135 in src/limestone/datastore.cpp

View workflow job for this annotation

GitHub Actions / Clang-Tidy

cppcoreguidelines-owning-memory

initializing non-owner 'FILE *' (aka '_IO_FILE *') with a newly created 'gsl::owner<>'
if (!strm) {
LOG_AND_THROW_IO_EXCEPTION("fopen failed for file: " + file_path, errno);
}
log_entry::durable_epoch(strm, epoch_id);
if (fflush(strm) != 0) {
LOG_AND_THROW_IO_EXCEPTION("fflush failed for file: " + file_path, errno);
}
if (fsync(fileno(strm)) != 0) {
LOG_AND_THROW_IO_EXCEPTION("fsync failed for file: " + file_path, errno);
}
if (fclose(strm) != 0) {

Check warning on line 146 in src/limestone/datastore.cpp

View workflow job for this annotation

GitHub Actions / Clang-Tidy

cppcoreguidelines-owning-memory

calling legacy resource function without passing a 'gsl::owner<>'
LOG_AND_THROW_IO_EXCEPTION("fclose failed for file: " + file_path, errno);
}
}

void datastore::write_epoch_to_file(epoch_id_type epoch_id) {
if (++epoch_write_counter >= max_entries_in_epoch_file) {
write_epoch_to_file_internal(tmp_epoch_file_path_.string(), epoch_id, file_write_mode::overwrite);

boost::system::error_code ec;
if (::rename(tmp_epoch_file_path_.c_str(), epoch_file_path_.c_str()) != 0) {
LOG_AND_THROW_IO_EXCEPTION("Failed to rename temp file: " + tmp_epoch_file_path_.string() + " to " + epoch_file_path_.string(), errno);
}
boost::filesystem::remove(tmp_epoch_file_path_, ec);
if (ec) {
LOG_AND_THROW_IO_EXCEPTION("Failed to remove temp file: " + tmp_epoch_file_path_.string(), ec);
}
epoch_write_counter = 0;
} else {
write_epoch_to_file_internal(epoch_file_path_.string(), epoch_id, file_write_mode::append);
}
}



void datastore::ready() {
try {
create_snapshot();
online_compaction_worker_future_ = std::async(std::launch::async, &datastore::online_compaction_worker, this);
if (epoch_id_switched_.load() != 0) {
write_epoch_to_file(epoch_id_informed_.load());
}
cleanup_rotated_epoch_files(location_);
state_ = state::ready;
} catch (...) {
HANDLE_EXCEPTION_AND_ABORT();
Expand Down Expand Up @@ -151,6 +205,7 @@ log_channel& datastore::create_channel(const boost::filesystem::path& location)
epoch_id_type datastore::last_epoch() const noexcept { return static_cast<epoch_id_type>(epoch_id_informed_.load()); }

void datastore::switch_epoch(epoch_id_type new_epoch_id) {
VLOG(30) << "switch_epoch: " << new_epoch_id;
try {
check_after_ready(static_cast<const char*>(__func__));
auto neid = static_cast<std::uint64_t>(new_epoch_id);
Expand Down Expand Up @@ -200,23 +255,7 @@ void datastore::update_min_epoch_id(bool from_switch_epoch) { // NOLINT(readabi
}
if (epoch_id_to_be_recorded_.compare_exchange_strong(old_epoch_id, to_be_epoch)) {
std::lock_guard<std::mutex> lock(mtx_epoch_file_);
if (to_be_epoch < epoch_id_to_be_recorded_.load()) {
break;
}
FILE* strm = fopen(epoch_file_path_.c_str(), "a"); // NOLINT(*-owning-memory)
if (!strm) {
LOG_AND_THROW_IO_EXCEPTION("fopen failed", errno);
}
log_entry::durable_epoch(strm, static_cast<epoch_id_type>(epoch_id_to_be_recorded_.load()));
if (fflush(strm) != 0) {
LOG_AND_THROW_IO_EXCEPTION("fflush failed", errno);
}
if (fsync(fileno(strm)) != 0) {
LOG_AND_THROW_IO_EXCEPTION("fsync failed", errno);
}
if (fclose(strm) != 0) { // NOLINT(*-owning-memory)
LOG_AND_THROW_IO_EXCEPTION("fclose failed", errno);
}
write_epoch_to_file(static_cast<epoch_id_type>(to_be_epoch));
epoch_id_record_finished_.store(epoch_id_to_be_recorded_.load());
break;
}
Expand Down Expand Up @@ -255,6 +294,7 @@ void datastore::update_min_epoch_id(bool from_switch_epoch) { // NOLINT(readabi
}
}


void datastore::add_persistent_callback(std::function<void(epoch_id_type)> callback) noexcept {
check_before_ready(static_cast<const char*>(__func__));
persistent_callback_ = std::move(callback);
Expand Down
34 changes: 34 additions & 0 deletions src/limestone/datastore_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,41 @@ std::set<std::string> assemble_snapshot_input_filenames(
return assemble_snapshot_input_filenames(compaction_catalog, location, file_ops);
}

std::set<boost::filesystem::path> filter_epoch_files(const boost::filesystem::path& directory) {
std::set<boost::filesystem::path> epoch_files;
for (const auto& entry : boost::filesystem::directory_iterator(directory)) {
if (entry.path().filename().string().rfind(epoch_file_name, 0) == 0) {
epoch_files.insert(entry.path());
}
}
return epoch_files;
}

void cleanup_rotated_epoch_files(const boost::filesystem::path& directory) {
// Retrieve all epoch files in the directory
std::set<boost::filesystem::path> epoch_files = filter_epoch_files(directory);

// Define the main epoch file path
boost::filesystem::path main_epoch_file = directory / std::string(epoch_file_name);

// Check if the main epoch file exists among the filtered files
auto main_file_it = epoch_files.find(main_epoch_file);
if (main_file_it == epoch_files.end()) {
LOG_AND_THROW_EXCEPTION("Epoch file does not exist: " + main_epoch_file.string());
}

// Remove the main epoch file from the set of epoch files
epoch_files.erase(main_file_it);

// Iterate through the remaining epoch files and remove them
for (const auto& file : epoch_files) {
boost::system::error_code ec;
boost::filesystem::remove(file, ec);
if (ec) {
LOG_AND_THROW_IO_EXCEPTION("Failed to remove file: " + file.string() + ". Error: ", ec);
}
}
}

} // namespace limestone::internal

Expand Down
24 changes: 13 additions & 11 deletions src/limestone/dblog_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include <iomanip>
#include <set>
#include <boost/filesystem.hpp>

#include <glog/logging.h>
Expand Down Expand Up @@ -55,6 +56,8 @@ std::optional<epoch_id_type> last_durable_epoch(const boost::filesystem::path& f
return rv;
}



epoch_id_type dblog_scan::last_durable_epoch_in_dir() {
auto& from_dir = dblogdir_;
// read main epoch file first
Expand All @@ -74,22 +77,21 @@ epoch_id_type dblog_scan::last_durable_epoch_in_dir() {
// main epoch file is empty or does not contain a valid epoch,
// read all rotated-epoch files
std::optional<epoch_id_type> ld_epoch;
for (const boost::filesystem::path& p : boost::filesystem::directory_iterator(from_dir)) {
if (p.filename().string().rfind(epoch_file_name, 0) == 0) { // starts_with(epoch_file_name)
// this is epoch file (main one or rotated)
std::optional<epoch_id_type> epoch = last_durable_epoch(p);
if (!epoch.has_value()) {
continue; // file is empty
}
// ld_epoch = max(ld_epoch, epoch)
if (!ld_epoch.has_value() || *ld_epoch < *epoch) {
ld_epoch = epoch;
}
auto epoch_files = filter_epoch_files(from_dir);
for (const boost::filesystem::path& p : epoch_files) {
std::optional<epoch_id_type> epoch = last_durable_epoch(p);
if (!epoch.has_value()) {
continue; // file is empty
}
if (!ld_epoch.has_value() || *ld_epoch < *epoch) {
ld_epoch = epoch;
}
}
return ld_epoch.value_or(0); // 0 = minimum epoch
}



static bool log_error_and_throw(log_entry::read_error& e) {
LOG_AND_THROW_EXCEPTION("this pwal file is broken: " + e.message());
return false;
Expand Down
10 changes: 9 additions & 1 deletion src/limestone/dblog_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,18 @@ class dblog_scan {
*/
void rescan_directory_paths();

/**
* @brief Cleans up unnecessary epoch files in the log directory.
*
* This method identifies and removes unnecessary epoch files in the log directory.
* Only the main epoch file is retained, and all rotated epoch files are deleted.
* If the main epoch file does not exist among the identified files, an exception is thrown.
*/
void cleanup_rotated_epoch_files();

private:
boost::filesystem::path dblogdir_;
std::list<boost::filesystem::path> path_list_;

int thread_num_{1};
bool fail_fast_{false};

Expand Down
29 changes: 29 additions & 0 deletions src/limestone/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <boost/filesystem.hpp>

#include <limestone/api/datastore.h>
#include "file_operations.h"

namespace limestone::internal {
using namespace limestone::api;
Expand All @@ -31,12 +32,25 @@ using namespace limestone::api;
*/
static constexpr const std::string_view epoch_file_name = "epoch";

static constexpr const std::string_view tmp_epoch_file_name = ".epoch.tmp";


// moved from log_channel.h
/**
* @brief prefix of pwal file name
*/
static constexpr const std::string_view log_channel_prefix = "pwal_";


/**
* @brief The maximum number of entries allowed in an epoch file.
*
* This constant defines the upper limit for the number of entries that can be stored
* in a single epoch file. It is used to ensure that the file does not grow too large,
* which could impact performance and manageability.
*/
static constexpr const int max_entries_in_epoch_file = 100;

// from dblog_scan.cpp

// return max epoch in file.
Expand Down Expand Up @@ -75,6 +89,21 @@ void create_compact_pwal(
int num_worker,
const std::set<std::string>& file_names = std::set<std::string>());

std::set<boost::filesystem::path> filter_epoch_files(
const boost::filesystem::path& directory);

std::set<std::string> assemble_snapshot_input_filenames(
const std::unique_ptr<compaction_catalog>& compaction_catalog,
const boost::filesystem::path& location,
file_operations& file_ops);

std::set<std::string> assemble_snapshot_input_filenames(
const std::unique_ptr<compaction_catalog>& compaction_catalog,
const boost::filesystem::path& location);

void cleanup_rotated_epoch_files(
const boost::filesystem::path& directory);

// filepath.cpp

void remove_trailing_dir_separators(boost::filesystem::path& p);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,7 @@
#include "compaction_catalog.h"
#include "limestone/api/epoch_id_type.h"
#include "limestone/api/limestone_exception.h"

namespace limestone::internal {
// Forward declare the target function for testing
std::set<std::string> assemble_snapshot_input_filenames(
const std::unique_ptr<compaction_catalog>& compaction_catalog,
const boost::filesystem::path& location);
std::set<std::string> assemble_snapshot_input_filenames(
const std::unique_ptr<compaction_catalog>& compaction_catalog,
const boost::filesystem::path& location,
file_operations& file_ops);
}
#include "internal.h"

namespace limestone::testing {

Expand Down
Loading

0 comments on commit 146193a

Please sign in to comment.