diff --git a/src/meta-srv/src/service/admin.rs b/src/meta-srv/src/service/admin.rs index 3cd1a229384d..f60461fa7c54 100644 --- a/src/meta-srv/src/service/admin.rs +++ b/src/meta-srv/src/service/admin.rs @@ -16,10 +16,7 @@ mod health; mod heartbeat; mod leader; mod maintenance; -mod meta; mod node_lease; -mod region_migration; -mod route; mod util; use std::collections::HashMap; @@ -50,34 +47,6 @@ pub fn make_admin_service(metasrv: Metasrv) -> Admin { .route("/heartbeat", handler.clone()) .route("/heartbeat/help", handler); - let router = router.route( - "/catalogs", - meta::CatalogsHandler { - table_metadata_manager: metasrv.table_metadata_manager().clone(), - }, - ); - - let handler = meta::SchemasHandler { - table_metadata_manager: metasrv.table_metadata_manager().clone(), - }; - let router = router - .route("/schemas", handler.clone()) - .route("/schemas/help", handler); - - let handler = meta::TablesHandler { - table_metadata_manager: metasrv.table_metadata_manager().clone(), - }; - let router = router - .route("/tables", handler.clone()) - .route("/tables/help", handler); - - let handler = meta::TableHandler { - table_metadata_manager: metasrv.table_metadata_manager().clone(), - }; - let router = router - .route("/table", handler.clone()) - .route("/table/help", handler); - let router = router.route( "/leader", leader::LeaderHandler { @@ -85,19 +54,6 @@ pub fn make_admin_service(metasrv: Metasrv) -> Admin { }, ); - let handler = route::RouteHandler { - table_metadata_manager: metasrv.table_metadata_manager().clone(), - }; - let router = router - .route("/route", handler.clone()) - .route("/route/help", handler); - - let handler = region_migration::SubmitRegionMigrationTaskHandler { - region_migration_manager: metasrv.region_migration_manager().clone(), - meta_peer_client: metasrv.meta_peer_client().clone(), - }; - let router = router.route("/region-migration", handler); - let router = router.route( "/maintenance", maintenance::MaintenanceHandler { diff --git a/src/meta-srv/src/service/admin/meta.rs b/src/meta-srv/src/service/admin/meta.rs deleted file mode 100644 index 2d3d09278f33..000000000000 --- a/src/meta-srv/src/service/admin/meta.rs +++ /dev/null @@ -1,239 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; - -use common_error::ext::BoxedError; -use common_meta::key::table_name::TableNameKey; -use common_meta::key::TableMetadataManagerRef; -use futures::TryStreamExt; -use snafu::ResultExt; -use store_api::storage::{RegionId, TableId}; -use tonic::codegen::http; - -use crate::error; -use crate::error::{ParseNumSnafu, Result, TableMetadataManagerSnafu}; -use crate::service::admin::{util, HttpHandler}; - -#[derive(Clone)] -pub struct CatalogsHandler { - pub table_metadata_manager: TableMetadataManagerRef, -} - -#[derive(Clone)] -pub struct SchemasHandler { - pub table_metadata_manager: TableMetadataManagerRef, -} - -#[derive(Clone)] -pub struct TablesHandler { - pub table_metadata_manager: TableMetadataManagerRef, -} - -#[derive(Clone)] -pub struct TableHandler { - pub table_metadata_manager: TableMetadataManagerRef, -} - -#[async_trait::async_trait] -impl HttpHandler for CatalogsHandler { - async fn handle( - &self, - _: &str, - _: http::Method, - _: &HashMap, - ) -> Result> { - let stream = self - .table_metadata_manager - .catalog_manager() - .catalog_names(); - - let keys = stream - .try_collect::>() - .await - .map_err(BoxedError::new) - .context(error::ListCatalogsSnafu)?; - - to_http_response(keys) - } -} - -#[async_trait::async_trait] -impl HttpHandler for SchemasHandler { - async fn handle( - &self, - path: &str, - _: http::Method, - params: &HashMap, - ) -> Result> { - if path.ends_with("/help") { - return util::to_text_response( - r#" - - GET /schemas?catalog=foo - "#, - ); - } - - let catalog = util::get_value(params, "catalog")?; - let stream = self - .table_metadata_manager - .schema_manager() - .schema_names(catalog); - - let keys = stream - .try_collect::>() - .await - .map_err(BoxedError::new) - .context(error::ListSchemasSnafu { catalog })?; - - to_http_response(keys) - } -} - -#[async_trait::async_trait] -impl HttpHandler for TablesHandler { - async fn handle( - &self, - path: &str, - _: http::Method, - params: &HashMap, - ) -> Result> { - if path.ends_with("/help") { - return util::to_text_response( - r#" - - GET /tables?catalog=foo&schema=bar - "#, - ); - } - - let catalog = util::get_value(params, "catalog")?; - let schema = util::get_value(params, "schema")?; - - let stream = self - .table_metadata_manager - .table_name_manager() - .tables(catalog, schema); - let tables = stream - .try_collect::>() - .await - .map_err(BoxedError::new) - .context(error::ListTablesSnafu { catalog, schema })? - .into_iter() - .map(|(k, _)| k) - .collect::>(); - - to_http_response(tables) - } -} - -#[async_trait::async_trait] -impl HttpHandler for TableHandler { - async fn handle( - &self, - path: &str, - _: http::Method, - params: &HashMap, - ) -> Result> { - if path.ends_with("/help") { - return util::to_text_response( - r#" - - GET /table?region_ids=1,2,3,4,5 - - GET /table?table_ids=1,2,3,4,5 - - GET /table?catalog=foo&schema=bar&table=baz - "#, - ); - } - - let table_ids = self.extract_table_ids(params).await?; - - let table_info_values = self - .table_metadata_manager - .table_info_manager() - .batch_get(&table_ids) - .await - .context(TableMetadataManagerSnafu)? - .into_iter() - .collect::>(); - - http::Response::builder() - .header("Content-Type", "application/json") - .status(http::StatusCode::OK) - // Safety: HashMap is definitely "serde-json"-able. - .body(serde_json::to_string(&table_info_values).unwrap()) - .context(error::InvalidHttpBodySnafu) - } -} - -impl TableHandler { - async fn extract_table_ids(&self, params: &HashMap) -> Result> { - if let Some(ids) = params.get("region_ids") { - let table_ids = ids - .split(',') - .map(|x| { - x.parse::() - .map(|y| RegionId::from_u64(y).table_id()) - .context(ParseNumSnafu { - err_msg: format!("invalid region id: {x}"), - }) - }) - .collect::>>()?; - - return Ok(table_ids); - } - - if let Some(ids) = params.get("table_ids") { - let table_ids = ids - .split(',') - .map(|x| { - x.parse::().context(ParseNumSnafu { - err_msg: format!("invalid table id: {x}"), - }) - }) - .collect::>>()?; - - return Ok(table_ids); - } - - let catalog = util::get_value(params, "catalog")?; - let schema = util::get_value(params, "schema")?; - let table = util::get_value(params, "table")?; - - let key = TableNameKey::new(catalog, schema, table); - - let table_id = self - .table_metadata_manager - .table_name_manager() - .get(key) - .await - .context(TableMetadataManagerSnafu)? - .map(|x| x.table_id()); - - if let Some(table_id) = table_id { - Ok(vec![table_id]) - } else { - Ok(vec![]) - } - } -} - -fn to_http_response(keys: Vec) -> Result> { - let body = serde_json::to_string(&keys).context(error::SerializeToJsonSnafu { - input: format!("{keys:?}"), - })?; - - http::Response::builder() - .status(http::StatusCode::OK) - .body(body) - .context(error::InvalidHttpBodySnafu) -} diff --git a/src/meta-srv/src/service/admin/region_migration.rs b/src/meta-srv/src/service/admin/region_migration.rs deleted file mode 100644 index e07bb7adb2bc..000000000000 --- a/src/meta-srv/src/service/admin/region_migration.rs +++ /dev/null @@ -1,278 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; -use std::num::ParseIntError; -use std::str::FromStr; -use std::time::Duration; - -use common_meta::peer::Peer; -use common_meta::{distributed_time_constants, ClusterId}; -use humantime::parse_duration; -use serde::Serialize; -use snafu::{ensure, OptionExt, ResultExt}; -use store_api::storage::RegionId; -use tonic::codegen::http; - -use super::HttpHandler; -use crate::cluster::MetaPeerClientRef; -use crate::error::{self, Error, Result}; -use crate::lease::lookup_alive_datanode_peer; -use crate::procedure::region_migration::manager::{ - RegionMigrationManagerRef, RegionMigrationProcedureTask, -}; - -/// The handler of submitting migration task. -pub struct SubmitRegionMigrationTaskHandler { - pub region_migration_manager: RegionMigrationManagerRef, - pub meta_peer_client: MetaPeerClientRef, -} - -#[derive(Debug, Clone, PartialEq, Eq)] -struct SubmitRegionMigrationTaskRequest { - cluster_id: ClusterId, - region_id: RegionId, - from_peer_id: u64, - to_peer_id: u64, - replay_timeout: Duration, -} - -#[derive(Debug, Serialize)] -struct SubmitRegionMigrationTaskResponse { - /// The `None` stands region has been migrated. - procedure_id: Option, -} - -fn parse_num_parameter_with_default( - key: &str, - params: &HashMap, - default_fn: F, -) -> Result -where - F: Fn(&str) -> Result, - T: FromStr, -{ - let parse_result = if let Some(id) = params.get(key) { - id.parse::().context(error::ParseNumSnafu { - err_msg: format!("invalid {key}: {id}"), - })? - } else { - default_fn(key)? - }; - - Ok(parse_result) -} - -const DEFAULT_REPLAY_TIMEOUT: Duration = Duration::from_millis(1000); - -impl TryFrom<&HashMap> for SubmitRegionMigrationTaskRequest { - type Error = Error; - - fn try_from(params: &HashMap) -> Result { - let cluster_id = parse_num_parameter_with_default("cluster_id", params, |_| Ok(0))?; - - let region_id: u64 = parse_num_parameter_with_default("region_id", params, |key| { - error::MissingRequiredParameterSnafu { param: key }.fail() - })?; - - let from_peer_id: u64 = parse_num_parameter_with_default("from_peer_id", params, |key| { - error::MissingRequiredParameterSnafu { param: key }.fail() - })?; - - let to_peer_id: u64 = parse_num_parameter_with_default("to_peer_id", params, |key| { - error::MissingRequiredParameterSnafu { param: key }.fail() - })?; - - let replay_timeout = if let Some(duration) = params.get("replay_timeout") { - parse_duration(duration).context(error::ParseDurationSnafu { duration })? - } else { - DEFAULT_REPLAY_TIMEOUT - }; - - Ok(SubmitRegionMigrationTaskRequest { - cluster_id, - region_id: RegionId::from_u64(region_id), - from_peer_id, - to_peer_id, - replay_timeout, - }) - } -} - -impl SubmitRegionMigrationTaskHandler { - fn is_leader(&self) -> bool { - self.meta_peer_client.is_leader() - } - - /// Checks the peer is available. - async fn lookup_peer(&self, cluster_id: ClusterId, peer_id: u64) -> Result> { - lookup_alive_datanode_peer( - cluster_id, - peer_id, - &self.meta_peer_client, - distributed_time_constants::DATANODE_LEASE_SECS, - ) - .await - } - - /// Submits a region migration task, returns the procedure id. - async fn handle_submit( - &self, - task: SubmitRegionMigrationTaskRequest, - ) -> Result { - ensure!( - self.is_leader(), - error::UnexpectedSnafu { - violated: "Trying to submit a region migration procedure to non-leader meta server" - } - ); - - let SubmitRegionMigrationTaskRequest { - cluster_id, - region_id, - from_peer_id, - to_peer_id, - replay_timeout, - } = task; - - let from_peer = self.lookup_peer(cluster_id, from_peer_id).await?.context( - error::PeerUnavailableSnafu { - peer_id: from_peer_id, - }, - )?; - let to_peer = self.lookup_peer(cluster_id, to_peer_id).await?.context( - error::PeerUnavailableSnafu { - peer_id: to_peer_id, - }, - )?; - let procedure_id = self - .region_migration_manager - .submit_procedure(RegionMigrationProcedureTask { - cluster_id, - region_id, - from_peer, - to_peer, - replay_timeout, - }) - .await?; - - Ok(SubmitRegionMigrationTaskResponse { - procedure_id: procedure_id.map(|id| id.to_string()), - }) - } -} - -#[async_trait::async_trait] -impl HttpHandler for SubmitRegionMigrationTaskHandler { - async fn handle( - &self, - _: &str, - _: http::Method, - params: &HashMap, - ) -> Result> { - let request = SubmitRegionMigrationTaskRequest::try_from(params)?; - - let response = self.handle_submit(request).await?; - - http::Response::builder() - .status(http::StatusCode::OK) - .body(serde_json::to_string(&response).with_context(|_| { - error::SerializeToJsonSnafu { - input: format!("{response:?}"), - } - })?) - .context(error::InvalidHttpBodySnafu) - } -} - -#[cfg(test)] -mod tests { - use std::assert_matches::assert_matches; - use std::collections::HashMap; - - use crate::error; - use crate::service::admin::region_migration::DEFAULT_REPLAY_TIMEOUT; - - #[test] - fn test_parse_migration_task_req() { - use store_api::storage::RegionId; - - use crate::service::admin::region_migration::SubmitRegionMigrationTaskRequest; - - let params = HashMap::from([ - ("cluster_id".to_string(), "10".to_string()), - ( - "region_id".to_string(), - RegionId::new(1024, 1).as_u64().to_string(), - ), - ("from_peer_id".to_string(), "1".to_string()), - ("to_peer_id".to_string(), "2".to_string()), - ]); - - let task_req = SubmitRegionMigrationTaskRequest::try_from(¶ms).unwrap(); - - assert_eq!( - SubmitRegionMigrationTaskRequest { - cluster_id: 10, - region_id: RegionId::new(1024, 1), - from_peer_id: 1, - to_peer_id: 2, - replay_timeout: DEFAULT_REPLAY_TIMEOUT - }, - task_req - ); - - let params = HashMap::from([ - ( - "region_id".to_string(), - RegionId::new(1024, 1).as_u64().to_string(), - ), - ("from_peer_id".to_string(), "1".to_string()), - ("to_peer_id".to_string(), "2".to_string()), - ]); - - let task_req = SubmitRegionMigrationTaskRequest::try_from(¶ms).unwrap(); - - assert_eq!( - SubmitRegionMigrationTaskRequest { - cluster_id: 0, - region_id: RegionId::new(1024, 1), - from_peer_id: 1, - to_peer_id: 2, - replay_timeout: DEFAULT_REPLAY_TIMEOUT - }, - task_req - ); - - let required_fields = [ - ( - "region_id".to_string(), - RegionId::new(1024, 1).as_u64().to_string(), - ), - ("from_peer_id".to_string(), "1".to_string()), - ("to_peer_id".to_string(), "2".to_string()), - ]; - - for i in 0..required_fields.len() { - let params = required_fields[..i] - .iter() - .cloned() - .collect::>(); - - let err = SubmitRegionMigrationTaskRequest::try_from(¶ms).unwrap_err(); - assert_matches!(err, error::Error::MissingRequiredParameter { .. }); - assert!(err.to_string().contains(&required_fields[i].0)); - } - } -} diff --git a/src/meta-srv/src/service/admin/route.rs b/src/meta-srv/src/service/admin/route.rs deleted file mode 100644 index cbc2dbe68746..000000000000 --- a/src/meta-srv/src/service/admin/route.rs +++ /dev/null @@ -1,110 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; - -use common_catalog::format_full_table_name; -use common_meta::key::table_name::TableNameKey; -use common_meta::key::TableMetadataManagerRef; -use snafu::{OptionExt, ResultExt}; -use store_api::storage::{RegionId, TableId}; -use tonic::codegen::http; - -use super::{util, HttpHandler}; -use crate::error; -use crate::error::{ - ParseNumSnafu, Result, TableMetadataManagerSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu, -}; - -#[derive(Clone)] -pub struct RouteHandler { - pub table_metadata_manager: TableMetadataManagerRef, -} - -#[async_trait::async_trait] -impl HttpHandler for RouteHandler { - async fn handle( - &self, - path: &str, - _: http::Method, - params: &HashMap, - ) -> Result> { - if path.ends_with("/help") { - return util::to_text_response( - r#" - - GET /table?region_id=123 - - GET /table?table_id=456 - - GET /table?catalog=foo&schema=bar&table=baz - "#, - ); - } - - let table_id = self.extract_table_id(params).await?; - - let table_route_value = self - .table_metadata_manager - .table_route_manager() - .table_route_storage() - .get(table_id) - .await - .context(TableMetadataManagerSnafu)? - .context(TableRouteNotFoundSnafu { table_id })?; - - http::Response::builder() - .status(http::StatusCode::OK) - .body(serde_json::to_string(&table_route_value).unwrap()) - .context(error::InvalidHttpBodySnafu) - } -} - -impl RouteHandler { - async fn extract_table_id(&self, params: &HashMap) -> Result { - if let Some(id) = params.get("region_id") { - let table_id = id - .parse::() - .map(|x| RegionId::from_u64(x).table_id()) - .context(ParseNumSnafu { - err_msg: format!("invalid region id: {id}"), - })?; - return Ok(table_id); - } - - if let Some(id) = params.get("table_id") { - let table_id = id.parse::().context(ParseNumSnafu { - err_msg: format!("invalid table id: {id}"), - })?; - - return Ok(table_id); - } - - let catalog = util::get_value(params, "catalog")?; - let schema = util::get_value(params, "schema")?; - let table = util::get_value(params, "table")?; - - let key = TableNameKey::new(catalog, schema, table); - - let table_id = self - .table_metadata_manager - .table_name_manager() - .get(key) - .await - .context(TableMetadataManagerSnafu)? - .map(|x| x.table_id()) - .context(TableNotFoundSnafu { - name: format_full_table_name(catalog, schema, table), - })?; - - Ok(table_id) - } -} diff --git a/src/meta-srv/src/service/admin/util.rs b/src/meta-srv/src/service/admin/util.rs index 828630a31425..0ea46f6702b3 100644 --- a/src/meta-srv/src/service/admin/util.rs +++ b/src/meta-srv/src/service/admin/util.rs @@ -31,12 +31,6 @@ pub fn extract_cluster_id(params: &HashMap) -> Result { }) } -pub fn get_value<'a>(params: &'a HashMap, key: &str) -> Result<&'a String> { - params - .get(key) - .context(error::MissingRequiredParameterSnafu { param: key }) -} - pub fn to_text_response(text: &str) -> Result> { http::Response::builder() .header("Content-Type", "text/plain")