From 63e828d3a6d4ccb5f3f39be9a49662a39ba2a0d1 Mon Sep 17 00:00:00 2001 From: Shinichi Umegane Date: Fri, 4 Oct 2024 19:26:10 +0900 Subject: [PATCH] Add test cases --- docs/internal/db_startup_sort_improvement.md | 8 + src/limestone/datastore_snapshot.cpp | 73 ++-- test/limestone/compaction/compaction_test.cpp | 316 +++++++++++++++++- 3 files changed, 367 insertions(+), 30 deletions(-) diff --git a/docs/internal/db_startup_sort_improvement.md b/docs/internal/db_startup_sort_improvement.md index aa2d8758..07620a7a 100644 --- a/docs/internal/db_startup_sort_improvement.md +++ b/docs/internal/db_startup_sort_improvement.md @@ -82,6 +82,14 @@ * assemble_snapshot_input_filenamesのテストがないので作成する。 * drop table, truncate table時に作成されるログエントリに対する処理が実装されてていない。 * 現在の実装を確認し、それに合わせてロジックを組み込む必要がある。 + * pwal_0000.compacted => すでに存在するコンパクション済みファイル, snapshot => 起動時に作成するスナップショットファイル、pwal_0000.compactedのエントリを含まない。 + * pwal_0000.compacted にtruncate, drop tableのエントリが存在する場合 + * pwal_0000.compacted作成時に、truncate, drop tableの結果消えるエントリは、すでに削除済みなので、cursortアクセスで特に問題ない。 + * snapshot作成時に、truncate, drop tableのエントリが存在する場合 + * snapshot作成時に、消す必要があるエントリを削除しているので、snapshotに対するcursorのアクセスは特に問題ない。 + * pwal_0000.compacted にcurosrがアクセスする場合、truncate, drop tableにより削除すべエントリが存在する可能性があるので、それを発見したときにスキップする必要がある。 + * snapshot作成時に、削除したストレージのストレージIDのセットを作成しておき、cursorでpwal_0000.compactedアクセスじに、当該エントリのIDが見つかったらスキップする処理を追加する。 + * ## テストケースの作成 diff --git a/src/limestone/datastore_snapshot.cpp b/src/limestone/datastore_snapshot.cpp index 96011ef9..7a215444 100644 --- a/src/limestone/datastore_snapshot.cpp +++ b/src/limestone/datastore_snapshot.cpp @@ -186,11 +186,29 @@ static std::pair create_sorted_from_wals( } } -static void sortdb_foreach(sorting_context& sctx, std::function write_snapshot_entry, - bool skip_remove_entry) { + +[[maybe_unused]] +static write_version_type extract_write_version(const std::string_view& db_key) { + std::string wv(write_version_size, '\0'); + store_bswap64_value(&wv[0], &db_key[0]); + store_bswap64_value(&wv[8], &db_key[8]); + return write_version_type{wv}; +} + +[[maybe_unused]] +static std::string create_value_from_db_key_and_value(const std::string_view& db_key, const std::string_view& db_value) { + std::string value(write_version_size + db_value.size() - 1, '\0'); + store_bswap64_value(&value[0], &db_key[0]); + store_bswap64_value(&value[8], &db_key[8]); + std::memcpy(&value[write_version_size], &db_value[1], db_value.size() - 1); + return value; +} + +static void sortdb_foreach(sorting_context& sctx, std::function write_snapshot_entry, + std::function write_snapshot_remove_entry) { static_assert(sizeof(log_entry::entry_type) == 1); #if defined SORT_METHOD_PUT_ONLY - sctx.get_sortdb()->each([&sctx, write_snapshot_entry, skip_remove_entry, last_key = std::string{}](const std::string_view db_key, const std::string_view db_value) mutable { + sctx.get_sortdb()->each([&sctx, write_snapshot_entry, write_snapshot_remove_entry, last_key = std::string{}](const std::string_view db_key, const std::string_view db_value) mutable { // using the first entry in GROUP BY (original-)key // NB: max versions comes first (by the custom-comparator) std::string_view key(db_key.data() + write_version_size, db_key.size() - write_version_size); @@ -201,40 +219,31 @@ static void sortdb_foreach(sorting_context& sctx, std::function(&st_bytes), key.data(), sizeof(storage_id_type)); storage_id_type st = le64toh(st_bytes); + if (auto ret = sctx.clear_storage_find(st); ret) { // check range delete write_version_type range_ver = ret.value(); - std::string wv(write_version_size, '\0'); - store_bswap64_value(&wv[0], &db_key[0]); - store_bswap64_value(&wv[8], &db_key[8]); - write_version_type point_ver{wv}; - if (point_ver < range_ver) { + if (extract_write_version(db_key) < range_ver) { return; // skip } } auto entry_type = static_cast(db_value[0]); switch (entry_type) { - case log_entry::entry_type::normal_entry: { - std::string value(write_version_size + db_value.size() - 1, '\0'); - store_bswap64_value(&value[0], &db_key[0]); - store_bswap64_value(&value[8], &db_key[8]); - std::memcpy(&value[write_version_size], &db_value[1], db_value.size() - 1); - write_snapshot_entry(key, value); + case log_entry::entry_type::normal_entry: + write_snapshot_entry(key, create_value_from_db_key_and_value(db_key, db_value)); break; - } - case log_entry::entry_type::remove_entry: - if (!skip_remove_entry) { - write_snapshot_entry(key, "remove_entry"); - } + case log_entry::entry_type::remove_entry: { + write_snapshot_remove_entry(key, create_value_from_db_key_and_value(db_key, db_value)); break; + } default: LOG(ERROR) << "never reach " << static_cast(entry_type); std::abort(); } }); #else - sctx.get_sortdb()->each([&sctx, &write_snapshot_entry, skip_remove_entry](const std::string_view db_key, const std::string_view db_value) { + sctx.get_sortdb()->each([&sctx, &write_snapshot_entry, write_snapshot_remove_entry](const std::string_view db_key, const std::string_view db_value) { storage_id_type st_bytes{}; memcpy(static_cast(&st_bytes), db_key.data(), sizeof(storage_id_type)); storage_id_type st = le64toh(st_bytes); @@ -251,10 +260,8 @@ static void sortdb_foreach(sorting_context& sctx, std::function(entry_type); @@ -300,7 +307,7 @@ void create_compact_pwal( log_entry::write(ostrm, key_stid, value_etc); } }; - sortdb_foreach(sctx, write_snapshot_entry, true); + sortdb_foreach(sctx, write_snapshot_entry, [](std::string_view, std::string_view) {}); //log_entry::end_session(ostrm, epoch); if (fclose(ostrm) != 0) { // NOLINT(*-owning-memory) LOG_AND_THROW_IO_EXCEPTION("cannot close snapshot file (" + snapshot_file.string() + ")", errno); @@ -374,11 +381,19 @@ void datastore::create_snapshot() { if (!ostrm) { LOG_AND_THROW_IO_EXCEPTION("cannot create snapshot file", errno); } + log_entry::begin_session(ostrm, 0); setvbuf(ostrm, nullptr, _IOFBF, 128L * 1024L); // NOLINT, NB. glibc may ignore size when _IOFBF and buffer=NULL - auto write_snapshot_entry = [&ostrm](std::string_view key, std::string_view value){log_entry::write(ostrm, key, value);}; - - bool skip_remove_entry = compaction_catalog_->get_compacted_files().empty(); - sortdb_foreach(sctx, write_snapshot_entry, skip_remove_entry); + auto write_snapshot_entry = [&ostrm](std::string_view key_sid, std::string_view value_etc) { log_entry::write(ostrm, key_sid, value_etc); }; + + std::function write_snapshot_remove_entry; + if (compaction_catalog_->get_compacted_files().empty()) { + write_snapshot_remove_entry = [](std::string_view, std::string_view) {}; + } else { + write_snapshot_remove_entry = [&ostrm](std::string_view key, std::string_view value_etc) { + log_entry::write_remove(ostrm, key, value_etc); + }; + } + sortdb_foreach(sctx, write_snapshot_entry, write_snapshot_remove_entry); if (fclose(ostrm) != 0) { // NOLINT(*-owning-memory) LOG_AND_THROW_IO_EXCEPTION("cannot close snapshot file (" + snapshot_file.string() + ")", errno); } diff --git a/test/limestone/compaction/compaction_test.cpp b/test/limestone/compaction/compaction_test.cpp index 7335de5f..c5c36aef 100644 --- a/test/limestone/compaction/compaction_test.cpp +++ b/test/limestone/compaction/compaction_test.cpp @@ -149,7 +149,105 @@ class compaction_test : public ::testing::Test { } return kv_list; } - + + void print_log_entry(const log_entry& entry) { + std::string key; + storage_id_type storage_id = entry.storage(); + log_entry::entry_type type = entry.type(); + + if (type == log_entry::entry_type::normal_entry || type == log_entry::entry_type::remove_entry) { + entry.key(key); + } + + switch (type) { + case log_entry::entry_type::normal_entry: { + std::string value; + entry.value(value); + std::cout << "Entry Type: normal_entry, Storage ID: " << storage_id + << ", Key: " << key << ", Value: " << value + << ", Write Version: Epoch: " << log_entry::write_version_epoch_number(entry.value_etc()) + << ", Minor: " << log_entry::write_version_minor_write_version(entry.value_etc()) + << std::endl; + break; + } + case log_entry::entry_type::remove_entry: { + std::cout << "Entry Type: remove_entry, Storage ID: " << storage_id << ", Key: " << key + << ", Write Version: Epoch: " << log_entry::write_version_epoch_number(entry.value_etc()) + << ", Minor: " << log_entry::write_version_minor_write_version(entry.value_etc()) + << std::endl; + break; + } + case log_entry::entry_type::clear_storage: + case log_entry::entry_type::add_storage: + case log_entry::entry_type::remove_storage: { + write_version_type write_version; + entry.write_version(write_version); + std::cout << "Entry Type: " << static_cast(type) << ", Storage ID: " << storage_id + << ", Write Version: Epoch: " << log_entry::write_version_epoch_number(entry.value_etc()) + << ", Minor: " << log_entry::write_version_minor_write_version(entry.value_etc()) + << std::endl; + break; + } + case log_entry::entry_type::marker_begin: + std::cout << "Entry Type: marker_begin, Epoch ID: " << entry.epoch_id() << std::endl; + break; + case log_entry::entry_type::marker_end: + std::cout << "Entry Type: marker_end, Epoch ID: " << entry.epoch_id() << std::endl; + break; + case log_entry::entry_type::marker_durable: + std::cout << "Entry Type: marker_durable, Epoch ID: " << entry.epoch_id() << std::endl; + break; + case log_entry::entry_type::marker_invalidated_begin: + std::cout << "Entry Type: marker_invalidated_begin, Epoch ID: " << entry.epoch_id() << std::endl; + break; + default: + std::cout << "Entry Type: unknown" << std::endl; + break; + } + } + + + + + /** + * @brief Reads a specified log file (PWAL, compacted_file, snapshot) and returns a list of log entries. + * @param log_file The path to the log file to be scanned. + * @return A vector of log_entry objects read from the log file. + */ + std::vector read_log_file(const std::string& log_file, const boost::filesystem::path& log_dir) { + boost::filesystem::path log_path = log_dir / log_file; + + std::vector log_entries; + limestone::internal::dblog_scan::parse_error pe; + + // Define a lambda function to capture and store log entries + auto add_entry = [&](log_entry& e) { log_entries.push_back(e); }; + + // Error reporting function, returning bool as expected by error_report_func_t + auto report_error = [](log_entry::read_error& error) -> bool { + std::cerr << "Error during log file scan: " << error.message() << std::endl; + return false; // Return false to indicate an error occurred + }; + + // Initialize a dblog_scan instance with the log directory + limestone::internal::dblog_scan scanner(log_dir); + + // Scan the specified log file + epoch_id_type max_epoch = scanner.scan_one_pwal_file(log_path, UINT64_MAX, add_entry, report_error, pe); + + if (pe.value() != limestone::internal::dblog_scan::parse_error::ok) { + std::cerr << "Parse error occurred while reading the log file: " << log_path.string() << std::endl; + } + + // Iterate over the log entries and print relevant information + std::cout << std::endl << "Log entries read from " << log_path.string() << ":" << std::endl; + for (const auto& entry : log_entries) { + print_log_entry(entry); + } + + return log_entries; + } + ::testing::AssertionResult ContainsPrefix(const char* files_expr, const char* prefix_expr, const char* expected_count_expr, const std::set& files, const std::string& prefix, int expected_count) { int match_count = 0; @@ -224,6 +322,67 @@ class compaction_test : public ::testing::Test { return pwal_file_names; } + + ::testing::AssertionResult AssertLogEntry(const log_entry& entry, const std::optional& expected_storage_id, + const std::optional& expected_key, const std::optional& expected_value, + const std::optional& expected_epoch_number, const std::optional& expected_minor_version, + log_entry::entry_type expected_type) { + // Check the entry type + if (entry.type() != expected_type) { + return ::testing::AssertionFailure() + << "Expected entry type: " << static_cast(expected_type) + << ", but got: " << static_cast(entry.type()); + } + + // Check the storage ID if it exists + if (expected_storage_id.has_value()) { + if (entry.storage() != expected_storage_id.value()) { + return ::testing::AssertionFailure() + << "Expected storage ID: " << expected_storage_id.value() + << ", but got: " << entry.storage(); + } + } + + // Check the key if it exists + if (expected_key.has_value()) { + std::string actual_key; + entry.key(actual_key); + if (actual_key != expected_key.value()) { + return ::testing::AssertionFailure() + << "Expected key: " << expected_key.value() + << ", but got: " << actual_key; + } + } + + // Check the value if it exists + if (expected_value.has_value()) { + std::string actual_value; + entry.value(actual_value); + if (actual_value != expected_value.value()) { + return ::testing::AssertionFailure() + << "Expected value: " << expected_value.value() + << ", but got: " << actual_value; + } + } + + // Check the write version if it exists + if (expected_epoch_number.has_value() && expected_minor_version.has_value()) { + epoch_id_type actual_epoch_number = log_entry::write_version_epoch_number(entry.value_etc()); + std::uint64_t actual_minor_version = log_entry::write_version_minor_write_version(entry.value_etc()); + + if (actual_epoch_number != expected_epoch_number.value() || + actual_minor_version != expected_minor_version.value()) { + return ::testing::AssertionFailure() + << "Expected write version (epoch_number: " << expected_epoch_number.value() + << ", minor_write_version: " << expected_minor_version.value() + << "), but got (epoch_number: " << actual_epoch_number + << ", minor_write_version: " << actual_minor_version << ")"; + } + } + + // If all checks pass, return success + return ::testing::AssertionSuccess(); + } }; TEST_F(compaction_test, no_pwals) { @@ -826,6 +985,161 @@ TEST_F(compaction_test, scenario02) { ASSERT_PRED_FORMAT3(ContainsPrefix, pwals, "pwal_0002.", 1); } +// This test case verifies the correct behavior of `remove_entry`. +TEST_F(compaction_test, scenario03) { + FLAGS_v = 50; // set VLOG level to 50 + + // 1. Create multiple PWALs using two different storage IDs + gen_datastore(); + datastore_->switch_epoch(1); + + // Storage ID 1: key1 added, then removed + lc0_->begin_session(); + lc0_->add_entry(1, "key1", "value1", {1, 0}); + lc0_->remove_entry(1, "key1", {1, 1}); // use remove_entry + lc0_->end_session(); + + // Storage ID 2: key2 added, no removal + lc1_->begin_session(); + lc1_->add_entry(2, "key2", "value2", {1, 0}); + lc1_->end_session(); + + // Storage ID 1: key3 removed first, then added + lc2_->begin_session(); + lc2_->remove_entry(1, "key3", {1, 0}); + lc2_->add_entry(1, "key3", "value3", {1, 3}); + lc2_->end_session(); + + // Storeage ID 1: key4 deleted witout adding + lc0_->begin_session(); + lc0_->remove_entry(1, "key4", {1, 0}); + lc0_->end_session(); + + datastore_->switch_epoch(2); + + // Check the created PWAL files + auto pwals = extract_pwal_files_from_datastore(); + EXPECT_EQ(pwals.size(), 3); + ASSERT_PRED_FORMAT2(ContainsString, pwals, "pwal_0000"); + ASSERT_PRED_FORMAT2(ContainsString, pwals, "pwal_0001"); + ASSERT_PRED_FORMAT2(ContainsString, pwals, "pwal_0002"); + + auto log_entries = read_log_file("pwal_0000", location); + ASSERT_EQ(log_entries.size(), 3); // Ensure that there are log entries + EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key1", "value1", 1, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[1], 1, "key1", std::nullopt, 1, 1, log_entry::entry_type::remove_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[2], 1, "key4", std::nullopt, 1, 0, log_entry::entry_type::remove_entry)); + log_entries = read_log_file("pwal_0001", location); + ASSERT_EQ(log_entries.size(), 1); // Ensure that there are log entries + EXPECT_TRUE(AssertLogEntry(log_entries[0], 2, "key2", "value2", 1, 0, log_entry::entry_type::normal_entry)); + log_entries = read_log_file("pwal_0002", location); + ASSERT_EQ(log_entries.size(), 2); // Ensure that there are log entries + EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key3", std::nullopt, 1, 0,log_entry ::entry_type::remove_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[1], 1, "key3", "value3", 1, 3, log_entry::entry_type::normal_entry)); + + // 2. Execute compaction + run_compact_with_epoch_switch(2); + + // Check the catalog and PWALs after compaction + compaction_catalog catalog = compaction_catalog::from_catalog_file(location); + EXPECT_EQ(catalog.get_max_epoch_id(), 1); + EXPECT_EQ(catalog.get_compacted_files().size(), 1); + EXPECT_EQ(catalog.get_detached_pwals().size(), 3); + + pwals = extract_pwal_files_from_datastore(); + EXPECT_EQ(pwals.size(), 4); // Includes the compacted file + ASSERT_PRED_FORMAT2(ContainsString, pwals, "pwal_0000.compacted"); + + log_entries = read_log_file("pwal_0000.compacted", location); + ASSERT_EQ(log_entries.size(), 2); // Ensure that there are log entries + EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key3", "value3", 0, 0, log_entry::entry_type::normal_entry)); // write version changed to 0 + EXPECT_TRUE(AssertLogEntry(log_entries[1], 2, "key2", "value2", 0, 0, log_entry::entry_type::normal_entry)); // write version changed to 0 + + // 3. Add/Update PWALs (include remove_entry again) + + // Storage ID 1: key1 added, then removed + lc0_->begin_session(); + lc0_->add_entry(1, "key11", "value1", {2, 0}); + lc0_->remove_entry(1, "key11", {2, 1}); // use remove_entry + lc0_->end_session(); + + // Storage ID 2: key2 added, no removal + lc1_->begin_session(); + lc1_->add_entry(2, "key21", "value2", {2, 0}); + lc1_->end_session(); + + // Storage ID 1: key3 removed first, then added + lc2_->begin_session(); + lc2_->remove_entry(1, "key31", {2, 0}); + lc2_->add_entry(1, "key31", "value3", {2, 3}); + lc2_->end_session(); + + // Storeage ID 1: key4 deleted witout adding + lc0_->begin_session(); + lc0_->remove_entry(1, "key41", {2, 0}); + lc0_->end_session(); + + datastore_->switch_epoch(3); + pwals = extract_pwal_files_from_datastore(); + + // Check the created PWAL files + pwals = extract_pwal_files_from_datastore(); + EXPECT_EQ(pwals.size(), 7); // 3 new pwals and 3 rotaed pwals and 1 compacted file + ASSERT_PRED_FORMAT2(ContainsString, pwals, "pwal_0000"); + ASSERT_PRED_FORMAT2(ContainsString, pwals, "pwal_0001"); + ASSERT_PRED_FORMAT2(ContainsString, pwals, "pwal_0002"); + + log_entries = read_log_file("pwal_0000", location); + ASSERT_EQ(log_entries.size(), 3); // Ensure that there are log entries + EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key11", "value1", 2, 0, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[1], 1, "key11", std::nullopt, 2, 1, log_entry::entry_type::remove_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[2], 1, "key41", std::nullopt, 2, 0, log_entry::entry_type::remove_entry)); + log_entries = read_log_file("pwal_0001", location); + ASSERT_EQ(log_entries.size(), 1); // Ensure that there are log entries + EXPECT_TRUE(AssertLogEntry(log_entries[0], 2, "key21", "value2", 2, 0, log_entry::entry_type::normal_entry)); + log_entries = read_log_file("pwal_0002", location); + ASSERT_EQ(log_entries.size(), 2); // Ensure that there are log entries + EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key31", std::nullopt, 2, 0, log_entry::entry_type::remove_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[1], 1, "key31", "value3", 2, 3, log_entry::entry_type::normal_entry)); + + // 4. Restart the datastore + datastore_->shutdown(); + datastore_ = nullptr; + gen_datastore(); // Regenerate datastore after restart + + // 5. check the compacted file and snapshot creating at the boot time + log_entries = read_log_file("pwal_0000.compacted", location); + ASSERT_EQ(log_entries.size(), 2); // Ensure that there are log entries + EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key3", "value3", 0, 0, log_entry::entry_type::normal_entry)); // write version changed to 0 + EXPECT_TRUE(AssertLogEntry(log_entries[1], 2, "key2", "value2", 0, 0, log_entry::entry_type::normal_entry)); // write version changed to 0 + + log_entries = read_log_file("data/snapshot", location); + ASSERT_EQ(log_entries.size(), 4); // Ensure that there are log entries + EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key11", std::nullopt, 2, 1, log_entry::entry_type::remove_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[1], 1, "key31", "value3", 2, 3, log_entry::entry_type::normal_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[2], 1, "key41", std::nullopt, 2, 0, log_entry::entry_type::remove_entry)); + EXPECT_TRUE(AssertLogEntry(log_entries[3], 2, "key21", "value2", 2, 0, log_entry::entry_type::normal_entry)); + + // 5. Verify the snapshot contents after restart + std::vector> kv_list = restart_datastore_and_read_snapshot(); + + // key1 should exist with its updated value, key2 and key3 should be removed + ASSERT_EQ(kv_list.size(), 4); + EXPECT_EQ(kv_list[0].first, "key3"); + EXPECT_EQ(kv_list[0].second, "value3"); + EXPECT_EQ(kv_list[1].first, "key31"); + EXPECT_EQ(kv_list[1].second, "value3"); + EXPECT_EQ(kv_list[2].first, "key2"); + EXPECT_EQ(kv_list[2].second, "value2"); + EXPECT_EQ(kv_list[3].first, "key21"); + EXPECT_EQ(kv_list[3].second, "value2"); +} + + + + + + // This test is disabled because it is environment-dependent and may not work properly in CI environments. TEST_F(compaction_test, DISABLED_fail_compact_with_io_error) { gen_datastore();