From 840e94630d01818696538dbd2f28441702c1ccc9 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 27 Dec 2023 19:08:54 +0900 Subject: [PATCH] feat: implement param parsing of `SubmitRegionMigrationTaskHandler` (#3015) * feat: implement param parsing of `SubmitMigrationTaskHandler` * chore: apply suggestions from CR * refactor: change `SubmitRegionMigrationTaskParams` to `SubmitRegionMigrationTaskRequest` --- .../src/handler/keep_lease_handler.rs | 1 + src/meta-srv/src/service/admin.rs | 3 + .../src/service/admin/region_migration.rs | 207 ++++++++++++++++++ 3 files changed, 211 insertions(+) create mode 100644 src/meta-srv/src/service/admin/region_migration.rs diff --git a/src/meta-srv/src/handler/keep_lease_handler.rs b/src/meta-srv/src/handler/keep_lease_handler.rs index a3b332f00f2a..64dfeff698f2 100644 --- a/src/meta-srv/src/handler/keep_lease_handler.rs +++ b/src/meta-srv/src/handler/keep_lease_handler.rs @@ -22,6 +22,7 @@ use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler}; use crate::keys::{LeaseKey, LeaseValue}; use crate::metasrv::Context; +/// Keeps [Datanode] leases pub struct KeepLeaseHandler; #[async_trait::async_trait] diff --git a/src/meta-srv/src/service/admin.rs b/src/meta-srv/src/service/admin.rs index a5867e376924..64965571b270 100644 --- a/src/meta-srv/src/service/admin.rs +++ b/src/meta-srv/src/service/admin.rs @@ -16,7 +16,10 @@ mod health; mod heartbeat; mod leader; mod meta; +// TODO(weny): removes it. mod node_lease; +#[allow(dead_code)] +mod region_migration; mod route; mod util; diff --git a/src/meta-srv/src/service/admin/region_migration.rs b/src/meta-srv/src/service/admin/region_migration.rs new file mode 100644 index 000000000000..0cc1457f5019 --- /dev/null +++ b/src/meta-srv/src/service/admin/region_migration.rs @@ -0,0 +1,207 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::num::ParseIntError; +use std::str::FromStr; + +use common_meta::peer::Peer; +use common_meta::ClusterId; +use serde::Serialize; +use snafu::ResultExt; +use store_api::storage::RegionId; +use tonic::codegen::http; + +use super::HttpHandler; +use crate::error::{self, Error, Result}; + +pub trait PeerLookup: Send + Sync { + fn peer(&self, peer_id: u64) -> Option; +} + +/// The handler of submitting migration task. +pub struct SubmitRegionMigrationTaskHandler { + // TODO(weny): waits for https://github.com/GreptimeTeam/greptimedb/pull/3014 +} + +#[derive(Debug, Clone, PartialEq, Eq)] +struct SubmitRegionMigrationTaskRequest { + cluster_id: ClusterId, + region_id: RegionId, + from_peer_id: u64, + to_peer_id: u64, +} + +#[derive(Debug, Serialize)] +struct SubmitRegionMigrationTaskResponse { + procedure_id: String, +} + +fn parse_num_parameter_with_default( + key: &str, + params: &HashMap, + default_fn: F, +) -> Result +where + F: Fn(&str) -> Result, + T: FromStr, +{ + let parse_result = if let Some(id) = params.get(key) { + id.parse::().context(error::ParseNumSnafu { + err_msg: format!("invalid {key}: {id}"), + })? + } else { + default_fn(key)? + }; + + Ok(parse_result) +} + +impl TryFrom<&HashMap> for SubmitRegionMigrationTaskRequest { + type Error = Error; + + fn try_from(params: &HashMap) -> Result { + let cluster_id = parse_num_parameter_with_default("cluster_id", params, |_| Ok(0))?; + + let region_id: u64 = parse_num_parameter_with_default("region_id", params, |key| { + error::MissingRequiredParameterSnafu { param: key }.fail() + })?; + + let from_peer_id: u64 = parse_num_parameter_with_default("from_peer_id", params, |key| { + error::MissingRequiredParameterSnafu { param: key }.fail() + })?; + + let to_peer_id: u64 = parse_num_parameter_with_default("to_peer_id", params, |key| { + error::MissingRequiredParameterSnafu { param: key }.fail() + })?; + + Ok(SubmitRegionMigrationTaskRequest { + cluster_id, + region_id: RegionId::from_u64(region_id), + from_peer_id, + to_peer_id, + }) + } +} + +impl SubmitRegionMigrationTaskHandler { + /// Submits a region migration task, returns the procedure id. + async fn handle_submit( + &self, + _task: SubmitRegionMigrationTaskRequest, + ) -> Result { + // TODO(weny): waits for https://github.com/GreptimeTeam/greptimedb/pull/3014 + todo!() + } +} + +#[async_trait::async_trait] +impl HttpHandler for SubmitRegionMigrationTaskHandler { + async fn handle( + &self, + _: &str, + params: &HashMap, + ) -> Result> { + let request = SubmitRegionMigrationTaskRequest::try_from(params)?; + + let response = self.handle_submit(request).await?; + + http::Response::builder() + .status(http::StatusCode::OK) + .body(serde_json::to_string(&response).with_context(|_| { + error::SerializeToJsonSnafu { + input: format!("{response:?}"), + } + })?) + .context(error::InvalidHttpBodySnafu) + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + use std::collections::HashMap; + + use crate::error; + + #[test] + fn test_parse_migration_task_req() { + use store_api::storage::RegionId; + + use crate::service::admin::region_migration::SubmitRegionMigrationTaskRequest; + + let params = HashMap::from([ + ("cluster_id".to_string(), "10".to_string()), + ( + "region_id".to_string(), + RegionId::new(1024, 1).as_u64().to_string(), + ), + ("from_peer_id".to_string(), "1".to_string()), + ("to_peer_id".to_string(), "2".to_string()), + ]); + + let task_req = SubmitRegionMigrationTaskRequest::try_from(¶ms).unwrap(); + + assert_eq!( + SubmitRegionMigrationTaskRequest { + cluster_id: 10, + region_id: RegionId::new(1024, 1), + from_peer_id: 1, + to_peer_id: 2, + }, + task_req + ); + + let params = HashMap::from([ + ( + "region_id".to_string(), + RegionId::new(1024, 1).as_u64().to_string(), + ), + ("from_peer_id".to_string(), "1".to_string()), + ("to_peer_id".to_string(), "2".to_string()), + ]); + + let task_req = SubmitRegionMigrationTaskRequest::try_from(¶ms).unwrap(); + + assert_eq!( + SubmitRegionMigrationTaskRequest { + cluster_id: 0, + region_id: RegionId::new(1024, 1), + from_peer_id: 1, + to_peer_id: 2, + }, + task_req + ); + + let required_fields = [ + ( + "region_id".to_string(), + RegionId::new(1024, 1).as_u64().to_string(), + ), + ("from_peer_id".to_string(), "1".to_string()), + ("to_peer_id".to_string(), "2".to_string()), + ]; + + for i in 0..required_fields.len() { + let params = required_fields[..i] + .iter() + .cloned() + .collect::>(); + + let err = SubmitRegionMigrationTaskRequest::try_from(¶ms).unwrap_err(); + assert_matches!(err, error::Error::MissingRequiredParameter { .. }); + assert!(err.to_string().contains(&required_fields[i].0)); + } + } +}