Skip to content

Commit

Permalink
Merge branch 'remove-storage'
Browse files Browse the repository at this point in the history
implement remove_storage and truncate_storage: handling entries at making snapshot
  • Loading branch information
ban-nobuhiro committed Aug 27, 2024
2 parents a0eb980 + 53d3ea4 commit cc4d032
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 16 deletions.
100 changes: 85 additions & 15 deletions src/limestone/datastore_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,46 @@ namespace limestone::internal {
constexpr std::size_t write_version_size = sizeof(epoch_id_type) + sizeof(std::uint64_t);
static_assert(write_version_size == 16);

class sorting_context {
public:
sorting_context(sorting_context&& obj) noexcept : sortdb(std::move(obj.sortdb)) {
std::unique_lock lk{obj.mtx_clear_storage};
clear_storage = std::move(obj.clear_storage); // NOLINT(*-prefer-member-initializer): need lock
}
sorting_context(const sorting_context&) = delete;
sorting_context& operator=(const sorting_context&) = delete;
sorting_context& operator=(sorting_context&&) = delete;
sorting_context() = default;
~sorting_context() = default;
explicit sorting_context(std::unique_ptr<sortdb_wrapper>&& s) noexcept : sortdb(std::move(s)) {
}

// point entries
private:
std::unique_ptr<sortdb_wrapper> sortdb;
public:
sortdb_wrapper* get_sortdb() { return sortdb.get(); }

// range delete entries
private:
std::mutex mtx_clear_storage;
std::map<storage_id_type, write_version_type> clear_storage;
public:
void clear_storage_update(const storage_id_type sid, const write_version_type wv) {
std::unique_lock lk{mtx_clear_storage};
if (auto [it, inserted] = clear_storage.emplace(sid, wv);
!inserted) {
it->second = std::max(it->second, wv);
}
}
std::optional<write_version_type> clear_storage_find(const storage_id_type sid) {
// no need to lock, for now
auto itr = clear_storage.find(sid);
if (itr == clear_storage.end()) return {};
return {itr->second};
}
};

[[maybe_unused]]
static void store_bswap64_value(void *dest, const void *src) {
auto* p64_dest = reinterpret_cast<std::uint64_t*>(dest); // NOLINT(*-reinterpret-cast)
Expand Down Expand Up @@ -85,14 +125,14 @@ static void insert_twisted_entry(sortdb_wrapper* sortdb, const log_entry& e) {
sortdb->put(db_key, db_value);
}

static std::pair<epoch_id_type, std::unique_ptr<sortdb_wrapper>> create_sortdb_from_wals(
static std::pair<epoch_id_type, sorting_context> create_sorted_from_wals(
const boost::filesystem::path& from_dir,
int num_worker,
const std::set<std::string>& file_names = std::set<std::string>()) {
#if defined SORT_METHOD_PUT_ONLY
auto sortdb = std::make_unique<sortdb_wrapper>(from_dir, comp_twisted_key);
sorting_context sctx{std::make_unique<sortdb_wrapper>(from_dir, comp_twisted_key)};
#else
auto sortdb = std::make_unique<sortdb_wrapper>(from_dir);
sorting_context sctx{std::make_unique<sortdb_wrapper>(from_dir)};
#endif
dblog_scan logscan = file_names.empty() ? dblog_scan{from_dir} : dblog_scan{from_dir, file_names};

Expand All @@ -105,15 +145,20 @@ static std::pair<epoch_id_type, std::unique_ptr<sortdb_wrapper>> create_sortdb_f
const auto add_entry_to_point = insert_entry_or_update_to_max;
bool works_with_multi_thread = false;
#endif
auto add_entry = [&sortdb, &add_entry_to_point](const log_entry& e){
auto add_entry = [&sctx, &add_entry_to_point](const log_entry& e){
switch (e.type()) {
case log_entry::entry_type::normal_entry:
case log_entry::entry_type::remove_entry:
add_entry_to_point(sortdb.get(), e);
add_entry_to_point(sctx.get_sortdb(), e);
break;
case log_entry::entry_type::clear_storage:
case log_entry::entry_type::remove_storage:
break; // TODO: implement
case log_entry::entry_type::remove_storage: { // remove_storage is treated as clear_storage
// clear_storage[st] = max(clear_storage[st], wv)
write_version_type wv;
e.write_version(wv);
sctx.clear_storage_update(e.storage(), wv);
return;
}
case log_entry::entry_type::add_storage:
break; // ignore
default:
Expand All @@ -128,7 +173,7 @@ static std::pair<epoch_id_type, std::unique_ptr<sortdb_wrapper>> create_sortdb_f
logscan.set_thread_num(num_worker);
try {
epoch_id_type max_appeared_epoch = logscan.scan_pwal_files_throws(ld_epoch, add_entry);
return {max_appeared_epoch, std::move(sortdb)};
return {max_appeared_epoch, std::move(sctx)};
} catch (std::runtime_error& e) {
VLOG_LP(log_info) << "failed to scan pwal files: " << e.what();
LOG(ERROR) << "/:limestone recover process failed. (cause: corruption detected in transaction log data directory), "
Expand All @@ -138,17 +183,31 @@ static std::pair<epoch_id_type, std::unique_ptr<sortdb_wrapper>> create_sortdb_f
}
}

static void sortdb_foreach(sortdb_wrapper *sortdb, std::function<void(const std::string_view key, const std::string_view value)> write_snapshot_entry) {
static void sortdb_foreach(sorting_context& sctx, std::function<void(const std::string_view key, const std::string_view value)> write_snapshot_entry) {
static_assert(sizeof(log_entry::entry_type) == 1);
#if defined SORT_METHOD_PUT_ONLY
sortdb->each([write_snapshot_entry, last_key = std::string{}](const std::string_view db_key, const std::string_view db_value) mutable {
sctx.get_sortdb()->each([&sctx, write_snapshot_entry, last_key = std::string{}](const std::string_view db_key, const std::string_view db_value) mutable {
// using the first entry in GROUP BY (original-)key
// NB: max versions comes first (by the custom-comparator)
std::string_view key(db_key.data() + write_version_size, db_key.size() - write_version_size);
if (key == last_key) { // same (original-)key with prev
return; // skip
}
last_key.assign(key);
storage_id_type st_bytes{};
memcpy(static_cast<void*>(&st_bytes), key.data(), sizeof(storage_id_type));
storage_id_type st = le64toh(st_bytes);
if (auto ret = sctx.clear_storage_find(st); ret) {
// check range delete
write_version_type range_ver = ret.value();
std::string wv(write_version_size, '\0');
store_bswap64_value(&wv[0], &db_key[0]);
store_bswap64_value(&wv[8], &db_key[8]);
write_version_type point_ver{wv};
if (point_ver < range_ver) {
return; // skip
}
}

auto entry_type = static_cast<log_entry::entry_type>(db_value[0]);
switch (entry_type) {
Expand All @@ -168,7 +227,18 @@ static void sortdb_foreach(sortdb_wrapper *sortdb, std::function<void(const std:
}
});
#else
sortdb->each([&write_snapshot_entry](const std::string_view db_key, const std::string_view db_value) {
sctx.get_sortdb()->each([&sctx, &write_snapshot_entry](const std::string_view db_key, const std::string_view db_value) {
storage_id_type st_bytes{};
memcpy(static_cast<void*>(&st_bytes), db_key.data(), sizeof(storage_id_type));
storage_id_type st = le64toh(st_bytes);
if (auto ret = sctx.clear_storage_find(st); ret) {
// check range delete
write_version_type range_ver = ret.value();
write_version_type point_ver{db_value.substr(1)};
if (point_ver < range_ver) {
return; // skip
}
}
auto entry_type = static_cast<log_entry::entry_type>(db_value[0]);
switch (entry_type) {
case log_entry::entry_type::normal_entry:
Expand All @@ -189,7 +259,7 @@ void create_compact_pwal(
const boost::filesystem::path& to_dir,
int num_worker,
const std::set<std::string>& file_names) {
auto [max_appeared_epoch, sortdb] = create_sortdb_from_wals(from_dir, num_worker, file_names);
auto [max_appeared_epoch, sctx] = create_sorted_from_wals(from_dir, num_worker, file_names);

boost::system::error_code error;
const bool result_check = boost::filesystem::exists(to_dir, error);
Expand Down Expand Up @@ -222,7 +292,7 @@ void create_compact_pwal(
log_entry::write(ostrm, key_stid, value_etc);
}
};
sortdb_foreach(sortdb.get(), write_snapshot_entry);
sortdb_foreach(sctx, write_snapshot_entry);
//log_entry::end_session(ostrm, epoch);
if (fclose(ostrm) != 0) { // NOLINT(*-owning-memory)
LOG_LP(ERROR) << "cannot close snapshot file (" << snapshot_file << "), errno = " << errno;
Expand All @@ -237,7 +307,7 @@ using namespace limestone::internal;

void datastore::create_snapshot(const std::set<std::string>& file_names) {
const auto& from_dir = location_;
auto [max_appeared_epoch, sortdb] = create_sortdb_from_wals(from_dir, recover_max_parallelism_, file_names);
auto [max_appeared_epoch, sctx] = create_sorted_from_wals(from_dir, recover_max_parallelism_, file_names);
epoch_id_switched_.store(max_appeared_epoch);
epoch_id_informed_.store(max_appeared_epoch);

Expand All @@ -261,7 +331,7 @@ void datastore::create_snapshot(const std::set<std::string>& file_names) {
}
setvbuf(ostrm, nullptr, _IOFBF, 128L * 1024L); // NOLINT, NB. glibc may ignore size when _IOFBF and buffer=NULL
auto write_snapshot_entry = [&ostrm](std::string_view key, std::string_view value){log_entry::write(ostrm, key, value);};
sortdb_foreach(sortdb.get(), write_snapshot_entry);
sortdb_foreach(sctx, write_snapshot_entry);
if (fclose(ostrm) != 0) { // NOLINT(*-owning-memory)
LOG_LP(ERROR) << "cannot close snapshot file (" << snapshot_file << "), errno = " << errno;
throw std::runtime_error("I/O error");
Expand Down
51 changes: 50 additions & 1 deletion test/limestone/log/log_channel_test.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 Project Tsurugi.
* Copyright 2022-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -167,4 +167,53 @@ TEST_F(log_channel_test, skip_storage_add_remove) {
EXPECT_EQ(m.size(), 0);
}

TEST_F(log_channel_test, remove_storage) {
limestone::api::log_channel& channel = datastore_->create_channel(boost::filesystem::path(location));

channel.begin_session();
channel.add_storage(42, {90, 4});
channel.add_entry(42, "42-100", "v1", {100, 4});
channel.add_entry(43, "43-100", "v2", {100, 4});
channel.end_session();

channel.begin_session();
channel.remove_storage(42, {110, 4});
channel.end_session();

datastore_->ready();
auto ss = datastore_->get_snapshot();
auto cursor = ss->get_cursor();

auto m = read_all_from_cursor(cursor.get());
EXPECT_EQ(m.size(), 1);
EXPECT_EQ(m["43-100"], "v2"); // in another storage
}

TEST_F(log_channel_test, truncate_storage) {
limestone::api::log_channel& channel = datastore_->create_channel(boost::filesystem::path(location));

channel.begin_session();
channel.add_storage(42, {90, 4});
channel.add_entry(42, "42-100", "v1", {100, 4});
channel.add_entry(43, "43-100", "v2", {100, 4});
channel.end_session();

channel.begin_session();
channel.truncate_storage(42, {110, 4});
channel.end_session();

channel.begin_session();
channel.add_entry(42, "42-120", "v3", {120, 4});
channel.end_session();

datastore_->ready();
auto ss = datastore_->get_snapshot();
auto cursor = ss->get_cursor();

auto m = read_all_from_cursor(cursor.get());
EXPECT_EQ(m.size(), 2);
EXPECT_EQ(m["43-100"], "v2"); // in another storage
EXPECT_EQ(m["42-120"], "v3"); // after truncate
}

} // namespace limestone::testing

0 comments on commit cc4d032

Please sign in to comment.