Skip to content

Commit

Permalink
Init commit
Browse files Browse the repository at this point in the history
Add debug logs, refactor stream/sink merge handling

Refactor Rust DDL: Cleanup imports, immutable cols, sink.rs trim

Refactor imports and simplify functions in alter_table_column and create_sink.

Updated `basic.slt` and `ddl_controller.rs` to fix tests and DDL ops.

Refactor table catalog methods & improve sink handling for table definition replacement.

Optimized column handling and cleaned up code in `alter_table_column.rs`.

Cleanup: Removed debug prints, refactored code in Rust files for database system readiness.

Enhanced sink handling and column changes in catalog and DDL operations.

Added sink_into_table_column migration

Refactored sink creation & updated DDL field init

Add end-to-end test for sink-table routines

Refactor stream_project.rs, add debug in dashboard mod.rs

Remove debug prints; add ID info to project node

Cleanups: Remove duplicate imports, reformat closures, and refactor multiline parameters for clarity

Add `original_target_columns` to Sink proto, SinkCatalog struct, and update `replace_table_with_definition`

Remove unused import in alter_table_column.rs; optimize column cloning with clone_from in notification_service.rs

Refactor table sink updates: Removed old logic in `notif_service.rs`, added `table_sink_catalog_update` in `catalog/mod.rs`.

Add `init` method to `CatalogController` to update table sinks on instantiation

Extended up method in MigrationTrait, added SelectStatement use

Refactor create_sink.rs by dropping PbField and reorg imports

Refactor `start_service` to instantiate `Arc<CatalogController>` inline.

Signed-off-by: Shanicky Chen <[email protected]>
  • Loading branch information
Shanicky Chen authored and shanicky committed Jul 23, 2024
1 parent c5607a3 commit c48c9c7
Show file tree
Hide file tree
Showing 19 changed files with 797 additions and 85 deletions.
201 changes: 201 additions & 0 deletions e2e_test/sink/sink_into_table/alter_column.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t_simple_1 (v1 int);

statement ok
create table m_simple (v1 int primary key);

statement ok
create sink s_simple_1 into m_simple as select v1 from t_simple_1;

statement ok
insert into t_simple_1 values (1), (2), (3);

statement ok
flush;

query I rowsort
select * from m_simple;
----
1
2
3

statement ok
alter table m_simple add column v2 int;

statement ok
insert into t_simple_1 values (4);

statement ok
flush;

query II rowsort
select * from m_simple;
----
1 NULL
2 NULL
3 NULL
4 NULL

statement ok
create table t_simple_2 (v1 int, v2 int);

statement ok
create sink s_simple_2 into m_simple as select v1, v2 from t_simple_2;

statement ok
insert into t_simple_2 values (100, 101), (200, 201), (300, 301);

statement ok
flush;

query II rowsort
select * from m_simple;
----
1 NULL
100 101
2 NULL
200 201
3 NULL
300 301
4 NULL

statement error dropping columns in target table of sinks is not supported
alter table m_simple drop column v2;

statement ok
drop sink s_simple_1;

statement ok
drop sink s_simple_2;

statement ok
drop table t_simple_1;

statement ok
drop table t_simple_2;

statement ok
drop table m_simple;

# target table with row_id as primary key
statement ok
create table t_s1 (v1 int);

statement ok
insert into t_s1 values (1), (2), (3);

statement ok
create table t_row_id_as_primary_key (v1 int, v2 int default 1000);

statement ok
create sink s1 into t_row_id_as_primary_key as select v1 from t_s1 with (type = 'append-only', force_append_only = 'true');

statement ok
flush;

query II rowsort
select * from t_row_id_as_primary_key;
----
1 1000
2 1000
3 1000

statement ok
alter table t_row_id_as_primary_key add column v3 int;

query III rowsort
select * from t_row_id_as_primary_key;
----
1 1000 NULL
2 1000 NULL
3 1000 NULL

statement ok
create sink s11 into t_row_id_as_primary_key as select v1+1000 as v1, v1+2000 as v2, v1+3000 as v3 from t_s1 with (type = 'append-only', force_append_only = 'true');

statement ok
flush;

query III rowsort
select * from t_row_id_as_primary_key;
----
1 1000 NULL
1001 2001 3001
1002 2002 3002
1003 2003 3003
2 1000 NULL
3 1000 NULL

