Skip to content

Commit

Permalink
feat: add table meta by id
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed Nov 1, 2023
1 parent 5f3bbdc commit b365ac6
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 83 deletions.
34 changes: 34 additions & 0 deletions src/common/meta/src/key/table_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use serde::{Deserialize, Serialize};
use table::engine::TableReference;
use table::metadata::{RawTableInfo, TableId};
Expand All @@ -21,6 +23,7 @@ use crate::error::Result;
use crate::key::{to_removed_key, TableMetaKey};
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::BatchGetRequest;
use crate::table_name::TableName;

pub struct TableInfoKey {
Expand Down Expand Up @@ -233,6 +236,37 @@ impl TableInfoManager {
.map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
.transpose()
}

pub async fn batch_get(
&self,
table_ids: &[TableId],
) -> Result<HashMap<TableId, TableInfoValue>> {
let lookup_table = table_ids
.iter()
.map(|id| (TableInfoKey::new(*id).as_raw_key(), id))
.collect::<HashMap<_, _>>();

let resp = self
.kv_backend
.batch_get(BatchGetRequest {
keys: lookup_table.keys().cloned().collect::<Vec<_>>(),
})
.await?;

let values = resp
.kvs
.iter()
.map(|kv| {
Ok((
// Safety: must exist.
**lookup_table.get(kv.key()).unwrap(),
TableInfoValue::try_from_raw_value(&kv.value)?,
))
})
.collect::<Result<HashMap<_, _>>>()?;

Ok(values)
}
}

#[cfg(test)]
Expand Down
120 changes: 63 additions & 57 deletions src/meta-srv/src/service/admin/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ use common_error::ext::BoxedError;
use common_meta::key::table_name::TableNameKey;
use common_meta::key::TableMetadataManagerRef;
use futures::TryStreamExt;
use snafu::{OptionExt, ResultExt};
use snafu::ResultExt;
use store_api::storage::{RegionId, TableId};
use tonic::codegen::http;

use crate::error;
use crate::error::{Result, TableMetadataManagerSnafu};
use crate::service::admin::HttpHandler;
use crate::error::{InvalidHttpBodySnafu, ParseNumSnafu, Result, TableMetadataManagerSnafu};
use crate::service::admin::{util, HttpHandler};

