From 21babd5e513e3d12bac67a494005d8e8d9b46c68 Mon Sep 17 00:00:00 2001 From: August Date: Tue, 11 Jun 2024 14:47:48 +0800 Subject: [PATCH] chore: expose associated source in rw_sources (#17150) --- e2e_test/ddl/alter_set_schema.slt | 3 +- e2e_test/source/basic/ddl.slt | 2 +- .../basic/old_row_format_syntax/ddl.slt | 2 +- .../tests/testdata/output/subquery.yaml | 2 +- .../system_catalog/rw_catalog/rw_sources.rs | 67 +++++++++---------- src/frontend/src/handler/show.rs | 1 - 6 files changed, 38 insertions(+), 39 deletions(-) diff --git a/e2e_test/ddl/alter_set_schema.slt b/e2e_test/ddl/alter_set_schema.slt index 9ea95e9cc09c2..74dcc5a77e64a 100644 --- a/e2e_test/ddl/alter_set_schema.slt +++ b/e2e_test/ddl/alter_set_schema.slt @@ -53,13 +53,14 @@ CREATE SOURCE test_source (v INT) WITH ( statement ok ALTER SOURCE test_source SET SCHEMA test_schema; -query TT +query TT rowsort SELECT name AS sourcename, nspname AS schemaname FROM rw_sources JOIN pg_namespace ON pg_namespace.oid = rw_sources.schema_id WHERE nspname = 'test_schema'; ---- test_source test_schema +test_table test_schema statement ok CREATE SINK test_sink AS SELECT u FROM test_schema.test_table WITH ( diff --git a/e2e_test/source/basic/ddl.slt b/e2e_test/source/basic/ddl.slt index 465e0f19344e9..a56d90934c149 100644 --- a/e2e_test/source/basic/ddl.slt +++ b/e2e_test/source/basic/ddl.slt @@ -188,7 +188,7 @@ create table s ( query T show sources ---- - +s query T show tables diff --git a/e2e_test/source/basic/old_row_format_syntax/ddl.slt b/e2e_test/source/basic/old_row_format_syntax/ddl.slt index d5c41d4ded878..b48249ca7e393 100644 --- a/e2e_test/source/basic/old_row_format_syntax/ddl.slt +++ b/e2e_test/source/basic/old_row_format_syntax/ddl.slt @@ -142,7 +142,7 @@ create table s ( query T show sources ---- - +s query T show tables diff --git a/src/frontend/planner_test/tests/testdata/output/subquery.yaml b/src/frontend/planner_test/tests/testdata/output/subquery.yaml index f9ae7cdee3f0d..e113a0aca4d1d 100644 --- a/src/frontend/planner_test/tests/testdata/output/subquery.yaml +++ b/src/frontend/planner_test/tests/testdata/output/subquery.yaml @@ -241,7 +241,7 @@ │ │ │ │ │ │ │ └─LogicalProject { exprs: [rw_system_tables.id, rw_system_tables.name, 'system table':Varchar, rw_system_tables.schema_id, rw_system_tables.owner, rw_system_tables.definition, rw_system_tables.acl] } │ │ │ │ │ │ │ └─LogicalSysScan { table: rw_system_tables, columns: [rw_system_tables.id, rw_system_tables.name, rw_system_tables.schema_id, rw_system_tables.owner, rw_system_tables.definition, rw_system_tables.acl] } │ │ │ │ │ │ └─LogicalProject { exprs: [rw_sources.id, rw_sources.name, 'source':Varchar, rw_sources.schema_id, rw_sources.owner, rw_sources.definition, rw_sources.acl] } - │ │ │ │ │ │ └─LogicalSysScan { table: rw_sources, columns: [rw_sources.id, rw_sources.name, rw_sources.schema_id, rw_sources.owner, rw_sources.connector, rw_sources.columns, rw_sources.format, rw_sources.row_encode, rw_sources.append_only, rw_sources.connection_id, rw_sources.definition, rw_sources.acl, rw_sources.initialized_at, rw_sources.created_at, rw_sources.initialized_at_cluster_version, rw_sources.created_at_cluster_version] } + │ │ │ │ │ │ └─LogicalSysScan { table: rw_sources, columns: [rw_sources.id, rw_sources.name, rw_sources.schema_id, rw_sources.owner, rw_sources.connector, rw_sources.columns, rw_sources.format, rw_sources.row_encode, rw_sources.append_only, rw_sources.associated_table_id, rw_sources.connection_id, rw_sources.definition, rw_sources.acl, rw_sources.initialized_at, rw_sources.created_at, rw_sources.initialized_at_cluster_version, rw_sources.created_at_cluster_version] } │ │ │ │ │ └─LogicalProject { exprs: [rw_indexes.id, rw_indexes.name, 'index':Varchar, rw_indexes.schema_id, rw_indexes.owner, rw_indexes.definition, rw_indexes.acl] } │ │ │ │ │ └─LogicalSysScan { table: rw_indexes, columns: [rw_indexes.id, rw_indexes.name, rw_indexes.primary_table_id, rw_indexes.key_columns, rw_indexes.include_columns, rw_indexes.schema_id, rw_indexes.owner, rw_indexes.definition, rw_indexes.acl, rw_indexes.initialized_at, rw_indexes.created_at, rw_indexes.initialized_at_cluster_version, rw_indexes.created_at_cluster_version] } │ │ │ │ └─LogicalProject { exprs: [rw_sinks.id, rw_sinks.name, 'sink':Varchar, rw_sinks.schema_id, rw_sinks.owner, rw_sinks.definition, rw_sinks.acl] } diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs index 5a05f69750caf..441ac99b48cda 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs @@ -32,6 +32,7 @@ struct RwSource { format: Option, row_encode: Option, append_only: bool, + associated_table_id: Option, connection_id: Option, definition: String, acl: String, @@ -51,40 +52,38 @@ fn read_rw_sources_info(reader: &SysCatalogReaderImpl) -> Result> Ok(schemas .flat_map(|schema| { - schema - .iter_source() - .filter(|s| s.associated_table_id.is_none()) - .map(|source| RwSource { - id: source.id as i32, - name: source.name.clone(), - schema_id: schema.id() as i32, - owner: source.owner as i32, - connector: source - .with_properties - .get(UPSTREAM_SOURCE_KEY) - .cloned() - .unwrap_or("".to_string()) - .to_uppercase(), - columns: source.columns.iter().map(|c| c.name().into()).collect(), - format: source - .info - .get_format() - .ok() - .map(|format| format.as_str_name().into()), - row_encode: source - .info - .get_row_encode() - .ok() - .map(|row_encode| row_encode.as_str_name().into()), - append_only: source.append_only, - connection_id: source.connection_id.map(|id| id as i32), - definition: source.create_sql(), - acl: get_acl_items(&Object::SourceId(source.id), false, &users, username_map), - initialized_at: source.initialized_at_epoch.map(|e| e.as_timestamptz()), - created_at: source.created_at_epoch.map(|e| e.as_timestamptz()), - initialized_at_cluster_version: source.initialized_at_cluster_version.clone(), - created_at_cluster_version: source.created_at_cluster_version.clone(), - }) + schema.iter_source().map(|source| RwSource { + id: source.id as i32, + name: source.name.clone(), + schema_id: schema.id() as i32, + owner: source.owner as i32, + connector: source + .with_properties + .get(UPSTREAM_SOURCE_KEY) + .cloned() + .unwrap_or("".to_string()) + .to_uppercase(), + columns: source.columns.iter().map(|c| c.name().into()).collect(), + format: source + .info + .get_format() + .ok() + .map(|format| format.as_str_name().into()), + row_encode: source + .info + .get_row_encode() + .ok() + .map(|row_encode| row_encode.as_str_name().into()), + append_only: source.append_only, + associated_table_id: source.associated_table_id.map(|id| id.table_id as i32), + connection_id: source.connection_id.map(|id| id as i32), + definition: source.create_sql(), + acl: get_acl_items(&Object::SourceId(source.id), false, &users, username_map), + initialized_at: source.initialized_at_epoch.map(|e| e.as_timestamptz()), + created_at: source.created_at_epoch.map(|e| e.as_timestamptz()), + initialized_at_cluster_version: source.initialized_at_cluster_version.clone(), + created_at_cluster_version: source.created_at_cluster_version.clone(), + }) }) .collect()) } diff --git a/src/frontend/src/handler/show.rs b/src/frontend/src/handler/show.rs index f2d5186b67962..b93a8032cbcd4 100644 --- a/src/frontend/src/handler/show.rs +++ b/src/frontend/src/handler/show.rs @@ -318,7 +318,6 @@ pub async fn handle_show_object( .read_guard() .get_schema_by_name(session.database(), &schema_or_default(&schema))? .iter_source() - .filter(|t| t.associated_table_id.is_none()) .map(|t| t.name.clone()) .collect(), ShowObject::Sink { schema } => catalog_reader