From feea58002d5ae010b65de283f11a8046862312db Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Thu, 1 Feb 2024 20:37:24 -0500 Subject: [PATCH] feat(catalog): add source columns to rw_columns (#14939) --- .../system_catalog/rw_catalog/rw_columns.rs | 37 +++++++++++++++++-- 1 file changed, 34 insertions(+), 3 deletions(-) diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_columns.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_columns.rs index c6369d6fae81..b2675de86923 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_columns.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_columns.rs @@ -47,6 +47,7 @@ impl SysCatalogReaderImpl { Ok(schemas .flat_map(|schema| { + // view columns let view_rows = schema.iter_view().flat_map(|view| { view.columns.iter().enumerate().map(|(index, column)| { OwnedRow::new(vec![ @@ -64,6 +65,7 @@ impl SysCatalogReaderImpl { }) }); + // sink columns let sink_rows = schema .iter_sink() .flat_map(|sink| { @@ -87,7 +89,8 @@ impl SysCatalogReaderImpl { }) .chain(view_rows); - let rows = schema + // pg_catalog columns + let catalog_rows = schema .iter_system_tables() .flat_map(|table| { table @@ -111,7 +114,8 @@ impl SysCatalogReaderImpl { }) .chain(sink_rows); - schema + // table columns + let table_rows = schema .iter_valid_table() .flat_map(|table| { table @@ -137,7 +141,34 @@ impl SysCatalogReaderImpl { ]) }) }) - .chain(rows) + .chain(catalog_rows); + + // source columns + schema + .iter_source() + .flat_map(|source| { + source + .columns + .iter() + .enumerate() + .map(move |(index, column)| { + OwnedRow::new(vec![ + Some(ScalarImpl::Int32(source.id as i32)), + Some(ScalarImpl::Utf8(column.name().into())), + Some(ScalarImpl::Int32(index as i32 + 1)), + Some(ScalarImpl::Bool(column.is_hidden)), + Some(ScalarImpl::Bool( + source.pk_col_ids.contains(&column.column_id()), + )), + Some(ScalarImpl::Bool(false)), + Some(ScalarImpl::Utf8(column.data_type().to_string().into())), + Some(ScalarImpl::Int32(column.data_type().to_oid())), + Some(ScalarImpl::Int16(column.data_type().type_len())), + Some(ScalarImpl::Utf8(column.data_type().pg_name().into())), + ]) + }) + }) + .chain(table_rows) }) .collect_vec()) }