From c2edaffa5c94c00a86ad74539ee4550e9217e680 Mon Sep 17 00:00:00 2001 From: JeremyHi Date: Mon, 15 Jan 2024 20:36:39 +0800 Subject: [PATCH] feat: let tables API return a stream (#3170) --- src/catalog/src/error.rs | 14 ++++++++--- src/catalog/src/kvbackend/manager.rs | 16 ++++++++----- src/common/meta/src/key/table_name.rs | 32 +++++++++++++++++--------- src/meta-srv/src/error.rs | 14 ++++++++--- src/meta-srv/src/service/admin/meta.rs | 10 +++++--- 5 files changed, 60 insertions(+), 26 deletions(-) diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index 5ec798864056..c9fd60e9ee4e 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -41,6 +41,14 @@ pub enum Error { source: BoxedError, }, + #[snafu(display("Failed to list {}.{}'s tables", catalog, schema))] + ListTables { + location: Location, + catalog: String, + schema: String, + source: BoxedError, + }, + #[snafu(display("Failed to re-compile script due to internal error"))] CompileScriptInternal { location: Location, @@ -270,9 +278,9 @@ impl ErrorExt for Error { StatusCode::InvalidArguments } - Error::ListCatalogs { source, .. } | Error::ListSchemas { source, .. } => { - source.status_code() - } + Error::ListCatalogs { source, .. } + | Error::ListSchemas { source, .. } + | Error::ListTables { source, .. } => source.status_code(), Error::OpenSystemCatalog { source, .. } | Error::CreateSystemCatalog { source, .. } diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index 5bed429be1c1..224c1c0121f6 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -35,8 +35,8 @@ use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME}; use table::TableRef; use crate::error::{ - self as catalog_err, ListCatalogsSnafu, ListSchemasSnafu, Result as CatalogResult, - TableMetadataManagerSnafu, + self as catalog_err, ListCatalogsSnafu, ListSchemasSnafu, ListTablesSnafu, + Result as CatalogResult, TableMetadataManagerSnafu, }; use crate::information_schema::InformationSchemaProvider; use crate::CatalogManager; @@ -135,18 +135,22 @@ impl CatalogManager for KvBackendCatalogManager { } async fn table_names(&self, catalog: &str, schema: &str) -> CatalogResult> { - let mut tables = self + let stream = self .table_metadata_manager .table_name_manager() .tables(catalog, schema) + .await; + let mut tables = stream + .try_collect::>() .await - .context(TableMetadataManagerSnafu)? + .map_err(BoxedError::new) + .context(ListTablesSnafu { catalog, schema })? .into_iter() .map(|(k, _)| k) - .collect::>(); + .collect::>(); tables.extend_from_slice(&self.system_catalog.table_names(schema)); - Ok(tables) + Ok(tables.into_iter().collect()) } async fn catalog_exists(&self, catalog: &str) -> CatalogResult { diff --git a/src/common/meta/src/key/table_name.rs b/src/common/meta/src/key/table_name.rs index 12d44dace180..e86561cbc986 100644 --- a/src/common/meta/src/key/table_name.rs +++ b/src/common/meta/src/key/table_name.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use futures_util::stream::BoxStream; use serde::{Deserialize, Serialize}; use snafu::OptionExt; use table::metadata::TableId; @@ -24,7 +25,9 @@ use crate::key::{to_removed_key, TableMetaKey}; use crate::kv_backend::memory::MemoryKvBackend; use crate::kv_backend::txn::{Txn, TxnOp}; use crate::kv_backend::KvBackendRef; +use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; use crate::rpc::store::RangeRequest; +use crate::rpc::KeyValue; use crate::table_name::TableName; #[derive(Debug, Clone, Copy)] @@ -79,6 +82,14 @@ impl TableMetaKey for TableNameKey<'_> { } } +/// Decodes `KeyValue` to ({table_name}, TableNameValue) +pub fn table_decoder(kv: KeyValue) -> Result<(String, TableNameValue)> { + let table_name = TableNameKey::strip_table_name(kv.key())?; + let table_name_value = TableNameValue::try_from_raw_value(&kv.value)?; + + Ok((table_name, table_name_value)) +} + impl<'a> From<&'a TableName> for TableNameKey<'a> { fn from(value: &'a TableName) -> Self { Self { @@ -218,19 +229,18 @@ impl TableNameManager { &self, catalog: &str, schema: &str, - ) -> Result> { + ) -> BoxStream<'static, Result<(String, TableNameValue)>> { let key = TableNameKey::prefix_to_table(catalog, schema).into_bytes(); let req = RangeRequest::new().with_prefix(key); - let resp = self.kv_backend.range(req).await?; - - let mut res = Vec::with_capacity(resp.kvs.len()); - for kv in resp.kvs { - res.push(( - TableNameKey::strip_table_name(kv.key())?, - TableNameValue::try_from_raw_value(&kv.value)?, - )) - } - Ok(res) + + let stream = PaginationStream::new( + self.kv_backend.clone(), + req, + DEFAULT_PAGE_SIZE, + Arc::new(table_decoder), + ); + + Box::pin(stream) } } diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index b5b13c0c7d1b..4b70d6479779 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -111,6 +111,14 @@ pub enum Error { source: BoxedError, }, + #[snafu(display("Failed to list {}.{}'s tables", catalog, schema))] + ListTables { + location: Location, + catalog: String, + schema: String, + source: BoxedError, + }, + #[snafu(display("Failed to join a future"))] Join { location: Location, @@ -732,9 +740,9 @@ impl ErrorExt for Error { Error::StartProcedureManager { source, .. } | Error::StopProcedureManager { source, .. } => source.status_code(), - Error::ListCatalogs { source, .. } | Error::ListSchemas { source, .. } => { - source.status_code() - } + Error::ListCatalogs { source, .. } + | Error::ListSchemas { source, .. } + | Error::ListTables { source, .. } => source.status_code(), Error::StartTelemetryTask { source, .. } => source.status_code(), Error::RegionFailoverCandidatesNotFound { .. } => StatusCode::RuntimeResourcesExhausted, diff --git a/src/meta-srv/src/service/admin/meta.rs b/src/meta-srv/src/service/admin/meta.rs index 969eec807efa..77eb38e7dd5b 100644 --- a/src/meta-srv/src/service/admin/meta.rs +++ b/src/meta-srv/src/service/admin/meta.rs @@ -115,15 +115,19 @@ impl HttpHandler for TablesHandler { let catalog = util::get_value(params, "catalog")?; let schema = util::get_value(params, "schema")?; - let tables = self + let stream = self .table_metadata_manager .table_name_manager() .tables(catalog, schema) + .await; + let tables = stream + .try_collect::>() .await - .context(TableMetadataManagerSnafu)? + .map_err(BoxedError::new) + .context(error::ListTablesSnafu { catalog, schema })? .into_iter() .map(|(k, _)| k) - .collect(); + .collect::>(); to_http_response(tables) }