Skip to content

Commit

Permalink
chore: expose associated source in rw_sources (#17150)
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 authored Jun 11, 2024
1 parent 61bae13 commit 21babd5
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 39 deletions.
3 changes: 2 additions & 1 deletion e2e_test/ddl/alter_set_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,14 @@ CREATE SOURCE test_source (v INT) WITH (
statement ok
ALTER SOURCE test_source SET SCHEMA test_schema;

query TT
query TT rowsort
SELECT name AS sourcename, nspname AS schemaname
FROM rw_sources
JOIN pg_namespace ON pg_namespace.oid = rw_sources.schema_id
WHERE nspname = 'test_schema';
----
test_source test_schema
test_table test_schema

statement ok
CREATE SINK test_sink AS SELECT u FROM test_schema.test_table WITH (
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/source/basic/ddl.slt
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ create table s (
query T
show sources
----

s

query T
show tables
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/source/basic/old_row_format_syntax/ddl.slt
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ create table s (
query T
show sources
----

s

query T
show tables
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@
│ │ │ │ │ │ │ └─LogicalProject { exprs: [rw_system_tables.id, rw_system_tables.name, 'system table':Varchar, rw_system_tables.schema_id, rw_system_tables.owner, rw_system_tables.definition, rw_system_tables.acl] }
│ │ │ │ │ │ │ └─LogicalSysScan { table: rw_system_tables, columns: [rw_system_tables.id, rw_system_tables.name, rw_system_tables.schema_id, rw_system_tables.owner, rw_system_tables.definition, rw_system_tables.acl] }
│ │ │ │ │ │ └─LogicalProject { exprs: [rw_sources.id, rw_sources.name, 'source':Varchar, rw_sources.schema_id, rw_sources.owner, rw_sources.definition, rw_sources.acl] }
│ │ │ │ │ │ └─LogicalSysScan { table: rw_sources, columns: [rw_sources.id, rw_sources.name, rw_sources.schema_id, rw_sources.owner, rw_sources.connector, rw_sources.columns, rw_sources.format, rw_sources.row_encode, rw_sources.append_only, rw_sources.connection_id, rw_sources.definition, rw_sources.acl, rw_sources.initialized_at, rw_sources.created_at, rw_sources.initialized_at_cluster_version, rw_sources.created_at_cluster_version] }
│ │ │ │ │ │ └─LogicalSysScan { table: rw_sources, columns: [rw_sources.id, rw_sources.name, rw_sources.schema_id, rw_sources.owner, rw_sources.connector, rw_sources.columns, rw_sources.format, rw_sources.row_encode, rw_sources.append_only, rw_sources.associated_table_id, rw_sources.connection_id, rw_sources.definition, rw_sources.acl, rw_sources.initialized_at, rw_sources.created_at, rw_sources.initialized_at_cluster_version, rw_sources.created_at_cluster_version] }
│ │ │ │ │ └─LogicalProject { exprs: [rw_indexes.id, rw_indexes.name, 'index':Varchar, rw_indexes.schema_id, rw_indexes.owner, rw_indexes.definition, rw_indexes.acl] }
│ │ │ │ │ └─LogicalSysScan { table: rw_indexes, columns: [rw_indexes.id, rw_indexes.name, rw_indexes.primary_table_id, rw_indexes.key_columns, rw_indexes.include_columns, rw_indexes.schema_id, rw_indexes.owner, rw_indexes.definition, rw_indexes.acl, rw_indexes.initialized_at, rw_indexes.created_at, rw_indexes.initialized_at_cluster_version, rw_indexes.created_at_cluster_version] }
│ │ │ │ └─LogicalProject { exprs: [rw_sinks.id, rw_sinks.name, 'sink':Varchar, rw_sinks.schema_id, rw_sinks.owner, rw_sinks.definition, rw_sinks.acl] }
Expand Down
67 changes: 33 additions & 34 deletions src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ struct RwSource {
format: Option<String>,
row_encode: Option<String>,
append_only: bool,
associated_table_id: Option<i32>,
connection_id: Option<i32>,
definition: String,
acl: String,
Expand All @@ -51,40 +52,38 @@ fn read_rw_sources_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwSource>>

Ok(schemas
.flat_map(|schema| {
schema
.iter_source()
.filter(|s| s.associated_table_id.is_none())
.map(|source| RwSource {
id: source.id as i32,
name: source.name.clone(),
schema_id: schema.id() as i32,
owner: source.owner as i32,
connector: source
.with_properties
.get(UPSTREAM_SOURCE_KEY)
.cloned()
.unwrap_or("".to_string())
.to_uppercase(),
columns: source.columns.iter().map(|c| c.name().into()).collect(),
format: source
.info
.get_format()
.ok()
.map(|format| format.as_str_name().into()),
row_encode: source
.info
.get_row_encode()
.ok()
.map(|row_encode| row_encode.as_str_name().into()),
append_only: source.append_only,
connection_id: source.connection_id.map(|id| id as i32),
definition: source.create_sql(),
acl: get_acl_items(&Object::SourceId(source.id), false, &users, username_map),
initialized_at: source.initialized_at_epoch.map(|e| e.as_timestamptz()),
created_at: source.created_at_epoch.map(|e| e.as_timestamptz()),
initialized_at_cluster_version: source.initialized_at_cluster_version.clone(),
created_at_cluster_version: source.created_at_cluster_version.clone(),
})
schema.iter_source().map(|source| RwSource {
id: source.id as i32,
name: source.name.clone(),
schema_id: schema.id() as i32,
owner: source.owner as i32,
connector: source
.with_properties
.get(UPSTREAM_SOURCE_KEY)
.cloned()
.unwrap_or("".to_string())
.to_uppercase(),
columns: source.columns.iter().map(|c| c.name().into()).collect(),
format: source
.info
.get_format()
.ok()
.map(|format| format.as_str_name().into()),
row_encode: source
.info
.get_row_encode()
.ok()
.map(|row_encode| row_encode.as_str_name().into()),
append_only: source.append_only,
associated_table_id: source.associated_table_id.map(|id| id.table_id as i32),
connection_id: source.connection_id.map(|id| id as i32),
definition: source.create_sql(),
acl: get_acl_items(&Object::SourceId(source.id), false, &users, username_map),
initialized_at: source.initialized_at_epoch.map(|e| e.as_timestamptz()),
created_at: source.created_at_epoch.map(|e| e.as_timestamptz()),
initialized_at_cluster_version: source.initialized_at_cluster_version.clone(),
created_at_cluster_version: source.created_at_cluster_version.clone(),
})
})
.collect())
}
1 change: 0 additions & 1 deletion src/frontend/src/handler/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,6 @@ pub async fn handle_show_object(
.read_guard()
.get_schema_by_name(session.database(), &schema_or_default(&schema))?
.iter_source()
.filter(|t| t.associated_table_id.is_none())
.map(|t| t.name.clone())
.collect(),
ShowObject::Sink { schema } => catalog_reader
Expand Down

0 comments on commit 21babd5

Please sign in to comment.