Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Dec 10, 2024
1 parent 5f2a4f3 commit a52d3bd
Showing 1 changed file with 73 additions and 18 deletions.
91 changes: 73 additions & 18 deletions src/frontend/src/handler/alter_source_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,9 @@ pub fn alter_definition_add_column(definition: &str, column: ColumnDef) -> Resul

#[cfg(test)]
pub mod tests {
use std::collections::HashMap;
use std::collections::BTreeMap;

use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
use risingwave_common::types::DataType;

use crate::catalog::root_catalog::SchemaPath;
use crate::test_utils::LocalFrontend;
Expand Down Expand Up @@ -211,47 +210,103 @@ pub mod tests {
.await
.unwrap();

let get_source = || {
let get_source = |name: &str| {
let catalog_reader = session.env().catalog_reader().read_guard();
catalog_reader
.get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "s")
.get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, name)
.unwrap()
.0
.clone()
};

let source = get_source();
let columns: HashMap<_, _> = source
let source = get_source("s");

let sql = "alter source s_shared add column v2 varchar;";
frontend.run_sql(sql).await.unwrap();

let altered_source = get_source("s_shared");
let altered_columns: BTreeMap<_, _> = altered_source
.columns
.iter()
.map(|col| (col.name(), (col.data_type().clone(), col.column_id())))
.collect();

let sql = "alter source s_shared add column v2 varchar;";
assert_eq!("Feature is not yet implemented: alter shared source\nTracking issue: https://github.com/risingwavelabs/risingwave/issues/16003", &frontend.run_sql(sql).await.unwrap_err().to_string());
// Check the new column is added.
// Check the old columns and IDs are not changed.
expect_test::expect![[r#"
{
"_row_id": (
Serial,
#0,
),
"_rw_kafka_offset": (
Varchar,
#4,
),
"_rw_kafka_partition": (
Varchar,
#3,
),
"_rw_kafka_timestamp": (
Timestamptz,
#2,
),
"v1": (
Int32,
#1,
),
"v2": (
Varchar,
#5,
),
}
"#]]
.assert_debug_eq(&altered_columns);

// Check version
assert_eq!(source.version + 1, altered_source.version);

// Check definition
expect_test::expect!["CREATE SOURCE s_shared (v1 INT, v2 CHARACTER VARYING) WITH (connector = 'kafka', topic = 'abc', properties.bootstrap.server = 'localhost:29092') FORMAT PLAIN ENCODE JSON"].assert_eq(&altered_source.definition);

let sql = "alter source s add column v2 varchar;";
frontend.run_sql(sql).await.unwrap();

let altered_source = get_source();

let altered_columns: HashMap<_, _> = altered_source
let altered_source = get_source("s");
let altered_columns: BTreeMap<_, _> = altered_source
.columns
.iter()
.map(|col| (col.name(), (col.data_type().clone(), col.column_id())))
.collect();

// Check the new column.
assert_eq!(columns.len() + 1, altered_columns.len());
assert_eq!(altered_columns["v2"].0, DataType::Varchar);

// Check the new column is added.
// Check the old columns and IDs are not changed.
assert_eq!(columns["v1"], altered_columns["v1"]);
expect_test::expect![[r#"
{
"_row_id": (
Serial,
#0,
),
"_rw_kafka_timestamp": (
Timestamptz,
#2,
),
"v1": (
Int32,
#1,
),
"v2": (
Varchar,
#3,
),
}
"#]]
.assert_debug_eq(&altered_columns);

// Check version
assert_eq!(source.version + 1, altered_source.version);

// Check definition
let altered_sql = r#"CREATE SOURCE s (v1 INT, v2 CHARACTER VARYING) WITH (connector = 'kafka', topic = 'abc', properties.bootstrap.server = 'localhost:29092') FORMAT PLAIN ENCODE JSON"#;
assert_eq!(altered_sql, altered_source.definition);
expect_test::expect!["CREATE SOURCE s (v1 INT, v2 CHARACTER VARYING) WITH (connector = 'kafka', topic = 'abc', properties.bootstrap.server = 'localhost:29092') FORMAT PLAIN ENCODE JSON"].assert_eq(&altered_source.definition);
}
}

0 comments on commit a52d3bd

Please sign in to comment.