Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(duplication): collect last committed decrees from primary replicas to meta server of the master cluster for duplication #2159

Merged
merged 8 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions idl/duplication.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ struct duplication_confirm_entry
1:i32 dupid;
2:i64 confirmed_decree;
3:optional bool checkpoint_prepared = false;

// Last committed decree from the primary replica of each partition, collected by
// meta server and used to be compared with duplicating progress of follower table.
4:optional i64 last_committed_decree;
}

// This is an internal RPC sent from replica server to meta.
Expand Down
80 changes: 61 additions & 19 deletions src/meta/duplication/duplication_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,20 @@
#include "common/duplication_common.h"
#include "meta/meta_data.h"
#include "runtime/api_layer1.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"

namespace dsn {
namespace replication {
DSN_DEFINE_uint64(replication,
dup_progress_min_update_period_ms,
5000,
"The minimum period in milliseconds that progress of duplication is updated");

DSN_DEFINE_uint64(replication,
dup_progress_min_report_period_ms,
5ULL * 60 * 1000,
"The minimum period in milliseconds that progress of duplication is reported");

namespace dsn::replication {

/*extern*/ void json_encode(dsn::json::JsonWriter &out, const duplication_status::type &s)
{
Expand Down Expand Up @@ -116,8 +126,13 @@ void duplication_info::init_progress(int partition_index, decree d)
zauto_write_lock l(_lock);

auto &p = _progress[partition_index];

p.last_committed_decree = invalid_decree;
p.volatile_decree = p.stored_decree = d;
p.is_altering = false;
p.last_progress_update_ms = 0;
p.is_inited = true;
p.checkpoint_prepared = false;
}

bool duplication_info::alter_progress(int partition_index,
Expand All @@ -126,9 +141,18 @@ bool duplication_info::alter_progress(int partition_index,
zauto_write_lock l(_lock);

partition_progress &p = _progress[partition_index];

// last_committed_decree could be update at any time no matter whether progress is
// initialized or busy updating, since it is not persisted to remote meta storage.
// It is just collected from the primary replica of each partition.
if (confirm_entry.__isset.last_committed_decree) {
p.last_committed_decree = confirm_entry.last_committed_decree;
}

if (!p.is_inited) {
return false;
}

if (p.is_altering) {
return false;
}
Expand All @@ -137,15 +161,19 @@ bool duplication_info::alter_progress(int partition_index,
if (p.volatile_decree < confirm_entry.confirmed_decree) {
p.volatile_decree = confirm_entry.confirmed_decree;
}
if (p.volatile_decree != p.stored_decree) {
// progress update is not supposed to be too frequent.
if (dsn_now_ms() > p.last_progress_update_ms + PROGRESS_UPDATE_PERIOD_MS) {
p.is_altering = true;
p.last_progress_update_ms = dsn_now_ms();
return true;
}

if (p.volatile_decree == p.stored_decree) {
return false;
}

// Progress update is not supposed to be too frequent.
if (dsn_now_ms() < p.last_progress_update_ms + FLAGS_dup_progress_min_update_period_ms) {
return false;
}
return false;

p.is_altering = true;
p.last_progress_update_ms = dsn_now_ms();
return true;
}

void duplication_info::persist_progress(int partition_index)
Expand All @@ -163,13 +191,26 @@ void duplication_info::persist_status()
zauto_write_lock l(_lock);

if (!_is_altering) {
LOG_ERROR_PREFIX("callers never write a duplication that is not altering to meta store");
LOG_ERROR_PREFIX("the status of this duplication is not being altered: status={}, "
"next_status={}, master_app_id={}, master_app_name={}, "
"follower_cluster_name={}, follower_app_name={}",
duplication_status_to_string(_status),
duplication_status_to_string(_next_status),
app_id,
app_name,
remote_cluster_name,
remote_app_name);
return;
}
LOG_INFO_PREFIX("change duplication status from {} to {} successfully [app_id: {}]",

LOG_INFO_PREFIX("change duplication status from {} to {} successfully: master_app_id={}, "
"master_app_name={}, follower_cluster_name={}, follower_app_name={}",
duplication_status_to_string(_status),
duplication_status_to_string(_next_status),
app_id);
app_id,
app_name,
remote_cluster_name,
remote_app_name);

_is_altering = false;
_status = _next_status;
Expand Down Expand Up @@ -197,11 +238,13 @@ blob duplication_info::to_json_blob() const

void duplication_info::report_progress_if_time_up()
{
// progress report is not supposed to be too frequent.
if (dsn_now_ms() > _last_progress_report_ms + PROGRESS_REPORT_PERIOD_MS) {
_last_progress_report_ms = dsn_now_ms();
LOG_INFO("duplication report: {}", to_string());
// Progress report is not supposed to be too frequent.
if (dsn_now_ms() < _last_progress_report_ms + FLAGS_dup_progress_min_report_period_ms) {
return;
}

_last_progress_report_ms = dsn_now_ms();
LOG_INFO("duplication report: {}", to_string());
}

duplication_info_s_ptr duplication_info::decode_from_blob(dupid_t dup_id,
Expand Down Expand Up @@ -263,5 +306,4 @@ void duplication_info::append_if_valid_for_query(
ent.__isset.progress = false;
}

} // namespace replication
} // namespace dsn
} // namespace dsn::replication
8 changes: 5 additions & 3 deletions src/meta/duplication/duplication_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,13 +216,15 @@ class duplication_info

mutable zrwlock_nr _lock;

static constexpr int PROGRESS_UPDATE_PERIOD_MS = 5000; // 5s
static constexpr int PROGRESS_REPORT_PERIOD_MS = 1000 * 60 * 5; // 5min

struct partition_progress
{
// Last committed decree collected from the primary replica of each partition.
// Not persisted to remote meta storage.
int64_t last_committed_decree{invalid_decree};

int64_t volatile_decree{invalid_decree};
int64_t stored_decree{invalid_decree};

bool is_altering{false};
uint64_t last_progress_update_ms{0};
bool is_inited{false};
Expand Down
38 changes: 22 additions & 16 deletions src/meta/duplication/meta_duplication_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -791,31 +791,36 @@ void meta_duplication_service::do_update_partition_confirmed(
int32_t partition_idx,
const duplication_confirm_entry &confirm_entry)
{
if (dup->alter_progress(partition_idx, confirm_entry)) {
std::string path = get_partition_path(dup, std::to_string(partition_idx));
blob value = blob::create_from_bytes(std::to_string(confirm_entry.confirmed_decree));
if (!dup->alter_progress(partition_idx, confirm_entry)) {
return;
}

const auto &path = get_partition_path(dup, std::to_string(partition_idx));

_meta_svc->get_meta_storage()->get_data(
path, [dup, rpc, partition_idx, confirm_entry, path, this](const blob &data) mutable {
auto value = blob::create_from_bytes(std::to_string(confirm_entry.confirmed_decree));

_meta_svc->get_meta_storage()->get_data(std::string(path), [=](const blob &data) mutable {
if (data.length() == 0) {
if (data.empty()) {
_meta_svc->get_meta_storage()->create_node(
std::string(path), std::move(value), [=]() mutable {
dup->persist_progress(partition_idx);
rpc.response().dup_map[dup->app_id][dup->id].progress[partition_idx] =
confirm_entry.confirmed_decree;
});
} else {
_meta_svc->get_meta_storage()->set_data(
std::string(path), std::move(value), [=]() mutable {
path, std::move(value), [dup, rpc, partition_idx, confirm_entry]() mutable {
dup->persist_progress(partition_idx);
rpc.response().dup_map[dup->app_id][dup->id].progress[partition_idx] =
confirm_entry.confirmed_decree;
});
return;
}

_meta_svc->get_meta_storage()->set_data(
path, std::move(value), [dup, rpc, partition_idx, confirm_entry]() mutable {
dup->persist_progress(partition_idx);
rpc.response().dup_map[dup->app_id][dup->id].progress[partition_idx] =
confirm_entry.confirmed_decree;
});

// duplication_sync_rpc will finally be replied when confirmed points
// of all partitions are stored.
});
}
}

std::shared_ptr<duplication_info>
Expand Down Expand Up @@ -908,7 +913,7 @@ void meta_duplication_service::do_restore_duplication_progress(
std::move(partition_path), [dup, partition_idx](const blob &value) {
// value is confirmed_decree encoded in string.

if (value.size() == 0) {
if (value.empty()) {
// not found
dup->init_progress(partition_idx, invalid_decree);
return;
Expand Down Expand Up @@ -953,10 +958,11 @@ void meta_duplication_service::do_restore_duplication(dupid_t dup_id,
app->max_replica_count,
store_path,
json);
if (nullptr == dup) {
if (!dup) {
LOG_ERROR("failed to decode json \"{}\" on path {}", json, store_path);
return; // fail fast
}

if (!dup->is_invalid_status()) {
app->duplications[dup->id] = dup;
refresh_duplicating_no_lock(app);
Expand Down
20 changes: 19 additions & 1 deletion src/meta/meta_state_service_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
#include <functional>
#include <queue>
#include <string>
#include <utility>
#include <vector>

#include "utils/blob.h"

namespace dsn {
class blob;
class task_tracker;

namespace dist {
class meta_state_service;
} // namespace dist
Expand Down Expand Up @@ -57,16 +60,31 @@ struct meta_storage

void create_node(std::string &&node, blob &&value, std::function<void()> &&cb);

void create_node(const std::string &node, blob &&value, std::function<void()> &&cb)
{
create_node(std::string(node), std::move(value), std::move(cb));
}

void delete_node_recursively(std::string &&node, std::function<void()> &&cb);

void delete_node(std::string &&node, std::function<void()> &&cb);

/// Will fatal if node doesn't exists.
void set_data(std::string &&node, blob &&value, std::function<void()> &&cb);

void set_data(const std::string &node, blob &&value, std::function<void()> &&cb)
{
set_data(std::string(node), std::move(value), std::move(cb));
}

/// If node does not exist, cb will receive an empty blob.
void get_data(std::string &&node, std::function<void(const blob &)> &&cb);

void get_data(const std::string &node, std::function<void(const blob &)> &&cb)
{
get_data(std::string(node), std::move(cb));
}

/// \param cb: void (bool node_exists, const std::vector<std::string> &children)
/// `children` contains the name (not full path) of children nodes.
/// `node_exists` indicates whether this node exists.
Expand Down
9 changes: 5 additions & 4 deletions src/meta/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1128,9 +1128,10 @@ void server_state::create_app(dsn::message_ex *msg)
request.options.partition_count,
request.options.replica_count,
duplicating
? fmt::format("{}.{}",
request.options.envs[duplication_constants::kEnvMasterClusterKey],
request.app_name)
? fmt::format("master_cluster_name={}, master_app_name={}",
master_cluster->second,
gutil::FindWithDefault(request.options.envs,
duplication_constants::kEnvMasterAppNameKey))
: "false");

auto option_match_check = [](const create_app_options &opt, const app_state &exist_app) {
Expand Down Expand Up @@ -1162,7 +1163,7 @@ void server_state::create_app(dsn::message_ex *msg)
zauto_write_lock l(_lock);

auto app = get_app(request.app_name);
if (nullptr != app) {
if (app) {
configuration_create_app_response response;

switch (app->status) {
Expand Down
Loading
Loading