statement ok
drop sink s1;

statement ok
drop sink s11;

statement ok
drop table t_row_id_as_primary_key;

statement ok
drop table t_s1;

# target table with append only
statement ok
create table t_s2 (v1 int);

statement ok
insert into t_s2 values (1), (2), (3);

statement ok
create table t_append_only (v1 int, v2 int default 1000) append only;

statement ok
create sink s2 into t_append_only as select v1 from t_s2 with (type = 'append-only', force_append_only = 'true');

statement ok
flush;

query II rowsort
select * from t_append_only;
----
1 1000
2 1000
3 1000

statement ok
alter table t_append_only add column v3 int;

query III rowsort
select * from t_append_only;
----
1 1000 NULL
2 1000 NULL
3 1000 NULL

statement ok
create sink s21 into t_append_only as select v1+1000 as v1, v1+2000 as v2, v1+3000 as v3 from t_s2 with (type = 'append-only', force_append_only = 'true');

query III rowsort
select * from t_append_only;
----
1 1000 NULL
1001 2001 3001
1002 2002 3002
1003 2003 3003
2 1000 NULL
3 1000 NULL

statement ok
drop sink s21;

statement ok
drop sink s2;

statement ok
drop table t_append_only;

statement ok
drop table t_s2;
4 changes: 0 additions & 4 deletions e2e_test/sink/sink_into_table/basic.slt
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ create table m_simple (v1 int primary key, v2 int);
statement ok
create sink s_simple_1 into m_simple as select v1, v2 from t_simple;

# we can't alter a table with incoming sinks
statement error Feature is not yet implemented: alter table with incoming sinks
alter table m_simple add column v3 int;

statement ok
drop sink s_simple_1;

Expand Down
3 changes: 3 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ message Sink {
// Handle the sink relies on any sceret. The key is the propertity name and the value is the secret id and type.
// Used for connect options.
map<string, secret.SecretRef> secret_refs = 25;

// only for the sink whose target is a table. Columns of the target table when the sink is created. At this point all the default columns of the target table are all handled by the project operator in the sink plan.
repeated plan_common.ColumnCatalog original_target_columns = 26;
}

message Subscription {
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/sink/catalog/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ impl SinkDesc {
created_at_cluster_version: None,
initialized_at_cluster_version: None,
create_type: self.create_type,
original_target_columns: vec![],
}
}

Expand Down
13 changes: 13 additions & 0 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,9 @@ pub struct SinkCatalog {

/// The secret reference for the sink, mapping from property name to secret id.
pub secret_refs: BTreeMap<String, PbSecretRef>,

/// Only for the sink whose target is a table. Columns of the target table when the sink is created. At this point all the default columns of the target table are all handled by the project operator in the sink plan.
pub original_target_columns: Vec<ColumnCatalog>,
}

impl SinkCatalog {
Expand Down Expand Up @@ -392,6 +395,11 @@ impl SinkCatalog {
initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
create_type: self.create_type.to_proto() as i32,
secret_refs: self.secret_refs.clone(),
original_target_columns: self
.original_target_columns
.iter()
.map(|c| c.to_protobuf())
.collect_vec(),
}
}

Expand Down Expand Up @@ -486,6 +494,11 @@ impl From<PbSink> for SinkCatalog {
created_at_cluster_version: pb.created_at_cluster_version,
create_type: CreateType::from_proto(create_type),
secret_refs: pb.secret_refs,
original_target_columns: pb
.original_target_columns
.into_iter()
.map(ColumnCatalog::from)
.collect_vec(),
}
}
}
Expand Down
18 changes: 18 additions & 0 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,24 @@ impl TableCatalog {
}
}

pub fn default_column_exprs(columns: &[ColumnCatalog]) -> Vec<ExprImpl> {
columns
.iter()
.map(|c| {
if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
expr,
..
})) = c.column_desc.generated_or_default_column.as_ref()
{
ExprImpl::from_expr_proto(expr.as_ref().unwrap())
.expect("expr in default columns corrupted")
} else {
ExprImpl::literal_null(c.data_type().clone())
}
})
.collect()
}

pub fn default_columns(&self) -> impl Iterator<Item = (usize, ExprImpl)> + '_ {
self.columns.iter().enumerate().filter_map(|(i, c)| {
if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
Expand Down
Loading

0 comments on commit c48c9c7

Please sign in to comment.