diff --git a/src/frontend/src/handler/alter_source_column.rs b/src/frontend/src/handler/alter_source_column.rs index 385fc34dba44a..ef91e011365e7 100644 --- a/src/frontend/src/handler/alter_source_column.rs +++ b/src/frontend/src/handler/alter_source_column.rs @@ -171,10 +171,9 @@ pub fn alter_definition_add_column(definition: &str, column: ColumnDef) -> Resul #[cfg(test)] pub mod tests { - use std::collections::HashMap; + use std::collections::BTreeMap; use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME}; - use risingwave_common::types::DataType; use crate::catalog::root_catalog::SchemaPath; use crate::test_utils::LocalFrontend; @@ -211,47 +210,103 @@ pub mod tests { .await .unwrap(); - let get_source = || { + let get_source = |name: &str| { let catalog_reader = session.env().catalog_reader().read_guard(); catalog_reader - .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "s") + .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, name) .unwrap() .0 .clone() }; - let source = get_source(); - let columns: HashMap<_, _> = source + let source = get_source("s"); + + let sql = "alter source s_shared add column v2 varchar;"; + frontend.run_sql(sql).await.unwrap(); + + let altered_source = get_source("s_shared"); + let altered_columns: BTreeMap<_, _> = altered_source .columns .iter() .map(|col| (col.name(), (col.data_type().clone(), col.column_id()))) .collect(); - let sql = "alter source s_shared add column v2 varchar;"; - assert_eq!("Feature is not yet implemented: alter shared source\nTracking issue: https://github.com/risingwavelabs/risingwave/issues/16003", &frontend.run_sql(sql).await.unwrap_err().to_string()); + // Check the new column is added. + // Check the old columns and IDs are not changed. + expect_test::expect![[r#" + { + "_row_id": ( + Serial, + #0, + ), + "_rw_kafka_offset": ( + Varchar, + #4, + ), + "_rw_kafka_partition": ( + Varchar, + #3, + ), + "_rw_kafka_timestamp": ( + Timestamptz, + #2, + ), + "v1": ( + Int32, + #1, + ), + "v2": ( + Varchar, + #5, + ), + } + "#]] + .assert_debug_eq(&altered_columns); + + // Check version + assert_eq!(source.version + 1, altered_source.version); + + // Check definition + expect_test::expect!["CREATE SOURCE s_shared (v1 INT, v2 CHARACTER VARYING) WITH (connector = 'kafka', topic = 'abc', properties.bootstrap.server = 'localhost:29092') FORMAT PLAIN ENCODE JSON"].assert_eq(&altered_source.definition); + let sql = "alter source s add column v2 varchar;"; frontend.run_sql(sql).await.unwrap(); - let altered_source = get_source(); - - let altered_columns: HashMap<_, _> = altered_source + let altered_source = get_source("s"); + let altered_columns: BTreeMap<_, _> = altered_source .columns .iter() .map(|col| (col.name(), (col.data_type().clone(), col.column_id()))) .collect(); - // Check the new column. - assert_eq!(columns.len() + 1, altered_columns.len()); - assert_eq!(altered_columns["v2"].0, DataType::Varchar); - + // Check the new column is added. // Check the old columns and IDs are not changed. - assert_eq!(columns["v1"], altered_columns["v1"]); + expect_test::expect![[r#" + { + "_row_id": ( + Serial, + #0, + ), + "_rw_kafka_timestamp": ( + Timestamptz, + #2, + ), + "v1": ( + Int32, + #1, + ), + "v2": ( + Varchar, + #3, + ), + } + "#]] + .assert_debug_eq(&altered_columns); // Check version assert_eq!(source.version + 1, altered_source.version); // Check definition - let altered_sql = r#"CREATE SOURCE s (v1 INT, v2 CHARACTER VARYING) WITH (connector = 'kafka', topic = 'abc', properties.bootstrap.server = 'localhost:29092') FORMAT PLAIN ENCODE JSON"#; - assert_eq!(altered_sql, altered_source.definition); + expect_test::expect!["CREATE SOURCE s (v1 INT, v2 CHARACTER VARYING) WITH (connector = 'kafka', topic = 'abc', properties.bootstrap.server = 'localhost:29092') FORMAT PLAIN ENCODE JSON"].assert_eq(&altered_source.definition); } }