Skip to content

Commit

Permalink
feat(bulk_load): add unhealthy partition check (#964)
Browse files Browse the repository at this point in the history
  • Loading branch information
hycdong authored Nov 15, 2021
1 parent 9546c46 commit cba16ed
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 34 deletions.
113 changes: 79 additions & 34 deletions src/meta/meta_bulk_load_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,41 +283,92 @@ void bulk_load_service::create_partition_bulk_load_dir(const std::string &app_na
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::partition_bulk_load(const std::string &app_name, const gpid &pid)
bool bulk_load_service::check_partition_status(
const std::string &app_name,
const gpid &pid,
bool always_unhealthy_check,
const std::function<void(const std::string &, const gpid &)> &retry_function,
/*out*/ partition_configuration &pconfig)
{
FAIL_POINT_INJECT_F("meta_bulk_load_partition_bulk_load", [](dsn::string_view) {});

std::shared_ptr<app_state> app = get_app(pid.get_app_id());
if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
dwarn_f(
"app(name={}, id={}) is not existed, set bulk load failed", app_name, pid.get_app_id());
handle_app_unavailable(pid.get_app_id(), app_name);
return;
return false;
}

rpc_address primary_addr = app->partitions[pid.get_partition_index()].primary;
if (primary_addr.is_invalid()) {
pconfig = app->partitions[pid.get_partition_index()];
if (pconfig.primary.is_invalid()) {
dwarn_f("app({}) partition({}) primary is invalid, try it later", app_name, pid);
tasking::enqueue(LPC_META_STATE_NORMAL,
_meta_svc->tracker(),
std::bind(&bulk_load_service::partition_bulk_load, this, app_name, pid),
[this, retry_function, app_name, pid]() { retry_function(app_name, pid); },
0,
std::chrono::seconds(1));
return false;
}

if (pconfig.secondaries.size() < pconfig.max_replica_count - 1) {
bulk_load_status::type p_status;
{
zauto_read_lock l(_lock);
p_status = get_partition_bulk_load_status_unlocked(pid);
}
// rollback to downloading, pause,cancel,failed bulk load should always send to replica
// server
if (!always_unhealthy_check && (p_status == bulk_load_status::BLS_DOWNLOADING ||
p_status == bulk_load_status::BLS_PAUSING ||
p_status == bulk_load_status::BLS_CANCELED ||
p_status == bulk_load_status::BLS_FAILED)) {
return true;
}
dwarn_f("app({}) partition({}) is unhealthy, status({}), try it later",
app_name,
pid,
dsn::enum_to_string(p_status));
tasking::enqueue(LPC_META_STATE_NORMAL,
_meta_svc->tracker(),
[this, retry_function, app_name, pid]() { retry_function(app_name, pid); },
0,
std::chrono::seconds(1));
return false;
}
return true;
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::partition_bulk_load(const std::string &app_name, const gpid &pid)
{
FAIL_POINT_INJECT_F("meta_bulk_load_partition_bulk_load", [](dsn::string_view) {});

partition_configuration pconfig;
if (!check_partition_status(app_name,
pid,
false,
std::bind(&bulk_load_service::partition_bulk_load,
this,
std::placeholders::_1,
std::placeholders::_2),
pconfig)) {
return;
}

zauto_read_lock l(_lock);
const app_bulk_load_info &ainfo = _app_bulk_load_info[pid.get_app_id()];
rpc_address primary_addr = pconfig.primary;
auto req = make_unique<bulk_load_request>();
req->pid = pid;
req->app_name = app_name;
req->primary_addr = primary_addr;
req->remote_provider_name = ainfo.file_provider_type;
req->cluster_name = ainfo.cluster_name;
req->meta_bulk_load_status = get_partition_bulk_load_status_unlocked(pid);
req->ballot = app->partitions[pid.get_partition_index()].ballot;
req->query_bulk_load_metadata = is_partition_metadata_not_updated_unlocked(pid);
req->remote_root_path = ainfo.remote_root_path;
{
zauto_read_lock l(_lock);
const app_bulk_load_info &ainfo = _app_bulk_load_info[pid.get_app_id()];
req->pid = pid;
req->app_name = app_name;
req->primary_addr = primary_addr;
req->remote_provider_name = ainfo.file_provider_type;
req->cluster_name = ainfo.cluster_name;
req->meta_bulk_load_status = get_partition_bulk_load_status_unlocked(pid);
req->ballot = pconfig.ballot;
req->query_bulk_load_metadata = is_partition_metadata_not_updated_unlocked(pid);
req->remote_root_path = ainfo.remote_root_path;
}

ddebug_f("send bulk load request to node({}), app({}), partition({}), partition "
"status = {}, remote provider = {}, cluster_name = {}, remote_root_path = {}",
Expand Down Expand Up @@ -1046,22 +1097,15 @@ void bulk_load_service::partition_ingestion(const std::string &app_name, const g
return;
}

std::shared_ptr<app_state> app = get_app(pid.get_app_id());
if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
dwarn_f(
"app(name={}, id={}) is not existed, set bulk load failed", app_name, pid.get_app_id());
handle_app_unavailable(pid.get_app_id(), app_name);
return;
}

rpc_address primary_addr = app->partitions[pid.get_partition_index()].primary;
if (primary_addr.is_invalid()) {
dwarn_f("app({}) partition({}) primary is invalid, try it later", app_name, pid);
tasking::enqueue(LPC_META_STATE_NORMAL,
_meta_svc->tracker(),
std::bind(&bulk_load_service::partition_ingestion, this, app_name, pid),
pid.thread_hash(),
std::chrono::seconds(1));
partition_configuration pconfig;
if (!check_partition_status(app_name,
pid,
true,
std::bind(&bulk_load_service::partition_ingestion,
this,
std::placeholders::_1,
std::placeholders::_2),
pconfig)) {
return;
}

Expand All @@ -1073,6 +1117,7 @@ void bulk_load_service::partition_ingestion(const std::string &app_name, const g
return;
}

rpc_address primary_addr = pconfig.primary;
tasking::enqueue(
LPC_BULK_LOAD_INGESTION,
_meta_svc->tracker(),
Expand Down
10 changes: 10 additions & 0 deletions src/meta/meta_bulk_load_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,16 @@ class bulk_load_service

void do_start_app_bulk_load(std::shared_ptr<app_state> app, start_bulk_load_rpc rpc);

// Called by `partition_bulk_load` and `partition_ingestion`
// check partition status before sending partition_bulk_load_request and
// partition_ingestion_request
bool check_partition_status(
const std::string &app_name,
const gpid &pid,
bool always_unhealthy_check,
const std::function<void(const std::string &, const gpid &)> &retry_function,
/*out*/ partition_configuration &pconfig);

void partition_bulk_load(const std::string &app_name, const gpid &pid);

void on_partition_bulk_load_reply(error_code err,
Expand Down
102 changes: 102 additions & 0 deletions src/meta/test/meta_bulk_load_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,51 @@ class bulk_load_service_test : public meta_test_base
}
}

void mock_partition_bulk_load(const std::string &app_name, const gpid &pid)
{
ddebug_f("mock function, app({}), pid({})", app_name, pid);
}

gpid before_check_partition_status(bulk_load_status::type status)
{
std::shared_ptr<app_state> app = find_app(APP_NAME);
partition_configuration config;
config.pid = gpid(app->app_id, 0);
config.max_replica_count = 3;
config.ballot = BALLOT;
config.primary = rpc_address("127.0.0.1", 10086);
config.secondaries.emplace_back(rpc_address("127.0.0.1", 10085));
config.secondaries.emplace_back(rpc_address("127.0.0.1", 10087));
app->partitions.clear();
app->partitions.emplace_back(config);
mock_meta_bulk_load_context(app->app_id, app->partition_count, status);
return config.pid;
}

bool check_partition_status(const std::string name,
bool mock_primary_invalid,
bool mock_lack_secondary,
gpid pid,
bool always_unhealthy_check)
{
std::shared_ptr<app_state> app = find_app(name);
if (mock_primary_invalid) {
app->partitions[pid.get_partition_index()].primary.set_invalid();
}
if (mock_lack_secondary) {
app->partitions[pid.get_partition_index()].secondaries.clear();
}
partition_configuration pconfig;
bool flag = bulk_svc().check_partition_status(
name,
pid,
always_unhealthy_check,
std::bind(&bulk_load_service_test::mock_partition_bulk_load, this, name, pid),
pconfig);
wait_all();
return flag;
}

void on_partition_bulk_load_reply(error_code err,
const bulk_load_request &request,
const bulk_load_response &response)
Expand Down Expand Up @@ -359,6 +404,63 @@ TEST_F(bulk_load_service_test, start_bulk_load_succeed)
fail::teardown();
}

/// check partition status unit tests
TEST_F(bulk_load_service_test, check_partition_status_app_wrong_test)
{
std::string table_name = "dropped_table";
create_app(table_name);
fail::setup();
fail::cfg("meta_check_bulk_load_request_params", "return()");
fail::cfg("meta_bulk_load_partition_bulk_load", "return()");
fail::cfg("meta_bulk_load_resend_request", "return()");
auto resp = start_bulk_load(table_name);
ASSERT_EQ(resp.err, ERR_OK);
std::shared_ptr<app_state> app = find_app(table_name);
app->status = app_status::AS_DROPPED;
ASSERT_FALSE(check_partition_status(table_name, false, false, gpid(app->app_id, 0), false));
ASSERT_TRUE(is_app_bulk_load_states_reset(app->app_id));
}

TEST_F(bulk_load_service_test, check_partition_status_test)
{
create_app(APP_NAME);
struct status_test
{
bulk_load_status::type status;
bool always_check;
bool mock_primary_invalid;
bool mock_lack_secondary;
bool expected_val;
} tests[] = {
// mock primary invalid
{bulk_load_status::BLS_DOWNLOADING, false, true, false, false},
// mock secondary invalid with always_check=false
{bulk_load_status::BLS_DOWNLOADING, false, false, true, true},
{bulk_load_status::BLS_DOWNLOADED, false, false, true, false},
{bulk_load_status::BLS_INGESTING, false, false, true, false},
{bulk_load_status::BLS_SUCCEED, false, false, true, false},
{bulk_load_status::BLS_PAUSING, false, false, true, true},
{bulk_load_status::BLS_PAUSED, false, false, true, false},
{bulk_load_status::BLS_CANCELED, false, false, true, true},
{bulk_load_status::BLS_FAILED, false, false, true, true},
{bulk_load_status::BLS_INVALID, false, false, true, false},
// mock secondary invalid with always_check=true
{bulk_load_status::BLS_INGESTING, true, false, true, false},
// normal case
{bulk_load_status::BLS_INGESTING, false, false, false, true},
};
for (auto test : tests) {
auto pid = before_check_partition_status(test.status);
ASSERT_EQ(check_partition_status(APP_NAME,
test.mock_primary_invalid,
test.mock_lack_secondary,
pid,
test.always_check),
test.expected_val);
}
drop_app(APP_NAME);
}

/// control bulk load unit tests
TEST_F(bulk_load_service_test, control_bulk_load_test)
{
Expand Down

0 comments on commit cba16ed

Please sign in to comment.