From ba2c623d7a5f692ee333ec346daaae27f26bf5be Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Thu, 28 Nov 2024 14:33:11 +0800 Subject: [PATCH 1/8] feat(duplication): collect last committed decree from replica to meta for duplication --- idl/duplication.thrift | 3 + src/meta/duplication/duplication_info.cpp | 57 +++++++++++++++---- src/meta/duplication/duplication_info.h | 7 ++- .../duplication/meta_duplication_service.cpp | 23 ++++---- src/meta/meta_state_service_utils.h | 15 +++++ src/meta/server_state.cpp | 8 +-- src/meta/test/duplication_info_test.cpp | 4 +- .../replica_duplicator_manager.cpp | 1 + 8 files changed, 88 insertions(+), 30 deletions(-) diff --git a/idl/duplication.thrift b/idl/duplication.thrift index 140361ca94..91aefd2781 100644 --- a/idl/duplication.thrift +++ b/idl/duplication.thrift @@ -172,6 +172,9 @@ struct duplication_confirm_entry 1:i32 dupid; 2:i64 confirmed_decree; 3:optional bool checkpoint_prepared = false; + + // + 4:optional i64 last_committed_decree; } // This is an internal RPC sent from replica server to meta. diff --git a/src/meta/duplication/duplication_info.cpp b/src/meta/duplication/duplication_info.cpp index 7f7bb62b69..d399978950 100644 --- a/src/meta/duplication/duplication_info.cpp +++ b/src/meta/duplication/duplication_info.cpp @@ -22,6 +22,16 @@ #include "runtime/api_layer1.h" #include "utils/fmt_logging.h" +DSN_DEFINE_uint64(replication, + dup_progress_min_update_period_ms, + 5000, + "The minimum period in milliseconds that progress of duplication is updated"); + +DSN_DEFINE_uint64(replication, + dup_progress_min_report_period_ms, + 5 * 60 * 1000, + "The minimum period in milliseconds that progress of duplication is reported"); + namespace dsn { namespace replication { @@ -133,19 +143,27 @@ bool duplication_info::alter_progress(int partition_index, return false; } + if (confirm_entry.__isset.last_committed_decree) { + p.last_committed_decree = confirm_entry.last_committed_decree; + } + p.checkpoint_prepared = confirm_entry.checkpoint_prepared; if (p.volatile_decree < confirm_entry.confirmed_decree) { p.volatile_decree = confirm_entry.confirmed_decree; } - if (p.volatile_decree != p.stored_decree) { - // progress update is not supposed to be too frequent. - if (dsn_now_ms() > p.last_progress_update_ms + PROGRESS_UPDATE_PERIOD_MS) { + + if (p.volatile_decree == p.stored_decree) { + return false; + } + + // Progress update is not supposed to be too frequent. + if (dsn_now_ms() < p.last_progress_update_ms + FLAGS_dup_progress_min_update_period_ms) { + return false; + } + p.is_altering = true; p.last_progress_update_ms = dsn_now_ms(); return true; - } - } - return false; } void duplication_info::persist_progress(int partition_index) @@ -163,13 +181,26 @@ void duplication_info::persist_status() zauto_write_lock l(_lock); if (!_is_altering) { - LOG_ERROR_PREFIX("callers never write a duplication that is not altering to meta store"); + LOG_ERROR_PREFIX("the status of this duplication is not being altered: status={}, " + "next_status={}, master_app_id={}, master_app_name={}, " + "follower_cluster_name={}, follower_app_name={}" + duplication_status_to_string(_status), + duplication_status_to_string(_next_status), + app_id, + app_name, + remote_cluster_name, + remote_app_name); return; } - LOG_INFO_PREFIX("change duplication status from {} to {} successfully [app_id: {}]", + + LOG_INFO_PREFIX("change duplication status from {} to {} successfully: master_app_id={}, " + "master_app_name={}, follower_cluster_name={}, follower_app_name={}", duplication_status_to_string(_status), duplication_status_to_string(_next_status), - app_id); + app_id, + app_name, + remote_cluster_name, + remote_app_name); _is_altering = false; _status = _next_status; @@ -197,11 +228,13 @@ blob duplication_info::to_json_blob() const void duplication_info::report_progress_if_time_up() { - // progress report is not supposed to be too frequent. - if (dsn_now_ms() > _last_progress_report_ms + PROGRESS_REPORT_PERIOD_MS) { + // Progress report is not supposed to be too frequent. + if (dsn_now_ms() < _last_progress_report_ms + FLAGS_dup_progress_min_report_period_ms) { + return; + } + _last_progress_report_ms = dsn_now_ms(); LOG_INFO("duplication report: {}", to_string()); - } } duplication_info_s_ptr duplication_info::decode_from_blob(dupid_t dup_id, diff --git a/src/meta/duplication/duplication_info.h b/src/meta/duplication/duplication_info.h index e1ddcacf38..2dccec4380 100644 --- a/src/meta/duplication/duplication_info.h +++ b/src/meta/duplication/duplication_info.h @@ -216,13 +216,14 @@ class duplication_info mutable zrwlock_nr _lock; - static constexpr int PROGRESS_UPDATE_PERIOD_MS = 5000; // 5s - static constexpr int PROGRESS_REPORT_PERIOD_MS = 1000 * 60 * 5; // 5min - struct partition_progress { int64_t volatile_decree{invalid_decree}; int64_t stored_decree{invalid_decree}; + + // + int64_t last_committed_decree{invalid_decree}; + bool is_altering{false}; uint64_t last_progress_update_ms{0}; bool is_inited{false}; diff --git a/src/meta/duplication/meta_duplication_service.cpp b/src/meta/duplication/meta_duplication_service.cpp index c557c890a2..a021658b0b 100644 --- a/src/meta/duplication/meta_duplication_service.cpp +++ b/src/meta/duplication/meta_duplication_service.cpp @@ -791,31 +791,34 @@ void meta_duplication_service::do_update_partition_confirmed( int32_t partition_idx, const duplication_confirm_entry &confirm_entry) { - if (dup->alter_progress(partition_idx, confirm_entry)) { - std::string path = get_partition_path(dup, std::to_string(partition_idx)); - blob value = blob::create_from_bytes(std::to_string(confirm_entry.confirmed_decree)); + if (!dup->alter_progress(partition_idx, confirm_entry)) { + return; + } - _meta_svc->get_meta_storage()->get_data(std::string(path), [=](const blob &data) mutable { - if (data.length() == 0) { + const auto &path = get_partition_path(dup, std::to_string(partition_idx)); + const auto &value = blob::create_from_bytes(std::to_string(confirm_entry.confirmed_decree)); + + _meta_svc->get_meta_storage()->get_data(path, [=](const blob &data) mutable { + if (data.empty()) { _meta_svc->get_meta_storage()->create_node( - std::string(path), std::move(value), [=]() mutable { + path, std::move(value), [=]() mutable { dup->persist_progress(partition_idx); rpc.response().dup_map[dup->app_id][dup->id].progress[partition_idx] = confirm_entry.confirmed_decree; }); - } else { + return; + } + _meta_svc->get_meta_storage()->set_data( - std::string(path), std::move(value), [=]() mutable { + path, std::move(value), [=]() mutable { dup->persist_progress(partition_idx); rpc.response().dup_map[dup->app_id][dup->id].progress[partition_idx] = confirm_entry.confirmed_decree; }); - } // duplication_sync_rpc will finally be replied when confirmed points // of all partitions are stored. }); - } } std::shared_ptr diff --git a/src/meta/meta_state_service_utils.h b/src/meta/meta_state_service_utils.h index 41099e898c..1676637be5 100644 --- a/src/meta/meta_state_service_utils.h +++ b/src/meta/meta_state_service_utils.h @@ -57,6 +57,11 @@ struct meta_storage void create_node(std::string &&node, blob &&value, std::function &&cb); + void create_node(const std::string &node, blob &&value, std::function &&cb) + { + create_node(std::string(node), std::move(value), std::move(cb)); + } + void delete_node_recursively(std::string &&node, std::function &&cb); void delete_node(std::string &&node, std::function &&cb); @@ -64,9 +69,19 @@ struct meta_storage /// Will fatal if node doesn't exists. void set_data(std::string &&node, blob &&value, std::function &&cb); + void set_data(const std::string &node, blob &&value, std::function &&cb); + { + set_data(std::string(node), std::move(value), std::move(cb)); + } + /// If node does not exist, cb will receive an empty blob. void get_data(std::string &&node, std::function &&cb); + void get_data(const std::string &node, std::function &&cb) + { + get_data(std::string(node), std::move(cb)); + } + /// \param cb: void (bool node_exists, const std::vector &children) /// `children` contains the name (not full path) of children nodes. /// `node_exists` indicates whether this node exists. diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp index fe18206885..59a5e0c74a 100644 --- a/src/meta/server_state.cpp +++ b/src/meta/server_state.cpp @@ -1128,9 +1128,9 @@ void server_state::create_app(dsn::message_ex *msg) request.options.partition_count, request.options.replica_count, duplicating - ? fmt::format("{}.{}", - request.options.envs[duplication_constants::kEnvMasterClusterKey], - request.app_name) + ? fmt::format("master_cluster_name={}, master_app_name={}", + master_cluster->second, + gutil::FindWithDefault(request.options.envs, duplication_constants::kEnvMasterAppNameKey)) : "false"); auto option_match_check = [](const create_app_options &opt, const app_state &exist_app) { @@ -1162,7 +1162,7 @@ void server_state::create_app(dsn::message_ex *msg) zauto_write_lock l(_lock); auto app = get_app(request.app_name); - if (nullptr != app) { + if (!app) { configuration_create_app_response response; switch (app->status) { diff --git a/src/meta/test/duplication_info_test.cpp b/src/meta/test/duplication_info_test.cpp index 5cfa4e12be..7dfa02f214 100644 --- a/src/meta/test/duplication_info_test.cpp +++ b/src/meta/test/duplication_info_test.cpp @@ -31,6 +31,8 @@ #include "gtest/gtest.h" #include "runtime/app_model.h" +DSN_DECLARE_uint64(dup_progress_min_update_period_ms) + namespace dsn { namespace replication { @@ -95,7 +97,7 @@ class duplication_info_test : public testing::Test ASSERT_FALSE(dup._progress[1].is_altering); dup._progress[1].last_progress_update_ms -= - duplication_info::PROGRESS_UPDATE_PERIOD_MS + 100; + FLAGS_dup_progress_min_update_period_ms + 100; entry.confirmed_decree = 15; entry.checkpoint_prepared = true; diff --git a/src/replica/duplication/replica_duplicator_manager.cpp b/src/replica/duplication/replica_duplicator_manager.cpp index 2e1e61cc4d..724f164d1a 100644 --- a/src/replica/duplication/replica_duplicator_manager.cpp +++ b/src/replica/duplication/replica_duplicator_manager.cpp @@ -94,6 +94,7 @@ replica_duplicator_manager::get_duplication_confirms_to_update() const entry.dupid = dup->id(); entry.confirmed_decree = progress.last_decree; entry.__set_checkpoint_prepared(progress.checkpoint_has_prepared); + entry.__set_last_committed_decree(_replica->last_committed_decree()); updates.emplace_back(entry); } return updates; From 0c10d9659228d7986043eaf92dfbea7424755cf4 Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Thu, 28 Nov 2024 15:21:58 +0800 Subject: [PATCH 2/8] format --- src/meta/duplication/duplication_info.cpp | 38 +++++++++---------- src/meta/duplication/duplication_info.h | 2 +- .../duplication/meta_duplication_service.cpp | 24 ++++++------ src/meta/meta_state_service_utils.h | 2 +- src/meta/server_state.cpp | 3 +- src/meta/test/duplication_info_test.cpp | 5 +-- 6 files changed, 38 insertions(+), 36 deletions(-) diff --git a/src/meta/duplication/duplication_info.cpp b/src/meta/duplication/duplication_info.cpp index d399978950..f85c6f4f74 100644 --- a/src/meta/duplication/duplication_info.cpp +++ b/src/meta/duplication/duplication_info.cpp @@ -144,7 +144,7 @@ bool duplication_info::alter_progress(int partition_index, } if (confirm_entry.__isset.last_committed_decree) { - p.last_committed_decree = confirm_entry.last_committed_decree; + p.last_committed_decree = confirm_entry.last_committed_decree; } p.checkpoint_prepared = confirm_entry.checkpoint_prepared; @@ -153,17 +153,17 @@ bool duplication_info::alter_progress(int partition_index, } if (p.volatile_decree == p.stored_decree) { - return false; + return false; } - // Progress update is not supposed to be too frequent. - if (dsn_now_ms() < p.last_progress_update_ms + FLAGS_dup_progress_min_update_period_ms) { - return false; - } + // Progress update is not supposed to be too frequent. + if (dsn_now_ms() < p.last_progress_update_ms + FLAGS_dup_progress_min_update_period_ms) { + return false; + } - p.is_altering = true; - p.last_progress_update_ms = dsn_now_ms(); - return true; + p.is_altering = true; + p.last_progress_update_ms = dsn_now_ms(); + return true; } void duplication_info::persist_progress(int partition_index) @@ -182,14 +182,14 @@ void duplication_info::persist_status() if (!_is_altering) { LOG_ERROR_PREFIX("the status of this duplication is not being altered: status={}, " - "next_status={}, master_app_id={}, master_app_name={}, " - "follower_cluster_name={}, follower_app_name={}" - duplication_status_to_string(_status), - duplication_status_to_string(_next_status), - app_id, - app_name, - remote_cluster_name, - remote_app_name); + "next_status={}, master_app_id={}, master_app_name={}, " + "follower_cluster_name={}, follower_app_name={}", + duplication_status_to_string(_status), + duplication_status_to_string(_next_status), + app_id, + app_name, + remote_cluster_name, + remote_app_name); return; } @@ -233,8 +233,8 @@ void duplication_info::report_progress_if_time_up() return; } - _last_progress_report_ms = dsn_now_ms(); - LOG_INFO("duplication report: {}", to_string()); + _last_progress_report_ms = dsn_now_ms(); + LOG_INFO("duplication report: {}", to_string()); } duplication_info_s_ptr duplication_info::decode_from_blob(dupid_t dup_id, diff --git a/src/meta/duplication/duplication_info.h b/src/meta/duplication/duplication_info.h index 2dccec4380..962d9f54f0 100644 --- a/src/meta/duplication/duplication_info.h +++ b/src/meta/duplication/duplication_info.h @@ -221,7 +221,7 @@ class duplication_info int64_t volatile_decree{invalid_decree}; int64_t stored_decree{invalid_decree}; - // + // int64_t last_committed_decree{invalid_decree}; bool is_altering{false}; diff --git a/src/meta/duplication/meta_duplication_service.cpp b/src/meta/duplication/meta_duplication_service.cpp index a021658b0b..f723ece5f1 100644 --- a/src/meta/duplication/meta_duplication_service.cpp +++ b/src/meta/duplication/meta_duplication_service.cpp @@ -795,26 +795,28 @@ void meta_duplication_service::do_update_partition_confirmed( return; } - const auto &path = get_partition_path(dup, std::to_string(partition_idx)); - const auto &value = blob::create_from_bytes(std::to_string(confirm_entry.confirmed_decree)); + const auto &path = get_partition_path(dup, std::to_string(partition_idx)); + + _meta_svc->get_meta_storage()->get_data( + path, [dup, rpc, partition_idx, confirm_entry, path, this](const blob &data) mutable { + auto value = blob::create_from_bytes(std::to_string(confirm_entry.confirmed_decree)); - _meta_svc->get_meta_storage()->get_data(path, [=](const blob &data) mutable { if (data.empty()) { _meta_svc->get_meta_storage()->create_node( - path, std::move(value), [=]() mutable { + path, std::move(value), [dup, rpc, partition_idx, confirm_entry]() mutable { dup->persist_progress(partition_idx); rpc.response().dup_map[dup->app_id][dup->id].progress[partition_idx] = confirm_entry.confirmed_decree; }); return; - } + } - _meta_svc->get_meta_storage()->set_data( - path, std::move(value), [=]() mutable { - dup->persist_progress(partition_idx); - rpc.response().dup_map[dup->app_id][dup->id].progress[partition_idx] = - confirm_entry.confirmed_decree; - }); + _meta_svc->get_meta_storage()->set_data( + path, std::move(value), [dup, rpc, partition_idx, confirm_entry]() mutable { + dup->persist_progress(partition_idx); + rpc.response().dup_map[dup->app_id][dup->id].progress[partition_idx] = + confirm_entry.confirmed_decree; + }); // duplication_sync_rpc will finally be replied when confirmed points // of all partitions are stored. diff --git a/src/meta/meta_state_service_utils.h b/src/meta/meta_state_service_utils.h index 1676637be5..a4760918a0 100644 --- a/src/meta/meta_state_service_utils.h +++ b/src/meta/meta_state_service_utils.h @@ -69,7 +69,7 @@ struct meta_storage /// Will fatal if node doesn't exists. void set_data(std::string &&node, blob &&value, std::function &&cb); - void set_data(const std::string &node, blob &&value, std::function &&cb); + void set_data(const std::string &node, blob &&value, std::function &&cb) { set_data(std::string(node), std::move(value), std::move(cb)); } diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp index 59a5e0c74a..6a352473f1 100644 --- a/src/meta/server_state.cpp +++ b/src/meta/server_state.cpp @@ -1130,7 +1130,8 @@ void server_state::create_app(dsn::message_ex *msg) duplicating ? fmt::format("master_cluster_name={}, master_app_name={}", master_cluster->second, - gutil::FindWithDefault(request.options.envs, duplication_constants::kEnvMasterAppNameKey)) + gutil::FindWithDefault(request.options.envs, + duplication_constants::kEnvMasterAppNameKey)) : "false"); auto option_match_check = [](const create_app_options &opt, const app_state &exist_app) { diff --git a/src/meta/test/duplication_info_test.cpp b/src/meta/test/duplication_info_test.cpp index 7dfa02f214..84e029c368 100644 --- a/src/meta/test/duplication_info_test.cpp +++ b/src/meta/test/duplication_info_test.cpp @@ -31,7 +31,7 @@ #include "gtest/gtest.h" #include "runtime/app_model.h" -DSN_DECLARE_uint64(dup_progress_min_update_period_ms) +DSN_DECLARE_uint64(dup_progress_min_update_period_ms); namespace dsn { namespace replication { @@ -96,8 +96,7 @@ class duplication_info_test : public testing::Test ASSERT_FALSE(dup.alter_progress(1, entry)); ASSERT_FALSE(dup._progress[1].is_altering); - dup._progress[1].last_progress_update_ms -= - FLAGS_dup_progress_min_update_period_ms + 100; + dup._progress[1].last_progress_update_ms -= FLAGS_dup_progress_min_update_period_ms + 100; entry.confirmed_decree = 15; entry.checkpoint_prepared = true; From f2f7292928e48d02c3e46e26ffa9aa8f42a9a53c Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Thu, 28 Nov 2024 15:41:16 +0800 Subject: [PATCH 3/8] fix clang-tidy --- src/meta/duplication/duplication_info.cpp | 8 +++----- src/meta/test/duplication_info_test.cpp | 6 ++---- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/src/meta/duplication/duplication_info.cpp b/src/meta/duplication/duplication_info.cpp index f85c6f4f74..377d9326bb 100644 --- a/src/meta/duplication/duplication_info.cpp +++ b/src/meta/duplication/duplication_info.cpp @@ -29,11 +29,10 @@ DSN_DEFINE_uint64(replication, DSN_DEFINE_uint64(replication, dup_progress_min_report_period_ms, - 5 * 60 * 1000, + static_cast(5) * 60 * 1000, "The minimum period in milliseconds that progress of duplication is reported"); -namespace dsn { -namespace replication { +namespace dsn::replication { /*extern*/ void json_encode(dsn::json::JsonWriter &out, const duplication_status::type &s) { @@ -296,5 +295,4 @@ void duplication_info::append_if_valid_for_query( ent.__isset.progress = false; } -} // namespace replication -} // namespace dsn +} // namespace dsn::replication diff --git a/src/meta/test/duplication_info_test.cpp b/src/meta/test/duplication_info_test.cpp index 84e029c368..4e12bcb8c3 100644 --- a/src/meta/test/duplication_info_test.cpp +++ b/src/meta/test/duplication_info_test.cpp @@ -33,8 +33,7 @@ DSN_DECLARE_uint64(dup_progress_min_update_period_ms); -namespace dsn { -namespace replication { +namespace dsn::replication { class duplication_info_test : public testing::Test { @@ -359,5 +358,4 @@ TEST_F(duplication_info_test, is_valid) ASSERT_TRUE(dup.is_invalid_status()); } -} // namespace replication -} // namespace dsn +} // namespace dsn::replication From d0fc506dc2d4733bc75e058b43a3f52b2083c12f Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Fri, 29 Nov 2024 15:26:39 +0800 Subject: [PATCH 4/8] add test for duplication info --- src/meta/duplication/duplication_info.cpp | 18 +++- src/meta/duplication/duplication_info.h | 7 +- .../duplication/meta_duplication_service.cpp | 5 +- src/meta/test/duplication_info_test.cpp | 91 ++++++++++++++++--- 4 files changed, 97 insertions(+), 24 deletions(-) diff --git a/src/meta/duplication/duplication_info.cpp b/src/meta/duplication/duplication_info.cpp index 377d9326bb..565be6992f 100644 --- a/src/meta/duplication/duplication_info.cpp +++ b/src/meta/duplication/duplication_info.cpp @@ -125,8 +125,13 @@ void duplication_info::init_progress(int partition_index, decree d) zauto_write_lock l(_lock); auto &p = _progress[partition_index]; + + p.last_committed_decree = invalid_decree; p.volatile_decree = p.stored_decree = d; + p.is_altering = false; + p.last_progress_update_ms = 0; p.is_inited = true; + p.checkpoint_prepared = false; } bool duplication_info::alter_progress(int partition_index, @@ -135,17 +140,22 @@ bool duplication_info::alter_progress(int partition_index, zauto_write_lock l(_lock); partition_progress &p = _progress[partition_index]; + + // last_committed_decree could be update at any time no matter whether progress is + // initialized or busy updating, since it is not persisted to remote meta storage. + // It is just collected from the primary replica of each partition. + if (confirm_entry.__isset.last_committed_decree) { + p.last_committed_decree = confirm_entry.last_committed_decree; + } + if (!p.is_inited) { return false; } + if (p.is_altering) { return false; } - if (confirm_entry.__isset.last_committed_decree) { - p.last_committed_decree = confirm_entry.last_committed_decree; - } - p.checkpoint_prepared = confirm_entry.checkpoint_prepared; if (p.volatile_decree < confirm_entry.confirmed_decree) { p.volatile_decree = confirm_entry.confirmed_decree; diff --git a/src/meta/duplication/duplication_info.h b/src/meta/duplication/duplication_info.h index 962d9f54f0..d1503a8682 100644 --- a/src/meta/duplication/duplication_info.h +++ b/src/meta/duplication/duplication_info.h @@ -218,12 +218,13 @@ class duplication_info struct partition_progress { + // last committed decree collected from the primary replica of each partition. + // Not persisted to remote meta storage. + int64_t last_committed_decree{invalid_decree}; + int64_t volatile_decree{invalid_decree}; int64_t stored_decree{invalid_decree}; - // - int64_t last_committed_decree{invalid_decree}; - bool is_altering{false}; uint64_t last_progress_update_ms{0}; bool is_inited{false}; diff --git a/src/meta/duplication/meta_duplication_service.cpp b/src/meta/duplication/meta_duplication_service.cpp index f723ece5f1..f63d8af359 100644 --- a/src/meta/duplication/meta_duplication_service.cpp +++ b/src/meta/duplication/meta_duplication_service.cpp @@ -913,7 +913,7 @@ void meta_duplication_service::do_restore_duplication_progress( std::move(partition_path), [dup, partition_idx](const blob &value) { // value is confirmed_decree encoded in string. - if (value.size() == 0) { + if (value.empty()) { // not found dup->init_progress(partition_idx, invalid_decree); return; @@ -958,10 +958,11 @@ void meta_duplication_service::do_restore_duplication(dupid_t dup_id, app->max_replica_count, store_path, json); - if (nullptr == dup) { + if (!dup) { LOG_ERROR("failed to decode json \"{}\" on path {}", json, store_path); return; // fail fast } + if (!dup->is_invalid_status()) { app->duplications[dup->id] = dup; refresh_duplicating_no_lock(app); diff --git a/src/meta/test/duplication_info_test.cpp b/src/meta/test/duplication_info_test.cpp index 4e12bcb8c3..25b17212ff 100644 --- a/src/meta/test/duplication_info_test.cpp +++ b/src/meta/test/duplication_info_test.cpp @@ -30,6 +30,7 @@ #include "gtest/gtest.h" #include "runtime/app_model.h" +#include "test_util/test_util.h" DSN_DECLARE_uint64(dup_progress_min_update_period_ms); @@ -49,9 +50,22 @@ class duplication_info_test : public testing::Test dup._status = status; } - static void test_alter_progress() + static void test_init_progress(duplication_info &dup, int partition_idx, decree expected_decree) { + dup.init_progress(partition_idx, expected_decree); + + const auto &progress = dup._progress[partition_idx]; + ASSERT_EQ(invalid_decree, progress.last_committed_decree); + ASSERT_EQ(expected_decree, progress.volatile_decree); + ASSERT_EQ(expected_decree, progress.stored_decree); + ASSERT_FALSE(progress.is_altering); + ASSERT_EQ(0, progress.last_progress_update_ms); + ASSERT_TRUE(progress.is_inited); + ASSERT_FALSE(progress.checkpoint_prepared); + } + static void test_alter_progress() + { duplication_info dup(1, 1, kTestAppName, @@ -62,45 +76,91 @@ class duplication_info_test : public testing::Test kTestRemoteAppName, std::vector(), kTestMetaStorePath); - duplication_confirm_entry entry; - ASSERT_FALSE(dup.alter_progress(0, entry)); - dup.init_progress(0, invalid_decree); + // Failed to alter progres for partition 0 since it has not been initialized. + ASSERT_FALSE(dup.alter_progress(0, duplication_confirm_entry())); + + // Initialize progress for partition 0. + test_init_progress(dup, 0, invalid_decree); + + // Alter progress with specified decrees for partition 0. + duplication_confirm_entry entry; + entry.__set_last_committed_decree(8); entry.confirmed_decree = 5; entry.checkpoint_prepared = true; ASSERT_TRUE(dup.alter_progress(0, entry)); - ASSERT_EQ(dup._progress[0].volatile_decree, 5); + + ASSERT_EQ(8, dup._progress[0].last_committed_decree); + ASSERT_EQ(5, dup._progress[0].volatile_decree); + ASSERT_EQ(invalid_decree, dup._progress[0].stored_decree); ASSERT_TRUE(dup._progress[0].is_altering); ASSERT_TRUE(dup._progress[0].checkpoint_prepared); - // busy updating + // Busy updating. + entry.__set_last_committed_decree(15); entry.confirmed_decree = 10; entry.checkpoint_prepared = false; ASSERT_FALSE(dup.alter_progress(0, entry)); - ASSERT_EQ(dup._progress[0].volatile_decree, 5); + + // last_committed_decree could be updated at any time. + ASSERT_EQ(15, dup._progress[0].last_committed_decree); + ASSERT_EQ(5, dup._progress[0].volatile_decree); + ASSERT_EQ(invalid_decree, dup._progress[0].stored_decree); ASSERT_TRUE(dup._progress[0].is_altering); ASSERT_TRUE(dup._progress[0].checkpoint_prepared); + // Persist progress for partition 0. dup.persist_progress(0); - ASSERT_EQ(dup._progress[0].stored_decree, 5); + + ASSERT_EQ(15, dup._progress[0].last_committed_decree); + ASSERT_EQ(5, dup._progress[0].volatile_decree); + ASSERT_EQ(5, dup._progress[0].stored_decree); ASSERT_FALSE(dup._progress[0].is_altering); ASSERT_TRUE(dup._progress[0].checkpoint_prepared); - // too frequent to update - dup.init_progress(1, invalid_decree); + // Initialize progress for partition 1. + test_init_progress(dup, 1, 5); + + // Alter progress for partition 1. ASSERT_TRUE(dup.alter_progress(1, entry)); + + ASSERT_EQ(15, dup._progress[1].last_committed_decree); + ASSERT_EQ(10, dup._progress[1].volatile_decree); + ASSERT_EQ(5, dup._progress[1].stored_decree); ASSERT_TRUE(dup._progress[1].is_altering); + ASSERT_FALSE(dup._progress[1].checkpoint_prepared); + + // Persist progress for partition 1. dup.persist_progress(1); + // It is too frequent to alter progress. + PRESERVE_FLAG(dup_progress_min_update_period_ms); + FLAGS_dup_progress_min_update_period_ms = 10000; + entry.__set_last_committed_decree(25); + entry.confirmed_decree = 15; + entry.checkpoint_prepared = true; ASSERT_FALSE(dup.alter_progress(1, entry)); + ASSERT_EQ(25, dup._progress[1].last_committed_decree); + // volatile_decree would be updated successfully even if it is too frequent. + ASSERT_EQ(15, dup._progress[1].volatile_decree); + ASSERT_EQ(10, dup._progress[1].stored_decree); ASSERT_FALSE(dup._progress[1].is_altering); + // checkpoint_prepared would be updated successfully even if it is too frequent. + ASSERT_TRUE(dup._progress[1].checkpoint_prepared); + // Reduce last update timestamp to make it infrequent. dup._progress[1].last_progress_update_ms -= FLAGS_dup_progress_min_update_period_ms + 100; + entry.__set_last_committed_decree(26); + entry.confirmed_decree = 25; - entry.confirmed_decree = 15; - entry.checkpoint_prepared = true; ASSERT_TRUE(dup.alter_progress(1, entry)); + ASSERT_EQ(26, dup._progress[1].last_committed_decree); + ASSERT_EQ(25, dup._progress[1].volatile_decree); + ASSERT_EQ(10, dup._progress[1].stored_decree); ASSERT_TRUE(dup._progress[1].is_altering); + ASSERT_TRUE(dup._progress[1].checkpoint_prepared); + + // Checkpoint are ready for both partition 0 and 1. ASSERT_TRUE(dup.all_checkpoint_has_prepared()); } @@ -128,8 +188,9 @@ class duplication_info_test : public testing::Test for (int i = 0; i < 4; i++) { dup.init_progress(i, invalid_decree); } + for (auto kv : dup_ent.progress) { - ASSERT_EQ(kv.second, invalid_decree); + ASSERT_EQ(invalid_decree, kv.second); } dup.start(); @@ -153,8 +214,8 @@ class duplication_info_test : public testing::Test dup.start(); dup.persist_status(); - ASSERT_EQ(dup._status, duplication_status::DS_PREPARE); - ASSERT_EQ(dup._next_status, duplication_status::DS_INIT); + ASSERT_EQ(duplication_status::DS_PREPARE, dup._status); + ASSERT_EQ(duplication_status::DS_INIT, dup._next_status); ASSERT_FALSE(dup.is_altering()); } From 5efc3db68b275d2c2095b6ca8bb25b1867ea55da Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Fri, 29 Nov 2024 17:18:22 +0800 Subject: [PATCH 5/8] fix IWYU --- src/meta/duplication/duplication_info.cpp | 1 + src/meta/meta_state_service_utils.h | 5 ++++- src/meta/test/duplication_info_test.cpp | 1 + 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/meta/duplication/duplication_info.cpp b/src/meta/duplication/duplication_info.cpp index 565be6992f..027a1f97c9 100644 --- a/src/meta/duplication/duplication_info.cpp +++ b/src/meta/duplication/duplication_info.cpp @@ -20,6 +20,7 @@ #include "common/duplication_common.h" #include "meta/meta_data.h" #include "runtime/api_layer1.h" +#include "utils/flags.h" #include "utils/fmt_logging.h" DSN_DEFINE_uint64(replication, diff --git a/src/meta/meta_state_service_utils.h b/src/meta/meta_state_service_utils.h index a4760918a0..04b8086df7 100644 --- a/src/meta/meta_state_service_utils.h +++ b/src/meta/meta_state_service_utils.h @@ -20,11 +20,14 @@ #include #include #include +#include #include +#include "utils/blob.h" + namespace dsn { -class blob; class task_tracker; + namespace dist { class meta_state_service; } // namespace dist diff --git a/src/meta/test/duplication_info_test.cpp b/src/meta/test/duplication_info_test.cpp index 25b17212ff..72bbcdbb15 100644 --- a/src/meta/test/duplication_info_test.cpp +++ b/src/meta/test/duplication_info_test.cpp @@ -31,6 +31,7 @@ #include "gtest/gtest.h" #include "runtime/app_model.h" #include "test_util/test_util.h" +#include "utils/flags.h" DSN_DECLARE_uint64(dup_progress_min_update_period_ms); From f99da3c833af4a701a0a7fd81d8de29630a54ab7 Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Fri, 29 Nov 2024 17:37:11 +0800 Subject: [PATCH 6/8] fix --- idl/duplication.thrift | 3 ++- src/meta/duplication/duplication_info.h | 2 +- src/meta/server_state.cpp | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/idl/duplication.thrift b/idl/duplication.thrift index 91aefd2781..589cc5d085 100644 --- a/idl/duplication.thrift +++ b/idl/duplication.thrift @@ -173,7 +173,8 @@ struct duplication_confirm_entry 2:i64 confirmed_decree; 3:optional bool checkpoint_prepared = false; - // + // Last committed decree from the primary replica of each partition, collected by + // meta server and used to be compared with duplicating progress of follower table. 4:optional i64 last_committed_decree; } diff --git a/src/meta/duplication/duplication_info.h b/src/meta/duplication/duplication_info.h index d1503a8682..7563d3d411 100644 --- a/src/meta/duplication/duplication_info.h +++ b/src/meta/duplication/duplication_info.h @@ -218,7 +218,7 @@ class duplication_info struct partition_progress { - // last committed decree collected from the primary replica of each partition. + // Last committed decree collected from the primary replica of each partition. // Not persisted to remote meta storage. int64_t last_committed_decree{invalid_decree}; diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp index 6a352473f1..8e68113f72 100644 --- a/src/meta/server_state.cpp +++ b/src/meta/server_state.cpp @@ -1163,7 +1163,7 @@ void server_state::create_app(dsn::message_ex *msg) zauto_write_lock l(_lock); auto app = get_app(request.app_name); - if (!app) { + if (app) { configuration_create_app_response response; switch (app->status) { From d4d665b960dfdeb89d425bd87811e159dd90ec2e Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Mon, 2 Dec 2024 12:01:18 +0800 Subject: [PATCH 7/8] fix tests --- .../test/meta_duplication_service_test.cpp | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/src/meta/test/meta_duplication_service_test.cpp b/src/meta/test/meta_duplication_service_test.cpp index a8a6668f9c..ec65a41fa0 100644 --- a/src/meta/test/meta_duplication_service_test.cpp +++ b/src/meta/test/meta_duplication_service_test.cpp @@ -677,11 +677,11 @@ TEST_F(meta_duplication_service_test, remove_dup) TEST_F(meta_duplication_service_test, duplication_sync) { const auto &server_nodes = ensure_enough_alive_nodes(3); - const std::string test_app = "test_app_0"; + const std::string test_app("test_app_0"); create_app(test_app); auto app = find_app(test_app); - // generate all primaries on node[0] + // Generate all primaries on node[0]. for (auto &pc : app->pcs) { pc.ballot = random32(1, 10000); SET_IP_AND_HOST_PORT_BY_DNS(pc, primary, server_nodes[0]); @@ -696,6 +696,7 @@ TEST_F(meta_duplication_service_test, duplication_sync) for (int i = 0; i < app->partition_count; i++) { dup->init_progress(i, invalid_decree); } + { std::map> confirm_list; @@ -712,20 +713,20 @@ TEST_F(meta_duplication_service_test, duplication_sync) confirm_list[gpid(app->app_id, 3)].push_back(ce); duplication_sync_response resp = duplication_sync(node, confirm_list); - ASSERT_EQ(resp.err, ERR_OK); - ASSERT_EQ(resp.dup_map.size(), 1); - ASSERT_EQ(resp.dup_map[app->app_id].size(), 1); - ASSERT_EQ(resp.dup_map[app->app_id][dupid].dupid, dupid); - ASSERT_EQ(resp.dup_map[app->app_id][dupid].status, duplication_status::DS_PREPARE); - ASSERT_EQ(resp.dup_map[app->app_id][dupid].create_ts, dup->create_timestamp_ms); - ASSERT_EQ(resp.dup_map[app->app_id][dupid].remote, dup->remote_cluster_name); - ASSERT_EQ(resp.dup_map[app->app_id][dupid].fail_mode, dup->fail_mode()); + ASSERT_EQ(ERR_OK, resp.err); + ASSERT_EQ(1, resp.dup_map.size()); + ASSERT_EQ(1, resp.dup_map[app->app_id].size()); + ASSERT_EQ(dupid, resp.dup_map[app->app_id][dupid].dupid); + ASSERT_EQ(duplication_status::DS_PREPARE, resp.dup_map[app->app_id][dupid].status); + ASSERT_EQ(dup->create_timestamp_ms, resp.dup_map[app->app_id][dupid].create_ts); + ASSERT_EQ(dup->remote_cluster_name, resp.dup_map[app->app_id][dupid].remote); + ASSERT_EQ(dup->fail_mode(), resp.dup_map[app->app_id][dupid].fail_mode); auto progress_map = resp.dup_map[app->app_id][dupid].progress; - ASSERT_EQ(progress_map.size(), 8); - ASSERT_EQ(progress_map[1], 5); - ASSERT_EQ(progress_map[2], 6); - ASSERT_EQ(progress_map[3], 7); + ASSERT_EQ(8, progress_map.size()); + ASSERT_EQ(5, progress_map[1]); + ASSERT_EQ(6, progress_map[2]); + ASSERT_EQ(7, progress_map[3]); // ensure no updated progresses will also be included in response for (int p = 4; p < 8; p++) { From 36f19a0a5f9177d6cc08107a1df9752704ba0ad1 Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Mon, 2 Dec 2024 17:20:39 +0800 Subject: [PATCH 8/8] Update src/meta/duplication/duplication_info.cpp Co-authored-by: Yingchun Lai --- src/meta/duplication/duplication_info.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/meta/duplication/duplication_info.cpp b/src/meta/duplication/duplication_info.cpp index 027a1f97c9..8f609c7b20 100644 --- a/src/meta/duplication/duplication_info.cpp +++ b/src/meta/duplication/duplication_info.cpp @@ -30,7 +30,7 @@ DSN_DEFINE_uint64(replication, DSN_DEFINE_uint64(replication, dup_progress_min_report_period_ms, - static_cast(5) * 60 * 1000, + 5ULL * 60 * 1000, "The minimum period in milliseconds that progress of duplication is reported"); namespace dsn::replication {