Skip to content

Commit

Permalink
implement truncate_storage: handling in making snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
ban-nobuhiro committed Aug 16, 2024
1 parent 2a83e9a commit c01a8fb
Showing 1 changed file with 55 additions and 18 deletions.
73 changes: 55 additions & 18 deletions src/limestone/datastore_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,20 @@ namespace limestone::internal {
constexpr std::size_t write_version_size = sizeof(epoch_id_type) + sizeof(std::uint64_t);
static_assert(write_version_size == 16);

struct sorting_context {

Check warning on line 38 in src/limestone/datastore_snapshot.cpp

View workflow job for this annotation

GitHub Actions / Clang-Tidy

cppcoreguidelines-special-member-functions,hicpp-special-member-functions

class 'sorting_context' defines a move constructor but does not define a destructor, a copy constructor, a copy assignment operator or a move assignment operator
std::unique_ptr<sortdb_wrapper> sortdb;

Check warning on line 39 in src/limestone/datastore_snapshot.cpp

View workflow job for this annotation

GitHub Actions / Clang-Tidy

misc-non-private-member-variables-in-classes

member variable 'sortdb' has public visibility
std::mutex mtx_remove_storage;

Check warning on line 40 in src/limestone/datastore_snapshot.cpp

View workflow job for this annotation

GitHub Actions / Clang-Tidy

misc-non-private-member-variables-in-classes

member variable 'mtx_remove_storage' has public visibility
std::map<storage_id_type, write_version_type> remove_storage;

Check warning on line 41 in src/limestone/datastore_snapshot.cpp

View workflow job for this annotation

GitHub Actions / Clang-Tidy

misc-non-private-member-variables-in-classes

member variable 'remove_storage' has public visibility
sorting_context(sorting_context&& obj) : sortdb(std::move(obj.sortdb)) {

Check warning on line 42 in src/limestone/datastore_snapshot.cpp

View workflow job for this annotation

GitHub Actions / Clang-Tidy

bugprone-exception-escape

an exception may be thrown in function 'sorting_context' which should not throw exceptions

Check warning on line 42 in src/limestone/datastore_snapshot.cpp

View workflow job for this annotation

GitHub Actions / Clang-Tidy

hicpp-noexcept-move,performance-noexcept-move-constructor

move constructors should be marked noexcept
if (!obj.mtx_remove_storage.try_lock()) {
throw std::runtime_error("programming error");
}
remove_storage = std::move(obj.remove_storage);
obj.mtx_remove_storage.unlock();
}
sorting_context() {};

Check warning on line 49 in src/limestone/datastore_snapshot.cpp

View workflow job for this annotation

GitHub Actions / Clang-Tidy

hicpp-use-equals-default,modernize-use-equals-default

use '= default' to define a trivial default constructor
};

[[maybe_unused]]
static void store_bswap64_value(void *dest, const void *src) {
auto* p64_dest = reinterpret_cast<std::uint64_t*>(dest); // NOLINT(*-reinterpret-cast)
Expand All @@ -53,11 +67,11 @@ static int comp_twisted_key(const std::string_view& a, const std::string_view& b
}

[[maybe_unused]]
static void insert_entry_or_update_to_max(sortdb_wrapper* sortdb, const log_entry& e) {
static void insert_entry_or_update_to_max(sorting_context& sctx, const log_entry& e) {
bool need_write = true;
// skip older entry than already inserted
std::string value;
if (sortdb->get(e.key_sid(), &value)) {
if (sctx.sortdb->get(e.key_sid(), &value)) {
write_version_type write_version;
e.write_version(write_version);
if (write_version < write_version_type(value.substr(1))) {
Expand All @@ -68,12 +82,12 @@ static void insert_entry_or_update_to_max(sortdb_wrapper* sortdb, const log_entr
std::string db_value;
db_value.append(1, static_cast<char>(e.type()));
db_value.append(e.value_etc());
sortdb->put(e.key_sid(), db_value);
sctx.sortdb->put(e.key_sid(), db_value);
}
}

[[maybe_unused]]
static void insert_twisted_entry(sortdb_wrapper* sortdb, const log_entry& e) {
static void insert_twisted_entry(sorting_context& sctx, const log_entry& e) {
// key_sid: storage_id[8] key[*], value_etc: epoch[8]LE minor_version[8]LE value[*], type: type[1]
// db_key: epoch[8]BE minor_version[8]BE storage_id[8] key[*], db_value: type[1] value[*]
std::string db_key(write_version_size + e.key_sid().size(), '\0');
Expand All @@ -82,24 +96,25 @@ static void insert_twisted_entry(sortdb_wrapper* sortdb, const log_entry& e) {
std::memcpy(&db_key[write_version_size], e.key_sid().data(), e.key_sid().size());
std::string db_value(1, static_cast<char>(e.type()));
db_value.append(e.value_etc().substr(write_version_size));
sortdb->put(db_key, db_value);
sctx.sortdb->put(db_key, db_value);
}

static std::pair<epoch_id_type, std::unique_ptr<sortdb_wrapper>> create_sortdb_from_wals(const boost::filesystem::path& from_dir, int num_worker) {
static std::pair<epoch_id_type, sorting_context> create_sorted_from_wals(const boost::filesystem::path& from_dir, int num_worker) {
sorting_context sctx;
#if defined SORT_METHOD_PUT_ONLY
auto sortdb = std::make_unique<sortdb_wrapper>(from_dir, comp_twisted_key);
sctx.sortdb = std::make_unique<sortdb_wrapper>(from_dir, comp_twisted_key);
#else
auto sortdb = std::make_unique<sortdb_wrapper>(from_dir);
sctx.sortdb = std::make_unique<sortdb_wrapper>(from_dir);
#endif
dblog_scan logscan{from_dir};

epoch_id_type ld_epoch = logscan.last_durable_epoch_in_dir();

#if defined SORT_METHOD_PUT_ONLY
auto add_entry = [&sortdb](const log_entry& e){insert_twisted_entry(sortdb.get(), e);};
auto add_entry = [&sctx](const log_entry& e){insert_twisted_entry(sctx, e);};
bool works_with_multi_thread = true;
#else
auto add_entry = [&sortdb](const log_entry& e){insert_entry_or_update_to_max(sortdb.get(), e);};
auto add_entry = [&sctx](const log_entry& e){insert_entry_or_update_to_max(sctx.get(), e);};

Check failure on line 117 in src/limestone/datastore_snapshot.cpp

View workflow job for this annotation

GitHub Actions / Clang-Tidy

clang-diagnostic-error

no member named 'get' in 'limestone::internal::sorting_context'
bool works_with_multi_thread = false;
#endif

Expand All @@ -110,7 +125,7 @@ static std::pair<epoch_id_type, std::unique_ptr<sortdb_wrapper>> create_sortdb_f
logscan.set_thread_num(num_worker);
try {
epoch_id_type max_appeared_epoch = logscan.scan_pwal_files_throws(ld_epoch, add_entry);
return {max_appeared_epoch, std::move(sortdb)};
return {max_appeared_epoch, std::move(sctx)};
} catch (std::runtime_error& e) {
VLOG_LP(log_info) << "failed to scan pwal files: " << e.what();
LOG(ERROR) << "/:limestone recover process failed. (cause: corruption detected in transaction log data directory), "
Expand All @@ -120,17 +135,28 @@ static std::pair<epoch_id_type, std::unique_ptr<sortdb_wrapper>> create_sortdb_f
}
}

static void sortdb_foreach(sortdb_wrapper *sortdb, std::function<void(const std::string_view key, const std::string_view value)> write_snapshot_entry) {
static void sortdb_foreach(sorting_context& sctx, std::function<void(const std::string_view key, const std::string_view value)> write_snapshot_entry) {
static_assert(sizeof(log_entry::entry_type) == 1);
#if defined SORT_METHOD_PUT_ONLY
sortdb->each([write_snapshot_entry, last_key = std::string{}](const std::string_view db_key, const std::string_view db_value) mutable {
sctx.sortdb->each([&sctx, write_snapshot_entry, last_key = std::string{}](const std::string_view db_key, const std::string_view db_value) mutable {
// using the first entry in GROUP BY (original-)key
// NB: max versions comes first (by the custom-comparator)
std::string_view key(db_key.data() + write_version_size, db_key.size() - write_version_size);
if (key == last_key) { // same (original-)key with prev
return; // skip
}
last_key.assign(key);
storage_id_type st_bytes{};
memcpy(static_cast<void*>(&st_bytes), key.data(), sizeof(storage_id_type));
storage_id_type st = le64toh(st_bytes);
if (auto itr = sctx.remove_storage.find(st); itr != sctx.remove_storage.end()) {
// check range delete
write_version_type range_ver = itr->second;
write_version_type point_ver{db_key};
if (point_ver < range_ver) {
return; // skip
}
}

auto entry_type = static_cast<log_entry::entry_type>(db_value[0]);
switch (entry_type) {
Expand All @@ -150,7 +176,18 @@ static void sortdb_foreach(sortdb_wrapper *sortdb, std::function<void(const std:
}
});
#else
sortdb->each([&write_snapshot_entry](const std::string_view db_key, const std::string_view db_value) {
sctx.sortdb->each([&write_snapshot_entry](const std::string_view db_key, const std::string_view db_value) {
storage_id_type st_bytes{};
memcpy(static_cast<void*>(&st_bytes), key.data(), sizeof(storage_id_type));

Check failure on line 181 in src/limestone/datastore_snapshot.cpp

View workflow job for this annotation

GitHub Actions / Clang-Tidy

clang-diagnostic-error

use of undeclared identifier 'key'
storage_id_type st = le64toh(st_bytes);
if (auto itr = sctx.remove_storage.find(st); itr != sctx.remove_storage.end()) {

Check failure on line 183 in src/limestone/datastore_snapshot.cpp

View workflow job for this annotation

GitHub Actions / Clang-Tidy

clang-diagnostic-error

variable 'sctx' cannot be implicitly captured in a lambda with no capture-default specified

Check failure on line 183 in src/limestone/datastore_snapshot.cpp

View workflow job for this annotation

GitHub Actions / Clang-Tidy

clang-diagnostic-error

variable 'sctx' cannot be implicitly captured in a lambda with no capture-default specified
// check range delete
write_version_type range_ver = itr->second;
write_version_type point_ver{db_value};
if (point_ver < range_ver) {
return; // skip
}
}
auto entry_type = static_cast<log_entry::entry_type>(db_value[0]);
switch (entry_type) {
case log_entry::entry_type::normal_entry:
Expand All @@ -167,7 +204,7 @@ static void sortdb_foreach(sortdb_wrapper *sortdb, std::function<void(const std:
}

void create_comapct_pwal(const boost::filesystem::path& from_dir, const boost::filesystem::path& to_dir, int num_worker) {
auto [max_appeared_epoch, sortdb] = create_sortdb_from_wals(from_dir, num_worker);
auto [max_appeared_epoch, sctx] = create_sorted_from_wals(from_dir, num_worker);

boost::system::error_code error;
const bool result_check = boost::filesystem::exists(to_dir, error);
Expand Down Expand Up @@ -200,7 +237,7 @@ void create_comapct_pwal(const boost::filesystem::path& from_dir, const boost::f
log_entry::write(ostrm, key_stid, value_etc);
}
};
sortdb_foreach(sortdb.get(), write_snapshot_entry);
sortdb_foreach(sctx, write_snapshot_entry);
//log_entry::end_session(ostrm, epoch);
if (fclose(ostrm) != 0) { // NOLINT(*-owning-memory)
LOG_LP(ERROR) << "cannot close snapshot file (" << snapshot_file << "), errno = " << errno;
Expand All @@ -215,7 +252,7 @@ using namespace limestone::internal;

void datastore::create_snapshot() {
const auto& from_dir = location_;
auto [max_appeared_epoch, sortdb] = create_sortdb_from_wals(from_dir, recover_max_parallelism_);
auto [max_appeared_epoch, sctx] = create_sorted_from_wals(from_dir, recover_max_parallelism_);
epoch_id_switched_.store(max_appeared_epoch);
epoch_id_informed_.store(max_appeared_epoch);

Expand All @@ -239,7 +276,7 @@ void datastore::create_snapshot() {
}
setvbuf(ostrm, nullptr, _IOFBF, 128L * 1024L); // NOLINT, NB. glibc may ignore size when _IOFBF and buffer=NULL
auto write_snapshot_entry = [&ostrm](std::string_view key, std::string_view value){log_entry::write(ostrm, key, value);};
sortdb_foreach(sortdb.get(), write_snapshot_entry);
sortdb_foreach(sctx, write_snapshot_entry);
if (fclose(ostrm) != 0) { // NOLINT(*-owning-memory)
LOG_LP(ERROR) << "cannot close snapshot file (" << snapshot_file << "), errno = " << errno;
throw std::runtime_error("I/O error");
Expand Down

0 comments on commit c01a8fb

Please sign in to comment.