Skip to content

Commit

Permalink
add unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
Rossil2012 committed Feb 7, 2024
1 parent 3603a6f commit 767af6a
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 3 deletions.
8 changes: 6 additions & 2 deletions src/frontend/src/handler/alter_source_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ pub async fn handle_alter_source_with_sr(
Ok(RwPgResponse::empty_result(StatementType::ALTER_SOURCE))
}

/// Apply the new `format_encode_options` to the source definition.
/// Apply the new `format_encode_options` to the source/table definition.
pub fn alter_definition_format_encode(
definition: &str,
format_encode_options: Vec<SqlOption>,
Expand All @@ -244,6 +244,10 @@ pub fn alter_definition_format_encode(
match &mut stmt {
Statement::CreateSource {
stmt: CreateSourceStatement { source_schema, .. },
}
| Statement::CreateTable {
source_schema: Some(source_schema),
..
} => {
match source_schema {
CompatibleSourceSchema::V2(schema) => {
Expand All @@ -269,7 +273,7 @@ pub mod tests {
use crate::test_utils::{create_proto_file, LocalFrontend, PROTO_FILE_DATA};

#[tokio::test]
async fn test_alter_source_column_handler() {
async fn test_alter_source_with_sr_handler() {
let proto_file = create_proto_file(PROTO_FILE_DATA);
let sql = format!(
r#"CREATE SOURCE src
Expand Down
77 changes: 76 additions & 1 deletion src/frontend/src/handler/alter_table_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_sqlparser::ast::{ConnectorSchema, ObjectName, Statement};
use risingwave_sqlparser::parser::Parser;

use super::alter_source_with_sr::alter_definition_format_encode;
use super::alter_table_column::{fetch_table_catalog_for_alter, schema_has_schema_registry};
use super::{HandlerArgs, RwPgResponse};
use crate::error::{ErrorCode, Result, RwError};
Expand Down Expand Up @@ -60,7 +61,12 @@ pub async fn handle_alter_table_with_sr(
.into());
}

let [definition]: [_; 1] = Parser::parse_sql(&original_table.definition)
let definition = alter_definition_format_encode(
&original_table.definition,
connector_schema.row_options.clone(),
)?;

let [definition]: [_; 1] = Parser::parse_sql(&definition)
.context("unable to parse original table definition")?
.try_into()
.unwrap();
Expand Down Expand Up @@ -125,3 +131,72 @@ pub async fn handle_alter_table_with_sr(

Ok(RwPgResponse::empty_result(StatementType::ALTER_TABLE))
}

#[cfg(test)]
pub mod tests {
use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};

use crate::catalog::root_catalog::SchemaPath;
use crate::test_utils::{create_proto_file, LocalFrontend, PROTO_FILE_DATA};

#[tokio::test]
async fn test_alter_table_with_sr_handler() {
let proto_file = create_proto_file(PROTO_FILE_DATA);
let sql = format!(
r#"CREATE TABLE t
WITH (
connector = 'kafka',
topic = 'test-topic',
properties.bootstrap.server = 'localhost:29092'
)
FORMAT PLAIN ENCODE PROTOBUF (
message = '.test.TestRecord',
schema.location = 'file://{}'
)"#,
proto_file.path().to_str().unwrap()
);
let frontend = LocalFrontend::new(Default::default()).await;
let session = frontend.session_ref();
let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);

frontend.run_sql(sql).await.unwrap();

let sql = format!(
r#"ALTER TABLE t FORMAT UPSERT ENCODE PROTOBUF (
message = '.test.TestRecord',
schema.location = 'file://{}'
)"#,
proto_file.path().to_str().unwrap()
);
assert!(frontend
.run_sql(sql)
.await
.unwrap_err()
.to_string()
.contains("the original definition is FORMAT Plain ENCODE Protobuf"));

let sql = format!(
r#"ALTER TABLE t FORMAT PLAIN ENCODE PROTOBUF (
message = '.test.TestRecordExt',
schema.location = 'file://{}'
)"#,
proto_file.path().to_str().unwrap()
);
frontend.run_sql(sql).await.unwrap();

let altered_table = session
.env()
.catalog_reader()
.read_guard()
.get_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
.unwrap()
.0
.clone();

let altered_sql = format!(
r#"CREATE TABLE t () WITH (connector = 'kafka', topic = 'test-topic', properties.bootstrap.server = 'localhost:29092') FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecordExt', schema.location = 'file://{}')"#,
proto_file.path().to_str().unwrap()
);
assert_eq!(altered_sql, altered_table.definition);
}
}

0 comments on commit 767af6a

Please sign in to comment.