Skip to content

Commit

Permalink
feat: implement param parsing of SubmitMigrationTaskHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Dec 27, 2023
1 parent 718447c commit b32f322
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/meta-srv/src/handler/keep_lease_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::keys::{LeaseKey, LeaseValue};
use crate::metasrv::Context;

/// Keeps [Datanode] leases
pub struct KeepLeaseHandler;

#[async_trait::async_trait]
Expand Down
3 changes: 3 additions & 0 deletions src/meta-srv/src/service/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ mod health;
mod heartbeat;
mod leader;
mod meta;
// TODO(weny): removes it.
mod node_lease;
#[allow(dead_code)]
mod region_migration;
mod route;
mod util;

Expand Down
203 changes: 203 additions & 0 deletions src/meta-srv/src/service/admin/region_migration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::num::ParseIntError;
use std::str::FromStr;

use common_meta::peer::Peer;
use common_meta::ClusterId;
use serde::Serialize;
use snafu::ResultExt;
use store_api::storage::RegionId;
use tonic::codegen::http;

use super::HttpHandler;
use crate::error::{self, Error, Result};

pub trait PeerLookup: Send + Sync {
fn peer(&self, peer_id: u64) -> Option<Peer>;
}

/// The handler of submitting migration task.
pub struct SubmitRegionMigrationTaskHandler {
// TODO(weny): waits for https://github.com/GreptimeTeam/greptimedb/pull/3014
}

#[derive(Debug, Clone, PartialEq, Eq)]
struct SubmitRegionMigrationTaskParams {
cluster_id: ClusterId,
region_id: RegionId,
from_peer_id: u64,
to_peer_id: u64,
}

#[derive(Debug, Serialize)]
struct SubmitRegionMigrationTaskResponse {
procedure_id: String,
}

fn parse_num_parameter_with_default<T, F>(
key: &str,
params: &HashMap<String, String>,
default_fn: F,
) -> Result<T>
where
F: Fn(&str) -> Result<T>,
T: FromStr<Err = ParseIntError>,
{
let parse_result = if let Some(id) = params.get(key) {
id.parse::<T>().context(error::ParseNumSnafu {
err_msg: format!("invalid {key}: {id}"),
})?
} else {
default_fn(key)?
};

Ok(parse_result)
}

impl TryFrom<&HashMap<String, String>> for SubmitRegionMigrationTaskParams {
type Error = Error;

fn try_from(params: &HashMap<String, String>) -> Result<Self> {
let cluster_id = parse_num_parameter_with_default("cluster_id", params, |_| Ok(0))?;

let region_id: u64 = parse_num_parameter_with_default("region_id", params, |key| {
error::MissingRequiredParameterSnafu { param: key }.fail()
})?;

let from_peer_id: u64 = parse_num_parameter_with_default("from_peer_id", params, |key| {
error::MissingRequiredParameterSnafu { param: key }.fail()
})?;

let to_peer_id: u64 = parse_num_parameter_with_default("to_peer_id", params, |key| {
error::MissingRequiredParameterSnafu { param: key }.fail()
})?;

Ok(SubmitRegionMigrationTaskParams {
cluster_id,
region_id: RegionId::from_u64(region_id),
from_peer_id,
to_peer_id,
})
}
}

impl SubmitRegionMigrationTaskHandler {
/// Submits a region migration task, returns the procedure id.
async fn handle_submit(
&self,
_task: SubmitRegionMigrationTaskParams,
) -> Result<SubmitRegionMigrationTaskResponse> {
// TODO(weny): waits for https://github.com/GreptimeTeam/greptimedb/pull/3014
todo!()
}
}

#[async_trait::async_trait]
impl HttpHandler for SubmitRegionMigrationTaskHandler {
async fn handle(
&self,
_: &str,
params: &HashMap<String, String>,
) -> Result<http::Response<String>> {
let params = SubmitRegionMigrationTaskParams::try_from(params)?;

let response = self.handle_submit(params).await?;

http::Response::builder()
.status(http::StatusCode::OK)
.body(serde_json::to_string(&response).unwrap())
.context(error::InvalidHttpBodySnafu)
}
}

#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::collections::HashMap;

use crate::error;

#[test]
fn test_parse_migration_task_params() {
use store_api::storage::RegionId;

use crate::service::admin::region_migration::SubmitRegionMigrationTaskParams;

let params = HashMap::from([
("cluster_id".to_string(), "10".to_string()),
(
"region_id".to_string(),
RegionId::new(1024, 1).as_u64().to_string(),
),
("from_peer_id".to_string(), "1".to_string()),
("to_peer_id".to_string(), "2".to_string()),
]);

let task_params = SubmitRegionMigrationTaskParams::try_from(&params).unwrap();

assert_eq!(
SubmitRegionMigrationTaskParams {
cluster_id: 10,
region_id: RegionId::new(1024, 1),
from_peer_id: 1,
to_peer_id: 2,
},
task_params
);

let params = HashMap::from([
(
"region_id".to_string(),
RegionId::new(1024, 1).as_u64().to_string(),
),
("from_peer_id".to_string(), "1".to_string()),
("to_peer_id".to_string(), "2".to_string()),
]);

let task_params = SubmitRegionMigrationTaskParams::try_from(&params).unwrap();

assert_eq!(
SubmitRegionMigrationTaskParams {
cluster_id: 0,
region_id: RegionId::new(1024, 1),
from_peer_id: 1,
to_peer_id: 2,
},
task_params
);

let required_fields = [
(
"region_id".to_string(),
RegionId::new(1024, 1).as_u64().to_string(),
),
("from_peer_id".to_string(), "1".to_string()),
("to_peer_id".to_string(), "2".to_string()),
];

for i in 0..required_fields.len() {
let params = required_fields[..i]
.iter()
.cloned()
.collect::<HashMap<_, _>>();

let err = SubmitRegionMigrationTaskParams::try_from(&params).unwrap_err();
assert_matches!(err, error::Error::MissingRequiredParameter { .. });
assert!(err.to_string().contains(&required_fields[i].0));
}
}
}

0 comments on commit b32f322

Please sign in to comment.