pub struct CatalogsHandler {
pub table_metadata_manager: TableMetadataManagerRef,
Expand Down Expand Up @@ -67,11 +68,7 @@ impl HttpHandler for SchemasHandler {
_: &str,
params: &HashMap<String, String>,
) -> Result<http::Response<String>> {
let catalog = params
.get("catalog_name")
.context(error::MissingRequiredParameterSnafu {
param: "catalog_name",
})?;
let catalog = util::get_value(params, "catalog")?;
let stream = self
.table_metadata_manager
.schema_manager()
Expand All @@ -95,17 +92,8 @@ impl HttpHandler for TablesHandler {
_: &str,
params: &HashMap<String, String>,
) -> Result<http::Response<String>> {
let catalog = params
.get("catalog_name")
.context(error::MissingRequiredParameterSnafu {
param: "catalog_name",
})?;

let schema = params
.get("schema_name")
.context(error::MissingRequiredParameterSnafu {
param: "schema_name",
})?;
let catalog = util::get_value(params, "catalog")?;
let schema = util::get_value(params, "schema")?;

let tables = self
.table_metadata_manager
Expand All @@ -128,19 +116,61 @@ impl HttpHandler for TableHandler {
_: &str,
params: &HashMap<String, String>,
) -> Result<http::Response<String>> {
let catalog = params
.get("catalog")
.context(error::MissingRequiredParameterSnafu { param: "catalog" })?;
let schema = params
.get("schema")
.context(error::MissingRequiredParameterSnafu { param: "schema" })?;
let table = params
.get("table")
.context(error::MissingRequiredParameterSnafu { param: "table" })?;
let table_ids = self.extract_table_ids(params).await?;

let key = TableNameKey::new(catalog, schema, table);
let table_info_values = self
.table_metadata_manager
.table_info_manager()
.batch_get(&table_ids)
.await
.context(TableMetadataManagerSnafu)?
.into_iter()
.map(|(k, v)| (format!("{k}"), format!("{v:?}")))
.collect::<HashMap<_, _>>();

http::Response::builder()
.status(http::StatusCode::OK)
// Safety: HashMap<String, String> is definitely "serde-json"-able.
.body(serde_json::to_string(&table_info_values).unwrap())
.context(InvalidHttpBodySnafu)
}
}

impl TableHandler {
async fn extract_table_ids(&self, params: &HashMap<String, String>) -> Result<Vec<TableId>> {
if let Some(ids) = params.get("region_ids") {
let table_ids = ids
.split(',')
.map(|x| {
x.parse::<u64>()
.map(|y| RegionId::from_u64(y).table_id())
.context(ParseNumSnafu {
err_msg: format!("invalid region id: {x}"),
})
})
.collect::<Result<Vec<_>>>()?;

return Ok(table_ids);
}

if let Some(ids) = params.get("table_ids") {
let table_ids = ids
.split(',')
.map(|x| {
x.parse::<u32>().context(ParseNumSnafu {
err_msg: format!("invalid table id: {x}"),
})
})
.collect::<Result<Vec<_>>>()?;

return Ok(table_ids);
}

let mut result = HashMap::with_capacity(2);
let catalog = util::get_value(params, "catalog")?;
let schema = util::get_value(params, "schema")?;
let table = util::get_value(params, "table")?;

let key = TableNameKey::new(catalog, schema, table);

let table_id = self
.table_metadata_manager
Expand All @@ -151,34 +181,10 @@ impl HttpHandler for TableHandler {
.map(|x| x.table_id());

if let Some(table_id) = table_id {
let table_info_value = self
.table_metadata_manager
.table_info_manager()
.get(table_id)
.await
.context(TableMetadataManagerSnafu)?
.map(|x| format!("{x:?}"))
.unwrap_or_else(|| "Not Found".to_string());
result.insert("table_info_value", table_info_value);
}

if let Some(table_id) = table_id {
let table_region_value = self
.table_metadata_manager
.table_route_manager()
.get(table_id)
.await
.context(TableMetadataManagerSnafu)?
.map(|x| format!("{x:?}"))
.unwrap_or_else(|| "Not Found".to_string());
result.insert("table_route_value", table_region_value);
Ok(vec![table_id])
} else {
Ok(vec![])
}

http::Response::builder()
.status(http::StatusCode::OK)
// Safety: HashMap<String, String> is definitely "serde-json"-able.
.body(serde_json::to_string(&result).unwrap())
.context(error::InvalidHttpBodySnafu)
}
}

Expand Down
73 changes: 50 additions & 23 deletions src/meta-srv/src/service/admin/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,18 @@

use std::collections::HashMap;

use common_catalog::format_full_table_name;
use common_meta::key::table_name::TableNameKey;
use common_meta::key::TableMetadataManagerRef;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{RegionId, TableId};
use tonic::codegen::http;

use super::HttpHandler;
use super::{util, HttpHandler};
use crate::error;
use crate::error::{Result, TableNotFoundSnafu, TableRouteNotFoundSnafu};
use crate::error::{
ParseNumSnafu, Result, TableMetadataManagerSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu,
};

pub struct RouteHandler {
pub table_metadata_manager: TableMetadataManagerRef,
Expand All @@ -34,37 +38,60 @@ impl HttpHandler for RouteHandler {
_path: &str,
params: &HashMap<String, String>,
) -> Result<http::Response<String>> {
let catalog = params
.get("catalog")
.context(error::MissingRequiredParameterSnafu { param: "catalog" })?;
let schema = params
.get("schema")
.context(error::MissingRequiredParameterSnafu { param: "schema" })?;
let table = params
.get("table")
.context(error::MissingRequiredParameterSnafu { param: "table" })?;

let key = TableNameKey::new(catalog, schema, table);

let table_id = self
.table_metadata_manager
.table_name_manager()
.get(key)
.await
.context(error::TableMetadataManagerSnafu)?
.map(|x| x.table_id())
.context(TableNotFoundSnafu { name: table })?;
let table_id = self.extract_table_id(params).await?;

let table_route_value = self
.table_metadata_manager
.table_route_manager()
.get(table_id)
.await
.context(error::TableMetadataManagerSnafu)?
.context(TableMetadataManagerSnafu)?
.context(TableRouteNotFoundSnafu { table_id })?;

http::Response::builder()
.status(http::StatusCode::OK)
.body(serde_json::to_string(&table_route_value).unwrap())
.context(error::InvalidHttpBodySnafu)
}
}

impl RouteHandler {
async fn extract_table_id(&self, params: &HashMap<String, String>) -> Result<TableId> {
if let Some(id) = params.get("region_id") {
let table_id = id
.parse::<u64>()
.map(|x| RegionId::from_u64(x).table_id())
.context(ParseNumSnafu {
err_msg: format!("invalid region id: {id}"),
})?;
return Ok(table_id);
}

if let Some(id) = params.get("table_id") {
let table_id = id.parse::<u32>().context(ParseNumSnafu {
err_msg: format!("invalid table id: {id}"),
})?;

return Ok(table_id);
}

let catalog = util::get_value(params, "catalog")?;
let schema = util::get_value(params, "schema")?;
let table = util::get_value(params, "table")?;

let key = TableNameKey::new(catalog, schema, table);

let table_id = self
.table_metadata_manager
.table_name_manager()
.get(key)
.await
.context(TableMetadataManagerSnafu)?
.map(|x| x.table_id())
.context(TableNotFoundSnafu {
name: format_full_table_name(catalog, schema, table),
})?;

Ok(table_id)
}
}
12 changes: 9 additions & 3 deletions src/meta-srv/src/service/admin/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,22 @@ use std::collections::HashMap;

use snafu::{OptionExt, ResultExt};

use crate::error::{self, Result};
use crate::error::{self, MissingRequiredParameterSnafu, ParseNumSnafu, Result};

pub fn extract_cluster_id(params: &HashMap<String, String>) -> Result<u64> {
params
.get("cluster_id")
.map(|id| id.parse::<u64>())
.context(error::MissingRequiredParameterSnafu {
.context(MissingRequiredParameterSnafu {
param: "cluster_id",
})?
.context(error::ParseNumSnafu {
.context(ParseNumSnafu {
err_msg: "`cluster_id` is not a valid number",
})
}

pub fn get_value<'a>(params: &'a HashMap<String, String>, key: &str) -> Result<&'a String> {
params
.get(key)
.context(error::MissingRequiredParameterSnafu { param: key })
}

0 comments on commit b365ac6

Please sign in to comment.