From 8521b37e902cd93f49097f06dd861aae7b0a6d23 Mon Sep 17 00:00:00 2001 From: Nobuhiro Ban Date: Tue, 20 Aug 2024 18:04:38 +0900 Subject: [PATCH] implement storage operations (add_storage, remove_storage, truncate_storage); recording as log entries in PWAL files, but these entries are not used for now. --- src/limestone/datastore_snapshot.cpp | 19 +++++++- src/limestone/log_channel.cpp | 18 ++++---- src/limestone/log_entry.h | 61 +++++++++++++++++++++++++ src/limestone/parse_wal_file.cpp | 41 +++++++++++++++-- test/limestone/log/log_channel_test.cpp | 51 +++++++++++++++++---- 5 files changed, 168 insertions(+), 22 deletions(-) diff --git a/src/limestone/datastore_snapshot.cpp b/src/limestone/datastore_snapshot.cpp index 3c8f321b..d441f00b 100644 --- a/src/limestone/datastore_snapshot.cpp +++ b/src/limestone/datastore_snapshot.cpp @@ -96,12 +96,27 @@ static std::pair> create_sortdb_f epoch_id_type ld_epoch = logscan.last_durable_epoch_in_dir(); #if defined SORT_METHOD_PUT_ONLY - auto add_entry = [&sortdb](log_entry& e){insert_twisted_entry(sortdb.get(), e);}; + const auto add_entry_to_point = insert_twisted_entry; bool works_with_multi_thread = true; #else - auto add_entry = [&sortdb](log_entry& e){insert_entry_or_update_to_max(sortdb.get(), e);}; + const auto add_entry_to_point = insert_entry_or_update_to_max; bool works_with_multi_thread = false; #endif + auto add_entry = [&sortdb, &add_entry_to_point](log_entry& e){ + switch (e.type()) { + case log_entry::entry_type::normal_entry: + case log_entry::entry_type::remove_entry: + add_entry_to_point(sortdb.get(), e); + break; + case log_entry::entry_type::clear_storage: + case log_entry::entry_type::remove_storage: + break; // TODO: implement + case log_entry::entry_type::add_storage: + break; // ignore + default: + assert(false); + } + }; if (!works_with_multi_thread && num_worker > 1) { LOG(INFO) << "/:limestone:config:datastore this sort method does not work correctly with multi-thread, so force the number of recover process thread = 1"; diff --git a/src/limestone/log_channel.cpp b/src/limestone/log_channel.cpp index 1a25cd14..630a5e43 100644 --- a/src/limestone/log_channel.cpp +++ b/src/limestone/log_channel.cpp @@ -96,19 +96,19 @@ void log_channel::remove_entry(storage_id_type storage_id, std::string_view key, write_version_ = write_version; } -void log_channel::add_storage([[maybe_unused]] storage_id_type storage_id, [[maybe_unused]] write_version_type write_version) { - LOG_LP(ERROR) << "not implemented"; - throw std::runtime_error("not implemented"); // FIXME +void log_channel::add_storage(storage_id_type storage_id, write_version_type write_version) { + log_entry::write_add_storage(strm_, storage_id, write_version); + write_version_ = write_version; } -void log_channel::remove_storage([[maybe_unused]] storage_id_type storage_id, [[maybe_unused]] write_version_type write_version) { - LOG_LP(ERROR) << "not implemented"; - throw std::runtime_error("not implemented"); // FIXME +void log_channel::remove_storage(storage_id_type storage_id, write_version_type write_version) { + log_entry::write_remove_storage(strm_, storage_id, write_version); + write_version_ = write_version; } -void log_channel::truncate_storage([[maybe_unused]] storage_id_type storage_id, [[maybe_unused]] write_version_type write_version) { - LOG_LP(ERROR) << "not implemented"; - throw std::runtime_error("not implemented"); // FIXME +void log_channel::truncate_storage(storage_id_type storage_id, write_version_type write_version) { + log_entry::write_clear_storage(strm_, storage_id, write_version); + write_version_ = write_version; } boost::filesystem::path log_channel::file_path() const noexcept { diff --git a/src/limestone/log_entry.h b/src/limestone/log_entry.h index 9304bf0f..dce7871e 100644 --- a/src/limestone/log_entry.h +++ b/src/limestone/log_entry.h @@ -44,6 +44,9 @@ class log_entry { marker_durable = 4, remove_entry = 5, marker_invalidated_begin = 6, + clear_storage = 7, + add_storage = 8, + remove_storage = 9, }; class read_error { public: @@ -129,6 +132,15 @@ class log_entry { case entry_type::marker_invalidated_begin: invalidated_begin(strm, epoch_id_); break; + case entry_type::clear_storage: + write_clear_storage(strm, key_sid_, value_etc_); + break; + case entry_type::add_storage: + write_add_storage(strm, key_sid_, value_etc_); + break; + case entry_type::remove_storage: + write_remove_storage(strm, key_sid_, value_etc_); + break; case entry_type::this_id_is_not_used: break; } @@ -197,6 +209,43 @@ class log_entry { write_bytes(strm, value_etc.data(), value_etc.length()); } + static inline void write_ope_storage_common(FILE* strm, entry_type type, storage_id_type storage_id, write_version_type write_version) { + write_uint8(strm, static_cast(type)); + write_uint64le(strm, static_cast(storage_id)); + write_uint64le(strm, static_cast(write_version.epoch_number_)); + write_uint64le(strm, static_cast(write_version.minor_write_version_)); + } + + static inline void write_ope_storage_common(FILE* strm, entry_type type, std::string_view key_sid, std::string_view value_etc) { + write_uint8(strm, static_cast(type)); + write_bytes(strm, key_sid.data(), key_sid.length()); + write_bytes(strm, value_etc.data(), value_etc.length()); + } + + static void write_clear_storage(FILE* strm, storage_id_type storage_id, write_version_type write_version) { + write_ope_storage_common(strm, entry_type::clear_storage, storage_id, write_version); + } + + static void write_clear_storage(FILE* strm, std::string_view key_sid, std::string_view value_etc) { + write_ope_storage_common(strm, entry_type::clear_storage, key_sid, value_etc); + } + + static void write_add_storage(FILE* strm, storage_id_type storage_id, write_version_type write_version) { + write_ope_storage_common(strm, entry_type::add_storage, storage_id, write_version); + } + + static void write_add_storage(FILE* strm, std::string_view key_sid, std::string_view value_etc) { + write_ope_storage_common(strm, entry_type::add_storage, key_sid, value_etc); + } + + static void write_remove_storage(FILE* strm, storage_id_type storage_id, write_version_type write_version) { + write_ope_storage_common(strm, entry_type::remove_storage, storage_id, write_version); + } + + static void write_remove_storage(FILE* strm, std::string_view key_sid, std::string_view value_etc) { + write_ope_storage_common(strm, entry_type::remove_storage, key_sid, value_etc); + } + // for reader bool read(std::istream& strm) { read_error ec{}; @@ -247,6 +296,18 @@ class log_entry { if (ec) return false; break; } + case entry_type::clear_storage: + case entry_type::add_storage: + case entry_type::remove_storage: + { + key_sid_.resize(sizeof(storage_id_type)); + read_bytes(strm, key_sid_.data(), static_cast(key_sid_.length()), ec); + if (ec) return false; + value_etc_.resize(sizeof(epoch_id_type) + sizeof(std::uint64_t)); + read_bytes(strm, value_etc_.data(), static_cast(value_etc_.length()), ec); + if (ec) return false; + break; + } case entry_type::marker_begin: case entry_type::marker_end: case entry_type::marker_durable: diff --git a/src/limestone/parse_wal_file.cpp b/src/limestone/parse_wal_file.cpp index 3925af18..9890c99e 100644 --- a/src/limestone/parse_wal_file.cpp +++ b/src/limestone/parse_wal_file.cpp @@ -55,6 +55,9 @@ void invalidate_epoch_snippet(boost::filesystem::fstream& strm, std::streampos f // | (empty) // log_entry = normal_entry // | remove_entry +// | clear_storage +// | add_storage +// | remove_storage // snippet_footer = (empty) // parser rule (with error-handle) @@ -72,8 +75,14 @@ void invalidate_epoch_snippet(boost::filesystem::fstream& strm, std::streampos f // | (empty) // log_entry = normal_entry { if (valid) process-entry } // | remove_entry { if (valid) process-entry } +// | clear_storage { if (valid) process-entry } +// | add_storage { if (valid) process-entry } +// | remove_storage { if (valid) process-entry } // | SHORT_normal_entry { if (valid) error-truncated } // TAIL // | SHORT_remove_entry { if (valid) error-truncated } // TAIL +// | SHORT_clear_storage { if (valid) error-truncated } // TAIL +// | SHORT_add_storage { if (valid) error-truncated } // TAIL +// | SHORT_remove_storage { if (valid) error-truncated } // TAIL // | UNKNOWN_TYPE_entry { if (valid) error-damaged-entry } // TAIL // snippet_footer = (empty) @@ -84,6 +93,9 @@ void invalidate_epoch_snippet(boost::filesystem::fstream& strm, std::streampos f // remove_entry = 0x05 key_length storage_id key(key_length) writer_version_major writer_version_minor // marker_durable = 0x04 epoch // marker_end = 0x03 epoch +// clear_storage = 0x07 storage_id write_version_major write_version_minor +// add_storage = 0x08 storage_id write_version_major write_version_minor +// remove_storage = 0x09 storage_id write_version_major write_version_minor // epoch = int64le // key_length = int32le // value_length = int32le @@ -101,6 +113,9 @@ void invalidate_epoch_snippet(boost::filesystem::fstream& strm, std::streampos f // = 0x05 byte(0-11) // SHORT_marker_durable = 0x04 byte(0-7) // SHORT_marker_end = 0x03 byte(0-7) +// SHORT_clear_storage = 0x07 byte(0-23) +// SHORT_add_storage = 0x08 byte(0-23) +// SHORT_remove_storage = 0x09 byte(0-23) // UNKNOWN_TYPE_entry = 0x00 byte(0-) // | 0x07-0xff byte(0-) // // marker_durable and marker_end are not used in pWAL file @@ -110,7 +125,9 @@ void invalidate_epoch_snippet(boost::filesystem::fstream& strm, std::streampos f enum class token_type { eof, normal_entry = 1, marker_begin, marker_end, marker_durable, remove_entry, marker_invalidated_begin, + clear_storage, add_storage, remove_storage, SHORT_normal_entry = 101, SHORT_marker_begin, SHORT_marker_end, SHORT_marker_durable, SHORT_remove_entry, SHORT_marker_inv_begin, + SHORT_clear_storage, SHORT_add_storage, SHORT_remove_storage, UNKNOWN_TYPE_entry = 1001, }; @@ -128,6 +145,9 @@ void invalidate_epoch_snippet(boost::filesystem::fstream& strm, std::streampos f case log_entry::entry_type::marker_durable: value_ = token_type::marker_durable; break; case log_entry::entry_type::remove_entry: value_ = token_type::remove_entry; break; case log_entry::entry_type::marker_invalidated_begin: value_ = token_type::marker_invalidated_begin; break; + case log_entry::entry_type::clear_storage: value_ = token_type::clear_storage; break; + case log_entry::entry_type::add_storage: value_ = token_type::add_storage; break; + case log_entry::entry_type::remove_storage: value_ = token_type::remove_storage; break; default: assert(false); } } else if (ec.value() == log_entry::read_error::short_entry) { @@ -138,6 +158,9 @@ void invalidate_epoch_snippet(boost::filesystem::fstream& strm, std::streampos f case log_entry::entry_type::marker_durable: value_ = token_type::SHORT_marker_durable; break; case log_entry::entry_type::remove_entry: value_ = token_type::SHORT_remove_entry; break; case log_entry::entry_type::marker_invalidated_begin: value_ = token_type::SHORT_marker_inv_begin; break; + case log_entry::entry_type::clear_storage: value_ = token_type::SHORT_clear_storage; break; + case log_entry::entry_type::add_storage: value_ = token_type::SHORT_add_storage; break; + case log_entry::entry_type::remove_storage: value_ = token_type::SHORT_remove_storage; break; default: assert(false); } } else if (ec.value() == log_entry::read_error::unknown_type) { @@ -164,11 +187,17 @@ void invalidate_epoch_snippet(boost::filesystem::fstream& strm, std::streampos f // loop: // normal_entry : { if (valid) process-entry } -> loop // remove_entry : { if (valid) process-entry } -> loop +// clear_storage : { if (valid) process-entry } -> loop +// add_storage : { if (valid) process-entry } -> loop +// remove_storage : { if (valid) process-entry } -> loop // eof : {} -> END // marker_begin : { head_pos := ...; max-epoch := max(...); if (epoch <= ld) { valid := true } else { valid := false, error-nondurable } } -> loop // marker_invalidated_begin : { head_pos := ...; max-epoch := max(...); valid := false } -> loop // SHORT_normal_entry : { if (valid) error-truncated } -> END // SHORT_remove_entry : { if (valid) error-truncated } -> END +// SHORT_clear_storage : { if (valid) error-truncated } -> END +// SHORT_add_storage : { if (valid) error-truncated } -> END +// SHORT_remove_storage : { if (valid) error-truncated } -> END // SHORT_marker_begin : { head_pos := ...; error-truncated } -> END // SHORT_marker_inv_begin : { head_pos := ... } -> END // UNKNOWN_TYPE_entry : { if (valid) error-damaged-entry } -> END @@ -216,7 +245,10 @@ epoch_id_type dblog_scan::scan_one_pwal_file( // NOLINT(readability-function-co switch (tok.value()) { case lex_token::token_type::normal_entry: case lex_token::token_type::remove_entry: -// normal_entry | remove_entry : (not 1st) { if (valid) process-entry } -> loop + case lex_token::token_type::clear_storage: + case lex_token::token_type::add_storage: + case lex_token::token_type::remove_storage: +// normal_entry | remove_entry | clear_storage | add_storage | remove_storage : (not 1st) { if (valid) process-entry } -> loop if (!first) { if (valid) { add_entry(e); @@ -281,8 +313,11 @@ epoch_id_type dblog_scan::scan_one_pwal_file( // NOLINT(readability-function-co break; } case lex_token::token_type::SHORT_normal_entry: - case lex_token::token_type::SHORT_remove_entry: { -// SHORT_normal_entry | SHORT_remove_entry : (not 1st) { if (valid) error-truncated } -> END + case lex_token::token_type::SHORT_remove_entry: + case lex_token::token_type::SHORT_clear_storage: + case lex_token::token_type::SHORT_add_storage: + case lex_token::token_type::SHORT_remove_storage: { +// SHORT_normal_entry | SHORT_remove_entry | SHORT_clear_storage | SHORT_add_storage | SHORT_remove_storage : (not 1st) { if (valid) error-truncated } -> END if (first) { err_unexpected(); pe = parse_error(parse_error::unexpected, fpos_before_read_entry); diff --git a/test/limestone/log/log_channel_test.cpp b/test/limestone/log/log_channel_test.cpp index 3483c5f6..9df6833a 100644 --- a/test/limestone/log/log_channel_test.cpp +++ b/test/limestone/log/log_channel_test.cpp @@ -97,6 +97,18 @@ TEST_F(log_channel_test, number_and_backup) { EXPECT_EQ(files.at(i++).string(), std::string(location) + "/pwal_0003"); } +static std::map read_all_from_cursor(limestone::api::cursor* cursor) { + std::map m; + while (cursor->next()) { + std::string key; + std::string value; + cursor->key(key); + cursor->value(value); + m[key] = value; + } + return m; +} + TEST_F(log_channel_test, remove) { limestone::api::log_channel& channel = datastore_->create_channel(boost::filesystem::path(location)); @@ -115,17 +127,40 @@ TEST_F(log_channel_test, remove) { auto cursor = ss->get_cursor(); // expect: datastore has {k1:v1, k3:v3}, not required to be sorted - std::map m; - while (cursor->next()) { - std::string key; - std::string value; - cursor->key(key); - cursor->value(value); - m[key] = value; - } + auto m = read_all_from_cursor(cursor.get()); EXPECT_EQ(m.size(), 2); EXPECT_EQ(m["k1"], "v1"); EXPECT_EQ(m["k3"], "v3"); } +TEST_F(log_channel_test, skip_storage_add_remove) { + // write log entry but not use at the moment... + // (purpose of this test: check not to abort as unimplemented) + limestone::api::log_channel& channel = datastore_->create_channel(boost::filesystem::path(location)); + + channel.begin_session(); + channel.add_storage(42, {90, 4}); + channel.add_entry(42, "k1", "v1", {100, 4}); + channel.add_entry(42, "k2", "v2", {100, 4}); + channel.end_session(); + + channel.begin_session(); + channel.remove_entry(42, "k1", {110, 0}); + channel.remove_entry(42, "k2", {110, 0}); + channel.end_session(); + + channel.begin_session(); + channel.truncate_storage(42, {120, 4}); + channel.remove_storage(42, {120, 4}); + channel.end_session(); + + datastore_->ready(); + auto ss = datastore_->get_snapshot(); + auto cursor = ss->get_cursor(); + + // expect: datastore has {k1:v1, k3:v3}, not required to be sorted + auto m = read_all_from_cursor(cursor.get()); + EXPECT_EQ(m.size(), 0); +} + } // namespace limestone::testing