Skip to content

Commit

Permalink
implement storage operations (add_storage, remove_storage, truncate_s…
Browse files Browse the repository at this point in the history
…torage);

recording as log entries in PWAL files, but these entries are not used for now.
  • Loading branch information
ban-nobuhiro committed Aug 20, 2024
1 parent 57bb214 commit 8521b37
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 22 deletions.
19 changes: 17 additions & 2 deletions src/limestone/datastore_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,27 @@ static std::pair<epoch_id_type, std::unique_ptr<sortdb_wrapper>> create_sortdb_f
epoch_id_type ld_epoch = logscan.last_durable_epoch_in_dir();

#if defined SORT_METHOD_PUT_ONLY
auto add_entry = [&sortdb](log_entry& e){insert_twisted_entry(sortdb.get(), e);};
const auto add_entry_to_point = insert_twisted_entry;
bool works_with_multi_thread = true;
#else
auto add_entry = [&sortdb](log_entry& e){insert_entry_or_update_to_max(sortdb.get(), e);};
const auto add_entry_to_point = insert_entry_or_update_to_max;
bool works_with_multi_thread = false;
#endif
auto add_entry = [&sortdb, &add_entry_to_point](log_entry& e){
switch (e.type()) {
case log_entry::entry_type::normal_entry:
case log_entry::entry_type::remove_entry:
add_entry_to_point(sortdb.get(), e);
break;
case log_entry::entry_type::clear_storage:
case log_entry::entry_type::remove_storage:
break; // TODO: implement
case log_entry::entry_type::add_storage:
break; // ignore
default:
assert(false);
}
};

