From cba16ed58927c21e73c689e1aa062ce6a92da7e5 Mon Sep 17 00:00:00 2001 From: HeYuchen Date: Mon, 15 Nov 2021 11:13:18 +0800 Subject: [PATCH] feat(bulk_load): add unhealthy partition check (#964) --- src/meta/meta_bulk_load_service.cpp | 113 ++++++++++++------ src/meta/meta_bulk_load_service.h | 10 ++ src/meta/test/meta_bulk_load_service_test.cpp | 102 ++++++++++++++++ 3 files changed, 191 insertions(+), 34 deletions(-) diff --git a/src/meta/meta_bulk_load_service.cpp b/src/meta/meta_bulk_load_service.cpp index 8fb1e60868..a2b842d4ab 100644 --- a/src/meta/meta_bulk_load_service.cpp +++ b/src/meta/meta_bulk_load_service.cpp @@ -283,41 +283,92 @@ void bulk_load_service::create_partition_bulk_load_dir(const std::string &app_na } // ThreadPool: THREAD_POOL_META_STATE -void bulk_load_service::partition_bulk_load(const std::string &app_name, const gpid &pid) +bool bulk_load_service::check_partition_status( + const std::string &app_name, + const gpid &pid, + bool always_unhealthy_check, + const std::function &retry_function, + /*out*/ partition_configuration &pconfig) { - FAIL_POINT_INJECT_F("meta_bulk_load_partition_bulk_load", [](dsn::string_view) {}); - std::shared_ptr app = get_app(pid.get_app_id()); if (app == nullptr || app->status != app_status::AS_AVAILABLE) { dwarn_f( "app(name={}, id={}) is not existed, set bulk load failed", app_name, pid.get_app_id()); handle_app_unavailable(pid.get_app_id(), app_name); - return; + return false; } - rpc_address primary_addr = app->partitions[pid.get_partition_index()].primary; - if (primary_addr.is_invalid()) { + pconfig = app->partitions[pid.get_partition_index()]; + if (pconfig.primary.is_invalid()) { dwarn_f("app({}) partition({}) primary is invalid, try it later", app_name, pid); tasking::enqueue(LPC_META_STATE_NORMAL, _meta_svc->tracker(), - std::bind(&bulk_load_service::partition_bulk_load, this, app_name, pid), + [this, retry_function, app_name, pid]() { retry_function(app_name, pid); }, + 0, + std::chrono::seconds(1)); + return false; + } + + if (pconfig.secondaries.size() < pconfig.max_replica_count - 1) { + bulk_load_status::type p_status; + { + zauto_read_lock l(_lock); + p_status = get_partition_bulk_load_status_unlocked(pid); + } + // rollback to downloading, pause,cancel,failed bulk load should always send to replica + // server + if (!always_unhealthy_check && (p_status == bulk_load_status::BLS_DOWNLOADING || + p_status == bulk_load_status::BLS_PAUSING || + p_status == bulk_load_status::BLS_CANCELED || + p_status == bulk_load_status::BLS_FAILED)) { + return true; + } + dwarn_f("app({}) partition({}) is unhealthy, status({}), try it later", + app_name, + pid, + dsn::enum_to_string(p_status)); + tasking::enqueue(LPC_META_STATE_NORMAL, + _meta_svc->tracker(), + [this, retry_function, app_name, pid]() { retry_function(app_name, pid); }, 0, std::chrono::seconds(1)); + return false; + } + return true; +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::partition_bulk_load(const std::string &app_name, const gpid &pid) +{ + FAIL_POINT_INJECT_F("meta_bulk_load_partition_bulk_load", [](dsn::string_view) {}); + + partition_configuration pconfig; + if (!check_partition_status(app_name, + pid, + false, + std::bind(&bulk_load_service::partition_bulk_load, + this, + std::placeholders::_1, + std::placeholders::_2), + pconfig)) { return; } - zauto_read_lock l(_lock); - const app_bulk_load_info &ainfo = _app_bulk_load_info[pid.get_app_id()]; + rpc_address primary_addr = pconfig.primary; auto req = make_unique(); - req->pid = pid; - req->app_name = app_name; - req->primary_addr = primary_addr; - req->remote_provider_name = ainfo.file_provider_type; - req->cluster_name = ainfo.cluster_name; - req->meta_bulk_load_status = get_partition_bulk_load_status_unlocked(pid); - req->ballot = app->partitions[pid.get_partition_index()].ballot; - req->query_bulk_load_metadata = is_partition_metadata_not_updated_unlocked(pid); - req->remote_root_path = ainfo.remote_root_path; + { + zauto_read_lock l(_lock); + const app_bulk_load_info &ainfo = _app_bulk_load_info[pid.get_app_id()]; + req->pid = pid; + req->app_name = app_name; + req->primary_addr = primary_addr; + req->remote_provider_name = ainfo.file_provider_type; + req->cluster_name = ainfo.cluster_name; + req->meta_bulk_load_status = get_partition_bulk_load_status_unlocked(pid); + req->ballot = pconfig.ballot; + req->query_bulk_load_metadata = is_partition_metadata_not_updated_unlocked(pid); + req->remote_root_path = ainfo.remote_root_path; + } ddebug_f("send bulk load request to node({}), app({}), partition({}), partition " "status = {}, remote provider = {}, cluster_name = {}, remote_root_path = {}", @@ -1046,22 +1097,15 @@ void bulk_load_service::partition_ingestion(const std::string &app_name, const g return; } - std::shared_ptr app = get_app(pid.get_app_id()); - if (app == nullptr || app->status != app_status::AS_AVAILABLE) { - dwarn_f( - "app(name={}, id={}) is not existed, set bulk load failed", app_name, pid.get_app_id()); - handle_app_unavailable(pid.get_app_id(), app_name); - return; - } - - rpc_address primary_addr = app->partitions[pid.get_partition_index()].primary; - if (primary_addr.is_invalid()) { - dwarn_f("app({}) partition({}) primary is invalid, try it later", app_name, pid); - tasking::enqueue(LPC_META_STATE_NORMAL, - _meta_svc->tracker(), - std::bind(&bulk_load_service::partition_ingestion, this, app_name, pid), - pid.thread_hash(), - std::chrono::seconds(1)); + partition_configuration pconfig; + if (!check_partition_status(app_name, + pid, + true, + std::bind(&bulk_load_service::partition_ingestion, + this, + std::placeholders::_1, + std::placeholders::_2), + pconfig)) { return; } @@ -1073,6 +1117,7 @@ void bulk_load_service::partition_ingestion(const std::string &app_name, const g return; } + rpc_address primary_addr = pconfig.primary; tasking::enqueue( LPC_BULK_LOAD_INGESTION, _meta_svc->tracker(), diff --git a/src/meta/meta_bulk_load_service.h b/src/meta/meta_bulk_load_service.h index 0b889bb12f..1a7f70101d 100644 --- a/src/meta/meta_bulk_load_service.h +++ b/src/meta/meta_bulk_load_service.h @@ -140,6 +140,16 @@ class bulk_load_service void do_start_app_bulk_load(std::shared_ptr app, start_bulk_load_rpc rpc); + // Called by `partition_bulk_load` and `partition_ingestion` + // check partition status before sending partition_bulk_load_request and + // partition_ingestion_request + bool check_partition_status( + const std::string &app_name, + const gpid &pid, + bool always_unhealthy_check, + const std::function &retry_function, + /*out*/ partition_configuration &pconfig); + void partition_bulk_load(const std::string &app_name, const gpid &pid); void on_partition_bulk_load_reply(error_code err, diff --git a/src/meta/test/meta_bulk_load_service_test.cpp b/src/meta/test/meta_bulk_load_service_test.cpp index 9d1bc8e81d..e8bce1d102 100644 --- a/src/meta/test/meta_bulk_load_service_test.cpp +++ b/src/meta/test/meta_bulk_load_service_test.cpp @@ -101,6 +101,51 @@ class bulk_load_service_test : public meta_test_base } } + void mock_partition_bulk_load(const std::string &app_name, const gpid &pid) + { + ddebug_f("mock function, app({}), pid({})", app_name, pid); + } + + gpid before_check_partition_status(bulk_load_status::type status) + { + std::shared_ptr app = find_app(APP_NAME); + partition_configuration config; + config.pid = gpid(app->app_id, 0); + config.max_replica_count = 3; + config.ballot = BALLOT; + config.primary = rpc_address("127.0.0.1", 10086); + config.secondaries.emplace_back(rpc_address("127.0.0.1", 10085)); + config.secondaries.emplace_back(rpc_address("127.0.0.1", 10087)); + app->partitions.clear(); + app->partitions.emplace_back(config); + mock_meta_bulk_load_context(app->app_id, app->partition_count, status); + return config.pid; + } + + bool check_partition_status(const std::string name, + bool mock_primary_invalid, + bool mock_lack_secondary, + gpid pid, + bool always_unhealthy_check) + { + std::shared_ptr app = find_app(name); + if (mock_primary_invalid) { + app->partitions[pid.get_partition_index()].primary.set_invalid(); + } + if (mock_lack_secondary) { + app->partitions[pid.get_partition_index()].secondaries.clear(); + } + partition_configuration pconfig; + bool flag = bulk_svc().check_partition_status( + name, + pid, + always_unhealthy_check, + std::bind(&bulk_load_service_test::mock_partition_bulk_load, this, name, pid), + pconfig); + wait_all(); + return flag; + } + void on_partition_bulk_load_reply(error_code err, const bulk_load_request &request, const bulk_load_response &response) @@ -359,6 +404,63 @@ TEST_F(bulk_load_service_test, start_bulk_load_succeed) fail::teardown(); } +/// check partition status unit tests +TEST_F(bulk_load_service_test, check_partition_status_app_wrong_test) +{ + std::string table_name = "dropped_table"; + create_app(table_name); + fail::setup(); + fail::cfg("meta_check_bulk_load_request_params", "return()"); + fail::cfg("meta_bulk_load_partition_bulk_load", "return()"); + fail::cfg("meta_bulk_load_resend_request", "return()"); + auto resp = start_bulk_load(table_name); + ASSERT_EQ(resp.err, ERR_OK); + std::shared_ptr app = find_app(table_name); + app->status = app_status::AS_DROPPED; + ASSERT_FALSE(check_partition_status(table_name, false, false, gpid(app->app_id, 0), false)); + ASSERT_TRUE(is_app_bulk_load_states_reset(app->app_id)); +} + +TEST_F(bulk_load_service_test, check_partition_status_test) +{ + create_app(APP_NAME); + struct status_test + { + bulk_load_status::type status; + bool always_check; + bool mock_primary_invalid; + bool mock_lack_secondary; + bool expected_val; + } tests[] = { + // mock primary invalid + {bulk_load_status::BLS_DOWNLOADING, false, true, false, false}, + // mock secondary invalid with always_check=false + {bulk_load_status::BLS_DOWNLOADING, false, false, true, true}, + {bulk_load_status::BLS_DOWNLOADED, false, false, true, false}, + {bulk_load_status::BLS_INGESTING, false, false, true, false}, + {bulk_load_status::BLS_SUCCEED, false, false, true, false}, + {bulk_load_status::BLS_PAUSING, false, false, true, true}, + {bulk_load_status::BLS_PAUSED, false, false, true, false}, + {bulk_load_status::BLS_CANCELED, false, false, true, true}, + {bulk_load_status::BLS_FAILED, false, false, true, true}, + {bulk_load_status::BLS_INVALID, false, false, true, false}, + // mock secondary invalid with always_check=true + {bulk_load_status::BLS_INGESTING, true, false, true, false}, + // normal case + {bulk_load_status::BLS_INGESTING, false, false, false, true}, + }; + for (auto test : tests) { + auto pid = before_check_partition_status(test.status); + ASSERT_EQ(check_partition_status(APP_NAME, + test.mock_primary_invalid, + test.mock_lack_secondary, + pid, + test.always_check), + test.expected_val); + } + drop_app(APP_NAME); +} + /// control bulk load unit tests TEST_F(bulk_load_service_test, control_bulk_load_test) {