diff --git a/idl/duplication.thrift b/idl/duplication.thrift index 140361ca94..589cc5d085 100644 --- a/idl/duplication.thrift +++ b/idl/duplication.thrift @@ -172,6 +172,10 @@ struct duplication_confirm_entry 1:i32 dupid; 2:i64 confirmed_decree; 3:optional bool checkpoint_prepared = false; + + // Last committed decree from the primary replica of each partition, collected by + // meta server and used to be compared with duplicating progress of follower table. + 4:optional i64 last_committed_decree; } // This is an internal RPC sent from replica server to meta. diff --git a/src/meta/duplication/duplication_info.cpp b/src/meta/duplication/duplication_info.cpp index 7f7bb62b69..8f609c7b20 100644 --- a/src/meta/duplication/duplication_info.cpp +++ b/src/meta/duplication/duplication_info.cpp @@ -20,10 +20,20 @@ #include "common/duplication_common.h" #include "meta/meta_data.h" #include "runtime/api_layer1.h" +#include "utils/flags.h" #include "utils/fmt_logging.h" -namespace dsn { -namespace replication { +DSN_DEFINE_uint64(replication, + dup_progress_min_update_period_ms, + 5000, + "The minimum period in milliseconds that progress of duplication is updated"); + +DSN_DEFINE_uint64(replication, + dup_progress_min_report_period_ms, + 5ULL * 60 * 1000, + "The minimum period in milliseconds that progress of duplication is reported"); + +namespace dsn::replication { /*extern*/ void json_encode(dsn::json::JsonWriter &out, const duplication_status::type &s) { @@ -116,8 +126,13 @@ void duplication_info::init_progress(int partition_index, decree d) zauto_write_lock l(_lock); auto &p = _progress[partition_index]; + + p.last_committed_decree = invalid_decree; p.volatile_decree = p.stored_decree = d; + p.is_altering = false; + p.last_progress_update_ms = 0; p.is_inited = true; + p.checkpoint_prepared = false; } bool duplication_info::alter_progress(int partition_index, @@ -126,9 +141,18 @@ bool duplication_info::alter_progress(int partition_index, zauto_write_lock l(_lock); partition_progress &p = _progress[partition_index]; + + // last_committed_decree could be update at any time no matter whether progress is + // initialized or busy updating, since it is not persisted to remote meta storage. + // It is just collected from the primary replica of each partition. + if (confirm_entry.__isset.last_committed_decree) { + p.last_committed_decree = confirm_entry.last_committed_decree; + } + if (!p.is_inited) { return false; } + if (p.is_altering) { return false; } @@ -137,15 +161,19 @@ bool duplication_info::alter_progress(int partition_index, if (p.volatile_decree < confirm_entry.confirmed_decree) { p.volatile_decree = confirm_entry.confirmed_decree; } - if (p.volatile_decree != p.stored_decree) { - // progress update is not supposed to be too frequent. - if (dsn_now_ms() > p.last_progress_update_ms + PROGRESS_UPDATE_PERIOD_MS) { - p.is_altering = true; - p.last_progress_update_ms = dsn_now_ms(); - return true; - } + + if (p.volatile_decree == p.stored_decree) { + return false; + } + + // Progress update is not supposed to be too frequent. + if (dsn_now_ms() < p.last_progress_update_ms + FLAGS_dup_progress_min_update_period_ms) { + return false; } - return false; + + p.is_altering = true; + p.last_progress_update_ms = dsn_now_ms(); + return true; } void duplication_info::persist_progress(int partition_index) @@ -163,13 +191,26 @@ void duplication_info::persist_status() zauto_write_lock l(_lock); if (!_is_altering) { - LOG_ERROR_PREFIX("callers never write a duplication that is not altering to meta store"); + LOG_ERROR_PREFIX("the status of this duplication is not being altered: status={}, " + "next_status={}, master_app_id={}, master_app_name={}, " + "follower_cluster_name={}, follower_app_name={}", + duplication_status_to_string(_status), + duplication_status_to_string(_next_status), + app_id, + app_name, + remote_cluster_name, + remote_app_name); return; } - LOG_INFO_PREFIX("change duplication status from {} to {} successfully [app_id: {}]", + + LOG_INFO_PREFIX("change duplication status from {} to {} successfully: master_app_id={}, " + "master_app_name={}, follower_cluster_name={}, follower_app_name={}", duplication_status_to_string(_status), duplication_status_to_string(_next_status), - app_id); + app_id, + app_name, + remote_cluster_name, + remote_app_name); _is_altering = false; _status = _next_status; @@ -197,11 +238,13 @@ blob duplication_info::to_json_blob() const void duplication_info::report_progress_if_time_up() { - // progress report is not supposed to be too frequent. - if (dsn_now_ms() > _last_progress_report_ms + PROGRESS_REPORT_PERIOD_MS) { - _last_progress_report_ms = dsn_now_ms(); - LOG_INFO("duplication report: {}", to_string()); + // Progress report is not supposed to be too frequent. + if (dsn_now_ms() < _last_progress_report_ms + FLAGS_dup_progress_min_report_period_ms) { + return; } + + _last_progress_report_ms = dsn_now_ms(); + LOG_INFO("duplication report: {}", to_string()); } duplication_info_s_ptr duplication_info::decode_from_blob(dupid_t dup_id, @@ -263,5 +306,4 @@ void duplication_info::append_if_valid_for_query( ent.__isset.progress = false; } -} // namespace replication -} // namespace dsn +} // namespace dsn::replication diff --git a/src/meta/duplication/duplication_info.h b/src/meta/duplication/duplication_info.h index e1ddcacf38..7563d3d411 100644 --- a/src/meta/duplication/duplication_info.h +++ b/src/meta/duplication/duplication_info.h @@ -216,13 +216,15 @@ class duplication_info mutable zrwlock_nr _lock; - static constexpr int PROGRESS_UPDATE_PERIOD_MS = 5000; // 5s - static constexpr int PROGRESS_REPORT_PERIOD_MS = 1000 * 60 * 5; // 5min - struct partition_progress { + // Last committed decree collected from the primary replica of each partition. + // Not persisted to remote meta storage. + int64_t last_committed_decree{invalid_decree}; + int64_t volatile_decree{invalid_decree}; int64_t stored_decree{invalid_decree}; + bool is_altering{false}; uint64_t last_progress_update_ms{0}; bool is_inited{false}; diff --git a/src/meta/duplication/meta_duplication_service.cpp b/src/meta/duplication/meta_duplication_service.cpp index c557c890a2..f63d8af359 100644 --- a/src/meta/duplication/meta_duplication_service.cpp +++ b/src/meta/duplication/meta_duplication_service.cpp @@ -791,31 +791,36 @@ void meta_duplication_service::do_update_partition_confirmed( int32_t partition_idx, const duplication_confirm_entry &confirm_entry) { - if (dup->alter_progress(partition_idx, confirm_entry)) { - std::string path = get_partition_path(dup, std::to_string(partition_idx)); - blob value = blob::create_from_bytes(std::to_string(confirm_entry.confirmed_decree)); + if (!dup->alter_progress(partition_idx, confirm_entry)) { + return; + } + + const auto &path = get_partition_path(dup, std::to_string(partition_idx)); + + _meta_svc->get_meta_storage()->get_data( + path, [dup, rpc, partition_idx, confirm_entry, path, this](const blob &data) mutable { + auto value = blob::create_from_bytes(std::to_string(confirm_entry.confirmed_decree)); - _meta_svc->get_meta_storage()->get_data(std::string(path), [=](const blob &data) mutable { - if (data.length() == 0) { + if (data.empty()) { _meta_svc->get_meta_storage()->create_node( - std::string(path), std::move(value), [=]() mutable { - dup->persist_progress(partition_idx); - rpc.response().dup_map[dup->app_id][dup->id].progress[partition_idx] = - confirm_entry.confirmed_decree; - }); - } else { - _meta_svc->get_meta_storage()->set_data( - std::string(path), std::move(value), [=]() mutable { + path, std::move(value), [dup, rpc, partition_idx, confirm_entry]() mutable { dup->persist_progress(partition_idx); rpc.response().dup_map[dup->app_id][dup->id].progress[partition_idx] = confirm_entry.confirmed_decree; }); + return; } + _meta_svc->get_meta_storage()->set_data( + path, std::move(value), [dup, rpc, partition_idx, confirm_entry]() mutable { + dup->persist_progress(partition_idx); + rpc.response().dup_map[dup->app_id][dup->id].progress[partition_idx] = + confirm_entry.confirmed_decree; + }); + // duplication_sync_rpc will finally be replied when confirmed points // of all partitions are stored. }); - } } std::shared_ptr @@ -908,7 +913,7 @@ void meta_duplication_service::do_restore_duplication_progress( std::move(partition_path), [dup, partition_idx](const blob &value) { // value is confirmed_decree encoded in string. - if (value.size() == 0) { + if (value.empty()) { // not found dup->init_progress(partition_idx, invalid_decree); return; @@ -953,10 +958,11 @@ void meta_duplication_service::do_restore_duplication(dupid_t dup_id, app->max_replica_count, store_path, json); - if (nullptr == dup) { + if (!dup) { LOG_ERROR("failed to decode json \"{}\" on path {}", json, store_path); return; // fail fast } + if (!dup->is_invalid_status()) { app->duplications[dup->id] = dup; refresh_duplicating_no_lock(app); diff --git a/src/meta/meta_state_service_utils.h b/src/meta/meta_state_service_utils.h index 41099e898c..04b8086df7 100644 --- a/src/meta/meta_state_service_utils.h +++ b/src/meta/meta_state_service_utils.h @@ -20,11 +20,14 @@ #include #include #include +#include #include +#include "utils/blob.h" + namespace dsn { -class blob; class task_tracker; + namespace dist { class meta_state_service; } // namespace dist @@ -57,6 +60,11 @@ struct meta_storage void create_node(std::string &&node, blob &&value, std::function &&cb); + void create_node(const std::string &node, blob &&value, std::function &&cb) + { + create_node(std::string(node), std::move(value), std::move(cb)); + } + void delete_node_recursively(std::string &&node, std::function &&cb); void delete_node(std::string &&node, std::function &&cb); @@ -64,9 +72,19 @@ struct meta_storage /// Will fatal if node doesn't exists. void set_data(std::string &&node, blob &&value, std::function &&cb); + void set_data(const std::string &node, blob &&value, std::function &&cb) + { + set_data(std::string(node), std::move(value), std::move(cb)); + } + /// If node does not exist, cb will receive an empty blob. void get_data(std::string &&node, std::function &&cb); + void get_data(const std::string &node, std::function &&cb) + { + get_data(std::string(node), std::move(cb)); + } + /// \param cb: void (bool node_exists, const std::vector &children) /// `children` contains the name (not full path) of children nodes. /// `node_exists` indicates whether this node exists. diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp index fe18206885..8e68113f72 100644 --- a/src/meta/server_state.cpp +++ b/src/meta/server_state.cpp @@ -1128,9 +1128,10 @@ void server_state::create_app(dsn::message_ex *msg) request.options.partition_count, request.options.replica_count, duplicating - ? fmt::format("{}.{}", - request.options.envs[duplication_constants::kEnvMasterClusterKey], - request.app_name) + ? fmt::format("master_cluster_name={}, master_app_name={}", + master_cluster->second, + gutil::FindWithDefault(request.options.envs, + duplication_constants::kEnvMasterAppNameKey)) : "false"); auto option_match_check = [](const create_app_options &opt, const app_state &exist_app) { @@ -1162,7 +1163,7 @@ void server_state::create_app(dsn::message_ex *msg) zauto_write_lock l(_lock); auto app = get_app(request.app_name); - if (nullptr != app) { + if (app) { configuration_create_app_response response; switch (app->status) { diff --git a/src/meta/test/duplication_info_test.cpp b/src/meta/test/duplication_info_test.cpp index 5cfa4e12be..72bbcdbb15 100644 --- a/src/meta/test/duplication_info_test.cpp +++ b/src/meta/test/duplication_info_test.cpp @@ -30,9 +30,12 @@ #include "gtest/gtest.h" #include "runtime/app_model.h" +#include "test_util/test_util.h" +#include "utils/flags.h" -namespace dsn { -namespace replication { +DSN_DECLARE_uint64(dup_progress_min_update_period_ms); + +namespace dsn::replication { class duplication_info_test : public testing::Test { @@ -48,9 +51,22 @@ class duplication_info_test : public testing::Test dup._status = status; } - static void test_alter_progress() + static void test_init_progress(duplication_info &dup, int partition_idx, decree expected_decree) { + dup.init_progress(partition_idx, expected_decree); + + const auto &progress = dup._progress[partition_idx]; + ASSERT_EQ(invalid_decree, progress.last_committed_decree); + ASSERT_EQ(expected_decree, progress.volatile_decree); + ASSERT_EQ(expected_decree, progress.stored_decree); + ASSERT_FALSE(progress.is_altering); + ASSERT_EQ(0, progress.last_progress_update_ms); + ASSERT_TRUE(progress.is_inited); + ASSERT_FALSE(progress.checkpoint_prepared); + } + static void test_alter_progress() + { duplication_info dup(1, 1, kTestAppName, @@ -61,46 +77,91 @@ class duplication_info_test : public testing::Test kTestRemoteAppName, std::vector(), kTestMetaStorePath); - duplication_confirm_entry entry; - ASSERT_FALSE(dup.alter_progress(0, entry)); - dup.init_progress(0, invalid_decree); + // Failed to alter progres for partition 0 since it has not been initialized. + ASSERT_FALSE(dup.alter_progress(0, duplication_confirm_entry())); + + // Initialize progress for partition 0. + test_init_progress(dup, 0, invalid_decree); + + // Alter progress with specified decrees for partition 0. + duplication_confirm_entry entry; + entry.__set_last_committed_decree(8); entry.confirmed_decree = 5; entry.checkpoint_prepared = true; ASSERT_TRUE(dup.alter_progress(0, entry)); - ASSERT_EQ(dup._progress[0].volatile_decree, 5); + + ASSERT_EQ(8, dup._progress[0].last_committed_decree); + ASSERT_EQ(5, dup._progress[0].volatile_decree); + ASSERT_EQ(invalid_decree, dup._progress[0].stored_decree); ASSERT_TRUE(dup._progress[0].is_altering); ASSERT_TRUE(dup._progress[0].checkpoint_prepared); - // busy updating + // Busy updating. + entry.__set_last_committed_decree(15); entry.confirmed_decree = 10; entry.checkpoint_prepared = false; ASSERT_FALSE(dup.alter_progress(0, entry)); - ASSERT_EQ(dup._progress[0].volatile_decree, 5); + + // last_committed_decree could be updated at any time. + ASSERT_EQ(15, dup._progress[0].last_committed_decree); + ASSERT_EQ(5, dup._progress[0].volatile_decree); + ASSERT_EQ(invalid_decree, dup._progress[0].stored_decree); ASSERT_TRUE(dup._progress[0].is_altering); ASSERT_TRUE(dup._progress[0].checkpoint_prepared); + // Persist progress for partition 0. dup.persist_progress(0); - ASSERT_EQ(dup._progress[0].stored_decree, 5); + + ASSERT_EQ(15, dup._progress[0].last_committed_decree); + ASSERT_EQ(5, dup._progress[0].volatile_decree); + ASSERT_EQ(5, dup._progress[0].stored_decree); ASSERT_FALSE(dup._progress[0].is_altering); ASSERT_TRUE(dup._progress[0].checkpoint_prepared); - // too frequent to update - dup.init_progress(1, invalid_decree); + // Initialize progress for partition 1. + test_init_progress(dup, 1, 5); + + // Alter progress for partition 1. ASSERT_TRUE(dup.alter_progress(1, entry)); + + ASSERT_EQ(15, dup._progress[1].last_committed_decree); + ASSERT_EQ(10, dup._progress[1].volatile_decree); + ASSERT_EQ(5, dup._progress[1].stored_decree); ASSERT_TRUE(dup._progress[1].is_altering); + ASSERT_FALSE(dup._progress[1].checkpoint_prepared); + + // Persist progress for partition 1. dup.persist_progress(1); + // It is too frequent to alter progress. + PRESERVE_FLAG(dup_progress_min_update_period_ms); + FLAGS_dup_progress_min_update_period_ms = 10000; + entry.__set_last_committed_decree(25); + entry.confirmed_decree = 15; + entry.checkpoint_prepared = true; ASSERT_FALSE(dup.alter_progress(1, entry)); + ASSERT_EQ(25, dup._progress[1].last_committed_decree); + // volatile_decree would be updated successfully even if it is too frequent. + ASSERT_EQ(15, dup._progress[1].volatile_decree); + ASSERT_EQ(10, dup._progress[1].stored_decree); ASSERT_FALSE(dup._progress[1].is_altering); + // checkpoint_prepared would be updated successfully even if it is too frequent. + ASSERT_TRUE(dup._progress[1].checkpoint_prepared); - dup._progress[1].last_progress_update_ms -= - duplication_info::PROGRESS_UPDATE_PERIOD_MS + 100; + // Reduce last update timestamp to make it infrequent. + dup._progress[1].last_progress_update_ms -= FLAGS_dup_progress_min_update_period_ms + 100; + entry.__set_last_committed_decree(26); + entry.confirmed_decree = 25; - entry.confirmed_decree = 15; - entry.checkpoint_prepared = true; ASSERT_TRUE(dup.alter_progress(1, entry)); + ASSERT_EQ(26, dup._progress[1].last_committed_decree); + ASSERT_EQ(25, dup._progress[1].volatile_decree); + ASSERT_EQ(10, dup._progress[1].stored_decree); ASSERT_TRUE(dup._progress[1].is_altering); + ASSERT_TRUE(dup._progress[1].checkpoint_prepared); + + // Checkpoint are ready for both partition 0 and 1. ASSERT_TRUE(dup.all_checkpoint_has_prepared()); } @@ -128,8 +189,9 @@ class duplication_info_test : public testing::Test for (int i = 0; i < 4; i++) { dup.init_progress(i, invalid_decree); } + for (auto kv : dup_ent.progress) { - ASSERT_EQ(kv.second, invalid_decree); + ASSERT_EQ(invalid_decree, kv.second); } dup.start(); @@ -153,8 +215,8 @@ class duplication_info_test : public testing::Test dup.start(); dup.persist_status(); - ASSERT_EQ(dup._status, duplication_status::DS_PREPARE); - ASSERT_EQ(dup._next_status, duplication_status::DS_INIT); + ASSERT_EQ(duplication_status::DS_PREPARE, dup._status); + ASSERT_EQ(duplication_status::DS_INIT, dup._next_status); ASSERT_FALSE(dup.is_altering()); } @@ -358,5 +420,4 @@ TEST_F(duplication_info_test, is_valid) ASSERT_TRUE(dup.is_invalid_status()); } -} // namespace replication -} // namespace dsn +} // namespace dsn::replication diff --git a/src/meta/test/meta_duplication_service_test.cpp b/src/meta/test/meta_duplication_service_test.cpp index a8a6668f9c..ec65a41fa0 100644 --- a/src/meta/test/meta_duplication_service_test.cpp +++ b/src/meta/test/meta_duplication_service_test.cpp @@ -677,11 +677,11 @@ TEST_F(meta_duplication_service_test, remove_dup) TEST_F(meta_duplication_service_test, duplication_sync) { const auto &server_nodes = ensure_enough_alive_nodes(3); - const std::string test_app = "test_app_0"; + const std::string test_app("test_app_0"); create_app(test_app); auto app = find_app(test_app); - // generate all primaries on node[0] + // Generate all primaries on node[0]. for (auto &pc : app->pcs) { pc.ballot = random32(1, 10000); SET_IP_AND_HOST_PORT_BY_DNS(pc, primary, server_nodes[0]); @@ -696,6 +696,7 @@ TEST_F(meta_duplication_service_test, duplication_sync) for (int i = 0; i < app->partition_count; i++) { dup->init_progress(i, invalid_decree); } + { std::map> confirm_list; @@ -712,20 +713,20 @@ TEST_F(meta_duplication_service_test, duplication_sync) confirm_list[gpid(app->app_id, 3)].push_back(ce); duplication_sync_response resp = duplication_sync(node, confirm_list); - ASSERT_EQ(resp.err, ERR_OK); - ASSERT_EQ(resp.dup_map.size(), 1); - ASSERT_EQ(resp.dup_map[app->app_id].size(), 1); - ASSERT_EQ(resp.dup_map[app->app_id][dupid].dupid, dupid); - ASSERT_EQ(resp.dup_map[app->app_id][dupid].status, duplication_status::DS_PREPARE); - ASSERT_EQ(resp.dup_map[app->app_id][dupid].create_ts, dup->create_timestamp_ms); - ASSERT_EQ(resp.dup_map[app->app_id][dupid].remote, dup->remote_cluster_name); - ASSERT_EQ(resp.dup_map[app->app_id][dupid].fail_mode, dup->fail_mode()); + ASSERT_EQ(ERR_OK, resp.err); + ASSERT_EQ(1, resp.dup_map.size()); + ASSERT_EQ(1, resp.dup_map[app->app_id].size()); + ASSERT_EQ(dupid, resp.dup_map[app->app_id][dupid].dupid); + ASSERT_EQ(duplication_status::DS_PREPARE, resp.dup_map[app->app_id][dupid].status); + ASSERT_EQ(dup->create_timestamp_ms, resp.dup_map[app->app_id][dupid].create_ts); + ASSERT_EQ(dup->remote_cluster_name, resp.dup_map[app->app_id][dupid].remote); + ASSERT_EQ(dup->fail_mode(), resp.dup_map[app->app_id][dupid].fail_mode); auto progress_map = resp.dup_map[app->app_id][dupid].progress; - ASSERT_EQ(progress_map.size(), 8); - ASSERT_EQ(progress_map[1], 5); - ASSERT_EQ(progress_map[2], 6); - ASSERT_EQ(progress_map[3], 7); + ASSERT_EQ(8, progress_map.size()); + ASSERT_EQ(5, progress_map[1]); + ASSERT_EQ(6, progress_map[2]); + ASSERT_EQ(7, progress_map[3]); // ensure no updated progresses will also be included in response for (int p = 4; p < 8; p++) { diff --git a/src/replica/duplication/replica_duplicator_manager.cpp b/src/replica/duplication/replica_duplicator_manager.cpp index 2e1e61cc4d..724f164d1a 100644 --- a/src/replica/duplication/replica_duplicator_manager.cpp +++ b/src/replica/duplication/replica_duplicator_manager.cpp @@ -94,6 +94,7 @@ replica_duplicator_manager::get_duplication_confirms_to_update() const entry.dupid = dup->id(); entry.confirmed_decree = progress.last_decree; entry.__set_checkpoint_prepared(progress.checkpoint_has_prepared); + entry.__set_last_committed_decree(_replica->last_committed_decree()); updates.emplace_back(entry); } return updates;