diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 6732f4c9e7281..847007f574368 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -133,6 +133,12 @@ cargo make ci-start ci-pubsub cargo run --bin prepare_ci_pubsub sqllogictest -p 4566 -d dev './e2e_test/source/basic/*.slt' sqllogictest -p 4566 -d dev './e2e_test/source/basic/old_row_format_syntax/*.slt' +sqllogictest -p 4566 -d dev './e2e_test/source/basic/alter/kafka.slt' + +echo "--- e2e, kafka alter source" +chmod +x ./scripts/source/prepare_data_after_alter.sh +./scripts/source/prepare_data_after_alter.sh 2 +sqllogictest -p 4566 -d dev './e2e_test/source/basic/alter/kafka_after_new_data.slt' echo "--- Run CH-benCHmark" ./risedev slt -p 4566 -d dev './e2e_test/ch_benchmark/batch/ch_benchmark.slt' diff --git a/e2e_test/source/basic/alter/kafka.slt b/e2e_test/source/basic/alter/kafka.slt new file mode 100644 index 0000000000000..6e2b7b88d2727 --- /dev/null +++ b/e2e_test/source/basic/alter/kafka.slt @@ -0,0 +1,113 @@ +statement ok +CREATE SOURCE s1 (v1 int) with ( + connector = 'kafka', + topic = 'kafka_alter', + properties.bootstrap.server = 'message_queue:29092', + scan.startup.mode = 'earliest' +) FORMAT PLAIN ENCODE JSON; + +statement ok +CREATE SOURCE s2 (v2 varchar) with ( + connector = 'kafka', + topic = 'kafka_alter', + properties.bootstrap.server = 'message_queue:29092', + scan.startup.mode = 'earliest' +) FORMAT PLAIN ENCODE JSON; + +statement ok +create materialized view mv1 as select * from s1; + +statement ok +create materialized view mv2 as select * from s2; + +sleep 10s + +statement ok +flush; + +query I +select * from s1; +---- +1 + +query T +select * from s2; +---- +11 + +# alter source +statement ok +alter source s1 add column v2 varchar; + +# alter source with null column +statement ok +alter source s2 add column v4 int; + +statement ok +create materialized view mv3 as select * from s1; + +statement ok +create materialized view mv4 as select * from s2; + +sleep 10s + +statement ok +flush; + +query IT +select * from s1 +---- +1 11 + +query TI +select * from s2 +---- +11 NULL + +query I +select * from mv1 +---- +1 + +query T +select * from mv2 +---- +11 + +query IT +select * from mv3 +---- +1 11 + +query TI +select * from mv4 +---- +11 NULL + +# alter source again +statement ok +alter source s1 add column v3 int; + +statement ok +create materialized view mv5 as select * from s1; + +sleep 10s + +statement ok +flush; + +query ITI +select * from s1 +---- +1 11 111 + +query ITI +select * from mv5 +---- +1 11 111 + +# check definition after altering +query TT +show create source s1; +---- +public.s1 CREATE SOURCE s1 (v1 INT, v2 CHARACTER VARYING, v3 INT) WITH (connector = 'kafka', topic = 'kafka_alter', properties.bootstrap.server = 'message_queue:29092', scan.startup.mode = 'earliest') FORMAT PLAIN ENCODE JSON \ No newline at end of file diff --git a/e2e_test/source/basic/alter/kafka_after_new_data.slt b/e2e_test/source/basic/alter/kafka_after_new_data.slt new file mode 100644 index 0000000000000..2b0ab659766e9 --- /dev/null +++ b/e2e_test/source/basic/alter/kafka_after_new_data.slt @@ -0,0 +1,67 @@ +sleep 5s + +statement ok +flush; + +query IT rowsort +select * from s1 +---- +1 11 111 +2 22 222 + +query I rowsort +select * from mv1 +---- +1 +2 + +query IT rowsort +select * from mv3 +---- +1 11 +2 22 + +query TI rowsort +select * from s2 +---- +11 NULL +22 NULL + +query T rowsort +select * from mv2 +---- +11 +22 + +query TI rowsort +select * from mv4 +---- +11 NULL +22 NULL + +query ITI rowsort +select * from mv5 +---- +1 11 111 +2 22 222 + +statement ok +drop materialized view mv1 + +statement ok +drop materialized view mv2 + +statement ok +drop materialized view mv3 + +statement ok +drop materialized view mv4 + +statement ok +drop materialized view mv5 + +statement ok +drop source s1 + +statement ok +drop source s2 \ No newline at end of file diff --git a/e2e_test/source/basic/kafka.slt b/e2e_test/source/basic/kafka.slt index 1d8bf2118f042..1d5801cfe4d23 100644 --- a/e2e_test/source/basic/kafka.slt +++ b/e2e_test/source/basic/kafka.slt @@ -645,6 +645,12 @@ select count(*) from s16 ---- 0 +statement error Feature is not yet implemented: Alter source with schema registry +alter source s18 add column v10 int; + +statement error Feature is not yet implemented: Alter source with schema registry +alter source s17 add column v10 int; + query III rowsort select * from s21; ---- diff --git a/proto/catalog.proto b/proto/catalog.proto index b60f8f33a1c19..85eb266ca0527 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -78,6 +78,9 @@ message Source { optional uint64 initialized_at_epoch = 15; optional uint64 created_at_epoch = 16; + + // Per-source catalog version, used by schema change. + uint64 version = 100; } enum SinkType { diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 6094a699ed7b4..35ae7e0b01bb4 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -67,6 +67,15 @@ message DropSourceResponse { uint64 version = 2; } +message AlterSourceRequest { + catalog.Source source = 1; +} + +message AlterSourceResponse { + common.Status status = 1; + uint64 version = 2; +} + message CreateSinkRequest { catalog.Sink sink = 1; stream_plan.StreamFragmentGraph fragment_graph = 2; @@ -316,6 +325,7 @@ service DdlService { rpc DropMaterializedView(DropMaterializedViewRequest) returns (DropMaterializedViewResponse); rpc CreateTable(CreateTableRequest) returns (CreateTableResponse); rpc AlterRelationName(AlterRelationNameRequest) returns (AlterRelationNameResponse); + rpc AlterSource(AlterSourceRequest) returns (AlterSourceResponse); rpc DropTable(DropTableRequest) returns (DropTableResponse); rpc RisectlListStateTables(RisectlListStateTablesRequest) returns (RisectlListStateTablesResponse); rpc CreateView(CreateViewRequest) returns (CreateViewResponse); diff --git a/scripts/source/alter_data/kafka_alter.2 b/scripts/source/alter_data/kafka_alter.2 new file mode 100644 index 0000000000000..39676d079c758 --- /dev/null +++ b/scripts/source/alter_data/kafka_alter.2 @@ -0,0 +1 @@ +{"v1": 2, "v2": "22", "v3": 222} \ No newline at end of file diff --git a/scripts/source/prepare_data_after_alter.sh b/scripts/source/prepare_data_after_alter.sh new file mode 100644 index 0000000000000..3225ce5d128d5 --- /dev/null +++ b/scripts/source/prepare_data_after_alter.sh @@ -0,0 +1,18 @@ +#!/usr/bin/env bash + +# Exits as soon as any line fails. +set -e + +KCAT_BIN="kcat" +# kcat bin name on linux is "kafkacat" +if [ "$(uname)" == "Linux" ] +then + KCAT_BIN="kafkacat" +fi + +SCRIPT_PATH="$(cd "$(dirname "$0")" >/dev/null 2>&1 && pwd)" +cd "$SCRIPT_PATH/.." || exit 1 + +FILE="./source/alter_data/kafka_alter.$1" +echo "Send data from $FILE" +cat $FILE | ${KCAT_BIN} -P -b message_queue:29092 -t kafka_alter \ No newline at end of file diff --git a/scripts/source/test_data/kafka_alter.1 b/scripts/source/test_data/kafka_alter.1 new file mode 100644 index 0000000000000..3cf65f5d494cf --- /dev/null +++ b/scripts/source/test_data/kafka_alter.1 @@ -0,0 +1 @@ +{"v1": 1, "v2": "11", "v3": 111} \ No newline at end of file diff --git a/src/common/src/catalog/mod.rs b/src/common/src/catalog/mod.rs index fab5e714a6ba9..070ad5d9f777e 100644 --- a/src/common/src/catalog/mod.rs +++ b/src/common/src/catalog/mod.rs @@ -41,6 +41,10 @@ pub type CatalogVersion = u64; pub type TableVersionId = u64; /// The default version ID for a new table. pub const INITIAL_TABLE_VERSION_ID: u64 = 0; +/// The version number of the per-source catalog. +pub type SourceVersionId = u64; +/// The default version ID for a new source. +pub const INITIAL_SOURCE_VERSION_ID: u64 = 0; pub const DEFAULT_DATABASE_NAME: &str = "dev"; pub const DEFAULT_SCHEMA_NAME: &str = "public"; diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index 1d6b6e4248db9..df3fce39004a1 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use parking_lot::lock_api::ArcRwLockReadGuard; use parking_lot::{RawRwLock, RwLock}; -use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, TableId}; +use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId}; use risingwave_common::error::ErrorCode::InternalError; use risingwave_common::error::{Result, RwError}; use risingwave_common::util::column_index_mapping::ColIndexMapping; @@ -30,7 +30,7 @@ use risingwave_rpc_client::MetaClient; use tokio::sync::watch::Receiver; use super::root_catalog::Catalog; -use super::DatabaseId; +use super::{DatabaseId, TableId}; use crate::user::UserId; pub type CatalogReadGuard = ArcRwLockReadGuard; @@ -86,6 +86,8 @@ pub trait CatalogWriter: Send + Sync { mapping: ColIndexMapping, ) -> Result<()>; + async fn alter_source_column(&self, source: PbSource) -> Result<()>; + async fn create_index( &self, index: PbIndex, @@ -220,6 +222,11 @@ impl CatalogWriter for CatalogWriterImpl { self.wait_version(version).await } + async fn alter_source_column(&self, source: PbSource) -> Result<()> { + let version = self.meta_client.alter_source_column(source).await?; + self.wait_version(version).await + } + async fn replace_table( &self, table: PbTable, diff --git a/src/frontend/src/catalog/root_catalog.rs b/src/frontend/src/catalog/root_catalog.rs index 147d55eb92736..f8fa09efa43e3 100644 --- a/src/frontend/src/catalog/root_catalog.rs +++ b/src/frontend/src/catalog/root_catalog.rs @@ -403,6 +403,17 @@ impl Catalog { .ok_or_else(|| CatalogError::NotFound("schema_id", schema_id.to_string())) } + pub fn get_source_by_id( + &self, + db_id: &DatabaseId, + schema_id: &SchemaId, + source_id: &SourceId, + ) -> CatalogResult<&Arc> { + self.get_schema_by_id(db_id, schema_id)? + .get_source_by_id(source_id) + .ok_or_else(|| CatalogError::NotFound("source_id", source_id.to_string())) + } + /// Refer to [`SearchPath`]. pub fn first_valid_schema( &self, diff --git a/src/frontend/src/catalog/source_catalog.rs b/src/frontend/src/catalog/source_catalog.rs index dfcbc3bd9cdd2..ec35cfb7bde28 100644 --- a/src/frontend/src/catalog/source_catalog.rs +++ b/src/frontend/src/catalog/source_catalog.rs @@ -14,12 +14,12 @@ use std::collections::BTreeMap; -use risingwave_common::catalog::ColumnCatalog; +use risingwave_common::catalog::{ColumnCatalog, SourceVersionId}; use risingwave_common::util::epoch::Epoch; use risingwave_pb::catalog::source::OptionalAssociatedTableId; use risingwave_pb::catalog::{PbSource, StreamSourceInfo, WatermarkDesc}; -use super::{ColumnId, ConnectionId, OwnedByUserCatalog, SourceId}; +use super::{ColumnId, ConnectionId, DatabaseId, OwnedByUserCatalog, SchemaId, SourceId}; use crate::catalog::TableId; use crate::user::UserId; use crate::WithOptions; @@ -43,6 +43,7 @@ pub struct SourceCatalog { pub connection_id: Option, pub created_at_epoch: Option, pub initialized_at_epoch: Option, + pub version: SourceVersionId, } impl SourceCatalog { @@ -50,6 +51,35 @@ impl SourceCatalog { pub fn create_sql(&self) -> String { self.definition.clone() } + + pub fn to_prost(&self, schema_id: SchemaId, database_id: DatabaseId) -> PbSource { + PbSource { + id: self.id, + schema_id, + database_id, + name: self.name.clone(), + row_id_index: self.row_id_index.map(|idx| idx as _), + columns: self.columns.iter().map(|c| c.to_protobuf()).collect(), + pk_column_ids: self.pk_col_ids.iter().map(Into::into).collect(), + properties: self.properties.clone().into_iter().collect(), + owner: self.owner, + info: Some(self.info.clone()), + watermark_descs: self.watermark_descs.clone(), + definition: self.definition.clone(), + connection_id: self.connection_id, + initialized_at_epoch: self.initialized_at_epoch.map(|x| x.0), + created_at_epoch: self.created_at_epoch.map(|x| x.0), + optional_associated_table_id: self + .associated_table_id + .map(|id| OptionalAssociatedTableId::AssociatedTableId(id.table_id)), + version: self.version, + } + } + + /// Get a reference to the source catalog's version. + pub fn version(&self) -> SourceVersionId { + self.version + } } impl From<&PbSource> for SourceCatalog { @@ -77,6 +107,7 @@ impl From<&PbSource> for SourceCatalog { .map(|id| match id { OptionalAssociatedTableId::AssociatedTableId(id) => id, }); + let version = prost.version; let connection_id = prost.connection_id; @@ -96,6 +127,7 @@ impl From<&PbSource> for SourceCatalog { connection_id, created_at_epoch: prost.created_at_epoch.map(Epoch::from), initialized_at_epoch: prost.initialized_at_epoch.map(Epoch::from), + version, } } } diff --git a/src/frontend/src/handler/alter_source_column.rs b/src/frontend/src/handler/alter_source_column.rs new file mode 100644 index 0000000000000..be139fd6d6976 --- /dev/null +++ b/src/frontend/src/handler/alter_source_column.rs @@ -0,0 +1,201 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use itertools::Itertools; +use pgwire::pg_response::{PgResponse, StatementType}; +use risingwave_common::catalog::ColumnId; +use risingwave_common::error::{ErrorCode, Result, RwError}; +use risingwave_connector::source::{SourceEncode, SourceStruct}; +use risingwave_source::source_desc::extract_source_struct; +use risingwave_sqlparser::ast::{ + AlterSourceOperation, ColumnDef, CreateSourceStatement, ObjectName, Statement, +}; +use risingwave_sqlparser::parser::Parser; + +use super::create_table::bind_sql_columns; +use super::{HandlerArgs, RwPgResponse}; +use crate::catalog::root_catalog::SchemaPath; +use crate::Binder; + +// Note for future drop column: +// 1. Dependencies of generated columns + +/// Handle `ALTER TABLE [ADD] COLUMN` statements. +pub async fn handle_alter_source_column( + handler_args: HandlerArgs, + source_name: ObjectName, + operation: AlterSourceOperation, +) -> Result { + // Get original definition + let session = handler_args.session; + let db_name = session.database(); + let (schema_name, real_source_name) = + Binder::resolve_schema_qualified_name(db_name, source_name.clone())?; + let search_path = session.config().get_search_path(); + let user_name = &session.auth_context().user_name; + + let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); + + let (db_id, schema_id, mut catalog) = { + let reader = session.env().catalog_reader().read_guard(); + let (source, schema_name) = + reader.get_source_by_name(db_name, schema_path, &real_source_name)?; + let db = reader.get_database_by_name(db_name)?; + let schema = db.get_schema_by_name(schema_name).unwrap(); + + session.check_privilege_for_drop_alter(schema_name, &**source)?; + + (db.id(), schema.id(), (**source).clone()) + }; + + // Currently only allow source without schema registry + let SourceStruct { encode, .. } = extract_source_struct(&catalog.info)?; + match encode { + SourceEncode::Avro | SourceEncode::Protobuf => { + return Err(RwError::from(ErrorCode::NotImplemented( + "Alter source with schema registry".into(), + None.into(), + ))); + } + SourceEncode::Invalid | SourceEncode::Native => { + return Err(RwError::from(ErrorCode::NotSupported( + format!("Alter source with encode {:?}", encode), + "Alter source with encode JSON | BYTES | CSV".into(), + ))); + } + _ => {} + } + + let columns = &mut catalog.columns; + match operation { + AlterSourceOperation::AddColumn { column_def } => { + let new_column_name = column_def.name.real_value(); + if columns + .iter() + .any(|c| c.column_desc.name == new_column_name) + { + Err(ErrorCode::InvalidInputSyntax(format!( + "column \"{new_column_name}\" of source \"{source_name}\" already exists" + )))? + } + catalog.definition = + alter_definition_add_column(&catalog.definition, column_def.clone())?; + let mut bound_column = bind_sql_columns(&[column_def])?.remove(0); + bound_column.column_desc.column_id = columns + .iter() + .fold(ColumnId::new(i32::MIN), |a, b| a.max(b.column_id())) + .next(); + columns.push(bound_column); + } + _ => unreachable!(), + } + + // update version + catalog.version += 1; + + let catalog_writer = session.catalog_writer()?; + catalog_writer + .alter_source_column(catalog.to_prost(schema_id, db_id)) + .await?; + + Ok(PgResponse::empty_result(StatementType::ALTER_SOURCE)) +} + +/// `alter_definition_add_column` adds a new column to the definition of the relation. +#[inline(always)] +pub fn alter_definition_add_column(definition: &str, column: ColumnDef) -> Result { + let ast = Parser::parse_sql(definition).expect("failed to parse relation definition"); + let mut stmt = ast + .into_iter() + .exactly_one() + .expect("should contains only one statement"); + + match &mut stmt { + Statement::CreateSource { + stmt: CreateSourceStatement { columns, .. }, + } => { + columns.push(column); + } + _ => unreachable!(), + } + + Ok(stmt.to_string()) +} + +#[cfg(test)] +pub mod tests { + use std::collections::HashMap; + + use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME}; + use risingwave_common::types::DataType; + + use crate::catalog::root_catalog::SchemaPath; + use crate::test_utils::LocalFrontend; + + #[tokio::test] + async fn test_alter_source_column_handler() { + let frontend = LocalFrontend::new(Default::default()).await; + let session = frontend.session_ref(); + let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME); + + let sql = r#"create source s (v1 int) with ( + connector = 'kafka', + topic = 'abc', + properties.bootstrap.server = 'localhost:29092', + ) FORMAT PLAIN ENCODE JSON;"#; + + frontend.run_sql(sql).await.unwrap(); + + let get_source = || { + let catalog_reader = session.env().catalog_reader().read_guard(); + catalog_reader + .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "s") + .unwrap() + .0 + .clone() + }; + + let source = get_source(); + let columns: HashMap<_, _> = source + .columns + .iter() + .map(|col| (col.name(), (col.data_type().clone(), col.column_id()))) + .collect(); + + let sql = "alter source s add column v2 varchar;"; + frontend.run_sql(sql).await.unwrap(); + + let altered_source = get_source(); + + let altered_columns: HashMap<_, _> = altered_source + .columns + .iter() + .map(|col| (col.name(), (col.data_type().clone(), col.column_id()))) + .collect(); + + // Check the new column. + assert_eq!(columns.len() + 1, altered_columns.len()); + assert_eq!(altered_columns["v2"].0, DataType::Varchar); + + // Check the old columns and IDs are not changed. + assert_eq!(columns["v1"], altered_columns["v1"]); + + // Check version + assert_eq!(source.version + 1, altered_source.version); + + // Check definition + let altered_sql = r#"CREATE SOURCE s (v1 INT, v2 CHARACTER VARYING) WITH (connector = 'kafka', topic = 'abc', properties.bootstrap.server = 'localhost:29092') FORMAT PLAIN ENCODE JSON"#; + assert_eq!(altered_sql, altered_source.definition); + } +} diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 9de708175caee..3a1e72abcf902 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -21,7 +21,7 @@ use maplit::{convert_args, hashmap}; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::catalog::{ is_column_ids_dedup, ColumnCatalog, ColumnDesc, TableId, DEFAULT_KEY_COLUMN_NAME, - KAFKA_TIMESTAMP_COLUMN_NAME, + INITIAL_SOURCE_VERSION_ID, KAFKA_TIMESTAMP_COLUMN_NAME, }; use risingwave_common::error::ErrorCode::{self, InvalidInputSyntax, ProtocolError}; use risingwave_common::error::{Result, RwError}; @@ -1065,6 +1065,7 @@ pub async fn handle_create_source( initialized_at_epoch: None, created_at_epoch: None, optional_associated_table_id: None, + version: INITIAL_SOURCE_VERSION_ID, }; let catalog_writer = session.catalog_writer()?; diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 49f57febe47a8..ed429a3eec54e 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -19,8 +19,8 @@ use fixedbitset::FixedBitSet; use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::catalog::{ - ColumnCatalog, ColumnDesc, TableId, TableVersionId, INITIAL_TABLE_VERSION_ID, - USER_COLUMN_ID_OFFSET, + ColumnCatalog, ColumnDesc, TableId, TableVersionId, INITIAL_SOURCE_VERSION_ID, + INITIAL_TABLE_VERSION_ID, USER_COLUMN_ID_OFFSET, }; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_pb::catalog::source::OptionalAssociatedTableId; @@ -609,6 +609,7 @@ fn gen_table_plan_inner( optional_associated_table_id: Some(OptionalAssociatedTableId::AssociatedTableId( TableId::placeholder().table_id, )), + version: INITIAL_SOURCE_VERSION_ID, }); let source_catalog = source.as_ref().map(|source| Rc::new((source).into())); diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index dc8ddd6d55cac..464d71427004d 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -33,6 +33,7 @@ use crate::session::SessionImpl; use crate::utils::WithOptions; mod alter_relation_rename; +mod alter_source_column; mod alter_system; mod alter_table_column; pub mod alter_user; @@ -494,6 +495,10 @@ pub async fn handle( name, operation: AlterSourceOperation::RenameSource { source_name }, } => alter_relation_rename::handle_rename_source(handler_args, name, source_name).await, + Statement::AlterSource { + name, + operation: operation @ AlterSourceOperation::AddColumn { .. }, + } => alter_source_column::handle_alter_source_column(handler_args, name, operation).await, Statement::AlterSystem { param, value } => { alter_system::handle_alter_system(handler_args, param, value).await } diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index 92704120b1ed9..05bab2ea5404a 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -256,7 +256,6 @@ pub fn get_connection_name(with_properties: &BTreeMap) -> Option .get(CONNECTION_NAME_KEY) .map(|s| s.to_lowercase()) } - #[cfg(test)] mod tests { use bytes::BytesMut; diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 0fc7462e22045..7519f6c281d8d 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -464,6 +464,11 @@ impl CatalogWriter for MockCatalogWriter { Ok(()) } + async fn alter_source_column(&self, source: PbSource) -> Result<()> { + self.catalog.write().update_source(&source); + Ok(()) + } + async fn alter_view_name(&self, _view_id: u32, _view_name: &str) -> Result<()> { unreachable!() } diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 67f1d1cbd15aa..0aa241c3bd53f 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -1546,6 +1546,28 @@ where .await } + pub async fn alter_source_column(&self, source: Source) -> MetaResult { + let source_id = source.get_id(); + let core = &mut *self.core.lock().await; + let database_core = &mut core.database; + database_core.ensure_source_id(source_id)?; + + let original_source = database_core.sources.get(&source_id).unwrap().clone(); + if original_source.get_version() + 1 != source.get_version() { + bail!("source version is stale"); + } + + let mut sources = BTreeMapTransaction::new(&mut database_core.sources); + sources.insert(source_id, source.clone()); + commit_meta!(self, sources)?; + + let version = self + .notify_frontend_relation_info(Operation::Update, RelationInfo::Source(source)) + .await; + + Ok(version) + } + pub async fn alter_index_name( &self, index_id: IndexId, diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 2a7349fde67d9..b4ec2a1028049 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -97,6 +97,7 @@ pub enum DdlCommand { DropStreamingJob(StreamingJobId, DropMode), ReplaceTable(StreamingJob, StreamFragmentGraphProto, ColIndexMapping), AlterRelationName(Relation, String), + AlterSourceColumn(Source), CreateConnection(Connection), DropConnection(ConnectionId), } @@ -264,6 +265,7 @@ where DdlCommand::DropConnection(connection_id) => { ctrl.drop_connection(connection_id).await } + DdlCommand::AlterSourceColumn(source) => ctrl.alter_source_column(source).await, } } .in_current_span(); @@ -347,6 +349,11 @@ where Ok(version) } + // Maybe we can unify `alter_source_column` and `alter_source_name`. + async fn alter_source_column(&self, source: Source) -> MetaResult { + self.catalog_manager.alter_source_column(source).await + } + async fn create_function(&self, function: Function) -> MetaResult { self.catalog_manager.create_function(&function).await } diff --git a/src/meta/src/rpc/service/ddl_service.rs b/src/meta/src/rpc/service/ddl_service.rs index 93b8b10282b6b..186597647a7ab 100644 --- a/src/meta/src/rpc/service/ddl_service.rs +++ b/src/meta/src/rpc/service/ddl_service.rs @@ -596,6 +596,21 @@ where })) } + async fn alter_source( + &self, + request: Request, + ) -> Result, Status> { + let AlterSourceRequest { source } = request.into_inner(); + let version = self + .ddl_controller + .run_command(DdlCommand::AlterSourceColumn(source.unwrap())) + .await?; + Ok(Response::new(AlterSourceResponse { + status: None, + version, + })) + } + async fn get_ddl_progress( &self, _request: Request, diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 3ece4deb6e24d..ddedcacd7c7f5 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -417,6 +417,15 @@ impl MetaClient { Ok(resp.version) } + // only adding columns is supported + pub async fn alter_source_column(&self, source: PbSource) -> Result { + let request = AlterSourceRequest { + source: Some(source), + }; + let resp = self.inner.alter_source(request).await?; + Ok(resp.version) + } + pub async fn replace_table( &self, table: PbTable, @@ -1662,6 +1671,7 @@ macro_rules! for_all_meta_rpc { ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse } ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse } ,{ ddl_client, replace_table_plan, ReplaceTablePlanRequest, ReplaceTablePlanResponse } + ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse } ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse } ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse } ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse } diff --git a/src/sqlparser/src/ast/ddl.rs b/src/sqlparser/src/ast/ddl.rs index 7b6bd3e911eb8..8cdc4ac2aa865 100644 --- a/src/sqlparser/src/ast/ddl.rs +++ b/src/sqlparser/src/ast/ddl.rs @@ -103,6 +103,7 @@ pub enum AlterSinkOperation { #[cfg_attr(feature = "visitor", derive(Visit, VisitMut))] pub enum AlterSourceOperation { RenameSource { source_name: ObjectName }, + AddColumn { column_def: ColumnDef }, } impl fmt::Display for AlterTableOperation { @@ -197,6 +198,9 @@ impl fmt::Display for AlterSourceOperation { AlterSourceOperation::RenameSource { source_name } => { write!(f, "RENAME TO {source_name}") } + AlterSourceOperation::AddColumn { column_def } => { + write!(f, "ADD COLUMN {column_def}") + } } } } diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 99c16b203e4b0..66e3b06a8b141 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -2981,8 +2981,13 @@ impl Parser { } else { return self.expected("TO after RENAME", self.peek_token()); } + } else if self.parse_keyword(Keyword::ADD) { + let _ = self.parse_keyword(Keyword::COLUMN); + let _if_not_exists = self.parse_keywords(&[Keyword::IF, Keyword::NOT, Keyword::EXISTS]); + let column_def = self.parse_column_def()?; + AlterSourceOperation::AddColumn { column_def } } else { - return self.expected("RENAME after ALTER SOURCE", self.peek_token()); + return self.expected("RENAME | ADD COLUMN after ALTER SOURCE", self.peek_token()); }; Ok(Statement::AlterSource { diff --git a/src/sqlparser/tests/testdata/alter.yaml b/src/sqlparser/tests/testdata/alter.yaml index 4dd0a20684923..ad8afff801418 100644 --- a/src/sqlparser/tests/testdata/alter.yaml +++ b/src/sqlparser/tests/testdata/alter.yaml @@ -7,3 +7,5 @@ formatted_sql: ALTER SYSTEM SET a = 'abc' - input: ALTER SYSTEM SET a = DEFAULT formatted_sql: ALTER SYSTEM SET a = DEFAULT +- input: ALTER SOURCE t ADD COLUMN id INT; + formatted_sql: ALTER SOURCE t ADD COLUMN id INT