Skip to content

Commit

Permalink
Merge branch 'main' of github.com:risingwavelabs/risingwave into bakj…
Browse files Browse the repository at this point in the history
…os/deno_udf
  • Loading branch information
bakjos committed Apr 10, 2024
2 parents 89b3d4c + f82821d commit 0c2360b
Show file tree
Hide file tree
Showing 29 changed files with 407 additions and 36 deletions.
91 changes: 91 additions & 0 deletions e2e_test/streaming/with_version_column.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@

statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement error
create table t0 (v1 int, v2 int, v3 int, v4 int, primary key(v1)) on conflict do update if not null with version column(v5);

statement error
create table t0 (v1 int, v2 int, v3 int, v4 bool, primary key(v1)) on conflict do update if not null with version column(v4);

statement error
create table t0 (v1 int, v2 int, v3 int, v4 bool, primary key(v1)) on conflict do update if not null with version column v4;

statement error
create table t0 (v1 int, v2 int, v3 int, v4 int, primary key(v1)) on conflict ignore with version column(v4);

statement ok
create table t1 (v1 int, v2 int, v3 int, v4 int, primary key(v1)) on conflict do update if not null with version column(v4);

statement ok
insert into t1 values (1,null,2, 4), (2,3,null, 1);

statement ok
insert into t1 values (3,null,5,2), (3,6,null, 1);

statement ok
insert into t1 values (1,5,null,5), (2,null, 6, 1);

statement ok
create materialized view mv1 as select * from t1;


query III rowsort
select v1, v2, v3, v4 from mv1;
----
1 5 2 5
2 3 6 1
3 NULL 5 2


statement ok
update t1 set v2 = 2 where v1 > 1;

statement ok
flush;

query IIII rowsort
select v1, v2, v3, v4 from mv1;
----
1 5 2 5
2 2 6 1
3 2 5 2

statement ok
drop materialized view mv1;

statement ok
drop table t1;


statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t2 (v1 int, v2 int, v3 int, v4 int, primary key(v1)) on conflict overwrite with version column(v4);

statement ok
insert into t2 values (1,null,2, 4), (2,3,null, 1);

statement ok
insert into t2 values (3,null,5,2), (3,6,null, 1);

statement ok
insert into t2 values (1,5,null,3), (2,null, 6, 1);

statement ok
create materialized view mv2 as select * from t2;


query III rowsort
select v1, v2, v3, v4 from mv2;
----
1 NULL 2 4
2 NULL 6 1
3 NULL 5 2

statement ok
drop materialized view mv2;

