Skip to content

Commit

Permalink
feat: alter column for table with connector (#12164)
Browse files Browse the repository at this point in the history
  • Loading branch information
wugouzi authored and Li0k committed Sep 15, 2023
1 parent 8267947 commit 22b1ab9
Show file tree
Hide file tree
Showing 18 changed files with 258 additions and 67 deletions.
4 changes: 4 additions & 0 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ chmod +x ./scripts/source/prepare_data_after_alter.sh
./scripts/source/prepare_data_after_alter.sh 2
sqllogictest -p 4566 -d dev './e2e_test/source/basic/alter/kafka_after_new_data.slt'

echo "--- e2e, kafka alter source again"
./scripts/source/prepare_data_after_alter.sh 3
sqllogictest -p 4566 -d dev './e2e_test/source/basic/alter/kafka_after_new_data_2.slt'

echo "--- Run CH-benCHmark"
./risedev slt -p 4566 -d dev './e2e_test/ch_benchmark/batch/ch_benchmark.slt'
./risedev slt -p 4566 -d dev './e2e_test/ch_benchmark/streaming/*.slt'
28 changes: 25 additions & 3 deletions e2e_test/source/basic/alter/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,22 @@ CREATE SOURCE s2 (v2 varchar) with (
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE TABLE t (v1 int) with (
connector = 'kafka',
topic = 'kafka_alter',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;


statement ok
create materialized view mv1 as select * from s1;

statement ok
create materialized view mv2 as select * from s2;

sleep 10s
sleep 5s

statement ok
flush;
Expand All @@ -35,6 +44,11 @@ select * from s2;
----
11

query I
select * from t;
----
1

# alter source
statement ok
alter source s1 add column v2 varchar;
Expand All @@ -49,7 +63,10 @@ create materialized view mv3 as select * from s1;
statement ok
create materialized view mv4 as select * from s2;

sleep 10s
statement ok
alter table t add column v2 varchar;

sleep 5s

statement ok
flush;
Expand Down Expand Up @@ -84,14 +101,19 @@ select * from mv4
----
11 NULL

query IT
select * from t
----
1 NULL

# alter source again
statement ok
alter source s1 add column v3 int;

statement ok
create materialized view mv5 as select * from s1;

sleep 10s
sleep 5s

statement ok
flush;
Expand Down
15 changes: 15 additions & 0 deletions e2e_test/source/basic/alter/kafka_after_new_data.slt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,21 @@ select * from mv5
1 11 111
2 22 222

query IT rowsort
select * from t
----
1 NULL
2 22

statement ok
alter table t add column v3 int;

query IT rowsort
select * from t
----
1 NULL NULL
2 22 NULL

statement ok
drop materialized view mv1

Expand Down
14 changes: 14 additions & 0 deletions e2e_test/source/basic/alter/kafka_after_new_data_2.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
sleep 5s

statement ok
flush;

query IT rowsort
select * from t
----
1 NULL NULL
2 22 NULL
3 33 333

statement ok
drop table t;
2 changes: 2 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ message ReplaceTablePlanRequest {
stream_plan.StreamFragmentGraph fragment_graph = 2;
// The mapping from the old columns to the new columns of the table.
catalog.ColIndexMapping table_col_index_mapping = 3;
// Source catalog of table's associated source
catalog.Source source = 4;
}

message ReplaceTablePlanResponse {
Expand Down
1 change: 1 addition & 0 deletions scripts/source/alter_data/kafka_alter.3
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"v1": 3, "v2": "33", "v3": 333}
4 changes: 3 additions & 1 deletion src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub trait CatalogWriter: Send + Sync {

async fn replace_table(
&self,
source: Option<PbSource>,
table: PbTable,
graph: StreamFragmentGraph,
mapping: ColIndexMapping,
Expand Down Expand Up @@ -229,13 +230,14 @@ impl CatalogWriter for CatalogWriterImpl {

async fn replace_table(
&self,
source: Option<PbSource>,
table: PbTable,
graph: StreamFragmentGraph,
mapping: ColIndexMapping,
) -> Result<()> {
let version = self
.meta_client
.replace_table(table, graph, mapping)
.replace_table(source, table, graph, mapping)
.await?;
self.wait_version(version).await
}
Expand Down
6 changes: 6 additions & 0 deletions src/frontend/src/handler/alter_source_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ pub async fn handle_alter_source_column(
None.into(),
)));
}
SourceEncode::Json if catalog.info.use_schema_registry => {
return Err(RwError::from(ErrorCode::NotImplemented(
"Alter source with schema registry".into(),
None.into(),
)));
}
SourceEncode::Invalid | SourceEncode::Native => {
return Err(RwError::from(ErrorCode::NotSupported(
format!("Alter source with encode {:?}", encode),
Expand Down
89 changes: 65 additions & 24 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@ use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
use risingwave_pb::catalog::Table;
use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
use risingwave_pb::stream_plan::StreamFragmentGraph;
use risingwave_sqlparser::ast::{AlterTableOperation, ColumnOption, ObjectName, Statement};
use risingwave_sqlparser::ast::{
AlterTableOperation, ColumnOption, Encode, ObjectName, SourceSchemaV2, Statement,
};
use risingwave_sqlparser::parser::Parser;

use super::create_source::get_json_schema_location;
use super::create_table::{gen_create_table_plan, ColumnIdGenerator};
use super::{HandlerArgs, RwPgResponse};
use crate::catalog::root_catalog::SchemaPath;
use crate::catalog::table_catalog::TableType;
use crate::handler::create_table::gen_create_table_plan_with_source;
use crate::{build_graph, Binder, OptimizerContext, TableCatalog};

/// Handle `ALTER TABLE [ADD|DROP] COLUMN` statements. The `operation` must be either `AddColumn` or
Expand All @@ -51,13 +56,6 @@ pub async fn handle_alter_table_column(
reader.get_table_by_name(db_name, schema_path, &real_table_name)?;

match table.table_type() {
// Do not allow altering a table with a connector. It should be done passively according
// to the messages from the connector.
TableType::Table if table.has_associated_source() => {
Err(ErrorCode::InvalidInputSyntax(format!(
"cannot alter table \"{table_name}\" because it has a connector"
)))?
}
TableType::Table => {}

_ => Err(ErrorCode::InvalidInputSyntax(format!(
Expand All @@ -82,9 +80,26 @@ pub async fn handle_alter_table_column(
.context("unable to parse original table definition")?
.try_into()
.unwrap();
let Statement::CreateTable { columns, .. } = &mut definition else {
let Statement::CreateTable {
columns,
source_schema,
..
} = &mut definition
else {
panic!("unexpected statement: {:?}", definition);
};
let source_schema = source_schema
.clone()
.map(|source_schema| source_schema.into_source_schema_v2());

if let Some(source_schema) = &source_schema {
if schema_has_schema_registry(source_schema) {
return Err(RwError::from(ErrorCode::NotImplemented(
"Alter table with source having schema registry".into(),
None.into(),
)));
}
}

match operation {
AlterTableOperation::AddColumn {
Expand Down Expand Up @@ -170,20 +185,32 @@ pub async fn handle_alter_table_column(
panic!("unexpected statement type: {:?}", definition);
};

let (graph, table) = {
let (graph, table, source) = {
let context = OptimizerContext::from_handler_args(handler_args);
let (plan, source, table) = gen_create_table_plan(
context,
table_name,
columns,
constraints,
col_id_gen,
source_watermarks,
append_only,
)?;

// We should already have rejected the case where the table has a connector.
assert!(source.is_none());
let (plan, source, table) = match source_schema {
Some(source_schema) => {
gen_create_table_plan_with_source(
context,
table_name,
columns,
constraints,
source_schema,
source_watermarks,
col_id_gen,
append_only,
)
.await?
}
None => gen_create_table_plan(
context,
table_name,
columns,
constraints,
col_id_gen,
source_watermarks,
append_only,
)?,
};

// TODO: avoid this backward conversion.
if TableCatalog::from(&table).pk_column_ids() != original_catalog.pk_column_ids() {
Expand All @@ -203,10 +230,13 @@ pub async fn handle_alter_table_column(
// Fill the original table ID.
let table = Table {
id: original_catalog.id().table_id(),
optional_associated_source_id: original_catalog
.associated_source_id()
.map(|source_id| OptionalAssociatedSourceId::AssociatedSourceId(source_id.into())),
..table
};

(graph, table)
(graph, table, source)
};

// Calculate the mapping from the original columns to the new columns.
Expand All @@ -226,12 +256,23 @@ pub async fn handle_alter_table_column(
let catalog_writer = session.catalog_writer()?;

catalog_writer
.replace_table(table, graph, col_index_mapping)
.replace_table(source, table, graph, col_index_mapping)
.await?;

Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
}

fn schema_has_schema_registry(schema: &SourceSchemaV2) -> bool {
match schema.row_encode {
Encode::Avro | Encode::Protobuf => true,
Encode::Json => {
let mut options = schema.gen_options().unwrap();
matches!(get_json_schema_location(&mut options), Ok(Some(_)))
}
_ => false,
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ fn consume_string_from_options(
))))
}

fn get_json_schema_location(
pub fn get_json_schema_location(
row_options: &mut BTreeMap<String, String>,
) -> Result<Option<(AstString, bool)>> {
let schema_location = try_consume_string_from_options(row_options, "schema.location");
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/handler/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ pub fn get_connection_name(with_properties: &BTreeMap<String, String>) -> Option
.get(CONNECTION_NAME_KEY)
.map(|s| s.to_lowercase())
}

#[cfg(test)]
mod tests {
use bytes::BytesMut;
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ impl CatalogWriter for MockCatalogWriter {

async fn replace_table(
&self,
_source: Option<PbSource>,
table: PbTable,
_graph: StreamFragmentGraph,
_mapping: ColIndexMapping,
Expand Down
9 changes: 9 additions & 0 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ pub enum Command {
new_table_fragments: TableFragments,
merge_updates: Vec<MergeUpdate>,
dispatchers: HashMap<ActorId, Vec<Dispatcher>>,
init_split_assignment: SplitAssignment,
},

/// `SourceSplitAssignment` generates Plain(Mutation::Splits) for pushing initialized splits or
Expand Down Expand Up @@ -352,6 +353,7 @@ impl CommandContext {
old_table_fragments,
merge_updates,
dispatchers,
init_split_assignment,
..
} => {
let dropped_actors = old_table_fragments.actor_ids();
Expand All @@ -368,10 +370,16 @@ impl CommandContext {
})
.collect();

let actor_splits = init_split_assignment
.values()
.flat_map(build_actor_connector_splits)
.collect();

Some(Mutation::Update(UpdateMutation {
actor_new_dispatchers,
merge_update: merge_updates.clone(),
dropped_actors,
actor_splits,
..Default::default()
}))
}
Expand Down Expand Up @@ -761,6 +769,7 @@ impl CommandContext {
new_table_fragments,
merge_updates,
dispatchers,
..
} => {
let table_ids = HashSet::from_iter(std::iter::once(old_table_fragments.table_id()));

Expand Down
Loading

0 comments on commit 22b1ab9

Please sign in to comment.