Skip to content

Commit

Permalink
fix(frontend): split chunks in system catalog (#16375)
Browse files Browse the repository at this point in the history
Signed-off-by: Runji Wang <[email protected]>
  • Loading branch information
wangrunji0408 authored Apr 18, 2024
1 parent e15e74d commit b37228e
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 28 deletions.
13 changes: 5 additions & 8 deletions src/batch/src/executor/sys_row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,19 +101,16 @@ impl Executor for SysRowSeqScanExecutor {
}

fn execute(self: Box<Self>) -> BoxedDataChunkStream {
self.do_executor()
self.do_execute()
}
}

impl SysRowSeqScanExecutor {
#[try_stream(boxed, ok = DataChunk, error = BatchError)]
async fn do_executor(self: Box<Self>) {
let chunk = self
.sys_catalog_reader
.read_table(&self.table_id)
.await
.map_err(BatchError::SystemTable)?;
if chunk.cardinality() != 0 {
async fn do_execute(self: Box<Self>) {
#[for_await]
for chunk in self.sys_catalog_reader.read_table(self.table_id) {
let chunk = chunk.map_err(BatchError::SystemTable)?;
yield chunk.project(&self.column_indices);
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/common/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ pub mod test_utils;

use std::sync::Arc;

use async_trait::async_trait;
pub use column::*;
pub use external_table::*;
use futures::stream::BoxStream;
pub use internal_table::*;
use parse_display::Display;
pub use physical_table::*;
Expand Down Expand Up @@ -148,9 +148,9 @@ pub fn cdc_table_name_column_desc() -> ColumnDesc {
}

/// The local system catalog reader in the frontend node.
#[async_trait]
pub trait SysCatalogReader: Sync + Send + 'static {
async fn read_table(&self, table_id: &TableId) -> Result<DataChunk, BoxedError>;
/// Reads the data of the system catalog table.
fn read_table(&self, table_id: TableId) -> BoxStream<'_, Result<DataChunk, BoxedError>>;
}

pub type SysCatalogReaderRef = Arc<dyn SysCatalogReader>;
Expand Down
24 changes: 16 additions & 8 deletions src/frontend/macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ fn gen_sys_table(attr: Attr, item_fn: ItemFn) -> Result<TokenStream2> {
let struct_type = strip_outer_type(ty, "Vec").ok_or_else(return_type_error)?;
let _await = item_fn.sig.asyncness.map(|_| quote!(.await));
let handle_error = return_result.then(|| quote!(?));
let chunk_size = 1024usize;

Ok(quote! {
#[linkme::distributed_slice(crate::catalog::system_catalog::SYS_CATALOGS_SLICE)]
Expand All @@ -121,19 +122,26 @@ fn gen_sys_table(attr: Attr, item_fn: ItemFn) -> Result<TokenStream2> {
assert!(#struct_type::PRIMARY_KEY.is_some(), "primary key is required for system table");
};

#[futures_async_stream::try_stream(boxed, ok = risingwave_common::array::DataChunk, error = risingwave_common::error::BoxedError)]
async fn function(reader: &crate::catalog::system_catalog::SysCatalogReaderImpl) {
let rows = #user_fn_name(reader) #_await #handle_error;
let mut builder = #struct_type::data_chunk_builder(#chunk_size);
for row in rows {
if let Some(chunk) = builder.append_one_row(row.into_owned_row()) {
yield chunk;
}
}
if let Some(chunk) = builder.consume_all() {
yield chunk;
}
}

crate::catalog::system_catalog::BuiltinCatalog::Table(crate::catalog::system_catalog::BuiltinTable {
name: #table_name,
schema: #schema_name,
columns: #struct_type::fields(),
pk: #struct_type::PRIMARY_KEY.unwrap(),
function: |reader| std::boxed::Box::pin(async {
let rows = #user_fn_name(reader) #_await #handle_error;
let mut builder = #struct_type::data_chunk_builder(rows.len() + 1);
for row in rows {
_ = builder.append_one_row(row.into_owned_row());
}
Ok(builder.finish())
}),
function,
})
}
})
Expand Down
14 changes: 5 additions & 9 deletions src/frontend/src/catalog/system_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ pub mod rw_catalog;
use std::collections::HashMap;
use std::sync::{Arc, LazyLock};

use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use itertools::Itertools;
use parking_lot::RwLock;
use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef;
Expand Down Expand Up @@ -142,7 +141,7 @@ pub struct BuiltinTable {
schema: &'static str,
columns: Vec<SystemCatalogColumnsDef<'static>>,
pk: &'static [usize],
function: for<'a> fn(&'a SysCatalogReaderImpl) -> BoxFuture<'a, Result<DataChunk, BoxedError>>,
function: for<'a> fn(&'a SysCatalogReaderImpl) -> BoxStream<'a, Result<DataChunk, BoxedError>>,
}

pub struct BuiltinView {
Expand Down Expand Up @@ -340,18 +339,15 @@ pub static SYS_CATALOGS: LazyLock<SystemCatalog> = LazyLock::new(|| {
#[linkme::distributed_slice]
pub static SYS_CATALOGS_SLICE: [fn() -> BuiltinCatalog];

#[async_trait]
impl SysCatalogReader for SysCatalogReaderImpl {
async fn read_table(&self, table_id: &TableId) -> Result<DataChunk, BoxedError> {
fn read_table(&self, table_id: TableId) -> BoxStream<'_, Result<DataChunk, BoxedError>> {
let table_name = SYS_CATALOGS
.catalogs
.get((table_id.table_id - SYS_CATALOG_START_ID as u32) as usize)
.unwrap();
match table_name {
BuiltinCatalog::Table(t) => (t.function)(self).await,
BuiltinCatalog::View(_) => {
panic!("read_table should not be called on a view")
}
BuiltinCatalog::Table(t) => (t.function)(self),
BuiltinCatalog::View(_) => panic!("read_table should not be called on a view"),
}
}
}
Expand Down

0 comments on commit b37228e

Please sign in to comment.