Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate wip/log-0.7 into master (excluding draft documents) #70

Merged
merged 39 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
accc0c6
Add an internal document
umegane Sep 30, 2024
1a48442
Refactor to isolate testable units for easier testing
umegane Sep 30, 2024
68480e0
Add flag to snapshot creation to skip entry removal
umegane Sep 30, 2024
aed365d
Make cursor class handle two streams
umegane Sep 30, 2024
b01068c
cleanup: clang-tidy
umegane Oct 2, 2024
8b8b8d6
Update an internal document
umegane Oct 3, 2024
0146118
Add test cases
umegane Oct 3, 2024
c5b067f
Refactor: Rename test file and class for compaction functionality
umegane Oct 4, 2024
605844e
Add test cases
umegane Oct 4, 2024
2f313b7
Merge branch 'master' into wip/log-0.7
umegane Oct 7, 2024
d514252
Refactor: Separate snapshot_tracker class into standalone files
umegane Oct 10, 2024
b44d6c5
Add test cases
umegane Oct 10, 2024
7654a92
Refactor: Apply Pimpl Idiom to cursor class and rename snapshot_track…
umegane Oct 17, 2024
f85c130
Refactor: Apply Pimpl Idiom to snapshot class to hide implementation …
umegane Oct 17, 2024
1bf2a15
Refactor: Move sorting_context class to separate files
umegane Oct 17, 2024
aa64958
feat: Enable cursor to access clear_storage from sorting_context
umegane Oct 17, 2024
434d44c
cleanup: clang-tidy
umegane Oct 17, 2024
f8799b5
Fix: Ensure proper deletion of entries in snapshot when drop or trunc…
umegane Oct 18, 2024
b1be94a
Add test cases
umegane Oct 18, 2024
544b6ca
cleanup: clang-tidy
umegane Oct 18, 2024
acc1f2e
Update an internal document
umegane Oct 21, 2024
061fdb4
Remove db_startup_sort_improvement.md before merge to master
umegane Oct 21, 2024
f6a9066
Merge branch 'master' into wip/log-0.7
umegane Oct 21, 2024
b989d13
Merge branch 'master' into wip/log-0.7
umegane Oct 31, 2024
946aac8
fix: Typo in variable name
umegane Nov 12, 2024
fac5877
Fix epoch ID handling logic for consistent order in multi-threading
umegane Nov 12, 2024
4f5ca5c
Fix: Prevent invalid Epoch ID due to underflow during Epoch update
umegane Nov 14, 2024
a7f204a
Clarify synchronization loop purpose with added comments
umegane Nov 14, 2024
1455eae
Fix: Handle missing Epoch file at startup
umegane Nov 14, 2024
fa6a93e
Add manifest file locking to prevent simultaneous DB instances or dbl…
umegane Nov 18, 2024
cfcab04
Merge branch 'master' into wip/log-0.7
umegane Nov 20, 2024
1ef6217
Correct inaccurate comments to align with the current implementation …
umegane Nov 21, 2024
a760fad
Add docs/internal/epock_file_handling.md
umegane Nov 25, 2024
003f01a
cleanup: clang-tidy
umegane Nov 25, 2024
4417c00
addressed the case where epoch file writing takes a long time
t-horikawa Nov 27, 2024
f8ad37d
apply https://github.com/project-tsurugi/tsurugi-issues/issues/1034#i…
t-horikawa Nov 28, 2024
77552d6
address a race described in https://github.com/project-tsurugi/tsurug…
t-horikawa Nov 28, 2024
c73f270
Merge pull request #68 from project-tsurugi/wip/log-0.7_i_1034
umegane Nov 28, 2024
02bbea4
Temporarily remove unfinished documents for master merge
umegane Nov 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions include/limestone/api/datastore.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,10 @@ class datastore {
protected: // for tests
auto& log_channels_for_tests() const noexcept { return log_channels_; }
auto epoch_id_informed_for_tests() const noexcept { return epoch_id_informed_.load(); }
auto epoch_id_recorded_for_tests() const noexcept { return epoch_id_recorded_.load(); }
auto epoch_id_recorded_for_tests() const noexcept { return epoch_id_to_be_recorded_.load(); }
auto epoch_id_switched_for_tests() const noexcept { return epoch_id_switched_.load(); }
auto& files_for_tests() const noexcept { return files_; }
void rotate_epoch_file_for_tests() { rotate_epoch_file(); }

private:
std::vector<std::unique_ptr<log_channel>> log_channels_;
Expand All @@ -261,7 +263,8 @@ class datastore {

std::atomic_uint64_t epoch_id_informed_{};

std::atomic_uint64_t epoch_id_recorded_{};
std::atomic_uint64_t epoch_id_to_be_recorded_{};
std::atomic_uint64_t epoch_id_record_finished_{};

std::unique_ptr<backup> backup_{};

Expand Down Expand Up @@ -302,6 +305,8 @@ class datastore {

std::mutex mtx_epoch_file_{};

std::mutex mtx_epoch_persistent_callback_{};

state state_{};

void add_file(const boost::filesystem::path& file) noexcept;
Expand All @@ -327,7 +332,6 @@ class datastore {
*/
void create_snapshot();

epoch_id_type last_durable_epoch_in_dir();

/**
* @brief requests the data store to rotate log files
Expand All @@ -342,6 +346,9 @@ class datastore {
int64_t current_unix_epoch_in_millis();

std::map<storage_id_type, write_version_type> clear_storage;

// File descriptor for file lock (flock) on the manifest file
int fd_for_flock_{-1};
};

} // namespace limestone::api
2 changes: 1 addition & 1 deletion include/limestone/api/log_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ class log_channel {

std::atomic_uint64_t finished_epoch_id_{0};

std::atomic<epoch_id_type> latest_ession_epoch_id_{0};
std::atomic<epoch_id_type> latest_session_epoch_id_{0};

std::mutex session_mutex_;

Expand Down
64 changes: 53 additions & 11 deletions src/limestone/datastore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,19 @@ datastore::datastore(configuration const& conf) : location_(conf.data_locations_
}
}
internal::check_and_migrate_logdir_format(location_);

// acquire lock for manifest file
fd_for_flock_ = internal::acquire_manifest_lock(location_);
if (fd_for_flock_ == -1) {
if (errno == EWOULDBLOCK) {
std::string err_msg = "Another process is using the log directory: " + location_.string() + ". Terminate the conflicting process and restart this process.";
LOG_AND_THROW_IO_EXCEPTION(err_msg, errno);
} else {
std::string err_msg = "Failed to acquire lock for manifest in directory: " + location_.string();
LOG_AND_THROW_IO_EXCEPTION(err_msg, errno);
}
}

add_file(compaction_catalog_path);
compaction_catalog_ = std::make_unique<compaction_catalog>(compaction_catalog::from_catalog_file(location_));

Expand Down Expand Up @@ -149,22 +162,30 @@ void datastore::switch_epoch(epoch_id_type new_epoch_id) {
}

epoch_id_switched_.store(neid);
update_min_epoch_id(true);
if (state_ != state::not_ready) {
update_min_epoch_id(true);
}
} catch (...) {
HANDLE_EXCEPTION_AND_ABORT();
}
}

void datastore::update_min_epoch_id(bool from_switch_epoch) { // NOLINT(readability-function-cognitive-complexity)
auto upper_limit = epoch_id_switched_.load() - 1;
auto upper_limit = epoch_id_switched_.load();
if (upper_limit == 0) {
return; // If epoch_id_switched_ is zero, it means no epoch has been switched, so updating epoch_id_to_be_recorded_ and epoch_id_informed_ is unnecessary.
}
upper_limit--;
epoch_id_type max_finished_epoch = 0;

for (const auto& e : log_channels_) {
auto working_epoch = static_cast<epoch_id_type>(e->current_epoch_id_.load());
if ((working_epoch - 1) < upper_limit) {
upper_limit = working_epoch - 1;
}
auto working_epoch = e->current_epoch_id_.load();
auto finished_epoch = e->finished_epoch_id_.load();
if (working_epoch > finished_epoch && working_epoch != UINT64_MAX) {
if ((working_epoch - 1) < upper_limit) {
upper_limit = working_epoch - 1;
}
}
if (max_finished_epoch < finished_epoch) {
max_finished_epoch = finished_epoch;
}
Expand All @@ -175,19 +196,21 @@ void datastore::update_min_epoch_id(bool from_switch_epoch) { // NOLINT(readabi
if (from_switch_epoch && (to_be_epoch > static_cast<std::uint64_t>(max_finished_epoch))) {
to_be_epoch = static_cast<std::uint64_t>(max_finished_epoch);
}
auto old_epoch_id = epoch_id_recorded_.load();
auto old_epoch_id = epoch_id_to_be_recorded_.load();
while (true) {
if (old_epoch_id >= to_be_epoch) {
break;
}
if (epoch_id_recorded_.compare_exchange_strong(old_epoch_id, to_be_epoch)) {
if (epoch_id_to_be_recorded_.compare_exchange_strong(old_epoch_id, to_be_epoch)) {
std::lock_guard<std::mutex> lock(mtx_epoch_file_);

if (to_be_epoch < epoch_id_to_be_recorded_.load()) {
break;
}
FILE* strm = fopen(epoch_file_path_.c_str(), "a"); // NOLINT(*-owning-memory)
if (!strm) {
LOG_AND_THROW_IO_EXCEPTION("fopen failed", errno);
}
log_entry::durable_epoch(strm, static_cast<epoch_id_type>(epoch_id_recorded_.load()));
log_entry::durable_epoch(strm, static_cast<epoch_id_type>(epoch_id_to_be_recorded_.load()));
if (fflush(strm) != 0) {
LOG_AND_THROW_IO_EXCEPTION("fflush failed", errno);
}
Expand All @@ -197,18 +220,29 @@ void datastore::update_min_epoch_id(bool from_switch_epoch) { // NOLINT(readabi
if (fclose(strm) != 0) { // NOLINT(*-owning-memory)
LOG_AND_THROW_IO_EXCEPTION("fclose failed", errno);
}
epoch_id_record_finished_.store(epoch_id_to_be_recorded_.load());
break;
}
}
if (to_be_epoch > epoch_id_record_finished_.load()) {
return;
}

// update informed_epoch_
to_be_epoch = upper_limit;
// In `informed_epoch_`, the update restriction based on the `from_switch_epoch` condition is intentionally omitted.
// Due to the interface specifications of Shirakami, it is necessary to advance the epoch even if the log channel
// is not updated. This behavior differs from `recorded_epoch_` and should be maintained as such.
old_epoch_id = epoch_id_informed_.load();
while (true) {
if (old_epoch_id >= to_be_epoch) {
break;
}
if (epoch_id_informed_.compare_exchange_strong(old_epoch_id, to_be_epoch)) {
std::lock_guard<std::mutex> lock(mtx_epoch_persistent_callback_);
if (to_be_epoch < epoch_id_informed_.load()) {
break;
}
if (persistent_callback_) {
persistent_callback_(to_be_epoch);
}
Expand Down Expand Up @@ -244,7 +278,15 @@ std::future<void> datastore::shutdown() noexcept {
VLOG(log_info) << "/:limestone:datastore:shutdown compaction task has been stopped.";
}

return std::async(std::launch::async, []{
if (fd_for_flock_ != -1) {
if (::close(fd_for_flock_) == -1) {
VLOG(log_error) << "Failed to close lock file descriptor: " << strerror(errno);
} else {
fd_for_flock_ = -1;
}
}

return std::async(std::launch::async, [] {
std::this_thread::sleep_for(std::chrono::microseconds(100000));
VLOG(log_info) << "/:limestone:datastore:shutdown end";
});
Expand Down
19 changes: 19 additions & 0 deletions src/limestone/datastore_format.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <sys/file.h>
#include <fcntl.h>
#include <unistd.h>

#include <boost/filesystem.hpp>
#include <nlohmann/json.hpp>
Expand Down Expand Up @@ -170,4 +173,20 @@ void check_and_migrate_logdir_format(const boost::filesystem::path& logdir) {
}
}

int acquire_manifest_lock(const boost::filesystem::path& logdir) {
boost::filesystem::path manifest_path = logdir / std::string(manifest_file_name);

int fd = ::open(manifest_path.string().c_str(), O_RDWR); // NOLINT(hicpp-vararg, cppcoreguidelines-pro-type-vararg)
if (fd == -1) {
return -1;
}

if (::flock(fd, LOCK_EX | LOCK_NB) == -1) {
::close(fd);
return -1;
}
VLOG_LP(log_info) << "acquired lock on manifest file: " << manifest_path.string();
return fd;
}

} // namespace limestone::internal
19 changes: 11 additions & 8 deletions src/limestone/dblog_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,21 @@ epoch_id_type dblog_scan::last_durable_epoch_in_dir() {
auto& from_dir = dblogdir_;
// read main epoch file first
auto main_epoch_file = from_dir / std::string(epoch_file_name);

// If main epoch file does not exist, create an empty one
if (!boost::filesystem::exists(main_epoch_file)) {
// datastore operations (ctor and rotate) ensure that the main epoch file exists.
// so it may directory called from outside of datastore
LOG_AND_THROW_EXCEPTION("epoch file does not exist: " + main_epoch_file.string());
}
std::optional<epoch_id_type> ld_epoch = last_durable_epoch(main_epoch_file);
if (ld_epoch.has_value()) {
return *ld_epoch;
std::ofstream(main_epoch_file.string()).close(); // Create an empty file
} else {
// If the file exists, attempt to get the last durable epoch
std::optional<epoch_id_type> ld_epoch = last_durable_epoch(main_epoch_file);
if (ld_epoch.has_value()) {
return *ld_epoch;
}
}

// main epoch file is empty,
// main epoch file is empty or does not contain a valid epoch,
// read all rotated-epoch files
std::optional<epoch_id_type> ld_epoch;
for (const boost::filesystem::path& p : boost::filesystem::directory_iterator(from_dir)) {
if (p.filename().string().rfind(epoch_file_name, 0) == 0) { // starts_with(epoch_file_name)
// this is epoch file (main one or rotated)
Expand Down
8 changes: 8 additions & 0 deletions src/limestone/dblogutil/dblogutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,11 +289,19 @@ int main(char *dir, subcommand mode) { // NOLINT
}
try {
check_and_migrate_logdir_format(p);
int lock_fd = acquire_manifest_lock(p);
if (lock_fd == -1) {
LOG(ERROR) << "Another process is using the log directory: " << p
<< ". Terminate the conflicting process and re-execute the command. "
<< "Error: " << strerror(errno);
log_and_exit(64);
}
dblog_scan ds(p);
ds.set_thread_num(FLAGS_thread_num);
if (mode == cmd_inspect) inspect(ds, opt_epoch);
if (mode == cmd_repair) repair(ds, opt_epoch);
if (mode == cmd_compaction) compaction(ds, opt_epoch);
close(lock_fd);
} catch (limestone_exception& e) {
LOG(ERROR) << e.what();
log_and_exit(64);
Expand Down
7 changes: 7 additions & 0 deletions src/limestone/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,15 @@ void setup_initial_logdir(const boost::filesystem::path& logdir);
*/
int is_supported_version(const boost::filesystem::path& manifest_path, std::string& errmsg);

// Validates the manifest file in the specified log directory and performs repair or migration if necessary.
void check_and_migrate_logdir_format(const boost::filesystem::path& logdir);

// Acquires an exclusive lock on the manifest file.
// Returns the file descriptor on success, or -1 on failure.
// Note: This function does not log errors or handle them internally.
// The caller must check the return value and use errno for error handling.
int acquire_manifest_lock(const boost::filesystem::path& logdir);

// from datastore_restore.cpp

status purge_dir(const boost::filesystem::path& dir);
Expand Down
8 changes: 4 additions & 4 deletions src/limestone/limestone_exception_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,16 @@ inline void handle_exception_and_abort(std::string_view func_name) {
throw;
}
LOG_LP(FATAL) << "Fatal error in " << func_name << ": " << e.what();
std::abort(); // Safety measure: this should never be reached due to VLOG_LP(google::FATAL)
std::abort(); // Safety measure: this should never be reached due to LOG_LP(FATAL)
} catch (const std::runtime_error& e) {
LOG_LP(FATAL) << "Runtime error in " << func_name << ": " << e.what();
std::abort(); // Safety measure: this should never be reached due to VLOG_LP(google::FATAL)
std::abort(); // Safety measure: this should never be reached due to LOG_LP(FATAL)
} catch (const std::exception& e) {
LOG_LP(FATAL) << "Unexpected exception in " << func_name << ": " << e.what();
std::abort(); // Safety measure: this should never be reached due to VLOG_LP(google::FATAL)
std::abort(); // Safety measure: this should never be reached due to LOG_LP(FATAL)
} catch (...) {
LOG_LP(FATAL) << "Unknown exception in " << func_name;
std::abort(); // Safety measure: this should never be reached due to VLOG_LP(google::FATAL)
std::abort(); // Safety measure: this should never be reached due to LOG_LP(FATAL)
}
}

Expand Down
25 changes: 20 additions & 5 deletions src/limestone/log_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,26 @@ log_channel::log_channel(boost::filesystem::path location, std::size_t id, datas

void log_channel::begin_session() {
try {
// Synchronize `current_epoch_id_` with `epoch_id_switched_`.
// This loop is necessary to prevent inconsistencies in `current_epoch_id_`
// that could occur if `epoch_id_switched_` changes at a specific timing.
//
// Case where inconsistency occurs:
// 1. This thread (L) loads `epoch_id_switched_` and reads 10.
// 2. Another thread (S) immediately updates `epoch_id_switched_` to 11.
// 3. If the other thread (S) reads `current_epoch_id_` at this point,
// it expects `current_epoch_id_` to be consistent with the latest
// `epoch_id_switched_` value (11), but `current_epoch_id_` may still
// hold the outdated value, causing an inconsistency.
//
// This loop detects such inconsistencies and repeats until `current_epoch_id_`
// matches the latest value of `epoch_id_switched_`, ensuring consistency.
do {
current_epoch_id_.store(envelope_.epoch_id_switched_.load());
std::atomic_thread_fence(std::memory_order_acq_rel);
} while (current_epoch_id_.load() != envelope_.epoch_id_switched_.load());
latest_ession_epoch_id_.store(static_cast<epoch_id_type>(current_epoch_id_.load()));

latest_session_epoch_id_.store(static_cast<epoch_id_type>(current_epoch_id_.load()));

auto log_file = file_path();
strm_ = fopen(log_file.c_str(), "a"); // NOLINT(*-owning-memory)
Expand All @@ -60,7 +75,7 @@ void log_channel::begin_session() {
log_entry::begin_session(strm_, static_cast<epoch_id_type>(current_epoch_id_.load()));
{
std::lock_guard<std::mutex> lock(session_mutex_);
waiting_epoch_ids_.insert(latest_ession_epoch_id_);
waiting_epoch_ids_.insert(latest_session_epoch_id_);
}
} catch (...) {
HANDLE_EXCEPTION_AND_ABORT();
Expand All @@ -76,8 +91,8 @@ void log_channel::end_session() {
LOG_AND_THROW_IO_EXCEPTION("fsync failed", errno);
}
finished_epoch_id_.store(current_epoch_id_.load());
current_epoch_id_.store(UINT64_MAX);
envelope_.update_min_epoch_id();
current_epoch_id_.store(UINT64_MAX);

if (fclose(strm_) != 0) { // NOLINT(*-owning-memory)
LOG_AND_THROW_IO_EXCEPTION("fclose failed", errno);
Expand All @@ -86,7 +101,7 @@ void log_channel::end_session() {
// Remove current_epoch_id_ from waiting_epoch_ids_
{
std::lock_guard<std::mutex> lock(session_mutex_);
waiting_epoch_ids_.erase(latest_ession_epoch_id_.load());
waiting_epoch_ids_.erase(latest_session_epoch_id_.load());
// Notify waiting threads
session_cv_.notify_all();
}
Expand Down Expand Up @@ -178,7 +193,7 @@ rotation_result log_channel::do_rotate_file(epoch_id_type epoch) {
envelope_.subtract_file(location_ / file_);

// Create a rotation result with the current epoch ID
rotation_result result(new_name, latest_ession_epoch_id_);
rotation_result result(new_name, latest_session_epoch_id_);
return result;
}

Expand Down
Loading