statement ok
drop table t2;
3 changes: 3 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,9 @@ message Table {
// TTL of the record in the table, to ensure the consistency with other tables in the streaming plan, it only applies to append-only tables.
optional uint32 retention_seconds = 37;

// This field specifies the index of the column set in the "with version column" within all the columns. It is used for filtering during "on conflict" operations.
optional uint32 version_column_index = 38;

// Per-table catalog version, used by schema change. `None` for internal tables and tests.
// Not to be confused with the global catalog version for notification service.
TableVersion version = 100;
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/planner_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ impl TestCase {
source_watermarks,
append_only,
on_conflict,
with_version_column,
cdc_table_info,
include_column_options,
wildcard_idx,
Expand All @@ -449,6 +450,7 @@ impl TestCase {
source_watermarks,
append_only,
on_conflict,
with_version_column,
cdc_table_info,
include_column_options,
)
Expand Down
7 changes: 7 additions & 0 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ pub struct TableCatalog {
/// `No Check`.
pub conflict_behavior: ConflictBehavior,

pub version_column_index: Option<usize>,

pub read_prefix_len_hint: usize,

/// Per-table catalog version, used by schema change. `None` for internal tables and tests.
Expand Down Expand Up @@ -406,6 +408,7 @@ impl TableCatalog {
watermark_indices: self.watermark_columns.ones().map(|x| x as _).collect_vec(),
dist_key_in_pk: self.dist_key_in_pk.iter().map(|x| *x as _).collect(),
handle_pk_conflict_behavior: self.conflict_behavior.to_protobuf().into(),
version_column_index: self.version_column_index.map(|value| value as u32),
cardinality: Some(self.cardinality.to_protobuf()),
initialized_at_epoch: self.initialized_at_epoch.map(|epoch| epoch.0),
created_at_epoch: self.created_at_epoch.map(|epoch| epoch.0),
Expand Down Expand Up @@ -487,6 +490,7 @@ impl From<PbTable> for TableCatalog {
let mut col_index: HashMap<i32, usize> = HashMap::new();

let conflict_behavior = ConflictBehavior::from_protobuf(&tb_conflict_behavior);
let version_column_index = tb.version_column_index.map(|value| value as usize);
let columns: Vec<ColumnCatalog> = tb.columns.into_iter().map(ColumnCatalog::from).collect();
for (idx, catalog) in columns.clone().into_iter().enumerate() {
let col_name = catalog.name();
Expand Down Expand Up @@ -526,6 +530,7 @@ impl From<PbTable> for TableCatalog {
value_indices: tb.value_indices.iter().map(|x| *x as _).collect(),
definition: tb.definition,
conflict_behavior,
version_column_index,
read_prefix_len_hint: tb.read_prefix_len_hint as usize,
version: tb.version.map(TableVersion::from_prost),
watermark_columns,
Expand Down Expand Up @@ -640,6 +645,7 @@ mod tests {
incoming_sinks: vec![],
created_at_cluster_version: None,
initialized_at_cluster_version: None,
version_column_index: None,
}
.into();

Expand Down Expand Up @@ -700,6 +706,7 @@ mod tests {
created_at_cluster_version: None,
initialized_at_cluster_version: None,
dependent_relations: vec![],
version_column_index: None,
}
);
assert_eq!(table, TableCatalog::from(table.to_prost(0, 0)));
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub async fn replace_table_with_definition(
source_watermarks,
append_only,
on_conflict,
with_version_column,
wildcard_idx,
..
} = definition
Expand All @@ -70,6 +71,7 @@ pub async fn replace_table_with_definition(
source_watermarks,
append_only,
on_conflict,
with_version_column,
)
.await?;

Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,7 @@ pub(crate) async fn reparse_table_for_sink(
source_watermarks,
append_only,
on_conflict,
with_version_column,
..
} = definition
else {
Expand All @@ -631,6 +632,7 @@ pub(crate) async fn reparse_table_for_sink(
source_watermarks,
append_only,
on_conflict,
with_version_column,
)
.await?;

Expand Down
19 changes: 19 additions & 0 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ pub(crate) async fn gen_create_table_plan_with_source(
mut col_id_gen: ColumnIdGenerator,
append_only: bool,
on_conflict: Option<OnConflict>,
with_version_column: Option<String>,
include_column_options: IncludeOption,
) -> Result<(PlanRef, Option<PbSource>, PbTable)> {
if append_only
Expand Down Expand Up @@ -554,6 +555,7 @@ pub(crate) async fn gen_create_table_plan_with_source(
watermark_descs,
append_only,
on_conflict,
with_version_column,
Some(col_id_gen.into_version()),
)
}
Expand All @@ -569,6 +571,7 @@ pub(crate) fn gen_create_table_plan(
source_watermarks: Vec<SourceWatermark>,
append_only: bool,
on_conflict: Option<OnConflict>,
with_version_column: Option<String>,
) -> Result<(PlanRef, Option<PbSource>, PbTable)> {
let definition = context.normalized_sql().to_owned();
let mut columns = bind_sql_columns(&column_defs)?;
Expand All @@ -587,6 +590,7 @@ pub(crate) fn gen_create_table_plan(
source_watermarks,
append_only,
on_conflict,
with_version_column,
Some(col_id_gen.into_version()),
)
}
Expand All @@ -603,6 +607,7 @@ pub(crate) fn gen_create_table_plan_without_bind(
source_watermarks: Vec<SourceWatermark>,
append_only: bool,
on_conflict: Option<OnConflict>,
with_version_column: Option<String>,
version: Option<TableVersion>,
) -> Result<(PlanRef, Option<PbSource>, PbTable)> {
ensure_table_constraints_supported(&constraints)?;
Expand Down Expand Up @@ -636,6 +641,7 @@ pub(crate) fn gen_create_table_plan_without_bind(
watermark_descs,
append_only,
on_conflict,
with_version_column,
version,
)
}
Expand All @@ -653,6 +659,7 @@ fn gen_table_plan_inner(
watermark_descs: Vec<WatermarkDesc>,
append_only: bool,
on_conflict: Option<OnConflict>,
with_version_column: Option<String>,
version: Option<TableVersion>, /* TODO: this should always be `Some` if we support `ALTER
* TABLE` for `CREATE TABLE AS`. */
) -> Result<(PlanRef, Option<PbSource>, PbTable)> {
Expand Down Expand Up @@ -749,6 +756,7 @@ fn gen_table_plan_inner(
row_id_index,
append_only,
on_conflict,
with_version_column,
watermark_descs,
version,
is_external_source,
Expand All @@ -771,6 +779,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_source(
constraints: Vec<TableConstraint>,
mut col_id_gen: ColumnIdGenerator,
on_conflict: Option<OnConflict>,
with_version_column: Option<String>,
) -> Result<(PlanRef, PbTable)> {
let session = context.session_ctx().clone();
let db_name = session.database();
Expand Down Expand Up @@ -873,6 +882,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_source(
None,
append_only,
on_conflict,
with_version_column,
vec![],
Some(col_id_gen.into_version()),
true,
Expand Down Expand Up @@ -940,6 +950,7 @@ pub(super) async fn handle_create_table_plan(
source_watermarks: Vec<SourceWatermark>,
append_only: bool,
on_conflict: Option<OnConflict>,
with_version_column: Option<String>,
include_column_options: IncludeOption,
) -> Result<(PlanRef, Option<PbSource>, PbTable, TableJobType)> {
let source_schema = check_create_table_with_source(
Expand All @@ -962,6 +973,7 @@ pub(super) async fn handle_create_table_plan(
col_id_gen,
append_only,
on_conflict,
with_version_column,
include_column_options,
)
.await?,
Expand All @@ -977,6 +989,7 @@ pub(super) async fn handle_create_table_plan(
source_watermarks,
append_only,
on_conflict,
with_version_column,
)?,
TableJobType::General,
),
Expand All @@ -991,6 +1004,7 @@ pub(super) async fn handle_create_table_plan(
constraints,
col_id_gen,
on_conflict,
with_version_column,
)?;

((plan, None, table), TableJobType::SharedCdcSource)
Expand All @@ -1017,6 +1031,7 @@ pub async fn handle_create_table(
source_watermarks: Vec<SourceWatermark>,
append_only: bool,
on_conflict: Option<OnConflict>,
with_version_column: Option<String>,
cdc_table_info: Option<CdcTableInfo>,
include_column_options: IncludeOption,
) -> Result<RwPgResponse> {
Expand Down Expand Up @@ -1049,6 +1064,7 @@ pub async fn handle_create_table(
source_watermarks,
append_only,
on_conflict,
with_version_column,
include_column_options,
)
.await?;
Expand Down Expand Up @@ -1112,6 +1128,7 @@ pub async fn generate_stream_graph_for_table(
source_watermarks: Vec<SourceWatermark>,
append_only: bool,
on_conflict: Option<OnConflict>,
with_version_column: Option<String>,
) -> Result<(StreamFragmentGraph, Table, Option<PbSource>)> {
use risingwave_pb::catalog::table::OptionalAssociatedSourceId;

Expand All @@ -1129,6 +1146,7 @@ pub async fn generate_stream_graph_for_table(
col_id_gen,
append_only,
on_conflict,
with_version_column,
vec![],
)
.await?
Expand All @@ -1142,6 +1160,7 @@ pub async fn generate_stream_graph_for_table(
source_watermarks,
append_only,
on_conflict,
with_version_column,
)?,
};

Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/create_table_as.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub async fn handle_create_as(
column_defs: Vec<ColumnDef>,
append_only: bool,
on_conflict: Option<OnConflict>,
with_version_column: Option<String>,
) -> Result<RwPgResponse> {
if column_defs.iter().any(|column| column.data_type.is_some()) {
return Err(ErrorCode::InvalidInputSyntax(
Expand Down Expand Up @@ -106,6 +107,7 @@ pub async fn handle_create_as(
vec![], // No watermark should be defined in for `CREATE TABLE AS`
append_only,
on_conflict,
with_version_column,
Some(col_id_gen.into_version()),
)?;
let mut graph = build_graph(plan)?;
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ async fn do_handle_explain(
source_watermarks,
append_only,
on_conflict,
with_version_column,
cdc_table_info,
include_column_options,
wildcard_idx,
Expand All @@ -82,6 +83,7 @@ async fn do_handle_explain(
source_watermarks,
append_only,
on_conflict,
with_version_column,
include_column_options,
)
.await?;
Expand Down
Loading

0 comments on commit 0c2360b

Please sign in to comment.