Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support altering the target table’s columns of the sink. #17203

Merged
merged 6 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ if [[ $mode == "standalone" ]]; then
fi

if [[ $mode == "single-node" ]]; then
export RUST_MIN_STACK=4194304
source ci/scripts/single-node-utils.sh
fi

Expand Down
201 changes: 201 additions & 0 deletions e2e_test/sink/sink_into_table/alter_column.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t_simple_1 (v1 int);

statement ok
create table m_simple (v1 int primary key);

statement ok
create sink s_simple_1 into m_simple as select v1 from t_simple_1;

statement ok
insert into t_simple_1 values (1), (2), (3);

statement ok
flush;

query I rowsort
select * from m_simple;
----
1
2
3

statement ok
alter table m_simple add column v2 int;

statement ok
insert into t_simple_1 values (4);

statement ok
flush;

query II rowsort
select * from m_simple;
----
1 NULL
2 NULL
3 NULL
4 NULL

statement ok
create table t_simple_2 (v1 int, v2 int);

statement ok
create sink s_simple_2 into m_simple as select v1, v2 from t_simple_2;

statement ok
insert into t_simple_2 values (100, 101), (200, 201), (300, 301);

statement ok
flush;

query II rowsort
select * from m_simple;
----
1 NULL
100 101
2 NULL
200 201
3 NULL
300 301
4 NULL

statement error dropping columns in target table of sinks is not supported
alter table m_simple drop column v2;

statement ok
drop sink s_simple_1;

statement ok
drop sink s_simple_2;

statement ok
drop table t_simple_1;

statement ok
drop table t_simple_2;

statement ok
drop table m_simple;

# target table with row_id as primary key
statement ok
create table t_s1 (v1 int);

statement ok
insert into t_s1 values (1), (2), (3);

statement ok
create table t_row_id_as_primary_key (v1 int, v2 int default 1000);

statement ok
create sink s1 into t_row_id_as_primary_key as select v1 from t_s1 with (type = 'append-only', force_append_only = 'true');

statement ok
flush;

query II rowsort
select * from t_row_id_as_primary_key;
----
1 1000
2 1000
3 1000

statement ok
alter table t_row_id_as_primary_key add column v3 int;

query III rowsort
select * from t_row_id_as_primary_key;
----
1 1000 NULL
2 1000 NULL
3 1000 NULL

statement ok
create sink s11 into t_row_id_as_primary_key as select v1+1000 as v1, v1+2000 as v2, v1+3000 as v3 from t_s1 with (type = 'append-only', force_append_only = 'true');

statement ok
flush;

query III rowsort
select * from t_row_id_as_primary_key;
----
1 1000 NULL
1001 2001 3001
1002 2002 3002
1003 2003 3003
2 1000 NULL
3 1000 NULL

statement ok
drop sink s1;

statement ok
drop sink s11;

statement ok
drop table t_row_id_as_primary_key;

statement ok
drop table t_s1;

# target table with append only
statement ok
create table t_s2 (v1 int);

statement ok
insert into t_s2 values (1), (2), (3);

statement ok
create table t_append_only (v1 int, v2 int default 1000) append only;

statement ok
create sink s2 into t_append_only as select v1 from t_s2 with (type = 'append-only', force_append_only = 'true');

statement ok
flush;

query II rowsort
select * from t_append_only;
----
1 1000
2 1000
3 1000

statement ok
alter table t_append_only add column v3 int;

query III rowsort
select * from t_append_only;
----
1 1000 NULL
2 1000 NULL
3 1000 NULL

statement ok
create sink s21 into t_append_only as select v1+1000 as v1, v1+2000 as v2, v1+3000 as v3 from t_s2 with (type = 'append-only', force_append_only = 'true');

query III rowsort
select * from t_append_only;
----
1 1000 NULL
1001 2001 3001
1002 2002 3002
1003 2003 3003
2 1000 NULL
3 1000 NULL

statement ok
drop sink s21;

statement ok
drop sink s2;

statement ok
drop table t_append_only;

statement ok
drop table t_s2;
4 changes: 0 additions & 4 deletions e2e_test/sink/sink_into_table/basic.slt
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ create table m_simple (v1 int primary key, v2 int);
statement ok
create sink s_simple_1 into m_simple as select v1, v2 from t_simple;

# we can't alter a table with incoming sinks
statement error Feature is not yet implemented: alter table with incoming sinks
alter table m_simple add column v3 int;

statement ok
drop sink s_simple_1;

Expand Down
3 changes: 3 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ message Sink {
// Handle the sink relies on any sceret. The key is the propertity name and the value is the secret id and type.
// Used for connect options.
map<string, secret.SecretRef> secret_refs = 25;

// only for the sink whose target is a table. Columns of the target table when the sink is created. At this point all the default columns of the target table are all handled by the project operator in the sink plan.
repeated plan_common.ColumnCatalog original_target_columns = 26;
}

message Subscription {
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/sink/catalog/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ impl SinkDesc {
created_at_cluster_version: None,
initialized_at_cluster_version: None,
create_type: self.create_type,
original_target_columns: vec![],
}
}

Expand Down
18 changes: 18 additions & 0 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,9 @@ pub struct SinkCatalog {

/// The secret reference for the sink, mapping from property name to secret id.
pub secret_refs: BTreeMap<String, PbSecretRef>,

/// Only for the sink whose target is a table. Columns of the target table when the sink is created. At this point all the default columns of the target table are all handled by the project operator in the sink plan.
pub original_target_columns: Vec<ColumnCatalog>,
shanicky marked this conversation as resolved.
Show resolved Hide resolved
}

impl SinkCatalog {
Expand Down Expand Up @@ -406,6 +409,11 @@ impl SinkCatalog {
initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
create_type: self.create_type.to_proto() as i32,
secret_refs: self.secret_refs.clone(),
original_target_columns: self
.original_target_columns
.iter()
.map(|c| c.to_protobuf())
.collect_vec(),
}
}

Expand Down Expand Up @@ -442,6 +450,11 @@ impl SinkCatalog {
pub fn downstream_pk_indices(&self) -> Vec<usize> {
self.downstream_pk.clone()
}

pub fn unique_identity(&self) -> String {
// We need to align with meta here, so we've utilized the proto method.
self.to_proto().unique_identity()
}
}

impl From<PbSink> for SinkCatalog {
Expand Down Expand Up @@ -500,6 +513,11 @@ impl From<PbSink> for SinkCatalog {
created_at_cluster_version: pb.created_at_cluster_version,
create_type: CreateType::from_proto(create_type),
secret_refs: pb.secret_refs,
original_target_columns: pb
.original_target_columns
.into_iter()
.map(ColumnCatalog::from)
.collect_vec(),
}
}
}
Expand Down
18 changes: 18 additions & 0 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,24 @@ impl TableCatalog {
}
}

pub fn default_column_exprs(columns: &[ColumnCatalog]) -> Vec<ExprImpl> {
columns
.iter()
.map(|c| {
if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
expr,
..
})) = c.column_desc.generated_or_default_column.as_ref()
{
ExprImpl::from_expr_proto(expr.as_ref().unwrap())
.expect("expr in default columns corrupted")
} else {
ExprImpl::literal_null(c.data_type().clone())
}
})
.collect()
}

pub fn default_columns(&self) -> impl Iterator<Item = (usize, ExprImpl)> + '_ {
self.columns.iter().enumerate().filter_map(|(i, c)| {
if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
Expand Down
Loading
Loading