if (!works_with_multi_thread && num_worker > 1) {
LOG(INFO) << "/:limestone:config:datastore this sort method does not work correctly with multi-thread, so force the number of recover process thread = 1";
Expand Down
18 changes: 9 additions & 9 deletions src/limestone/log_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,19 +96,19 @@ void log_channel::remove_entry(storage_id_type storage_id, std::string_view key,
write_version_ = write_version;
}

void log_channel::add_storage([[maybe_unused]] storage_id_type storage_id, [[maybe_unused]] write_version_type write_version) {
LOG_LP(ERROR) << "not implemented";
throw std::runtime_error("not implemented"); // FIXME
void log_channel::add_storage(storage_id_type storage_id, write_version_type write_version) {
log_entry::write_add_storage(strm_, storage_id, write_version);
write_version_ = write_version;
}

void log_channel::remove_storage([[maybe_unused]] storage_id_type storage_id, [[maybe_unused]] write_version_type write_version) {
LOG_LP(ERROR) << "not implemented";
throw std::runtime_error("not implemented"); // FIXME
void log_channel::remove_storage(storage_id_type storage_id, write_version_type write_version) {
log_entry::write_remove_storage(strm_, storage_id, write_version);
write_version_ = write_version;
}

void log_channel::truncate_storage([[maybe_unused]] storage_id_type storage_id, [[maybe_unused]] write_version_type write_version) {
LOG_LP(ERROR) << "not implemented";
throw std::runtime_error("not implemented"); // FIXME
void log_channel::truncate_storage(storage_id_type storage_id, write_version_type write_version) {
log_entry::write_clear_storage(strm_, storage_id, write_version);
write_version_ = write_version;
}

boost::filesystem::path log_channel::file_path() const noexcept {
Expand Down
61 changes: 61 additions & 0 deletions src/limestone/log_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ class log_entry {
marker_durable = 4,
remove_entry = 5,
marker_invalidated_begin = 6,
clear_storage = 7,
add_storage = 8,
remove_storage = 9,
};
class read_error {
public:
Expand Down Expand Up @@ -129,6 +132,15 @@ class log_entry {
case entry_type::marker_invalidated_begin:
invalidated_begin(strm, epoch_id_);
break;
case entry_type::clear_storage:
write_clear_storage(strm, key_sid_, value_etc_);
break;
case entry_type::add_storage:
write_add_storage(strm, key_sid_, value_etc_);
break;
case entry_type::remove_storage:
write_remove_storage(strm, key_sid_, value_etc_);
break;
case entry_type::this_id_is_not_used:
break;
}
Expand Down Expand Up @@ -197,6 +209,43 @@ class log_entry {
write_bytes(strm, value_etc.data(), value_etc.length());
}

static inline void write_ope_storage_common(FILE* strm, entry_type type, storage_id_type storage_id, write_version_type write_version) {
write_uint8(strm, static_cast<std::uint8_t>(type));
write_uint64le(strm, static_cast<std::uint64_t>(storage_id));
write_uint64le(strm, static_cast<std::uint64_t>(write_version.epoch_number_));
write_uint64le(strm, static_cast<std::uint64_t>(write_version.minor_write_version_));
}

static inline void write_ope_storage_common(FILE* strm, entry_type type, std::string_view key_sid, std::string_view value_etc) {
write_uint8(strm, static_cast<std::uint8_t>(type));
write_bytes(strm, key_sid.data(), key_sid.length());
write_bytes(strm, value_etc.data(), value_etc.length());
}

static void write_clear_storage(FILE* strm, storage_id_type storage_id, write_version_type write_version) {
write_ope_storage_common(strm, entry_type::clear_storage, storage_id, write_version);
}

static void write_clear_storage(FILE* strm, std::string_view key_sid, std::string_view value_etc) {
write_ope_storage_common(strm, entry_type::clear_storage, key_sid, value_etc);
}

static void write_add_storage(FILE* strm, storage_id_type storage_id, write_version_type write_version) {
write_ope_storage_common(strm, entry_type::add_storage, storage_id, write_version);
}

static void write_add_storage(FILE* strm, std::string_view key_sid, std::string_view value_etc) {
write_ope_storage_common(strm, entry_type::add_storage, key_sid, value_etc);
}

static void write_remove_storage(FILE* strm, storage_id_type storage_id, write_version_type write_version) {
write_ope_storage_common(strm, entry_type::remove_storage, storage_id, write_version);
}

static void write_remove_storage(FILE* strm, std::string_view key_sid, std::string_view value_etc) {
write_ope_storage_common(strm, entry_type::remove_storage, key_sid, value_etc);
}

// for reader
bool read(std::istream& strm) {
read_error ec{};
Expand Down Expand Up @@ -247,6 +296,18 @@ class log_entry {
if (ec) return false;
break;
}
case entry_type::clear_storage:
case entry_type::add_storage:
case entry_type::remove_storage:
{
key_sid_.resize(sizeof(storage_id_type));
read_bytes(strm, key_sid_.data(), static_cast<std::streamsize>(key_sid_.length()), ec);
if (ec) return false;
value_etc_.resize(sizeof(epoch_id_type) + sizeof(std::uint64_t));
read_bytes(strm, value_etc_.data(), static_cast<std::streamsize>(value_etc_.length()), ec);
if (ec) return false;
break;
}
case entry_type::marker_begin:
case entry_type::marker_end:
case entry_type::marker_durable:
Expand Down
41 changes: 38 additions & 3 deletions src/limestone/parse_wal_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ void invalidate_epoch_snippet(boost::filesystem::fstream& strm, std::streampos f
// | (empty)
// log_entry = normal_entry
// | remove_entry
// | clear_storage
// | add_storage
// | remove_storage
// snippet_footer = (empty)

// parser rule (with error-handle)
Expand All @@ -72,8 +75,14 @@ void invalidate_epoch_snippet(boost::filesystem::fstream& strm, std::streampos f
// | (empty)
// log_entry = normal_entry { if (valid) process-entry }
// | remove_entry { if (valid) process-entry }
// | clear_storage { if (valid) process-entry }
// | add_storage { if (valid) process-entry }
// | remove_storage { if (valid) process-entry }
// | SHORT_normal_entry { if (valid) error-truncated } // TAIL
// | SHORT_remove_entry { if (valid) error-truncated } // TAIL
// | SHORT_clear_storage { if (valid) error-truncated } // TAIL
// | SHORT_add_storage { if (valid) error-truncated } // TAIL
// | SHORT_remove_storage { if (valid) error-truncated } // TAIL
// | UNKNOWN_TYPE_entry { if (valid) error-damaged-entry } // TAIL
// snippet_footer = (empty)

Expand All @@ -84,6 +93,9 @@ void invalidate_epoch_snippet(boost::filesystem::fstream& strm, std::streampos f
// remove_entry = 0x05 key_length storage_id key(key_length) writer_version_major writer_version_minor
// marker_durable = 0x04 epoch
// marker_end = 0x03 epoch
// clear_storage = 0x07 storage_id write_version_major write_version_minor
// add_storage = 0x08 storage_id write_version_major write_version_minor
// remove_storage = 0x09 storage_id write_version_major write_version_minor
// epoch = int64le
// key_length = int32le
// value_length = int32le
Expand All @@ -101,6 +113,9 @@ void invalidate_epoch_snippet(boost::filesystem::fstream& strm, std::streampos f
// = 0x05 byte(0-11)
// SHORT_marker_durable = 0x04 byte(0-7)
// SHORT_marker_end = 0x03 byte(0-7)
// SHORT_clear_storage = 0x07 byte(0-23)
// SHORT_add_storage = 0x08 byte(0-23)
// SHORT_remove_storage = 0x09 byte(0-23)
// UNKNOWN_TYPE_entry = 0x00 byte(0-)
// | 0x07-0xff byte(0-)
// // marker_durable and marker_end are not used in pWAL file
Expand All @@ -110,7 +125,9 @@ void invalidate_epoch_snippet(boost::filesystem::fstream& strm, std::streampos f
enum class token_type {
eof,
normal_entry = 1, marker_begin, marker_end, marker_durable, remove_entry, marker_invalidated_begin,
clear_storage, add_storage, remove_storage,
SHORT_normal_entry = 101, SHORT_marker_begin, SHORT_marker_end, SHORT_marker_durable, SHORT_remove_entry, SHORT_marker_inv_begin,
SHORT_clear_storage, SHORT_add_storage, SHORT_remove_storage,
UNKNOWN_TYPE_entry = 1001,
};

Expand All @@ -128,6 +145,9 @@ void invalidate_epoch_snippet(boost::filesystem::fstream& strm, std::streampos f
case log_entry::entry_type::marker_durable: value_ = token_type::marker_durable; break;
case log_entry::entry_type::remove_entry: value_ = token_type::remove_entry; break;
case log_entry::entry_type::marker_invalidated_begin: value_ = token_type::marker_invalidated_begin; break;
case log_entry::entry_type::clear_storage: value_ = token_type::clear_storage; break;
case log_entry::entry_type::add_storage: value_ = token_type::add_storage; break;
case log_entry::entry_type::remove_storage: value_ = token_type::remove_storage; break;
default: assert(false);
}
} else if (ec.value() == log_entry::read_error::short_entry) {
Expand All @@ -138,6 +158,9 @@ void invalidate_epoch_snippet(boost::filesystem::fstream& strm, std::streampos f
case log_entry::entry_type::marker_durable: value_ = token_type::SHORT_marker_durable; break;
case log_entry::entry_type::remove_entry: value_ = token_type::SHORT_remove_entry; break;
case log_entry::entry_type::marker_invalidated_begin: value_ = token_type::SHORT_marker_inv_begin; break;
case log_entry::entry_type::clear_storage: value_ = token_type::SHORT_clear_storage; break;
case log_entry::entry_type::add_storage: value_ = token_type::SHORT_add_storage; break;
case log_entry::entry_type::remove_storage: value_ = token_type::SHORT_remove_storage; break;
default: assert(false);
}
} else if (ec.value() == log_entry::read_error::unknown_type) {
Expand All @@ -164,11 +187,17 @@ void invalidate_epoch_snippet(boost::filesystem::fstream& strm, std::streampos f
// loop:
// normal_entry : { if (valid) process-entry } -> loop
// remove_entry : { if (valid) process-entry } -> loop
// clear_storage : { if (valid) process-entry } -> loop
// add_storage : { if (valid) process-entry } -> loop
// remove_storage : { if (valid) process-entry } -> loop
// eof : {} -> END
// marker_begin : { head_pos := ...; max-epoch := max(...); if (epoch <= ld) { valid := true } else { valid := false, error-nondurable } } -> loop
// marker_invalidated_begin : { head_pos := ...; max-epoch := max(...); valid := false } -> loop
// SHORT_normal_entry : { if (valid) error-truncated } -> END
// SHORT_remove_entry : { if (valid) error-truncated } -> END
// SHORT_clear_storage : { if (valid) error-truncated } -> END
// SHORT_add_storage : { if (valid) error-truncated } -> END
// SHORT_remove_storage : { if (valid) error-truncated } -> END
// SHORT_marker_begin : { head_pos := ...; error-truncated } -> END
// SHORT_marker_inv_begin : { head_pos := ... } -> END
// UNKNOWN_TYPE_entry : { if (valid) error-damaged-entry } -> END
Expand Down Expand Up @@ -216,7 +245,10 @@ epoch_id_type dblog_scan::scan_one_pwal_file( // NOLINT(readability-function-co
switch (tok.value()) {
case lex_token::token_type::normal_entry:
case lex_token::token_type::remove_entry:
// normal_entry | remove_entry : (not 1st) { if (valid) process-entry } -> loop
case lex_token::token_type::clear_storage:
case lex_token::token_type::add_storage:
case lex_token::token_type::remove_storage:
// normal_entry | remove_entry | clear_storage | add_storage | remove_storage : (not 1st) { if (valid) process-entry } -> loop
if (!first) {
if (valid) {
add_entry(e);
Expand Down Expand Up @@ -281,8 +313,11 @@ epoch_id_type dblog_scan::scan_one_pwal_file( // NOLINT(readability-function-co
break;
}
case lex_token::token_type::SHORT_normal_entry:
case lex_token::token_type::SHORT_remove_entry: {
// SHORT_normal_entry | SHORT_remove_entry : (not 1st) { if (valid) error-truncated } -> END
case lex_token::token_type::SHORT_remove_entry:
case lex_token::token_type::SHORT_clear_storage:
case lex_token::token_type::SHORT_add_storage:
case lex_token::token_type::SHORT_remove_storage: {
// SHORT_normal_entry | SHORT_remove_entry | SHORT_clear_storage | SHORT_add_storage | SHORT_remove_storage : (not 1st) { if (valid) error-truncated } -> END
if (first) {
err_unexpected();
pe = parse_error(parse_error::unexpected, fpos_before_read_entry);
Expand Down
51 changes: 43 additions & 8 deletions test/limestone/log/log_channel_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,18 @@ TEST_F(log_channel_test, number_and_backup) {
EXPECT_EQ(files.at(i++).string(), std::string(location) + "/pwal_0003");
}

static std::map<std::string, std::string> read_all_from_cursor(limestone::api::cursor* cursor) {
std::map<std::string, std::string> m;
while (cursor->next()) {
std::string key;
std::string value;
cursor->key(key);
cursor->value(value);
m[key] = value;
}
return m;
}

TEST_F(log_channel_test, remove) {
limestone::api::log_channel& channel = datastore_->create_channel(boost::filesystem::path(location));

Expand All @@ -115,17 +127,40 @@ TEST_F(log_channel_test, remove) {
auto cursor = ss->get_cursor();

// expect: datastore has {k1:v1, k3:v3}, not required to be sorted
std::map<std::string, std::string> m;
while (cursor->next()) {
std::string key;
std::string value;
cursor->key(key);
cursor->value(value);
m[key] = value;
}
auto m = read_all_from_cursor(cursor.get());
EXPECT_EQ(m.size(), 2);
EXPECT_EQ(m["k1"], "v1");
EXPECT_EQ(m["k3"], "v3");
}

TEST_F(log_channel_test, skip_storage_add_remove) {
// write log entry but not use at the moment...
// (purpose of this test: check not to abort as unimplemented)
limestone::api::log_channel& channel = datastore_->create_channel(boost::filesystem::path(location));

channel.begin_session();
channel.add_storage(42, {90, 4});
channel.add_entry(42, "k1", "v1", {100, 4});
channel.add_entry(42, "k2", "v2", {100, 4});
channel.end_session();

channel.begin_session();
channel.remove_entry(42, "k1", {110, 0});
channel.remove_entry(42, "k2", {110, 0});
channel.end_session();

channel.begin_session();
channel.truncate_storage(42, {120, 4});
channel.remove_storage(42, {120, 4});
channel.end_session();

datastore_->ready();
auto ss = datastore_->get_snapshot();
auto cursor = ss->get_cursor();

// expect: datastore has {k1:v1, k3:v3}, not required to be sorted
auto m = read_all_from_cursor(cursor.get());
EXPECT_EQ(m.size(), 0);
}

} // namespace limestone::testing

0 comments on commit 8521b37

Please sign in to comment.