From 479ffe5a0fc479528f3508ac6cb1f18824842c0b Mon Sep 17 00:00:00 2001 From: JeremyHi Date: Thu, 2 Nov 2023 10:45:12 +0800 Subject: [PATCH] feat: query table meta by ids (#2675) * feat: add table meta by id * feat: add help for http api * chore: by comment * feat: display for LeaderChangeMessage --- src/common/meta/src/key/table_info.rs | 34 +++++ src/meta-srv/src/bootstrap.rs | 3 + src/meta-srv/src/election.rs | 22 +++ src/meta-srv/src/metasrv.rs | 5 +- src/meta-srv/src/service/admin.rs | 64 ++++---- src/meta-srv/src/service/admin/heartbeat.rs | 14 +- src/meta-srv/src/service/admin/meta.rs | 156 ++++++++++++-------- src/meta-srv/src/service/admin/route.rs | 84 ++++++++--- src/meta-srv/src/service/admin/util.rs | 21 ++- 9 files changed, 279 insertions(+), 124 deletions(-) diff --git a/src/common/meta/src/key/table_info.rs b/src/common/meta/src/key/table_info.rs index 3c5c982a1e6b..0d8bcbf4f347 100644 --- a/src/common/meta/src/key/table_info.rs +++ b/src/common/meta/src/key/table_info.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use serde::{Deserialize, Serialize}; use table::engine::TableReference; use table::metadata::{RawTableInfo, TableId}; @@ -21,6 +23,7 @@ use crate::error::Result; use crate::key::{to_removed_key, TableMetaKey}; use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse}; use crate::kv_backend::KvBackendRef; +use crate::rpc::store::BatchGetRequest; use crate::table_name::TableName; pub struct TableInfoKey { @@ -233,6 +236,37 @@ impl TableInfoManager { .map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value)) .transpose() } + + pub async fn batch_get( + &self, + table_ids: &[TableId], + ) -> Result> { + let lookup_table = table_ids + .iter() + .map(|id| (TableInfoKey::new(*id).as_raw_key(), id)) + .collect::>(); + + let resp = self + .kv_backend + .batch_get(BatchGetRequest { + keys: lookup_table.keys().cloned().collect::>(), + }) + .await?; + + let values = resp + .kvs + .iter() + .map(|kv| { + Ok(( + // Safety: must exist. + **lookup_table.get(kv.key()).unwrap(), + TableInfoValue::try_from_raw_value(&kv.value)?, + )) + }) + .collect::>>()?; + + Ok(values) + } } #[cfg(test)] diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index cb2f15af2d50..5b3470d8b94c 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -20,6 +20,7 @@ use api::v1::meta::heartbeat_server::HeartbeatServer; use api::v1::meta::lock_server::LockServer; use api::v1::meta::store_server::StoreServer; use common_base::Plugins; +use common_telemetry::info; use etcd_client::Client; use servers::configurator::ConfiguratorRef; use servers::http::{HttpServer, HttpServerBuilder}; @@ -134,6 +135,8 @@ pub async fn bootstrap_meta_srv_with_router( .await .context(error::TcpBindSnafu { addr: bind_addr })?; + info!("gRPC server is bound to: {bind_addr}"); + let incoming = TcpIncoming::from_listener(listener, true, None).context(error::TcpIncomingSnafu)?; diff --git a/src/meta-srv/src/election.rs b/src/meta-srv/src/election.rs index 5c2bc503e4a7..cdd434068c90 100644 --- a/src/meta-srv/src/election.rs +++ b/src/meta-srv/src/election.rs @@ -14,6 +14,7 @@ pub mod etcd; +use std::fmt; use std::sync::Arc; use etcd_client::LeaderKey; @@ -29,6 +30,27 @@ pub enum LeaderChangeMessage { StepDown(Arc), } +impl fmt::Display for LeaderChangeMessage { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let leader_key = match self { + LeaderChangeMessage::Elected(leader_key) => { + write!(f, "Elected(")?; + leader_key + } + LeaderChangeMessage::StepDown(leader_key) => { + write!(f, "StepDown(")?; + leader_key + } + }; + write!(f, "LeaderKey {{ ")?; + write!(f, "name: {}", String::from_utf8_lossy(leader_key.name()))?; + write!(f, ", key: {}", String::from_utf8_lossy(leader_key.key()))?; + write!(f, ", rev: {}", leader_key.rev())?; + write!(f, ", lease: {}", leader_key.lease())?; + write!(f, " }})") + } +} + #[async_trait::async_trait] pub trait Election: Send + Sync { type Leader; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index e67a6c4e9232..a4d354badea3 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -261,10 +261,7 @@ impl MetaSrv { Ok(msg) => { in_memory.reset(); leader_cached_kv_store.reset(); - info!( - "Leader's cache has bean cleared on leader change: {:?}", - msg - ); + info!("Leader's cache has bean cleared on leader change: {msg}"); match msg { LeaderChangeMessage::Elected(_) => { state_handler.on_become_leader().await; diff --git a/src/meta-srv/src/service/admin.rs b/src/meta-srv/src/service/admin.rs index 53184355303d..c7eac3af4231 100644 --- a/src/meta-srv/src/service/admin.rs +++ b/src/meta-srv/src/service/admin.rs @@ -42,12 +42,12 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin { }, ); - let router = router.route( - "/heartbeat", - heartbeat::HeartBeatHandler { - meta_peer_client: meta_srv.meta_peer_client().clone(), - }, - ); + let handler = heartbeat::HeartBeatHandler { + meta_peer_client: meta_srv.meta_peer_client().clone(), + }; + let router = router + .route("/heartbeat", handler.clone()) + .route("/heartbeat/help", handler); let router = router.route( "/catalogs", @@ -56,26 +56,26 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin { }, ); - let router = router.route( - "/schemas", - meta::SchemasHandler { - table_metadata_manager: meta_srv.table_metadata_manager().clone(), - }, - ); - - let router = router.route( - "/tables", - meta::TablesHandler { - table_metadata_manager: meta_srv.table_metadata_manager().clone(), - }, - ); - - let router = router.route( - "/table", - meta::TableHandler { - table_metadata_manager: meta_srv.table_metadata_manager().clone(), - }, - ); + let handler = meta::SchemasHandler { + table_metadata_manager: meta_srv.table_metadata_manager().clone(), + }; + let router = router + .route("/schemas", handler.clone()) + .route("/schemas/help", handler); + + let handler = meta::TablesHandler { + table_metadata_manager: meta_srv.table_metadata_manager().clone(), + }; + let router = router + .route("/tables", handler.clone()) + .route("/tables/help", handler); + + let handler = meta::TableHandler { + table_metadata_manager: meta_srv.table_metadata_manager().clone(), + }; + let router = router + .route("/table", handler.clone()) + .route("/table/help", handler); let router = router.route( "/leader", @@ -84,12 +84,12 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin { }, ); - let router = router.route( - "/route", - route::RouteHandler { - table_metadata_manager: meta_srv.table_metadata_manager().clone(), - }, - ); + let handler = route::RouteHandler { + table_metadata_manager: meta_srv.table_metadata_manager().clone(), + }; + let router = router + .route("/route", handler.clone()) + .route("/route/help", handler); let router = router.route( "/inactive-regions/view", diff --git a/src/meta-srv/src/service/admin/heartbeat.rs b/src/meta-srv/src/service/admin/heartbeat.rs index f881ec1616bc..e17fa9cbb4dc 100644 --- a/src/meta-srv/src/service/admin/heartbeat.rs +++ b/src/meta-srv/src/service/admin/heartbeat.rs @@ -21,8 +21,9 @@ use tonic::codegen::http; use crate::cluster::MetaPeerClientRef; use crate::error::{self, Result}; use crate::keys::StatValue; -use crate::service::admin::HttpHandler; +use crate::service::admin::{util, HttpHandler}; +#[derive(Clone)] pub struct HeartBeatHandler { pub meta_peer_client: MetaPeerClientRef, } @@ -31,9 +32,18 @@ pub struct HeartBeatHandler { impl HttpHandler for HeartBeatHandler { async fn handle( &self, - _: &str, + path: &str, params: &HashMap, ) -> Result> { + if path.ends_with("/help") { + return util::to_text_response( + r#" + - GET /heartbeat + - GET /heartbeat?addr=127.0.0.1:3001 + "#, + ); + } + let stat_kvs = self.meta_peer_client.get_all_dn_stat_kvs().await?; let mut stat_vals: Vec = stat_kvs.into_values().collect(); diff --git a/src/meta-srv/src/service/admin/meta.rs b/src/meta-srv/src/service/admin/meta.rs index d199edd388ec..905924b1c7b1 100644 --- a/src/meta-srv/src/service/admin/meta.rs +++ b/src/meta-srv/src/service/admin/meta.rs @@ -18,25 +18,30 @@ use common_error::ext::BoxedError; use common_meta::key::table_name::TableNameKey; use common_meta::key::TableMetadataManagerRef; use futures::TryStreamExt; -use snafu::{OptionExt, ResultExt}; +use snafu::ResultExt; +use store_api::storage::{RegionId, TableId}; use tonic::codegen::http; use crate::error; -use crate::error::{Result, TableMetadataManagerSnafu}; -use crate::service::admin::HttpHandler; +use crate::error::{ParseNumSnafu, Result, TableMetadataManagerSnafu}; +use crate::service::admin::{util, HttpHandler}; +#[derive(Clone)] pub struct CatalogsHandler { pub table_metadata_manager: TableMetadataManagerRef, } +#[derive(Clone)] pub struct SchemasHandler { pub table_metadata_manager: TableMetadataManagerRef, } +#[derive(Clone)] pub struct TablesHandler { pub table_metadata_manager: TableMetadataManagerRef, } +#[derive(Clone)] pub struct TableHandler { pub table_metadata_manager: TableMetadataManagerRef, } @@ -64,14 +69,18 @@ impl HttpHandler for CatalogsHandler { impl HttpHandler for SchemasHandler { async fn handle( &self, - _: &str, + path: &str, params: &HashMap, ) -> Result> { - let catalog = params - .get("catalog_name") - .context(error::MissingRequiredParameterSnafu { - param: "catalog_name", - })?; + if path.ends_with("/help") { + return util::to_text_response( + r#" + - GET /schemas?catalog=foo + "#, + ); + } + + let catalog = util::get_value(params, "catalog")?; let stream = self .table_metadata_manager .schema_manager() @@ -92,20 +101,19 @@ impl HttpHandler for SchemasHandler { impl HttpHandler for TablesHandler { async fn handle( &self, - _: &str, + path: &str, params: &HashMap, ) -> Result> { - let catalog = params - .get("catalog_name") - .context(error::MissingRequiredParameterSnafu { - param: "catalog_name", - })?; - - let schema = params - .get("schema_name") - .context(error::MissingRequiredParameterSnafu { - param: "schema_name", - })?; + if path.ends_with("/help") { + return util::to_text_response( + r#" + - GET /tables?catalog=foo&schema=bar + "#, + ); + } + + let catalog = util::get_value(params, "catalog")?; + let schema = util::get_value(params, "schema")?; let tables = self .table_metadata_manager @@ -125,22 +133,74 @@ impl HttpHandler for TablesHandler { impl HttpHandler for TableHandler { async fn handle( &self, - _: &str, + path: &str, params: &HashMap, ) -> Result> { - let catalog = params - .get("catalog") - .context(error::MissingRequiredParameterSnafu { param: "catalog" })?; - let schema = params - .get("schema") - .context(error::MissingRequiredParameterSnafu { param: "schema" })?; - let table = params - .get("table") - .context(error::MissingRequiredParameterSnafu { param: "table" })?; + if path.ends_with("/help") { + return util::to_text_response( + r#" + - GET /table?region_ids=1,2,3,4,5 + - GET /table?table_ids=1,2,3,4,5 + - GET /table?catalog=foo&schema=bar&table=baz + "#, + ); + } - let key = TableNameKey::new(catalog, schema, table); + let table_ids = self.extract_table_ids(params).await?; + + let table_info_values = self + .table_metadata_manager + .table_info_manager() + .batch_get(&table_ids) + .await + .context(TableMetadataManagerSnafu)? + .into_iter() + .map(|(k, v)| (format!("{k}"), format!("{v:?}"))) + .collect::>(); + + http::Response::builder() + .status(http::StatusCode::OK) + // Safety: HashMap is definitely "serde-json"-able. + .body(serde_json::to_string(&table_info_values).unwrap()) + .context(error::InvalidHttpBodySnafu) + } +} + +impl TableHandler { + async fn extract_table_ids(&self, params: &HashMap) -> Result> { + if let Some(ids) = params.get("region_ids") { + let table_ids = ids + .split(',') + .map(|x| { + x.parse::() + .map(|y| RegionId::from_u64(y).table_id()) + .context(ParseNumSnafu { + err_msg: format!("invalid region id: {x}"), + }) + }) + .collect::>>()?; + + return Ok(table_ids); + } - let mut result = HashMap::with_capacity(2); + if let Some(ids) = params.get("table_ids") { + let table_ids = ids + .split(',') + .map(|x| { + x.parse::().context(ParseNumSnafu { + err_msg: format!("invalid table id: {x}"), + }) + }) + .collect::>>()?; + + return Ok(table_ids); + } + + let catalog = util::get_value(params, "catalog")?; + let schema = util::get_value(params, "schema")?; + let table = util::get_value(params, "table")?; + + let key = TableNameKey::new(catalog, schema, table); let table_id = self .table_metadata_manager @@ -151,34 +211,10 @@ impl HttpHandler for TableHandler { .map(|x| x.table_id()); if let Some(table_id) = table_id { - let table_info_value = self - .table_metadata_manager - .table_info_manager() - .get(table_id) - .await - .context(TableMetadataManagerSnafu)? - .map(|x| format!("{x:?}")) - .unwrap_or_else(|| "Not Found".to_string()); - result.insert("table_info_value", table_info_value); + Ok(vec![table_id]) + } else { + Ok(vec![]) } - - if let Some(table_id) = table_id { - let table_region_value = self - .table_metadata_manager - .table_route_manager() - .get(table_id) - .await - .context(TableMetadataManagerSnafu)? - .map(|x| format!("{x:?}")) - .unwrap_or_else(|| "Not Found".to_string()); - result.insert("table_route_value", table_region_value); - } - - http::Response::builder() - .status(http::StatusCode::OK) - // Safety: HashMap is definitely "serde-json"-able. - .body(serde_json::to_string(&result).unwrap()) - .context(error::InvalidHttpBodySnafu) } } diff --git a/src/meta-srv/src/service/admin/route.rs b/src/meta-srv/src/service/admin/route.rs index ad8c0fdaf55c..217133ad3d88 100644 --- a/src/meta-srv/src/service/admin/route.rs +++ b/src/meta-srv/src/service/admin/route.rs @@ -14,15 +14,20 @@ use std::collections::HashMap; +use common_catalog::format_full_table_name; use common_meta::key::table_name::TableNameKey; use common_meta::key::TableMetadataManagerRef; use snafu::{OptionExt, ResultExt}; +use store_api::storage::{RegionId, TableId}; use tonic::codegen::http; -use super::HttpHandler; +use super::{util, HttpHandler}; use crate::error; -use crate::error::{Result, TableNotFoundSnafu, TableRouteNotFoundSnafu}; +use crate::error::{ + ParseNumSnafu, Result, TableMetadataManagerSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu, +}; +#[derive(Clone)] pub struct RouteHandler { pub table_metadata_manager: TableMetadataManagerRef, } @@ -31,40 +36,73 @@ pub struct RouteHandler { impl HttpHandler for RouteHandler { async fn handle( &self, - _path: &str, + path: &str, params: &HashMap, ) -> Result> { - let catalog = params - .get("catalog") - .context(error::MissingRequiredParameterSnafu { param: "catalog" })?; - let schema = params - .get("schema") - .context(error::MissingRequiredParameterSnafu { param: "schema" })?; - let table = params - .get("table") - .context(error::MissingRequiredParameterSnafu { param: "table" })?; + if path.ends_with("/help") { + return util::to_text_response( + r#" + - GET /table?region_id=123 + - GET /table?table_id=456 + - GET /table?catalog=foo&schema=bar&table=baz + "#, + ); + } - let key = TableNameKey::new(catalog, schema, table); - - let table_id = self - .table_metadata_manager - .table_name_manager() - .get(key) - .await - .context(error::TableMetadataManagerSnafu)? - .map(|x| x.table_id()) - .context(TableNotFoundSnafu { name: table })?; + let table_id = self.extract_table_id(params).await?; let table_route_value = self .table_metadata_manager .table_route_manager() .get(table_id) .await - .context(error::TableMetadataManagerSnafu)? + .context(TableMetadataManagerSnafu)? .context(TableRouteNotFoundSnafu { table_id })?; + http::Response::builder() .status(http::StatusCode::OK) .body(serde_json::to_string(&table_route_value).unwrap()) .context(error::InvalidHttpBodySnafu) } } + +impl RouteHandler { + async fn extract_table_id(&self, params: &HashMap) -> Result { + if let Some(id) = params.get("region_id") { + let table_id = id + .parse::() + .map(|x| RegionId::from_u64(x).table_id()) + .context(ParseNumSnafu { + err_msg: format!("invalid region id: {id}"), + })?; + return Ok(table_id); + } + + if let Some(id) = params.get("table_id") { + let table_id = id.parse::().context(ParseNumSnafu { + err_msg: format!("invalid table id: {id}"), + })?; + + return Ok(table_id); + } + + let catalog = util::get_value(params, "catalog")?; + let schema = util::get_value(params, "schema")?; + let table = util::get_value(params, "table")?; + + let key = TableNameKey::new(catalog, schema, table); + + let table_id = self + .table_metadata_manager + .table_name_manager() + .get(key) + .await + .context(TableMetadataManagerSnafu)? + .map(|x| x.table_id()) + .context(TableNotFoundSnafu { + name: format_full_table_name(catalog, schema, table), + })?; + + Ok(table_id) + } +} diff --git a/src/meta-srv/src/service/admin/util.rs b/src/meta-srv/src/service/admin/util.rs index a26bee0a8d10..828630a31425 100644 --- a/src/meta-srv/src/service/admin/util.rs +++ b/src/meta-srv/src/service/admin/util.rs @@ -15,17 +15,32 @@ use std::collections::HashMap; use snafu::{OptionExt, ResultExt}; +use tonic::codegen::http; -use crate::error::{self, Result}; +use crate::error::{self, MissingRequiredParameterSnafu, ParseNumSnafu, Result}; pub fn extract_cluster_id(params: &HashMap) -> Result { params .get("cluster_id") .map(|id| id.parse::()) - .context(error::MissingRequiredParameterSnafu { + .context(MissingRequiredParameterSnafu { param: "cluster_id", })? - .context(error::ParseNumSnafu { + .context(ParseNumSnafu { err_msg: "`cluster_id` is not a valid number", }) } + +pub fn get_value<'a>(params: &'a HashMap, key: &str) -> Result<&'a String> { + params + .get(key) + .context(error::MissingRequiredParameterSnafu { param: key }) +} + +pub fn to_text_response(text: &str) -> Result> { + http::Response::builder() + .header("Content-Type", "text/plain") + .status(http::StatusCode::OK) + .body(text.to_string()) + .context(error::InvalidHttpBodySnafu) +}