From 613fb51132a4772d5f7ce1a321ad60d0092bb081 Mon Sep 17 00:00:00 2001 From: Dylan Date: Mon, 24 Jun 2024 17:30:46 +0800 Subject: [PATCH 01/11] fix(optimizer): fix apply topn transpose rule (#17386) --- .../batch/subquery/lateral_subquery.slt.part | 29 +++++++++++++++++++ .../testdata/input/lateral_subquery.yaml | 15 ++++++++++ .../testdata/output/lateral_subquery.yaml | 28 ++++++++++++++++++ .../rule/apply_topn_transpose_rule.rs | 4 ++- 4 files changed, 75 insertions(+), 1 deletion(-) diff --git a/e2e_test/batch/subquery/lateral_subquery.slt.part b/e2e_test/batch/subquery/lateral_subquery.slt.part index 04e93a3d397ce..98077487828e4 100644 --- a/e2e_test/batch/subquery/lateral_subquery.slt.part +++ b/e2e_test/batch/subquery/lateral_subquery.slt.part @@ -82,3 +82,32 @@ drop table all_sales; statement ok drop table salesperson; +statement ok +CREATE TABLE r(ts TIMESTAMPTZ, src_id int, dev_id int); + +statement ok +INSERT INTO r VALUES +('2024-06-20T19:00:22Z'::TIMESTAMPTZ, 2, 2), +('2024-06-20T19:00:22Z'::TIMESTAMPTZ, 1, 3), +('2024-06-20T19:00:23Z'::TIMESTAMPTZ, 1, 2), +('2024-06-20T19:00:24Z'::TIMESTAMPTZ, 2, 1), +('2024-06-20T19:00:24Z'::TIMESTAMPTZ, 1, 2), +('2024-06-20T19:00:25Z'::TIMESTAMPTZ, 2, 1); + +query TII rowsort +SELECT e.ts AS e_ts, d.* +FROM ( + SELECT '2024-06-20T19:01:00Z'::TIMESTAMPTZ ts, 1::INT AS src_id) e +JOIN LATERAL +( + SELECT DISTINCT ON(src_id, dev_id) * + FROM r + WHERE r.src_id = e.src_id AND r.ts <= e.ts + ORDER BY src_id, dev_id, ts DESC +)d on true; +---- +2024-06-20 19:01:00+00:00 2024-06-20 19:00:22+00:00 1 3 +2024-06-20 19:01:00+00:00 2024-06-20 19:00:24+00:00 1 2 + +statement ok +DROP TABLE r CASCADE; diff --git a/src/frontend/planner_test/tests/testdata/input/lateral_subquery.yaml b/src/frontend/planner_test/tests/testdata/input/lateral_subquery.yaml index 869bbdf6d7136..8b9126f18d641 100644 --- a/src/frontend/planner_test/tests/testdata/input/lateral_subquery.yaml +++ b/src/frontend/planner_test/tests/testdata/input/lateral_subquery.yaml @@ -119,3 +119,18 @@ ) AS b ON TRUE; expected_outputs: - stream_plan +- name: https://github.com/risingwavelabs/risingwave/issues/17382 + sql: | + CREATE TABLE r(ts TIMESTAMPTZ, src_id int, dev_id int); + SELECT e.ts AS e_ts, d.* + FROM ( + SELECT '2024-06-20T19:01:00Z'::TIMESTAMPTZ ts, 1::INT AS src_id) e + JOIN LATERAL + ( + SELECT DISTINCT ON(src_id, dev_id) * + FROM r + WHERE r.src_id = e.src_id AND r.ts <= e.ts + ORDER BY src_id, dev_id, ts DESC + )d on true; + expected_outputs: + - batch_plan diff --git a/src/frontend/planner_test/tests/testdata/output/lateral_subquery.yaml b/src/frontend/planner_test/tests/testdata/output/lateral_subquery.yaml index 39639b3ebb647..815890d6a73b8 100644 --- a/src/frontend/planner_test/tests/testdata/output/lateral_subquery.yaml +++ b/src/frontend/planner_test/tests/testdata/output/lateral_subquery.yaml @@ -242,3 +242,31 @@ └─StreamExchange { dist: HashShard(t1.c1, $expr3) } └─StreamProject { exprs: [t1.c1, t1.c2, t1.c3, AtTimeZone(t1.c4, 'UTC':Varchar) as $expr2, t1.c4::Date as $expr3, t1._row_id] } └─StreamTableScan { table: t1, columns: [t1.c1, t1.c2, t1.c3, t1.c4, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } +- name: https://github.com/risingwavelabs/risingwave/issues/17382 + sql: | + CREATE TABLE r(ts TIMESTAMPTZ, src_id int, dev_id int); + SELECT e.ts AS e_ts, d.* + FROM ( + SELECT '2024-06-20T19:01:00Z'::TIMESTAMPTZ ts, 1::INT AS src_id) e + JOIN LATERAL + ( + SELECT DISTINCT ON(src_id, dev_id) * + FROM r + WHERE r.src_id = e.src_id AND r.ts <= e.ts + ORDER BY src_id, dev_id, ts DESC + )d on true; + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchHashJoin { type: Inner, predicate: 1:Int32 IS NOT DISTINCT FROM 1:Int32 AND '2024-06-20 19:01:00+00:00':Timestamptz IS NOT DISTINCT FROM '2024-06-20 19:01:00+00:00':Timestamptz, output: ['2024-06-20 19:01:00+00:00':Timestamptz, r.ts, r.src_id, r.dev_id] } + ├─BatchExchange { order: [], dist: HashShard(1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz) } + │ └─BatchValues { rows: [['2024-06-20 19:01:00+00:00':Timestamptz, 1:Int32]] } + └─BatchExchange { order: [], dist: HashShard(1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz) } + └─BatchGroupTopN { order: [r.src_id ASC, r.dev_id ASC, r.ts DESC], limit: 1, offset: 0, group_key: [1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz, r.src_id, r.dev_id] } + └─BatchExchange { order: [], dist: HashShard(1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz, r.src_id, r.dev_id) } + └─BatchHashJoin { type: Inner, predicate: 1:Int32 = r.src_id AND (r.ts <= '2024-06-20 19:01:00+00:00':Timestamptz), output: all } + ├─BatchExchange { order: [], dist: HashShard(1:Int32) } + │ └─BatchHashAgg { group_key: [1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz], aggs: [] } + │ └─BatchExchange { order: [], dist: HashShard(1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz) } + │ └─BatchValues { rows: [[1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz]] } + └─BatchExchange { order: [], dist: HashShard(r.src_id) } + └─BatchScan { table: r, columns: [r.ts, r.src_id, r.dev_id], distribution: SomeShard } diff --git a/src/frontend/src/optimizer/rule/apply_topn_transpose_rule.rs b/src/frontend/src/optimizer/rule/apply_topn_transpose_rule.rs index 9a6884b33e9a2..dbdb180b1358c 100644 --- a/src/frontend/src/optimizer/rule/apply_topn_transpose_rule.rs +++ b/src/frontend/src/optimizer/rule/apply_topn_transpose_rule.rs @@ -49,7 +49,8 @@ impl Rule for ApplyTopNTransposeRule { apply.clone().decompose(); assert_eq!(join_type, JoinType::Inner); let topn: &LogicalTopN = right.as_logical_top_n()?; - let (topn_input, limit, offset, with_ties, mut order, group_key) = topn.clone().decompose(); + let (topn_input, limit, offset, with_ties, mut order, mut group_key) = + topn.clone().decompose(); let apply_left_len = left.schema().len(); @@ -73,6 +74,7 @@ impl Rule for ApplyTopNTransposeRule { .column_orders .iter_mut() .for_each(|ord| ord.column_index += apply_left_len); + group_key.iter_mut().for_each(|idx| *idx += apply_left_len); let new_group_key = (0..apply_left_len).chain(group_key).collect_vec(); LogicalTopN::new(new_apply, limit, offset, with_ties, order, new_group_key) }; From cdbd982d55613636672fe2f5524d819a9e992571 Mon Sep 17 00:00:00 2001 From: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Date: Mon, 24 Jun 2024 19:00:42 +0800 Subject: [PATCH 02/11] refactor(meta): move `finish_streaming_job` into catalog manager (#17414) --- src/meta/src/manager/catalog/mod.rs | 61 +++++++++++++++++++++ src/meta/src/rpc/ddl_controller.rs | 83 ++--------------------------- 2 files changed, 64 insertions(+), 80 deletions(-) diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 3aeab64bef128..9ba9333cee8e1 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -1073,6 +1073,67 @@ impl CatalogManager { Ok(()) } + /// `finish_stream_job` finishes a stream job and clean some states. + pub async fn finish_stream_job( + &self, + mut stream_job: StreamingJob, + internal_tables: Vec, + ) -> MetaResult { + // 1. finish procedure. + let mut creating_internal_table_ids = internal_tables.iter().map(|t| t.id).collect_vec(); + + // Update the corresponding 'created_at' field. + stream_job.mark_created(); + + let version = match stream_job { + StreamingJob::MaterializedView(table) => { + creating_internal_table_ids.push(table.id); + self.finish_create_table_procedure(internal_tables, table) + .await? + } + StreamingJob::Sink(sink, target_table) => { + let sink_id = sink.id; + + let mut version = self + .finish_create_sink_procedure(internal_tables, sink) + .await?; + + if let Some((table, source)) = target_table { + version = self + .finish_replace_table_procedure(&source, &table, None, Some(sink_id), None) + .await?; + } + + version + } + StreamingJob::Table(source, table, ..) => { + creating_internal_table_ids.push(table.id); + if let Some(source) = source { + self.finish_create_table_procedure_with_source(source, table, internal_tables) + .await? + } else { + self.finish_create_table_procedure(internal_tables, table) + .await? + } + } + StreamingJob::Index(index, table) => { + creating_internal_table_ids.push(table.id); + self.finish_create_index_procedure(internal_tables, index, table) + .await? + } + StreamingJob::Source(source) => { + self.finish_create_source_procedure(source, internal_tables) + .await? + } + }; + + // 2. unmark creating tables. + self.unmark_creating_tables(&creating_internal_table_ids, false) + .await; + + Ok(version) + } + /// This is used for both `CREATE TABLE` and `CREATE MATERIALIZED VIEW`. pub async fn finish_create_table_procedure( &self, diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index e1c6ebb0f32a8..cc5d7dd8971ad 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1317,8 +1317,9 @@ impl DdlController { }; tracing::debug!(id = job_id, "finishing stream job"); - let version = self - .finish_stream_job(mgr, stream_job, internal_tables) + let version = mgr + .catalog_manager + .finish_stream_job(stream_job, internal_tables) .await?; tracing::debug!(id = job_id, "finished stream job"); @@ -1757,84 +1758,6 @@ impl DdlController { Ok(()) } - /// `finish_stream_job` finishes a stream job and clean some states. - async fn finish_stream_job( - &self, - mgr: &MetadataManagerV1, - mut stream_job: StreamingJob, - internal_tables: Vec
, - ) -> MetaResult { - // 1. finish procedure. - let mut creating_internal_table_ids = internal_tables.iter().map(|t| t.id).collect_vec(); - - // Update the corresponding 'created_at' field. - stream_job.mark_created(); - - let version = match stream_job { - StreamingJob::MaterializedView(table) => { - creating_internal_table_ids.push(table.id); - mgr.catalog_manager - .finish_create_table_procedure(internal_tables, table) - .await? - } - StreamingJob::Sink(sink, target_table) => { - let sink_id = sink.id; - - let mut version = mgr - .catalog_manager - .finish_create_sink_procedure(internal_tables, sink) - .await?; - - if let Some((table, source)) = target_table { - let streaming_job = - StreamingJob::Table(source, table, TableJobType::Unspecified); - - version = self - .finish_replace_table( - mgr.catalog_manager.clone(), - &streaming_job, - None, - Some(sink_id), - None, - ) - .await?; - } - - version - } - StreamingJob::Table(source, table, ..) => { - creating_internal_table_ids.push(table.id); - if let Some(source) = source { - mgr.catalog_manager - .finish_create_table_procedure_with_source(source, table, internal_tables) - .await? - } else { - mgr.catalog_manager - .finish_create_table_procedure(internal_tables, table) - .await? - } - } - StreamingJob::Index(index, table) => { - creating_internal_table_ids.push(table.id); - mgr.catalog_manager - .finish_create_index_procedure(internal_tables, index, table) - .await? - } - StreamingJob::Source(source) => { - mgr.catalog_manager - .finish_create_source_procedure(source, internal_tables) - .await? - } - }; - - // 2. unmark creating tables. - mgr.catalog_manager - .unmark_creating_tables(&creating_internal_table_ids, false) - .await; - - Ok(version) - } - async fn drop_table_inner( &self, source_id: Option, From da9bd03524e0e5635ad06dfb4b0aa6110ba3dee8 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Mon, 24 Jun 2024 22:35:30 +0800 Subject: [PATCH 03/11] feat(expr): streaming `generate_series` ends with `now()` (#17371) Signed-off-by: Richard Chien --- ci/scripts/e2e-test-parallel.sh | 8 +- e2e_test/streaming/now.slt | 48 +++ proto/stream_plan.proto | 12 + src/common/src/array/stream_chunk_builder.rs | 5 + src/common/src/util/epoch.rs | 7 +- .../input/generate_series_with_now.yaml | 30 ++ .../tests/testdata/output/expr.yaml | 6 +- .../output/generate_series_with_now.yaml | 35 ++ src/frontend/src/binder/expr/function.rs | 8 +- src/frontend/src/expr/mod.rs | 2 +- src/frontend/src/expr/type_inference/func.rs | 54 ++- .../src/optimizer/logical_optimization.rs | 11 + .../src/optimizer/plan_node/convert.rs | 5 - .../src/optimizer/plan_node/generic/mod.rs | 2 + .../src/optimizer/plan_node/generic/now.rs | 106 +++++ .../src/optimizer/plan_node/logical_now.rs | 33 +- .../plan_node/logical_project_set.rs | 12 +- .../src/optimizer/plan_node/stream_now.rs | 47 +- .../plan_visitor/cardinality_visitor.rs | 8 +- src/frontend/src/optimizer/rule/mod.rs | 2 + .../stream/filter_with_now_to_join_rule.rs | 4 +- .../stream/generate_series_with_now_rule.rs | 86 ++++ src/frontend/src/optimizer/rule/stream/mod.rs | 1 + src/stream/src/executor/mod.rs | 2 +- src/stream/src/executor/now.rs | 402 +++++++++++++++--- src/stream/src/from_proto/now.rs | 40 +- src/stream/src/lib.rs | 1 + 27 files changed, 838 insertions(+), 139 deletions(-) create mode 100644 e2e_test/streaming/now.slt create mode 100644 src/frontend/planner_test/tests/testdata/input/generate_series_with_now.yaml create mode 100644 src/frontend/planner_test/tests/testdata/output/generate_series_with_now.yaml create mode 100644 src/frontend/src/optimizer/plan_node/generic/now.rs create mode 100644 src/frontend/src/optimizer/rule/stream/generate_series_with_now_rule.rs diff --git a/ci/scripts/e2e-test-parallel.sh b/ci/scripts/e2e-test-parallel.sh index 5f16a4c817871..c366ef07fc209 100755 --- a/ci/scripts/e2e-test-parallel.sh +++ b/ci/scripts/e2e-test-parallel.sh @@ -38,21 +38,21 @@ RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=i echo "--- e2e, ci-3streaming-2serving-3fe, streaming" RUST_LOG=$RUST_LOG \ risedev ci-start ci-3streaming-2serving-3fe -sqllogictest "${host_args[@]}" -d dev './e2e_test/streaming/**/*.slt' -j 16 --junit "parallel-streaming-${profile}" +sqllogictest "${host_args[@]}" -d dev './e2e_test/streaming/**/*.slt' -j 16 --junit "parallel-streaming-${profile}" --label "parallel" kill_cluster echo "--- e2e, ci-3streaming-2serving-3fe, batch" RUST_LOG=$RUST_LOG \ risedev ci-start ci-3streaming-2serving-3fe -sqllogictest "${host_args[@]}" -d dev './e2e_test/ddl/**/*.slt' --junit "parallel-batch-ddl-${profile}" -sqllogictest "${host_args[@]}" -d dev './e2e_test/visibility_mode/*.slt' -j 16 --junit "parallel-batch-${profile}" +sqllogictest "${host_args[@]}" -d dev './e2e_test/ddl/**/*.slt' --junit "parallel-batch-ddl-${profile}" --label "parallel" +sqllogictest "${host_args[@]}" -d dev './e2e_test/visibility_mode/*.slt' -j 16 --junit "parallel-batch-${profile}" --label "parallel" kill_cluster echo "--- e2e, ci-3streaming-2serving-3fe, generated" RUST_LOG=$RUST_LOG \ risedev ci-start ci-3streaming-2serving-3fe -sqllogictest "${host_args[@]}" -d dev './e2e_test/generated/**/*.slt' -j 16 --junit "parallel-generated-${profile}" +sqllogictest "${host_args[@]}" -d dev './e2e_test/generated/**/*.slt' -j 16 --junit "parallel-generated-${profile}" --label "parallel" kill_cluster diff --git a/e2e_test/streaming/now.slt b/e2e_test/streaming/now.slt new file mode 100644 index 0000000000000..ad086f2202e7a --- /dev/null +++ b/e2e_test/streaming/now.slt @@ -0,0 +1,48 @@ +# In madsim test, we cannot spawn process. +skipif madsim +# In parallel test, we cannot get the DB name. +skipif parallel +# TODO: Later if we introduce a new `now()`-like function that returns the time of statement execution, +# we'll be able to directly create MV without `./risedev psql` and so that we can remove these `skipif`. +system ok +./risedev psql -c " +create materialized view mv as +select * from generate_series( + to_timestamp($(date +%s)) - interval '10 second', + now(), + interval '1 second' +); +" + +skipif madsim +skipif parallel +statement ok +flush; + +skipif madsim +skipif parallel +query I +select count(*) >= 10 from mv; +---- +t + +skipif madsim +skipif parallel +sleep 2s + +skipif madsim +skipif parallel +statement ok +flush; + +skipif madsim +skipif parallel +query I +select count(*) >= 12 from mv; +---- +t + +skipif madsim +skipif parallel +statement ok +drop materialized view mv; diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 223a23813f68d..a43e34bde8df3 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -725,9 +725,21 @@ message RowIdGenNode { uint64 row_id_index = 1; } +message NowModeUpdateCurrent {} + +message NowModeGenerateSeries { + data.Datum start_timestamp = 1; + data.Datum interval = 2; +} + message NowNode { // Persists emitted 'now'. catalog.Table state_table = 1; + + oneof mode { + NowModeUpdateCurrent update_current = 101; + NowModeGenerateSeries generate_series = 102; + } } message ValuesNode { diff --git a/src/common/src/array/stream_chunk_builder.rs b/src/common/src/array/stream_chunk_builder.rs index a13cc3676792a..c44d313fffffe 100644 --- a/src/common/src/array/stream_chunk_builder.rs +++ b/src/common/src/array/stream_chunk_builder.rs @@ -104,6 +104,11 @@ impl StreamChunkBuilder { } } + /// Get the current number of rows in the builder. + pub fn size(&self) -> usize { + self.size + } + /// Append an iterator of output index and datum to the builder, return a chunk if the builder /// is full. /// diff --git a/src/common/src/util/epoch.rs b/src/common/src/util/epoch.rs index a067c689f669d..56dbdf6c54da9 100644 --- a/src/common/src/util/epoch.rs +++ b/src/common/src/util/epoch.rs @@ -174,10 +174,11 @@ impl EpochPair { Self::new(curr, curr - EPOCH_INC_MIN_STEP_FOR_TEST) } } -/// As most unit tests initializ a new epoch from a random value (e.g. 1, 2, 233 etc.), but the correct epoch in the system is a u64 with the last `EPOCH_AVAILABLE_BITS` bits set to 0. + +/// As most unit tests initialize a new epoch from a random value (e.g. 1, 2, 233 etc.), but the correct epoch in the system is a u64 with the last `EPOCH_AVAILABLE_BITS` bits set to 0. /// This method is to turn a a random epoch into a well shifted value. -pub const fn test_epoch(value: u64) -> u64 { - value << EPOCH_AVAILABLE_BITS +pub const fn test_epoch(value_millis: u64) -> u64 { + value_millis << EPOCH_AVAILABLE_BITS } /// There are numerous operations in our system's unit tests that involve incrementing or decrementing the epoch. diff --git a/src/frontend/planner_test/tests/testdata/input/generate_series_with_now.yaml b/src/frontend/planner_test/tests/testdata/input/generate_series_with_now.yaml new file mode 100644 index 0000000000000..e121aba41ff6f --- /dev/null +++ b/src/frontend/planner_test/tests/testdata/input/generate_series_with_now.yaml @@ -0,0 +1,30 @@ +- sql: | + select * from generate_series( + '2024-06-21 17:36:00'::timestamptz, + now(), + interval '1 hour' + ); + expected_outputs: + - logical_plan + - optimized_logical_plan_for_stream + - stream_plan +- sql: | + select * from generate_series( + '2024-06-21 17:36:00'::timestamp, -- `timestamp` type is not supported + now(), + interval '1 hour' + ); + expected_outputs: + - binder_error +- sql: | + select * from generate_series( + now() - interval '1 hour', + now(), + interval '1 hour' + ); + expected_outputs: + - stream_error +- sql: | + select * from unnest(array[now(), now()]); + expected_outputs: + - stream_error diff --git a/src/frontend/planner_test/tests/testdata/output/expr.yaml b/src/frontend/planner_test/tests/testdata/output/expr.yaml index 4ba572a54e600..f88f7c4d69b76 100644 --- a/src/frontend/planner_test/tests/testdata/output/expr.yaml +++ b/src/frontend/planner_test/tests/testdata/output/expr.yaml @@ -543,7 +543,7 @@ Failed to bind expression: v1 >= now() Caused by: - Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING` and `ON`. Found in clause: Some(GroupBy). Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information + Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING`, `ON` and `FROM`. Found in clause: Some(GroupBy). Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information - name: forbid now in select for stream sql: | create table t (v1 timestamp with time zone, v2 timestamp with time zone); @@ -552,7 +552,7 @@ Failed to bind expression: now() Caused by: - Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING` and `ON`. Found in clause: None. Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information + Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING`, `ON` and `FROM`. Found in clause: None. Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information - name: forbid now in agg filter for stream sql: | create table t (v1 timestamp with time zone, v2 int); @@ -561,7 +561,7 @@ Failed to bind expression: sum(v2) FILTER(WHERE v1 >= now()) Caused by: - Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING` and `ON`. Found in clause: Some(Filter). Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information + Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING`, `ON` and `FROM`. Found in clause: Some(Filter). Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information - name: typo pg_teminate_backend sql: | select pg_teminate_backend(1); diff --git a/src/frontend/planner_test/tests/testdata/output/generate_series_with_now.yaml b/src/frontend/planner_test/tests/testdata/output/generate_series_with_now.yaml new file mode 100644 index 0000000000000..4c8d71f987351 --- /dev/null +++ b/src/frontend/planner_test/tests/testdata/output/generate_series_with_now.yaml @@ -0,0 +1,35 @@ +# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information. +- sql: | + select * from generate_series( + '2024-06-21 17:36:00'::timestamptz, + now(), + interval '1 hour' + ); + logical_plan: |- + LogicalProject { exprs: [generate_series] } + └─LogicalTableFunction { table_function: GenerateSeries('2024-06-21 17:36:00':Varchar::Timestamptz, Now, '01:00:00':Interval) } + optimized_logical_plan_for_stream: 'LogicalNow { output: [ts] }' + stream_plan: |- + StreamMaterialize { columns: [generate_series], stream_key: [generate_series], pk_columns: [generate_series], pk_conflict: NoCheck, watermark_columns: [generate_series] } + └─StreamNow { output: [ts] } +- sql: | + select * from generate_series( + '2024-06-21 17:36:00'::timestamp, -- `timestamp` type is not supported + now(), + interval '1 hour' + ); + binder_error: function generate_series(timestamp without time zone, timestamp with time zone, interval) does not exist +- sql: | + select * from generate_series( + now() - interval '1 hour', + now(), + interval '1 hour' + ); + stream_error: |- + Not supported: General `now()` function in streaming queries + HINT: Streaming `now()` is currently only supported in GenerateSeries and TemporalFilter patterns. +- sql: | + select * from unnest(array[now(), now()]); + stream_error: |- + Not supported: General `now()` function in streaming queries + HINT: Streaming `now()` is currently only supported in GenerateSeries and TemporalFilter patterns. diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index 09c05a29695dc..b9a3b27825abf 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -1572,11 +1572,15 @@ impl Binder { if self.is_for_stream() && !matches!( self.context.clause, - Some(Clause::Where) | Some(Clause::Having) | Some(Clause::JoinOn) + Some(Clause::Where) + | Some(Clause::Having) + | Some(Clause::JoinOn) + | Some(Clause::From) ) { return Err(ErrorCode::InvalidInputSyntax(format!( - "For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING` and `ON`. Found in clause: {:?}. Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information", + "For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING`, `ON` and `FROM`. Found in clause: {:?}. \ + Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information", self.context.clause )) .into()); diff --git a/src/frontend/src/expr/mod.rs b/src/frontend/src/expr/mod.rs index 9dd5f7be1d53d..89142d0e9b237 100644 --- a/src/frontend/src/expr/mod.rs +++ b/src/frontend/src/expr/mod.rs @@ -413,7 +413,7 @@ macro_rules! impl_has_variant { }; } -impl_has_variant! {InputRef, Literal, FunctionCall, FunctionCallWithLambda, AggCall, Subquery, TableFunction, WindowFunction} +impl_has_variant! {InputRef, Literal, FunctionCall, FunctionCallWithLambda, AggCall, Subquery, TableFunction, WindowFunction, Now} #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct InequalityInputPair { diff --git a/src/frontend/src/expr/type_inference/func.rs b/src/frontend/src/expr/type_inference/func.rs index 9f28dfeb74c8c..0fecee8ab45c0 100644 --- a/src/frontend/src/expr/type_inference/func.rs +++ b/src/frontend/src/expr/type_inference/func.rs @@ -20,6 +20,7 @@ use risingwave_common::types::{DataType, StructType}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_expr::aggregate::AggKind; pub use risingwave_expr::sig::*; +use risingwave_pb::expr::table_function::PbType as PbTableFuncType; use super::{align_types, cast_ok_base, CastContext}; use crate::error::{ErrorCode, Result}; @@ -36,13 +37,24 @@ pub fn infer_type_with_sigmap( sig_map: &FunctionRegistry, ) -> Result { // special cases - if let FuncName::Scalar(func_type) = func_name - && let Some(res) = infer_type_for_special(func_type, inputs).transpose() - { - return res; - } - if let FuncName::Aggregate(AggKind::Grouping) = func_name { - return Ok(DataType::Int32); + match &func_name { + FuncName::Scalar(func_type) => { + if let Some(res) = infer_type_for_special(*func_type, inputs).transpose() { + return res; + } + } + FuncName::Table(func_type) => { + if let Some(res) = infer_type_for_special_table_function(*func_type, inputs).transpose() + { + return res; + } + } + FuncName::Aggregate(agg_kind) => { + if *agg_kind == AggKind::Grouping { + return Ok(DataType::Int32); + } + } + _ => {} } let actuals = inputs @@ -634,6 +646,34 @@ fn infer_type_for_special( } } +fn infer_type_for_special_table_function( + func_type: PbTableFuncType, + inputs: &mut [ExprImpl], +) -> Result> { + match func_type { + PbTableFuncType::GenerateSeries => { + if inputs.len() < 3 { + // let signature map handle this + return Ok(None); + } + match ( + inputs[0].return_type(), + inputs[1].return_type(), + inputs[2].return_type(), + ) { + (DataType::Timestamptz, DataType::Timestamptz, DataType::Interval) => { + // This is to allow `generate_series('2024-06-20 00:00:00'::timestamptz, now(), interval '1 day')`, + // which in streaming mode will be further converted to `StreamNow`. + Ok(Some(DataType::Timestamptz)) + } + // let signature map handle the rest + _ => Ok(None), + } + } + _ => Ok(None), + } +} + /// From all available functions in `sig_map`, find and return the best matching `FuncSign` for the /// provided `func_name` and `inputs`. This not only support exact function signature match, but can /// also match `substr(varchar, smallint)` or even `substr(varchar, unknown)` to `substr(varchar, diff --git a/src/frontend/src/optimizer/logical_optimization.rs b/src/frontend/src/optimizer/logical_optimization.rs index d452626bb9418..931a645b3d680 100644 --- a/src/frontend/src/optimizer/logical_optimization.rs +++ b/src/frontend/src/optimizer/logical_optimization.rs @@ -118,6 +118,14 @@ static DAG_TO_TREE: LazyLock = LazyLock::new(|| { ) }); +static STREAM_GENERATE_SERIES_WITH_NOW: LazyLock = LazyLock::new(|| { + OptimizationStage::new( + "Convert GENERATE_SERIES Ends With NOW", + vec![GenerateSeriesWithNowRule::create()], + ApplyOrder::TopDown, + ) +}); + static TABLE_FUNCTION_TO_PROJECT_SET: LazyLock = LazyLock::new(|| { OptimizationStage::new( "Table Function To Project Set", @@ -572,6 +580,9 @@ impl LogicalOptimizer { } plan = plan.optimize_by_rules(&SET_OPERATION_MERGE); plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN); + // Convert `generate_series` ends with `now()` to a `Now` source. Only for streaming mode. + // Should be applied before converting table function to project set. + plan = plan.optimize_by_rules(&STREAM_GENERATE_SERIES_WITH_NOW); // In order to unnest a table function, we need to convert it into a `project_set` first. plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_PROJECT_SET); diff --git a/src/frontend/src/optimizer/plan_node/convert.rs b/src/frontend/src/optimizer/plan_node/convert.rs index db961b3b1e20a..6f98073304d8b 100644 --- a/src/frontend/src/optimizer/plan_node/convert.rs +++ b/src/frontend/src/optimizer/plan_node/convert.rs @@ -84,11 +84,6 @@ pub fn stream_enforce_eowc_requirement( } Ok(StreamEowcSort::new(plan, watermark_col_idx).into()) } - } else if !emit_on_window_close && plan.emit_on_window_close() { - Err(ErrorCode::InternalError( - "Some bad thing happened, the generated plan is not correct.".to_string(), - ) - .into()) } else { Ok(plan) } diff --git a/src/frontend/src/optimizer/plan_node/generic/mod.rs b/src/frontend/src/optimizer/plan_node/generic/mod.rs index 00db5730e8038..392f073371843 100644 --- a/src/frontend/src/optimizer/plan_node/generic/mod.rs +++ b/src/frontend/src/optimizer/plan_node/generic/mod.rs @@ -78,6 +78,8 @@ mod cte_ref; pub use cte_ref::*; mod recursive_union; pub use recursive_union::*; +mod now; +pub use now::*; pub trait DistillUnit { fn distill_with_name<'a>(&self, name: impl Into>) -> XmlNode<'a>; diff --git a/src/frontend/src/optimizer/plan_node/generic/now.rs b/src/frontend/src/optimizer/plan_node/generic/now.rs new file mode 100644 index 0000000000000..911217d064214 --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/generic/now.rs @@ -0,0 +1,106 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use educe::Educe; +use enum_as_inner::EnumAsInner; +use pretty_xmlish::{Pretty, Str, XmlNode}; +use risingwave_common::catalog::{Field, Schema}; +use risingwave_common::types::{DataType, Interval, Timestamptz}; + +use super::{DistillUnit, GenericPlanNode}; +use crate::optimizer::plan_node::utils::childless_record; +use crate::optimizer::property::FunctionalDependencySet; +use crate::OptimizerContextRef; + +#[derive(Debug, Clone, Educe)] +#[educe(PartialEq, Eq, Hash)] +pub struct Now { + #[educe(PartialEq(ignore))] + #[educe(Hash(ignore))] + pub ctx: OptimizerContextRef, + + pub mode: Mode, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, EnumAsInner)] +pub enum Mode { + /// Emit current timestamp on startup, update it on barrier. + UpdateCurrent, + /// Generate a series of timestamps starting from `start_timestamp` with `interval`. + /// Keep generating new timestamps on barrier. + GenerateSeries { + start_timestamp: Timestamptz, + interval: Interval, + }, +} + +impl GenericPlanNode for Now { + fn functional_dependency(&self) -> crate::optimizer::property::FunctionalDependencySet { + FunctionalDependencySet::new(1) // only one column and no dependency + } + + fn schema(&self) -> risingwave_common::catalog::Schema { + Schema::new(vec![Field { + data_type: DataType::Timestamptz, + name: String::from(if self.mode.is_update_current() { + "now" + } else { + "ts" + }), + sub_fields: vec![], + type_name: String::default(), + }]) + } + + fn stream_key(&self) -> Option> { + match self.mode { + Mode::UpdateCurrent => Some(vec![]), + Mode::GenerateSeries { .. } => Some(vec![0]), + } + } + + fn ctx(&self) -> OptimizerContextRef { + self.ctx.clone() + } +} + +impl Now { + pub fn update_current(ctx: OptimizerContextRef) -> Self { + Self::new_inner(ctx, Mode::UpdateCurrent) + } + + pub fn generate_series( + ctx: OptimizerContextRef, + start_timestamp: Timestamptz, + interval: Interval, + ) -> Self { + Self::new_inner( + ctx, + Mode::GenerateSeries { + start_timestamp, + interval, + }, + ) + } + + fn new_inner(ctx: OptimizerContextRef, mode: Mode) -> Self { + Self { ctx, mode } + } +} + +impl DistillUnit for Now { + fn distill_with_name<'a>(&self, name: impl Into>) -> XmlNode<'a> { + childless_record(name, vec![("mode", Pretty::debug(&self.mode))]) + } +} diff --git a/src/frontend/src/optimizer/plan_node/logical_now.rs b/src/frontend/src/optimizer/plan_node/logical_now.rs index f9c33eb3d9cc1..ea34037c8977a 100644 --- a/src/frontend/src/optimizer/plan_node/logical_now.rs +++ b/src/frontend/src/optimizer/plan_node/logical_now.rs @@ -14,10 +14,8 @@ use pretty_xmlish::XmlNode; use risingwave_common::bail; -use risingwave_common::catalog::{Field, Schema}; -use risingwave_common::types::DataType; -use super::generic::GenericPlanRef; +use super::generic::{self, GenericPlanRef, Mode}; use super::utils::{childless_record, Distill}; use super::{ ColPrunable, ColumnPruningContext, ExprRewritable, Logical, LogicalFilter, PlanBase, PlanRef, @@ -26,30 +24,25 @@ use super::{ use crate::error::Result; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::utils::column_names_pretty; -use crate::optimizer::property::FunctionalDependencySet; use crate::utils::ColIndexMapping; -use crate::OptimizerContextRef; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct LogicalNow { pub base: PlanBase, + core: generic::Now, } impl LogicalNow { - pub fn new(ctx: OptimizerContextRef) -> Self { - let schema = Schema::new(vec![Field { - data_type: DataType::Timestamptz, - name: String::from("now"), - sub_fields: vec![], - type_name: String::default(), - }]); - let base = PlanBase::new_logical( - ctx, - schema, - Some(vec![]), - FunctionalDependencySet::default(), - ); - Self { base } + pub fn new(core: generic::Now) -> Self { + let base = PlanBase::new_logical_with_core(&core); + Self { base, core } + } + + pub fn max_one_row(&self) -> bool { + match self.core.mode { + Mode::UpdateCurrent => true, + Mode::GenerateSeries { .. } => false, + } } } @@ -91,7 +84,7 @@ impl ToStream for LogicalNow { /// `to_stream` is equivalent to `to_stream_with_dist_required(RequiredDist::Any)` fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result { - Ok(StreamNow::new(self.clone(), self.ctx()).into()) + Ok(StreamNow::new(self.core.clone()).into()) } } diff --git a/src/frontend/src/optimizer/plan_node/logical_project_set.rs b/src/frontend/src/optimizer/plan_node/logical_project_set.rs index 6f31b35a21ee0..8f6966ece6c70 100644 --- a/src/frontend/src/optimizer/plan_node/logical_project_set.rs +++ b/src/frontend/src/optimizer/plan_node/logical_project_set.rs @@ -22,7 +22,7 @@ use super::{ LogicalProject, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamProjectSet, ToBatch, ToStream, }; -use crate::error::Result; +use crate::error::{ErrorCode, Result}; use crate::expr::{ collect_input_refs, Expr, ExprImpl, ExprRewriter, ExprVisitor, FunctionCall, InputRef, TableFunction, @@ -400,6 +400,16 @@ impl ToStream for LogicalProjectSet { // TODO: implement to_stream_with_dist_required like LogicalProject fn to_stream(&self, ctx: &mut ToStreamContext) -> Result { + if self.select_list().iter().any(|item| item.has_now()) { + // User may use `now()` in table function in a wrong way, because we allow `now()` in `FROM` clause. + return Err(ErrorCode::NotSupported( + "General `now()` function in streaming queries".to_string(), + "Streaming `now()` is currently only supported in GenerateSeries and TemporalFilter patterns." + .to_string(), + ) + .into()); + } + let new_input = self.input().to_stream(ctx)?; let mut new_logical = self.core.clone(); new_logical.input = new_input; diff --git a/src/frontend/src/optimizer/plan_node/stream_now.rs b/src/frontend/src/optimizer/plan_node/stream_now.rs index d27321c08d06b..22a0d2c5fb0fb 100644 --- a/src/frontend/src/optimizer/plan_node/stream_now.rs +++ b/src/frontend/src/optimizer/plan_node/stream_now.rs @@ -14,46 +14,39 @@ use fixedbitset::FixedBitSet; use pretty_xmlish::XmlNode; -use risingwave_common::catalog::{Field, Schema}; -use risingwave_common::types::DataType; +use risingwave_common::types::Datum; +use risingwave_common::util::value_encoding::DatumToProtoExt; +use risingwave_pb::stream_plan::now_node::PbMode as PbNowMode; use risingwave_pb::stream_plan::stream_node::NodeBody; -use risingwave_pb::stream_plan::NowNode; +use risingwave_pb::stream_plan::{PbNowModeGenerateSeries, PbNowModeUpdateCurrent, PbNowNode}; +use super::generic::Mode; use super::stream::prelude::*; use super::utils::{childless_record, Distill, TableCatalogBuilder}; -use super::{ExprRewritable, LogicalNow, PlanBase, StreamNode}; +use super::{generic, ExprRewritable, PlanBase, StreamNode}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::utils::column_names_pretty; -use crate::optimizer::property::{Distribution, FunctionalDependencySet}; +use crate::optimizer::property::Distribution; use crate::stream_fragmenter::BuildFragmentGraphState; -use crate::OptimizerContextRef; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamNow { pub base: PlanBase, + core: generic::Now, } impl StreamNow { - pub fn new(_logical: LogicalNow, ctx: OptimizerContextRef) -> Self { - let schema = Schema::new(vec![Field { - data_type: DataType::Timestamptz, - name: String::from("now"), - sub_fields: vec![], - type_name: String::default(), - }]); + pub fn new(core: generic::Now) -> Self { let mut watermark_columns = FixedBitSet::with_capacity(1); watermark_columns.set(0, true); - let base = PlanBase::new_stream( - ctx, - schema, - Some(vec![]), - FunctionalDependencySet::default(), + let base = PlanBase::new_stream_with_core( + &core, Distribution::Single, - false, - false, // TODO(rc): derive EOWC property from input + core.mode.is_generate_series(), // append only + core.mode.is_generate_series(), // emit on window close watermark_columns, ); - Self { base } + Self { base, core } } } @@ -83,8 +76,18 @@ impl StreamNode for StreamNow { let table_catalog = internal_table_catalog_builder .build(dist_keys, 0) .with_id(state.gen_table_id_wrapped()); - NodeBody::Now(NowNode { + NodeBody::Now(PbNowNode { state_table: Some(table_catalog.to_internal_table_prost()), + mode: Some(match &self.core.mode { + Mode::UpdateCurrent => PbNowMode::UpdateCurrent(PbNowModeUpdateCurrent {}), + Mode::GenerateSeries { + start_timestamp, + interval, + } => PbNowMode::GenerateSeries(PbNowModeGenerateSeries { + start_timestamp: Some(Datum::Some((*start_timestamp).into()).to_protobuf()), + interval: Some(Datum::Some((*interval).into()).to_protobuf()), + }), + }), }) } } diff --git a/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs b/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs index c81bf539a5713..07459b59b1d5f 100644 --- a/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs +++ b/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs @@ -174,8 +174,12 @@ impl PlanVisitor for CardinalityVisitor { } } - fn visit_logical_now(&mut self, _plan: &plan_node::LogicalNow) -> Cardinality { - 1.into() + fn visit_logical_now(&mut self, plan: &plan_node::LogicalNow) -> Cardinality { + if plan.max_one_row() { + 1.into() + } else { + Cardinality::unknown() + } } fn visit_logical_expand(&mut self, plan: &plan_node::LogicalExpand) -> Cardinality { diff --git a/src/frontend/src/optimizer/rule/mod.rs b/src/frontend/src/optimizer/rule/mod.rs index 9364a6f2b7f5b..fd06402c7497f 100644 --- a/src/frontend/src/optimizer/rule/mod.rs +++ b/src/frontend/src/optimizer/rule/mod.rs @@ -92,6 +92,7 @@ pub use top_n_on_index_rule::*; mod stream; pub use stream::bushy_tree_join_ordering_rule::*; pub use stream::filter_with_now_to_join_rule::*; +pub use stream::generate_series_with_now_rule::*; pub use stream::split_now_and_rule::*; pub use stream::split_now_or_rule::*; pub use stream::stream_project_merge_rule::*; @@ -203,6 +204,7 @@ macro_rules! for_all_rules { , { SplitNowAndRule } , { SplitNowOrRule } , { FilterWithNowToJoinRule } + , { GenerateSeriesWithNowRule } , { TopNOnIndexRule } , { TrivialProjectToValuesRule } , { UnionInputValuesMergeRule } diff --git a/src/frontend/src/optimizer/rule/stream/filter_with_now_to_join_rule.rs b/src/frontend/src/optimizer/rule/stream/filter_with_now_to_join_rule.rs index 498696589c81b..cbdb65b4528a5 100644 --- a/src/frontend/src/optimizer/rule/stream/filter_with_now_to_join_rule.rs +++ b/src/frontend/src/optimizer/rule/stream/filter_with_now_to_join_rule.rs @@ -18,7 +18,7 @@ use risingwave_pb::plan_common::JoinType; use crate::expr::{ try_derive_watermark, ExprRewriter, FunctionCall, InputRef, WatermarkDerivation, }; -use crate::optimizer::plan_node::generic::GenericPlanRef; +use crate::optimizer::plan_node::generic::{self, GenericPlanRef}; use crate::optimizer::plan_node::{LogicalFilter, LogicalJoin, LogicalNow}; use crate::optimizer::rule::{BoxedRule, Rule}; use crate::optimizer::PlanRef; @@ -63,7 +63,7 @@ impl Rule for FilterWithNowToJoinRule { for now_filter in now_filters { new_plan = LogicalJoin::new( new_plan, - LogicalNow::new(plan.ctx()).into(), + LogicalNow::new(generic::Now::update_current(plan.ctx())).into(), JoinType::LeftSemi, Condition { conjunctions: vec![now_filter.into()], diff --git a/src/frontend/src/optimizer/rule/stream/generate_series_with_now_rule.rs b/src/frontend/src/optimizer/rule/stream/generate_series_with_now_rule.rs new file mode 100644 index 0000000000000..665967f6660b0 --- /dev/null +++ b/src/frontend/src/optimizer/rule/stream/generate_series_with_now_rule.rs @@ -0,0 +1,86 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::types::DataType; +use risingwave_pb::expr::table_function::PbType as PbTableFuncType; + +use crate::expr::{Expr, ExprRewriter}; +use crate::optimizer::plan_node::{generic, LogicalNow}; +use crate::optimizer::rule::{BoxedRule, Rule}; +use crate::PlanRef; + +pub struct GenerateSeriesWithNowRule {} +impl Rule for GenerateSeriesWithNowRule { + fn apply(&self, plan: PlanRef) -> Option { + let ctx = plan.ctx(); + let table_func = plan.as_logical_table_function()?.table_function(); + + if !table_func.args.iter().any(|arg| arg.has_now()) { + return None; + } + + if !(table_func.function_type == PbTableFuncType::GenerateSeries + && table_func.args.len() == 3 + && table_func.args[0].return_type() == DataType::Timestamptz + && table_func.args[1].is_now() + && table_func.args[2].return_type() == DataType::Interval) + { + // only convert `generate_series(const timestamptz, now(), const interval)` + ctx.warn_to_user( + "`now()` is currently only supported in `generate_series(timestamptz, timestamptz, interval)` function as `stop`. \ + You may not using it correctly. Please kindly check the document." + ); + return None; + } + + let start_timestamp = ctx + .session_timezone() + .rewrite_expr(table_func.args[0].clone()) + .try_fold_const() + .transpose() + .ok() + .flatten() + .flatten(); + let interval = ctx + .session_timezone() + .rewrite_expr(table_func.args[2].clone()) + .try_fold_const() + .transpose() + .ok() + .flatten() + .flatten(); + + if start_timestamp.is_none() || interval.is_none() { + ctx.warn_to_user( + "When using `generate_series` with `now()`, the `start` and `step` must be non-NULL constants", + ); + return None; + } + + Some( + LogicalNow::new(generic::Now::generate_series( + ctx, + start_timestamp.unwrap().into_timestamptz(), + interval.unwrap().into_interval(), + )) + .into(), + ) + } +} + +impl GenerateSeriesWithNowRule { + pub fn create() -> BoxedRule { + Box::new(Self {}) + } +} diff --git a/src/frontend/src/optimizer/rule/stream/mod.rs b/src/frontend/src/optimizer/rule/stream/mod.rs index cc86298e766e8..539d9048cff60 100644 --- a/src/frontend/src/optimizer/rule/stream/mod.rs +++ b/src/frontend/src/optimizer/rule/stream/mod.rs @@ -14,6 +14,7 @@ pub(crate) mod bushy_tree_join_ordering_rule; pub(crate) mod filter_with_now_to_join_rule; +pub(crate) mod generate_series_with_now_rule; pub(crate) mod split_now_and_rule; pub(crate) mod split_now_or_rule; pub(crate) mod stream_project_merge_rule; diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index 6d5c5867060de..a6a2152283668 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -129,7 +129,7 @@ pub use lookup_union::LookupUnionExecutor; pub use merge::MergeExecutor; pub use mview::*; pub use no_op::NoOpExecutor; -pub use now::NowExecutor; +pub use now::*; pub use over_window::*; pub use project::ProjectExecutor; pub use project_set::*; diff --git a/src/stream/src/executor/now.rs b/src/stream/src/executor/now.rs index 43316fef71094..049eee7f8c724 100644 --- a/src/stream/src/executor/now.rs +++ b/src/stream/src/executor/now.rs @@ -15,30 +15,65 @@ use std::ops::Bound; use std::ops::Bound::Unbounded; +use itertools::Itertools; use risingwave_common::array::Op; use risingwave_common::row; +use risingwave_common::types::{DefaultOrdered, Interval, Timestamptz, ToDatumRef}; +use risingwave_expr::capture_context; +use risingwave_expr::expr::{ + build_func_non_strict, EvalErrorReport, ExpressionBoxExt, InputRefExpression, + LiteralExpression, NonStrictExpression, +}; +use risingwave_expr::expr_context::TIME_ZONE; use tokio::sync::mpsc::UnboundedReceiver; use tokio_stream::wrappers::UnboundedReceiverStream; use crate::executor::prelude::*; +use crate::task::ActorEvalErrorReport; pub struct NowExecutor { data_types: Vec, + mode: NowMode, + eval_error_report: ActorEvalErrorReport, + /// Receiver of barrier channel. barrier_receiver: UnboundedReceiver, state_table: StateTable, } +pub enum NowMode { + /// Emit current timestamp on startup, update it on barrier. + UpdateCurrent, + /// Generate a series of timestamps starting from `start_timestamp` with `interval`. + /// Keep generating new timestamps on barrier. + GenerateSeries { + start_timestamp: Timestamptz, + interval: Interval, + }, +} + +enum ModeVars { + UpdateCurrent, + GenerateSeries { + chunk_builder: StreamChunkBuilder, + add_interval_expr: NonStrictExpression, + }, +} + impl NowExecutor { pub fn new( data_types: Vec, + mode: NowMode, + eval_error_report: ActorEvalErrorReport, barrier_receiver: UnboundedReceiver, state_table: StateTable, ) -> Self { Self { data_types, + mode, + eval_error_report, barrier_receiver, state_table, } @@ -48,24 +83,43 @@ impl NowExecutor { async fn execute_inner(self) { let Self { data_types, + mode, + eval_error_report, barrier_receiver, mut state_table, } = self; + let max_chunk_size = crate::config::chunk_size(); + // Whether the executor is paused. let mut paused = false; // The last timestamp **sent** to the downstream. let mut last_timestamp: Datum = None; + // Whether the first barrier is handled and `last_timestamp` is initialized. let mut initialized = false; + let mut mode_vars = match &mode { + NowMode::UpdateCurrent => ModeVars::UpdateCurrent, + NowMode::GenerateSeries { interval, .. } => { + // in most cases there won't be more than one row except for the first time + let chunk_builder = StreamChunkBuilder::unlimited(data_types.clone(), Some(1)); + let add_interval_expr = + build_add_interval_expr_captured(*interval, eval_error_report)?; + ModeVars::GenerateSeries { + chunk_builder, + add_interval_expr, + } + } + }; + const MAX_MERGE_BARRIER_SIZE: usize = 64; #[for_await] for barriers in UnboundedReceiverStream::new(barrier_receiver).ready_chunks(MAX_MERGE_BARRIER_SIZE) { - let mut timestamp = None; + let mut curr_timestamp = None; if barriers.len() > 1 { warn!( "handle multiple barriers at once in now executor: {}", @@ -74,7 +128,7 @@ impl NowExecutor { } for barrier in barriers { if !initialized { - // Handle the first barrier. + // Handle the initial barrier. state_table.init_epoch(barrier.epoch); let state_row = { let sub_range: &(Bound, Bound) = @@ -97,7 +151,7 @@ impl NowExecutor { } // Extract timestamp from the current epoch. - timestamp = Some(barrier.get_curr_epoch().as_scalar()); + curr_timestamp = Some(barrier.get_curr_epoch().as_scalar()); // Update paused state. if let Some(mutation) = barrier.mutation.as_deref() { @@ -116,28 +170,94 @@ impl NowExecutor { continue; } - let stream_chunk = if last_timestamp.is_some() { - let last_row = row::once(&last_timestamp); - let row = row::once(×tamp); - state_table.update(last_row, row); + match (&mode, &mut mode_vars) { + (NowMode::UpdateCurrent, ModeVars::UpdateCurrent) => { + let chunk = if last_timestamp.is_some() { + let last_row = row::once(&last_timestamp); + let row = row::once(&curr_timestamp); + state_table.update(last_row, row); + + StreamChunk::from_rows( + &[(Op::Delete, last_row), (Op::Insert, row)], + &data_types, + ) + } else { + let row = row::once(&curr_timestamp); + state_table.insert(row); + + StreamChunk::from_rows(&[(Op::Insert, row)], &data_types) + }; - StreamChunk::from_rows(&[(Op::Delete, last_row), (Op::Insert, row)], &data_types) - } else { - let row = row::once(×tamp); - state_table.insert(row); + yield Message::Chunk(chunk); + last_timestamp.clone_from(&curr_timestamp) + } + ( + NowMode::GenerateSeries { + start_timestamp, .. + }, + ModeVars::GenerateSeries { + chunk_builder, + ref add_interval_expr, + }, + ) => { + if last_timestamp.is_none() { + // We haven't emit any timestamp yet. Let's emit the first one and populate the state table. + let first = Some((*start_timestamp).into()); + let first_row = row::once(&first); + let _ = chunk_builder.append_row(Op::Insert, first_row); + state_table.insert(first_row); + last_timestamp = first; + } - StreamChunk::from_rows(&[(Op::Insert, row)], &data_types) - }; + // Now let's step through the timestamps from the last timestamp to the current timestamp. + // We use `last_row` as a temporary cursor to track the progress, and won't touch `last_timestamp` + // until the end of the loop, so that `last_timestamp` is always synced with the state table. + let mut last_row = OwnedRow::new(vec![last_timestamp.clone()]); + + loop { + if chunk_builder.size() >= max_chunk_size { + // Manually yield the chunk when size exceeds the limit. We don't want to use chunk builder + // with limited size here because the initial capacity can be too large for most cases. + // Basically only the first several chunks can potentially exceed the `max_chunk_size`. + if let Some(chunk) = chunk_builder.take() { + yield Message::Chunk(chunk); + } + } + + let next = add_interval_expr.eval_row_infallible(&last_row).await; + if DefaultOrdered(next.to_datum_ref()) + > DefaultOrdered(curr_timestamp.to_datum_ref()) + { + // We only increase the timestamp to the current timestamp. + break; + } + + let next_row = OwnedRow::new(vec![next]); + let _ = chunk_builder.append_row(Op::Insert, &next_row); + last_row = next_row; + } + + if let Some(chunk) = chunk_builder.take() { + yield Message::Chunk(chunk); + } - yield Message::Chunk(stream_chunk); + // Update the last timestamp. + state_table.update(row::once(&last_timestamp), &last_row); + last_timestamp = last_row + .into_inner() + .into_vec() + .into_iter() + .exactly_one() + .unwrap(); + } + _ => unreachable!(), + } yield Message::Watermark(Watermark::new( 0, DataType::Timestamptz, - timestamp.clone().unwrap(), + curr_timestamp.unwrap(), )); - - last_timestamp = timestamp; } } } @@ -148,36 +268,54 @@ impl Execute for NowExecutor { } } +#[capture_context(TIME_ZONE)] +pub fn build_add_interval_expr( + time_zone: &str, + interval: Interval, + eval_error_report: impl EvalErrorReport + 'static, +) -> risingwave_expr::Result { + let timestamptz_input = InputRefExpression::new(DataType::Timestamptz, 0); + let interval = LiteralExpression::new(DataType::Interval, Some(interval.into())); + let time_zone = LiteralExpression::new(DataType::Varchar, Some(time_zone.into())); + + use risingwave_pb::expr::expr_node::PbType as PbExprType; + build_func_non_strict( + PbExprType::AddWithTimeZone, + DataType::Timestamptz, + vec![ + timestamptz_input.boxed(), + interval.boxed(), + time_zone.boxed(), + ], + eval_error_report, + ) +} + #[cfg(test)] mod tests { - use risingwave_common::array::StreamChunk; use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId}; use risingwave_common::test_prelude::StreamChunkTestExt; - use risingwave_common::types::{DataType, ScalarImpl}; + use risingwave_common::types::test_utils::IntervalTestExt; + use risingwave_common::util::epoch::test_epoch; use risingwave_storage::memory::MemoryStateStore; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; - use super::NowExecutor; - use crate::common::table::state_table::StateTable; + use super::*; use crate::executor::test_utils::StreamExecutorTestExt; - use crate::executor::{ - Barrier, BoxedMessageStream, Execute, Mutation, StreamExecutorResult, Watermark, - }; #[tokio::test] async fn test_now() -> StreamExecutorResult<()> { let state_store = create_state_store(); - let (tx, mut now_executor) = create_executor(&state_store).await; + let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await; // Init barrier - tx.send(Barrier::with_prev_epoch_for_test(1 << 16, 1)) - .unwrap(); + tx.send(Barrier::new_test_barrier(test_epoch(1))).unwrap(); // Consume the barrier - now_executor.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; // Consume the data chunk - let chunk_msg = now_executor.next_unwrap_ready_chunk()?; + let chunk_msg = now.next_unwrap_ready_chunk()?; assert_eq!( chunk_msg.compact(), @@ -188,7 +326,7 @@ mod tests { ); // Consume the watermark - let watermark = now_executor.next_unwrap_ready_watermark()?; + let watermark = now.next_unwrap_ready_watermark()?; assert_eq!( watermark, @@ -199,14 +337,17 @@ mod tests { ) ); - tx.send(Barrier::with_prev_epoch_for_test(2 << 16, 1 << 16)) - .unwrap(); + tx.send(Barrier::with_prev_epoch_for_test( + test_epoch(2), + test_epoch(1), + )) + .unwrap(); // Consume the barrier - now_executor.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; // Consume the data chunk - let chunk_msg = now_executor.next_unwrap_ready_chunk()?; + let chunk_msg = now.next_unwrap_ready_chunk()?; assert_eq!( chunk_msg.compact(), @@ -218,7 +359,7 @@ mod tests { ); // Consume the watermark - let watermark = now_executor.next_unwrap_ready_watermark()?; + let watermark = now.next_unwrap_ready_watermark()?; assert_eq!( watermark, @@ -230,21 +371,25 @@ mod tests { ); // No more messages until the next barrier - now_executor.next_unwrap_pending(); + now.next_unwrap_pending(); // Recovery - drop((tx, now_executor)); - let (tx, mut now_executor) = create_executor(&state_store).await; - tx.send(Barrier::with_prev_epoch_for_test(3 << 16, 1 << 16)) - .unwrap(); + drop((tx, now)); + let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await; + tx.send(Barrier::with_prev_epoch_for_test( + test_epoch(3), + test_epoch(2), + )) + .unwrap(); // Consume the barrier - now_executor.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; // Consume the data chunk - let chunk_msg = now_executor.next_unwrap_ready_chunk()?; + let chunk_msg = now.next_unwrap_ready_chunk()?; assert_eq!( chunk_msg.compact(), + // the last chunk was not checkpointed so the deleted old value should be `001` StreamChunk::from_pretty( " TZ - 2021-04-01T00:00:00.001Z @@ -253,7 +398,7 @@ mod tests { ); // Consume the watermark - let watermark = now_executor.next_unwrap_ready_watermark()?; + let watermark = now.next_unwrap_ready_watermark()?; assert_eq!( watermark, @@ -265,28 +410,32 @@ mod tests { ); // Recovery with paused - drop((tx, now_executor)); - let (tx, mut now_executor) = create_executor(&state_store).await; - tx.send(Barrier::new_test_barrier(4 << 16).with_mutation(Mutation::Pause)) - .unwrap(); + drop((tx, now)); + let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await; + tx.send( + Barrier::with_prev_epoch_for_test(test_epoch(4), test_epoch(3)) + .with_mutation(Mutation::Pause), + ) + .unwrap(); // Consume the barrier - now_executor.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; // There should be no messages until `Resume` - now_executor.next_unwrap_pending(); + now.next_unwrap_pending(); // Resume barrier tx.send( - Barrier::with_prev_epoch_for_test(5 << 16, 4 << 16).with_mutation(Mutation::Resume), + Barrier::with_prev_epoch_for_test(test_epoch(5), test_epoch(4)) + .with_mutation(Mutation::Resume), ) .unwrap(); // Consume the barrier - now_executor.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; // Consume the data chunk - let chunk_msg = now_executor.next_unwrap_ready_chunk()?; + let chunk_msg = now.next_unwrap_ready_chunk()?; assert_eq!( chunk_msg.compact(), StreamChunk::from_pretty( @@ -297,7 +446,7 @@ mod tests { ); // Consume the watermark - let watermark = now_executor.next_unwrap_ready_watermark()?; + let watermark = now.next_unwrap_ready_watermark()?; assert_eq!( watermark, @@ -314,29 +463,30 @@ mod tests { #[tokio::test] async fn test_now_start_with_paused() -> StreamExecutorResult<()> { let state_store = create_state_store(); - let (tx, mut now_executor) = create_executor(&state_store).await; + let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await; // Init barrier - tx.send(Barrier::with_prev_epoch_for_test(1 << 16, 1).with_mutation(Mutation::Pause)) + tx.send(Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Pause)) .unwrap(); // Consume the barrier - now_executor.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; // There should be no messages until `Resume` - now_executor.next_unwrap_pending(); + now.next_unwrap_pending(); // Resume barrier tx.send( - Barrier::with_prev_epoch_for_test(2 << 16, 1 << 16).with_mutation(Mutation::Resume), + Barrier::with_prev_epoch_for_test(test_epoch(2), test_epoch(1)) + .with_mutation(Mutation::Resume), ) .unwrap(); // Consume the barrier - now_executor.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; // Consume the data chunk - let chunk_msg = now_executor.next_unwrap_ready_chunk()?; + let chunk_msg = now.next_unwrap_ready_chunk()?; assert_eq!( chunk_msg.compact(), @@ -347,7 +497,7 @@ mod tests { ); // Consume the watermark - let watermark = now_executor.next_unwrap_ready_watermark()?; + let watermark = now.next_unwrap_ready_watermark()?; assert_eq!( watermark, @@ -359,7 +509,121 @@ mod tests { ); // No more messages until the next barrier - now_executor.next_unwrap_pending(); + now.next_unwrap_pending(); + + Ok(()) + } + + #[tokio::test] + async fn test_now_generate_series() -> StreamExecutorResult<()> { + TIME_ZONE::scope("UTC".to_string(), test_now_generate_series_inner()).await + } + + async fn test_now_generate_series_inner() -> StreamExecutorResult<()> { + let start_timestamp = Timestamptz::from_secs(1617235190).unwrap(); // 2021-03-31 23:59:50 UTC + let interval = Interval::from_millis(1000); // 1s interval + + let state_store = create_state_store(); + let (tx, mut now) = create_executor( + NowMode::GenerateSeries { + start_timestamp, + interval, + }, + &state_store, + ) + .await; + + // Init barrier + tx.send(Barrier::new_test_barrier(test_epoch(1000))) + .unwrap(); + now.next_unwrap_ready_barrier()?; + + // Initial timestamps + let chunk = now.next_unwrap_ready_chunk()?; + assert_eq!(chunk.cardinality(), 12); // seconds from 23:59:50 to 00:00:01 (inclusive) + + assert_eq!( + now.next_unwrap_ready_watermark()?, + Watermark::new( + 0, + DataType::Timestamptz, + ScalarImpl::Timestamptz("2021-04-01T00:00:01.000Z".parse().unwrap()) + ) + ); + + tx.send(Barrier::with_prev_epoch_for_test( + test_epoch(2000), + test_epoch(1000), + )) + .unwrap(); + tx.send(Barrier::with_prev_epoch_for_test( + test_epoch(3000), + test_epoch(2000), + )) + .unwrap(); + + now.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; + + let chunk = now.next_unwrap_ready_chunk()?; + assert_eq!( + chunk.compact(), + StreamChunk::from_pretty( + " TZ + + 2021-04-01T00:00:02.000Z + + 2021-04-01T00:00:03.000Z" + ) + ); + + let watermark = now.next_unwrap_ready_watermark()?; + assert_eq!( + watermark, + Watermark::new( + 0, + DataType::Timestamptz, + ScalarImpl::Timestamptz("2021-04-01T00:00:03.000Z".parse().unwrap()) + ) + ); + + // Recovery + drop((tx, now)); + let (tx, mut now) = create_executor( + NowMode::GenerateSeries { + start_timestamp, + interval, + }, + &state_store, + ) + .await; + + tx.send(Barrier::with_prev_epoch_for_test( + test_epoch(4000), + test_epoch(3000), + )) + .unwrap(); + + now.next_unwrap_ready_barrier()?; + + let chunk = now.next_unwrap_ready_chunk()?; + assert_eq!( + chunk.compact(), + StreamChunk::from_pretty( + " TZ + + 2021-04-01T00:00:02.000Z + + 2021-04-01T00:00:03.000Z + + 2021-04-01T00:00:04.000Z" + ) + ); + + let watermark = now.next_unwrap_ready_watermark()?; + assert_eq!( + watermark, + Watermark::new( + 0, + DataType::Timestamptz, + ScalarImpl::Timestamptz("2021-04-01T00:00:04.000Z".parse().unwrap()) + ) + ); Ok(()) } @@ -369,6 +633,7 @@ mod tests { } async fn create_executor( + mode: NowMode, state_store: &MemoryStateStore, ) -> (UnboundedSender, BoxedMessageStream) { let table_id = TableId::new(1); @@ -384,8 +649,17 @@ mod tests { let (sender, barrier_receiver) = unbounded_channel(); - let now_executor = - NowExecutor::new(vec![DataType::Timestamptz], barrier_receiver, state_table); + let eval_error_report = ActorEvalErrorReport { + actor_context: ActorContext::for_test(123), + identity: "NowExecutor".into(), + }; + let now_executor = NowExecutor::new( + vec![DataType::Timestamptz], + mode, + eval_error_report, + barrier_receiver, + state_table, + ); (sender, now_executor.boxed().execute()) } } diff --git a/src/stream/src/from_proto/now.rs b/src/stream/src/from_proto/now.rs index a917cd136f3b4..9eac7caa13557 100644 --- a/src/stream/src/from_proto/now.rs +++ b/src/stream/src/from_proto/now.rs @@ -12,14 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_pb::stream_plan::NowNode; +use anyhow::Context; +use risingwave_common::types::{DataType, Datum}; +use risingwave_common::util::value_encoding::DatumFromProtoExt; +use risingwave_pb::stream_plan::now_node::PbMode as PbNowMode; +use risingwave_pb::stream_plan::{NowNode, PbNowModeGenerateSeries}; use risingwave_storage::StateStore; use tokio::sync::mpsc::unbounded_channel; use super::ExecutorBuilder; use crate::common::table::state_table::StateTable; use crate::error::StreamResult; -use crate::executor::{Executor, NowExecutor}; +use crate::executor::{Executor, NowExecutor, NowMode}; use crate::task::ExecutorParams; pub struct NowExecutorBuilder; @@ -37,11 +41,43 @@ impl ExecutorBuilder for NowExecutorBuilder { .create_actor_context .register_sender(params.actor_context.id, sender); + let mode = if let Ok(pb_mode) = node.get_mode() { + match pb_mode { + PbNowMode::UpdateCurrent(_) => NowMode::UpdateCurrent, + PbNowMode::GenerateSeries(PbNowModeGenerateSeries { + start_timestamp, + interval, + }) => { + let start_timestamp = Datum::from_protobuf( + start_timestamp.as_ref().unwrap(), + &DataType::Timestamptz, + ) + .context("`start_timestamp` field is not decodable")? + .context("`start_timestamp` field should not be NULL")? + .into_timestamptz(); + let interval = + Datum::from_protobuf(interval.as_ref().unwrap(), &DataType::Interval) + .context("`interval` field is not decodable")? + .context("`interval` field should not be NULL")? + .into_interval(); + NowMode::GenerateSeries { + start_timestamp, + interval, + } + } + } + } else { + // default to `UpdateCurrent` for backward-compatibility + NowMode::UpdateCurrent + }; + let state_table = StateTable::from_table_catalog(node.get_state_table()?, store, None).await; let exec = NowExecutor::new( params.info.schema.data_types(), + mode, + params.eval_error_report, barrier_receiver, state_table, ); diff --git a/src/stream/src/lib.rs b/src/stream/src/lib.rs index 51a735af0f5a7..a199e4c8acb9f 100644 --- a/src/stream/src/lib.rs +++ b/src/stream/src/lib.rs @@ -40,6 +40,7 @@ #![feature(btree_cursors)] #![feature(assert_matches)] #![feature(try_blocks)] +#![feature(result_flattening)] // required by `capture_context` use std::sync::Arc; From 388d182703aa69b752da96a3553bc6bcab601410 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Tue, 25 Jun 2024 14:52:27 +0800 Subject: [PATCH 04/11] fix: only run e2e of streaming `generate_series` in regular e2e tests (#17435) Signed-off-by: Richard Chien --- ci/scripts/e2e-test-parallel-for-opendal.sh | 6 +-- ci/scripts/e2e-test-parallel-in-memory.sh | 6 +-- ci/scripts/run-e2e-test.sh | 3 ++ e2e_test/streaming/now.slt | 48 --------------------- e2e_test/streaming_now/README | 1 + e2e_test/streaming_now/now.slt | 30 +++++++++++++ 6 files changed, 40 insertions(+), 54 deletions(-) delete mode 100644 e2e_test/streaming/now.slt create mode 100644 e2e_test/streaming_now/README create mode 100644 e2e_test/streaming_now/now.slt diff --git a/ci/scripts/e2e-test-parallel-for-opendal.sh b/ci/scripts/e2e-test-parallel-for-opendal.sh index 606adcf929cd7..a6a6c89e41164 100755 --- a/ci/scripts/e2e-test-parallel-for-opendal.sh +++ b/ci/scripts/e2e-test-parallel-for-opendal.sh @@ -31,7 +31,7 @@ host_args=(-h localhost -p 4565 -h localhost -p 4566 -h localhost -p 4567) echo "--- e2e, ci-3cn-3fe-opendal-fs-backend, streaming" RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ risedev ci-start ci-3cn-3fe-opendal-fs-backend -sqllogictest "${host_args[@]}" -d dev './e2e_test/streaming/**/*.slt' -j 16 --junit "parallel-opendal-fs-backend-${profile}" +sqllogictest "${host_args[@]}" -d dev './e2e_test/streaming/**/*.slt' -j 16 --junit "parallel-opendal-fs-backend-${profile}" --label "parallel" echo "--- Kill cluster Streaming" risedev ci-kill @@ -41,8 +41,8 @@ rm -rf /tmp/rw_ci echo "--- e2e, ci-3cn-3fe-opendal-fs-backend, batch" RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ risedev ci-start ci-3cn-3fe-opendal-fs-backend -sqllogictest "${host_args[@]}" -d dev './e2e_test/ddl/**/*.slt' --junit "parallel-opendal-fs-backend-ddl-${profile}" -sqllogictest "${host_args[@]}" -d dev './e2e_test/visibility_mode/*.slt' -j 16 --junit "parallel-opendal-fs-backend-batch-${profile}" +sqllogictest "${host_args[@]}" -d dev './e2e_test/ddl/**/*.slt' --junit "parallel-opendal-fs-backend-ddl-${profile}" --label "parallel" +sqllogictest "${host_args[@]}" -d dev './e2e_test/visibility_mode/*.slt' -j 16 --junit "parallel-opendal-fs-backend-batch-${profile}" --label "parallel" echo "--- Kill cluster Batch" risedev ci-kill diff --git a/ci/scripts/e2e-test-parallel-in-memory.sh b/ci/scripts/e2e-test-parallel-in-memory.sh index fcde15644c2b6..f7e6292bed54e 100755 --- a/ci/scripts/e2e-test-parallel-in-memory.sh +++ b/ci/scripts/e2e-test-parallel-in-memory.sh @@ -28,15 +28,15 @@ host_args=(-h localhost -p 4565 -h localhost -p 4566 -h localhost -p 4567) echo "--- e2e, ci-3cn-3fe-in-memory, streaming" risedev ci-start ci-3cn-3fe-in-memory sqllogictest --version -sqllogictest "${host_args[@]}" -d dev './e2e_test/streaming/**/*.slt' -j 16 --junit "parallel-in-memory-streaming-${profile}" --label in-memory +sqllogictest "${host_args[@]}" -d dev './e2e_test/streaming/**/*.slt' -j 16 --junit "parallel-in-memory-streaming-${profile}" --label "in-memory" --label "parallel" echo "--- Kill cluster" risedev ci-kill echo "--- e2e, ci-3cn-3fe-in-memory, batch" risedev ci-start ci-3cn-3fe-in-memory -sqllogictest "${host_args[@]}" -d dev './e2e_test/ddl/**/*.slt' --junit "parallel-in-memory-batch-ddl-${profile}" --label in-memory -sqllogictest "${host_args[@]}" -d dev './e2e_test/batch/**/*.slt' -j 16 --junit "parallel-in-memory-batch-${profile}" --label in-memory +sqllogictest "${host_args[@]}" -d dev './e2e_test/ddl/**/*.slt' --junit "parallel-in-memory-batch-ddl-${profile}" --label "in-memory" --label "parallel" +sqllogictest "${host_args[@]}" -d dev './e2e_test/batch/**/*.slt' -j 16 --junit "parallel-in-memory-batch-${profile}" --label "in-memory" --label "parallel" echo "--- Kill cluster" risedev ci-kill diff --git a/ci/scripts/run-e2e-test.sh b/ci/scripts/run-e2e-test.sh index 4736f4aa53a8a..c64b54ca25a43 100755 --- a/ci/scripts/run-e2e-test.sh +++ b/ci/scripts/run-e2e-test.sh @@ -80,6 +80,9 @@ RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=i cluster_start # Please make sure the regression is expected before increasing the timeout. sqllogictest -p 4566 -d dev './e2e_test/streaming/**/*.slt' --junit "streaming-${profile}" +if [[ "$mode" != "single-node" ]]; then + sqllogictest -p 4566 -d dev './e2e_test/streaming_now/**/*.slt' --junit "streaming-${profile}" +fi sqllogictest -p 4566 -d dev './e2e_test/backfill/sink/different_pk_and_dist_key.slt' echo "--- Kill cluster" diff --git a/e2e_test/streaming/now.slt b/e2e_test/streaming/now.slt deleted file mode 100644 index ad086f2202e7a..0000000000000 --- a/e2e_test/streaming/now.slt +++ /dev/null @@ -1,48 +0,0 @@ -# In madsim test, we cannot spawn process. -skipif madsim -# In parallel test, we cannot get the DB name. -skipif parallel -# TODO: Later if we introduce a new `now()`-like function that returns the time of statement execution, -# we'll be able to directly create MV without `./risedev psql` and so that we can remove these `skipif`. -system ok -./risedev psql -c " -create materialized view mv as -select * from generate_series( - to_timestamp($(date +%s)) - interval '10 second', - now(), - interval '1 second' -); -" - -skipif madsim -skipif parallel -statement ok -flush; - -skipif madsim -skipif parallel -query I -select count(*) >= 10 from mv; ----- -t - -skipif madsim -skipif parallel -sleep 2s - -skipif madsim -skipif parallel -statement ok -flush; - -skipif madsim -skipif parallel -query I -select count(*) >= 12 from mv; ----- -t - -skipif madsim -skipif parallel -statement ok -drop materialized view mv; diff --git a/e2e_test/streaming_now/README b/e2e_test/streaming_now/README new file mode 100644 index 0000000000000..ad5d817e3c958 --- /dev/null +++ b/e2e_test/streaming_now/README @@ -0,0 +1 @@ +Because currently the e2e test for `generate_series(..., now(), ...)` relies on external command `./risedev psql`, it cannot be tested in parallel, madsim and single-node mode. Let's just place it in this special folder and manually run it in `run-e2e-test.sh` to avoid complexity. Later if we introduce a new `now()`-like function that returns the time of statement execution, we'll be able to get rid of the external command and direcly create MV instead. We will move this to `streaming` folder at that time. diff --git a/e2e_test/streaming_now/now.slt b/e2e_test/streaming_now/now.slt new file mode 100644 index 0000000000000..ea9f85a489786 --- /dev/null +++ b/e2e_test/streaming_now/now.slt @@ -0,0 +1,30 @@ +system ok +./risedev psql -c " +create materialized view mv as +select * from generate_series( + to_timestamp($(date +%s)) - interval '10 second', + now(), + interval '1 second' +); +" + +statement ok +flush; + +query I +select count(*) >= 10 from mv; +---- +t + +sleep 2s + +statement ok +flush; + +query I +select count(*) >= 12 from mv; +---- +t + +statement ok +drop materialized view mv; From 4537c4ca86e29e13cd7b099536947d1139888020 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 25 Jun 2024 16:49:07 +0800 Subject: [PATCH 05/11] test: fix include header flaky test (#17441) Signed-off-by: xxchan --- .../source_inline/kafka/include_key_as.slt | 33 +++++++------------ 1 file changed, 12 insertions(+), 21 deletions(-) diff --git a/e2e_test/source_inline/kafka/include_key_as.slt b/e2e_test/source_inline/kafka/include_key_as.slt index 332e15bd54b03..2258204313b0a 100644 --- a/e2e_test/source_inline/kafka/include_key_as.slt +++ b/e2e_test/source_inline/kafka/include_key_as.slt @@ -186,21 +186,9 @@ rpk topic create 'test_additional_columns' system ok bash -c 'for i in {0..100}; do echo "key\$i:{\\"a\\": \$i}" | rpk topic produce test_additional_columns -f "%k:%v\\n" -H "header1=v1" -H "header2=v2"; done' -statement ok -create table additional_columns (a int) -include key as key_col -include partition as partition_col -include offset as offset_col -include timestamp as timestamp_col -include header as header_col -WITH ( - ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, - topic = 'test_additional_columns') -FORMAT PLAIN ENCODE JSON -# header with varchar type & non-exist header key statement error -create table additional_columns_1 (a int) +create table additional_columns (a int) include key as key_col include partition as partition_col include offset as offset_col @@ -220,13 +208,13 @@ Caused by: Protocol error: Only header column can have inner field, but got "timestamp" -# header with varchar type & non-exist header key statement ok -create table additional_columns_1 (a int) +create table additional_columns (a int) include key as key_col include partition as partition_col include offset as offset_col include timestamp as timestamp_col +include header as header_col_combined include header 'header1' as header_col_1 include header 'header2' as header_col_2 include header 'header2' varchar as header_col_3 @@ -236,7 +224,6 @@ WITH ( topic = 'test_additional_columns') FORMAT PLAIN ENCODE JSON - # Wait enough time to ensure SourceExecutor consumes all Kafka data. sleep 3s @@ -284,19 +271,20 @@ WHERE key_col IS NOT NULL AND partition_col IS NOT NULL AND offset_col IS NOT NULL AND timestamp_col IS NOT NULL - AND header_col IS NOT NULL + AND header_col_combined IS NOT NULL ---- 101 query ?? -SELECT (header_col[1]).key AS key, (header_col[1]).value::text AS value -FROM additional_columns limit 1; +WITH arr AS (SELECT header_col_combined FROM additional_columns limit 1) +SELECT unnest(header_col_combined) FROM arr ORDER BY 1; ---- -header1 \x7631 +(header1,"\\x7631") +(header2,"\\x7632") query ???? -select header_col_1, header_col_2, header_col_3, header_col_4 from additional_columns_1 limit 1 +select header_col_1, header_col_2, header_col_3, header_col_4 from additional_columns limit 1 ---- \x7631 \x7632 v2 NULL @@ -305,3 +293,6 @@ drop table upsert_students statement ok drop table plain_students + +statement ok +drop table additional_columns From 22b911df29fea32fb64c1ff7cc033388e855ca03 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Tue, 25 Jun 2024 16:53:56 +0800 Subject: [PATCH 06/11] feat(docs): improve docs of multi-cluster deployment (#17437) --- grafana/README.md | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/grafana/README.md b/grafana/README.md index ca3d4da00f9cc..8402421eabb65 100644 --- a/grafana/README.md +++ b/grafana/README.md @@ -1,6 +1,6 @@ # RisingWave Grafana Dashboard -The Grafana dashboard is generated with grafanalib. You'll need +The Grafana dashboard is generated with `grafanalib`. You'll need - Python - grafanalib @@ -29,18 +29,25 @@ And don't forget to include the generated `risingwave--dashboard.json` in t ./update.sh ``` -## Advanced Usage +## Multi-cluster Deployment -We can specify the source uid, dashboard uid, dashboard version, enable namespace filter and enable risingwave_name filter(used in multi-cluster deployment) via env variables. +The `generate.sh` supports multi-cluster deployment. The following environment variables are helpful: -For example, we can use the following query to generate dashboard json used in our benchmark cluster: +- `DASHBOARD_NAMESPACE_FILTER_ENABLED`: When set to `true`, a drop-down list will be added to the Grafana dashboard, and all Prometheus queries will be filtered by the selected namespace. +- `DASHBOARD_RISINGWAVE_NAME_FILTER_ENABLED`: When set to `true`, a drop-down list will be added to the Grafana dashboard, and all Prometheus queries will be filtered by the selected RisingWave name. This is useful when you have multiple RisingWave instances in the same namespace. +- `DASHBOARD_SOURCE_UID`: Set to the UID of your Prometheus source. +- `DASHBOARD_DYNAMIC_SOURCE`: Alternative to `DASHBOARD_SOURCE_UID`. When set to `true`, a drop-down list will be added to the Grafana dashboard to pick any one of the Prometheus sources. +- `DASHBOARD_UID`: Set to the UID of your Grafana dashboard. + +See more details in the `common.py` file. + +Examples: ```bash DASHBOARD_NAMESPACE_FILTER_ENABLED=true \ DASHBOARD_RISINGWAVE_NAME_FILTER_ENABLED=true \ DASHBOARD_SOURCE_UID= \ DASHBOARD_UID= \ -DASHBOARD_VERSION= \ ./generate.sh ``` @@ -51,6 +58,5 @@ DASHBOARD_NAMESPACE_FILTER_ENABLED=true \ DASHBOARD_RISINGWAVE_NAME_FILTER_ENABLED=true \ DASHBOARD_DYNAMIC_SOURCE=true \ DASHBOARD_UID= \ -DASHBOARD_VERSION= \ ./generate.sh ``` From e5c0a42990d6aa24d4ff853e8a9606483444590b Mon Sep 17 00:00:00 2001 From: stonepage <40830455+st1page@users.noreply.github.com> Date: Tue, 25 Jun 2024 18:36:32 +0800 Subject: [PATCH 07/11] chore(risedev): vaildate java 22 (#17447) --- src/risedevtool/connector.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/risedevtool/connector.toml b/src/risedevtool/connector.toml index abc23b67abdf8..535b4db843172 100644 --- a/src/risedevtool/connector.toml +++ b/src/risedevtool/connector.toml @@ -17,7 +17,7 @@ script = ''' set -euo pipefail -if !(command -v javac &> /dev/null && [[ "$(javac -version 2>&1 | awk '{print $2}')" =~ ^(11|17|19|20|21) ]]); then +if !(command -v javac &> /dev/null && [[ "$(javac -version 2>&1 | awk '{print $2}')" =~ ^(11|17|19|20|21|22) ]]); then echo "JDK 11+ is not installed. Please install JDK 11+ first." exit 1 fi From 2adcca3301cb7e6f3af9aaf9e214c30f8456550e Mon Sep 17 00:00:00 2001 From: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Date: Tue, 25 Jun 2024 18:37:27 +0800 Subject: [PATCH 08/11] fix(simulation): do not run background ddl on skipped tests (#17440) --- src/tests/simulation/src/slt.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/tests/simulation/src/slt.rs b/src/tests/simulation/src/slt.rs index e2fdfb6b54bb8..943d9bffcf4ca 100644 --- a/src/tests/simulation/src/slt.rs +++ b/src/tests/simulation/src/slt.rs @@ -20,7 +20,7 @@ use anyhow::{bail, Result}; use itertools::Itertools; use rand::{thread_rng, Rng, SeedableRng}; use rand_chacha::ChaChaRng; -use sqllogictest::{ParallelTestError, QueryExpect, Record, StatementExpect}; +use sqllogictest::{Condition, ParallelTestError, QueryExpect, Record, StatementExpect}; use crate::client::RisingWave; use crate::cluster::{Cluster, KillOpts}; @@ -274,6 +274,11 @@ pub async fn run_slt_task( } = &record && matches!(cmd, SqlCmd::CreateMaterializedView { .. }) && !manual_background_ddl_enabled + && conditions.iter().all(|c| { + *c != Condition::SkipIf { + label: "madsim".to_string(), + } + }) { let background_ddl_setting = rng.gen_bool(background_ddl_rate); let set_background_ddl = Record::Statement { From 9b5bcdc337865ac28d87bda9a113980b2e4e3c40 Mon Sep 17 00:00:00 2001 From: xiangjinwu <17769960+xiangjinwu@users.noreply.github.com> Date: Wed, 26 Jun 2024 11:27:37 +0800 Subject: [PATCH 09/11] refactor(source): cleanup unused `ParserProperties::key_encoding_config` (#17416) --- src/bench/sink_bench/main.rs | 1 - src/connector/src/parser/bytes_parser.rs | 1 - .../src/parser/debezium/debezium_parser.rs | 8 ++------ .../src/parser/debezium/simd_json_parser.rs | 1 - .../src/parser/maxwell/simd_json_parser.rs | 1 - src/connector/src/parser/mod.rs | 3 --- src/connector/src/parser/util.rs | 16 ---------------- .../src/source/datagen/source/generator.rs | 1 - .../src/source/datagen/source/reader.rs | 2 -- .../src/source/filesystem/s3/source/reader.rs | 1 - .../src/executor/backfill/cdc/cdc_backfill.rs | 1 - 11 files changed, 2 insertions(+), 34 deletions(-) diff --git a/src/bench/sink_bench/main.rs b/src/bench/sink_bench/main.rs index 1637b5df6c659..c063a8ca0cee7 100644 --- a/src/bench/sink_bench/main.rs +++ b/src/bench/sink_bench/main.rs @@ -209,7 +209,6 @@ impl MockDatagenSource { .unwrap(); let parser_config = ParserConfig { specific: SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Native, protocol_config: ProtocolProperties::Native, }, diff --git a/src/connector/src/parser/bytes_parser.rs b/src/connector/src/parser/bytes_parser.rs index 5df7fa28118d3..eeccf17be7a27 100644 --- a/src/connector/src/parser/bytes_parser.rs +++ b/src/connector/src/parser/bytes_parser.rs @@ -63,7 +63,6 @@ mod tests { async fn test_bytes_parser(get_payload: fn() -> Vec>) { let descs = vec![SourceColumnDesc::simple("id", DataType::Bytea, 0.into())]; let props = SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Bytes(BytesProperties { column_name: None }), protocol_config: ProtocolProperties::Plain, }; diff --git a/src/connector/src/parser/debezium/debezium_parser.rs b/src/connector/src/parser/debezium/debezium_parser.rs index 817c2a788f2be..bc43e556cb4f6 100644 --- a/src/connector/src/parser/debezium/debezium_parser.rs +++ b/src/connector/src/parser/debezium/debezium_parser.rs @@ -19,7 +19,6 @@ use risingwave_common::bail; use super::simd_json_parser::DebeziumJsonAccessBuilder; use super::{DebeziumAvroAccessBuilder, DebeziumAvroParserConfig}; use crate::error::ConnectorResult; -use crate::extract_key_config; use crate::parser::unified::debezium::DebeziumChangeEvent; use crate::parser::unified::json::TimestamptzHandling; use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; @@ -89,8 +88,8 @@ impl DebeziumParser { rw_columns: Vec, source_ctx: SourceContextRef, ) -> ConnectorResult { - let (key_config, key_type) = extract_key_config!(props); - let key_builder = build_accessor_builder(key_config, key_type).await?; + let key_builder = + build_accessor_builder(props.encoding_config.clone(), EncodingType::Key).await?; let payload_builder = build_accessor_builder(props.encoding_config, EncodingType::Value).await?; let debezium_props = if let ProtocolProperties::Debezium(props) = props.protocol_config { @@ -114,7 +113,6 @@ impl DebeziumParser { use crate::parser::JsonProperties; let props = SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, timestamptz_handling: None, @@ -225,7 +223,6 @@ mod tests { .collect::>(); let props = SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, timestamptz_handling: None, @@ -298,7 +295,6 @@ mod tests { .collect::>(); let props = SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, timestamptz_handling: None, diff --git a/src/connector/src/parser/debezium/simd_json_parser.rs b/src/connector/src/parser/debezium/simd_json_parser.rs index 85399ed772768..f9738eb9e357e 100644 --- a/src/connector/src/parser/debezium/simd_json_parser.rs +++ b/src/connector/src/parser/debezium/simd_json_parser.rs @@ -130,7 +130,6 @@ mod tests { async fn build_parser(rw_columns: Vec) -> DebeziumParser { let props = SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, timestamptz_handling: None, diff --git a/src/connector/src/parser/maxwell/simd_json_parser.rs b/src/connector/src/parser/maxwell/simd_json_parser.rs index 5db6cdd52e90c..e5bc7291ccf07 100644 --- a/src/connector/src/parser/maxwell/simd_json_parser.rs +++ b/src/connector/src/parser/maxwell/simd_json_parser.rs @@ -35,7 +35,6 @@ mod tests { ]; let props = SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, timestamptz_handling: None, diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index c11c833457a99..bca1b727d66a8 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -1049,7 +1049,6 @@ pub struct CommonParserConfig { #[derive(Debug, Clone, Default)] pub struct SpecificParserConfig { - pub key_encoding_config: Option, pub encoding_config: EncodingProperties, pub protocol_config: ProtocolProperties, } @@ -1057,7 +1056,6 @@ pub struct SpecificParserConfig { impl SpecificParserConfig { // for test only pub const DEFAULT_PLAIN_JSON: SpecificParserConfig = SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, timestamptz_handling: None, @@ -1278,7 +1276,6 @@ impl SpecificParserConfig { } }; Ok(Self { - key_encoding_config: None, encoding_config, protocol_config, }) diff --git a/src/connector/src/parser/util.rs b/src/connector/src/parser/util.rs index 30cb1fbf7d62e..590ba927854d3 100644 --- a/src/connector/src/parser/util.rs +++ b/src/connector/src/parser/util.rs @@ -92,22 +92,6 @@ macro_rules! only_parse_payload { }; } -// Extract encoding config and encoding type from ParserProperties -// for message key. -// -// Suppose (A, B) is the combination of key/payload combination: -// For (None, B), key should be the the key setting from B -// For (A, B), key should be the value setting from A -#[macro_export] -macro_rules! extract_key_config { - ($props:ident) => { - match $props.key_encoding_config { - Some(config) => (config, EncodingType::Value), - None => ($props.encoding_config.clone(), EncodingType::Key), - } - }; -} - /// Load raw bytes from: /// * local file, for on-premise or testing. /// * http/https, for common usage. diff --git a/src/connector/src/source/datagen/source/generator.rs b/src/connector/src/source/datagen/source/generator.rs index 5716631dff620..600efda2f6255 100644 --- a/src/connector/src/source/datagen/source/generator.rs +++ b/src/connector/src/source/datagen/source/generator.rs @@ -278,7 +278,6 @@ mod tests { use_schema_registry: false, timestamptz_handling: None, }), - key_encoding_config: None, }, data_types, rows_per_second, diff --git a/src/connector/src/source/datagen/source/reader.rs b/src/connector/src/source/datagen/source/reader.rs index d3115f504f32e..992343058e9ef 100644 --- a/src/connector/src/source/datagen/source/reader.rs +++ b/src/connector/src/source/datagen/source/reader.rs @@ -399,7 +399,6 @@ mod tests { state, ParserConfig { specific: SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Native, protocol_config: ProtocolProperties::Native, }, @@ -457,7 +456,6 @@ mod tests { }; let parser_config = ParserConfig { specific: SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Native, protocol_config: ProtocolProperties::Native, }, diff --git a/src/connector/src/source/filesystem/s3/source/reader.rs b/src/connector/src/source/filesystem/s3/source/reader.rs index c3c800d6a5317..0340e584d0b8f 100644 --- a/src/connector/src/source/filesystem/s3/source/reader.rs +++ b/src/connector/src/source/filesystem/s3/source/reader.rs @@ -289,7 +289,6 @@ mod tests { let config = ParserConfig { common: CommonParserConfig { rw_columns: descs }, specific: SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Csv(csv_config), protocol_config: ProtocolProperties::Plain, }, diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index 168e619cc956a..4e564130accbb 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -641,7 +641,6 @@ impl CdcBackfillExecutor { #[try_stream(ok = Message, error = StreamExecutorError)] pub async fn transform_upstream(upstream: BoxedMessageStream, output_columns: &[ColumnDesc]) { let props = SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, timestamptz_handling: None, From 0830f4f73a14ca3e052f4f9405ffab118864c168 Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 26 Jun 2024 11:38:06 +0800 Subject: [PATCH 10/11] test: workaround include_key_as flaky test (#17457) Signed-off-by: xxchan --- e2e_test/source_inline/kafka/include_key_as.slt | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/e2e_test/source_inline/kafka/include_key_as.slt b/e2e_test/source_inline/kafka/include_key_as.slt index 2258204313b0a..919e47bbb4248 100644 --- a/e2e_test/source_inline/kafka/include_key_as.slt +++ b/e2e_test/source_inline/kafka/include_key_as.slt @@ -184,7 +184,7 @@ rpk topic create 'test_additional_columns' # Note: `sh` doesn't have {..} brace expansion system ok -bash -c 'for i in {0..100}; do echo "key\$i:{\\"a\\": \$i}" | rpk topic produce test_additional_columns -f "%k:%v\\n" -H "header1=v1" -H "header2=v2"; done' +bash -c 'for i in {0..10}; do echo "key\$i:{\\"a\\": \$i}" | rpk topic produce test_additional_columns -f "%k:%v\\n" -H "header1=v1" -H "header2=v2"; done' statement error @@ -273,15 +273,16 @@ WHERE key_col IS NOT NULL AND timestamp_col IS NOT NULL AND header_col_combined IS NOT NULL ---- -101 +11 query ?? -WITH arr AS (SELECT header_col_combined FROM additional_columns limit 1) -SELECT unnest(header_col_combined) FROM arr ORDER BY 1; +WITH arr AS (SELECT header_col_combined FROM additional_columns), +unnested AS (SELECT unnest(header_col_combined) FROM arr) +select *, count(*) from unnested group by 1 order by 1; ---- -(header1,"\\x7631") -(header2,"\\x7632") +(header1,"\\x7631") 11 +(header2,"\\x7632") 11 query ???? select header_col_1, header_col_2, header_col_3, header_col_4 from additional_columns limit 1 From 6d8c9ab7b41a7f2ef81a8f4181234a9d5ed69a23 Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Wed, 26 Jun 2024 14:02:31 +0800 Subject: [PATCH 11/11] refactor(storage): emit sst on single upload task succeeds and remove sync finish event (#17333) --- .../event_handler/hummock_event_handler.rs | 114 ++--- src/storage/src/hummock/event_handler/mod.rs | 5 +- .../src/hummock/event_handler/uploader.rs | 412 ++++++++++-------- .../src/hummock/store/hummock_storage.rs | 6 +- 4 files changed, 269 insertions(+), 268 deletions(-) diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index 277984d3545dd..53d7167e0167b 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::{HashMap, VecDeque}; use std::pin::pin; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::Relaxed; @@ -43,7 +43,7 @@ use crate::hummock::compactor::{await_tree_key, compact, CompactorContext}; use crate::hummock::conflict_detector::ConflictDetector; use crate::hummock::event_handler::refiller::{CacheRefillerEvent, SpawnRefillTask}; use crate::hummock::event_handler::uploader::{ - HummockUploader, SpawnUploadTask, SyncedData, UploadTaskInfo, UploadTaskOutput, UploaderEvent, + HummockUploader, SpawnUploadTask, SyncedData, UploadTaskInfo, UploadTaskOutput, }; use crate::hummock::event_handler::{ HummockEvent, HummockReadVersionRef, HummockVersionUpdate, ReadOnlyReadVersionMapping, @@ -184,8 +184,7 @@ impl HummockEventReceiver { } struct HummockEventHandlerMetrics { - event_handler_on_sync_finish_latency: Histogram, - event_handler_on_spilled_latency: Histogram, + event_handler_on_upload_finish_latency: Histogram, event_handler_on_apply_version_update: Histogram, event_handler_on_recv_version_update: Histogram, } @@ -329,12 +328,9 @@ impl HummockEventHandler { let write_conflict_detector = ConflictDetector::new_from_config(storage_opts); let metrics = HummockEventHandlerMetrics { - event_handler_on_sync_finish_latency: state_store_metrics + event_handler_on_upload_finish_latency: state_store_metrics .event_handler_latency - .with_label_values(&["on_sync_finish"]), - event_handler_on_spilled_latency: state_store_metrics - .event_handler_latency - .with_label_values(&["on_spilled"]), + .with_label_values(&["on_upload_finish"]), event_handler_on_apply_version_update: state_store_metrics .event_handler_latency .with_label_values(&["apply_version"]), @@ -396,40 +392,6 @@ impl HummockEventHandler { // Handler for different events impl HummockEventHandler { - fn handle_epoch_synced(&mut self, epoch: HummockEpoch, data: SyncedData) { - debug!("epoch has been synced: {}.", epoch); - let SyncedData { - newly_upload_ssts: newly_uploaded_sstables, - .. - } = data; - { - let newly_uploaded_sstables = newly_uploaded_sstables - .into_iter() - .map(Arc::new) - .collect_vec(); - if !newly_uploaded_sstables.is_empty() { - let related_instance_ids: HashSet<_> = newly_uploaded_sstables - .iter() - .flat_map(|sst| sst.imm_ids().keys().cloned()) - .collect(); - self.for_each_read_version(related_instance_ids, |instance_id, read_version| { - newly_uploaded_sstables - .iter() - // Take rev because newer data come first in `newly_uploaded_sstables` but we apply - // older data first - .rev() - .for_each(|staging_sstable_info| { - if staging_sstable_info.imm_ids().contains_key(&instance_id) { - read_version.update(VersionUpdate::Staging(StagingData::Sst( - staging_sstable_info.clone(), - ))); - } - }); - }); - } - }; - } - /// This function will be performed under the protection of the `read_version_mapping` read /// lock, and add write lock on each `read_version` operation fn for_each_read_version( @@ -441,7 +403,7 @@ impl HummockEventHandler { #[cfg(debug_assertions)] { // check duplication on debug_mode - let mut id_set = HashSet::new(); + let mut id_set = std::collections::HashSet::new(); for instance in instances { assert!(id_set.insert(instance)); } @@ -488,9 +450,8 @@ impl HummockEventHandler { } } - fn handle_data_spilled(&mut self, staging_sstable_info: StagingSstableInfo) { - let staging_sstable_info = Arc::new(staging_sstable_info); - trace!("data_spilled. SST size {}", staging_sstable_info.imm_size()); + fn handle_uploaded_sst_inner(&mut self, staging_sstable_info: Arc) { + trace!("data_flushed. SST size {}", staging_sstable_info.imm_size()); self.for_each_read_version( staging_sstable_info.imm_ids().keys().cloned(), |_, read_version| { @@ -504,7 +465,7 @@ impl HummockEventHandler { fn handle_sync_epoch( &mut self, new_sync_epoch: HummockEpoch, - sync_result_sender: oneshot::Sender>, + sync_result_sender: oneshot::Sender>, ) { debug!( "awaiting for epoch to be synced: {}, max_synced_epoch: {}", @@ -715,8 +676,8 @@ impl HummockEventHandler { pub async fn start_hummock_event_handler_worker(mut self) { loop { tokio::select! { - event = self.uploader.next_event() => { - self.handle_uploader_event(event); + sst = self.uploader.next_uploaded_sst() => { + self.handle_uploaded_sst(sst); } event = self.refiller.next_event() => { let CacheRefillerEvent {pinned_version, new_pinned_version } = event; @@ -748,21 +709,12 @@ impl HummockEventHandler { } } - fn handle_uploader_event(&mut self, event: UploaderEvent) { - match event { - UploaderEvent::SyncFinish(epoch, data) => { - let _timer = self - .metrics - .event_handler_on_sync_finish_latency - .start_timer(); - self.handle_epoch_synced(epoch, data); - } - - UploaderEvent::DataSpilled(staging_sstable_info) => { - let _timer = self.metrics.event_handler_on_spilled_latency.start_timer(); - self.handle_data_spilled(staging_sstable_info); - } - } + fn handle_uploaded_sst(&mut self, sst: Arc) { + let _timer = self + .metrics + .event_handler_on_upload_finish_latency + .start_timer(); + self.handle_uploaded_sst_inner(sst); } /// Gracefully shutdown if returns `true`. @@ -925,21 +877,27 @@ impl HummockEventHandler { } pub(super) fn send_sync_result( - sender: oneshot::Sender>, - result: HummockResult<&SyncedData>, + sender: oneshot::Sender>, + result: HummockResult, ) { - let result = result.map( - |SyncedData { - newly_upload_ssts, - uploaded_ssts, - table_watermarks, - }| { + let _ = sender.send(result).inspect_err(|e| { + error!("unable to send sync result. Err: {:?}", e); + }); +} + +impl SyncedData { + pub fn into_sync_result(self) -> SyncResult { + { + let SyncedData { + uploaded_ssts, + table_watermarks, + } = self; let mut sync_size = 0; let mut uncommitted_ssts = Vec::new(); let mut old_value_ssts = Vec::new(); // The newly uploaded `sstable_infos` contains newer data. Therefore, // `newly_upload_ssts` at the front - for sst in newly_upload_ssts.iter().chain(uploaded_ssts.iter()) { + for sst in uploaded_ssts { sync_size += sst.imm_size(); uncommitted_ssts.extend(sst.sstable_infos().iter().cloned()); old_value_ssts.extend(sst.old_value_sstable_infos().iter().cloned()); @@ -950,12 +908,8 @@ pub(super) fn send_sync_result( table_watermarks: table_watermarks.clone(), old_value_ssts, } - }, - ); - - let _ = sender.send(result).inspect_err(|e| { - error!("unable to send sync result. Err: {:?}", e); - }); + } + } } #[cfg(test)] diff --git a/src/storage/src/hummock/event_handler/mod.rs b/src/storage/src/hummock/event_handler/mod.rs index bbf69ae194f72..c88611cd0888a 100644 --- a/src/storage/src/hummock/event_handler/mod.rs +++ b/src/storage/src/hummock/event_handler/mod.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use parking_lot::{RwLock, RwLockReadGuard}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::TableId; -use risingwave_hummock_sdk::{HummockEpoch, SyncResult}; +use risingwave_hummock_sdk::HummockEpoch; use thiserror_ext::AsReport; use tokio::sync::oneshot; @@ -36,6 +36,7 @@ use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; use super::store::version::HummockReadVersion; use crate::hummock::event_handler::hummock_event_handler::HummockEventSender; +use crate::hummock::event_handler::uploader::SyncedData; #[derive(Debug)] pub struct BufferWriteRequest { @@ -59,7 +60,7 @@ pub enum HummockEvent { /// handle sender. SyncEpoch { new_sync_epoch: HummockEpoch, - sync_result_sender: oneshot::Sender>, + sync_result_sender: oneshot::Sender>, }, /// Clear shared buffer and reset all states diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index f768aa23dcd89..ee0e975e5a3ea 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -22,7 +22,6 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{ready, Context, Poll}; -use futures::future::{try_join_all, TryJoinAll}; use futures::FutureExt; use itertools::Itertools; use more_asserts::{assert_ge, assert_gt}; @@ -35,7 +34,7 @@ use risingwave_common::must_match; use risingwave_hummock_sdk::table_watermark::{ TableWatermarks, VnodeWatermark, WatermarkDirection, }; -use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, LocalSstableInfo, SyncResult}; +use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, LocalSstableInfo}; use thiserror_ext::AsReport; use tokio::sync::oneshot; use tokio::task::JoinHandle; @@ -265,7 +264,10 @@ impl UploadingTask { } /// Poll the result of the uploading task - fn poll_result(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_result( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { Poll::Ready(match ready!(self.join_handle.poll_unpin(cx)) { Ok(task_result) => task_result .inspect(|_| { @@ -277,13 +279,13 @@ impl UploadingTask { }) .inspect_err(|e| error!(task_info = ?self.task_info, err = ?e.as_report(), "upload task failed")) .map(|output| { - StagingSstableInfo::new( + Arc::new(StagingSstableInfo::new( output.new_value_ssts, output.old_value_ssts, self.task_info.epochs.clone(), self.task_info.imm_ids.clone(), self.task_info.task_size, - ) + )) }), Err(err) => Err(HummockError::other(format!( @@ -294,7 +296,7 @@ impl UploadingTask { } /// Poll the uploading task until it succeeds. If it fails, we will retry it. - fn poll_ok_with_retry(&mut self, cx: &mut Context<'_>) -> Poll { + fn poll_ok_with_retry(&mut self, cx: &mut Context<'_>) -> Poll> { loop { let result = ready!(self.poll_result(cx)); match result { @@ -323,7 +325,7 @@ impl UploadingTask { } impl Future for UploadingTask { - type Output = HummockResult; + type Output = HummockResult>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.poll_result(cx) @@ -337,7 +339,7 @@ struct SpilledData { // ordered spilling tasks. Task at the back is spilling older data. uploading_tasks: VecDeque, // ordered spilled data. Data at the back is older. - uploaded_data: VecDeque, + uploaded_data: VecDeque>, } impl SpilledData { @@ -347,7 +349,10 @@ impl SpilledData { /// Poll the successful spill of the oldest uploading task. Return `Poll::Ready(None)` is there /// is no uploading task - fn poll_success_spill(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_success_spill( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { // only poll the oldest uploading task if there is any if let Some(task) = self.uploading_tasks.back_mut() { let staging_sstable_info = ready!(task.poll_ok_with_retry(cx)); @@ -905,19 +910,17 @@ impl UnsyncData { struct SyncingData { sync_epoch: HummockEpoch, - // TODO: may replace `TryJoinAll` with a future that will abort other join handles once - // one join handle failed. - // None means there is no pending uploading tasks - uploading_tasks: Option>, + // task of newer data at the front + uploading_tasks: VecDeque, // newer data at the front - uploaded: VecDeque, + uploaded: VecDeque>, table_watermarks: HashMap, - sync_result_sender: oneshot::Sender>, + sync_result_sender: oneshot::Sender>, } +#[derive(Debug)] pub struct SyncedData { - pub newly_upload_ssts: Vec, - pub uploaded_ssts: VecDeque, + pub uploaded_ssts: VecDeque>, pub table_watermarks: HashMap, } @@ -962,19 +965,23 @@ impl UploaderData { for (_, epoch_data) in self.unsync_data.epoch_data { epoch_data.spilled_data.abort(); } - // TODO: call `abort` on the uploading task join handle of syncing_data for syncing_data in self.syncing_data { + for task in syncing_data.uploading_tasks { + task.join_handle.abort(); + } send_sync_result(syncing_data.sync_result_sender, Err(err())); } } } +struct ErrState { + failed_epoch: HummockEpoch, + reason: String, +} + enum UploaderState { Working(UploaderData), - Err { - failed_epoch: HummockEpoch, - reason: String, - }, + Err(ErrState), } /// An uploader for hummock data. @@ -1081,14 +1088,14 @@ impl HummockUploader { pub(super) fn start_sync_epoch( &mut self, epoch: HummockEpoch, - sync_result_sender: oneshot::Sender>, + sync_result_sender: oneshot::Sender>, ) { let data = match &mut self.state { UploaderState::Working(data) => data, - UploaderState::Err { + UploaderState::Err(ErrState { failed_epoch, reason, - } => { + }) => { let result = Err(HummockError::other(format!( "previous epoch {} failed due to [{}]", failed_epoch, reason @@ -1119,15 +1126,9 @@ impl HummockUploader { .. } = sync_data; - let try_join_all_upload_task = if uploading_tasks.is_empty() { - None - } else { - Some(try_join_all(uploading_tasks)) - }; - data.syncing_data.push_front(SyncingData { sync_epoch: epoch, - uploading_tasks: try_join_all_upload_task, + uploading_tasks, uploaded: uploaded_data, table_watermarks, sync_result_sender, @@ -1139,20 +1140,24 @@ impl HummockUploader { .set(data.syncing_data.len() as _); } - fn add_synced_data(&mut self, epoch: HummockEpoch) { + fn set_max_synced_epoch( + max_synced_epoch: &mut HummockEpoch, + max_syncing_epoch: HummockEpoch, + epoch: HummockEpoch, + ) { assert!( - epoch <= self.max_syncing_epoch, + epoch <= max_syncing_epoch, "epoch {} that has been synced has not started syncing yet. previous max syncing epoch {}", epoch, - self.max_syncing_epoch + max_syncing_epoch ); assert!( - epoch > self.max_synced_epoch, + epoch > *max_synced_epoch, "epoch {} has been synced. previous max synced epoch: {}", epoch, - self.max_synced_epoch + max_synced_epoch ); - self.max_synced_epoch = epoch; + *max_synced_epoch = epoch; } pub(crate) fn update_pinned_version(&mut self, pinned_version: PinnedVersion) { @@ -1246,55 +1251,83 @@ impl HummockUploader { impl UploaderData { /// Poll the syncing task of the syncing data of the oldest epoch. Return `Poll::Ready(None)` if /// there is no syncing data. - #[expect(clippy::type_complexity)] fn poll_syncing_task( &mut self, cx: &mut Context<'_>, context: &UploaderContext, - ) -> Poll< - Option<( - HummockEpoch, - HummockResult, - oneshot::Sender>, - )>, - > { - // Only poll the oldest epoch if there is any so that the syncing epoch are finished in - // order - if let Some(syncing_data) = self.syncing_data.back_mut() { - // The syncing task has finished - let result = if let Some(all_tasks) = &mut syncing_data.uploading_tasks { - ready!(all_tasks.poll_unpin(cx)) + mut set_max_synced_epoch: impl FnMut(u64), + ) -> Poll, ErrState>>> { + while let Some(syncing_data) = self.syncing_data.back_mut() { + let sstable_info = if let Some(task) = syncing_data.uploading_tasks.back_mut() { + let result = ready!(task.poll_result(cx)); + let _task = syncing_data.uploading_tasks.pop_back().expect("non-empty"); + let sstable_info = match result { + Ok(sstable_info) => sstable_info, + Err(e) => { + let SyncingData { + sync_epoch, + uploading_tasks, + sync_result_sender, + .. + } = self.syncing_data.pop_back().expect("non-empty"); + for task in uploading_tasks { + task.join_handle.abort(); + } + send_sync_result( + sync_result_sender, + Err(HummockError::other(format!( + "failed sync task: {:?}", + e.as_report() + ))), + ); + + return Poll::Ready(Some(Err(ErrState { + failed_epoch: sync_epoch, + reason: format!("{:?}", e.as_report()), + }))); + } + }; + syncing_data.uploaded.push_front(sstable_info.clone()); + self.unsync_data.ack_flushed(&sstable_info); + Some(sstable_info) } else { - Ok(Vec::new()) + None }; - let syncing_data = self.syncing_data.pop_back().expect("must exist"); - context - .stats - .uploader_syncing_epoch_count - .set(self.syncing_data.len() as _); - let epoch = syncing_data.sync_epoch; - - let result = result.map(|newly_uploaded_sstable_infos| { - // take `rev` so that old data is acked first - for sstable_info in newly_uploaded_sstable_infos.iter().rev() { - self.unsync_data.ack_flushed(sstable_info); - } - SyncedData { - newly_upload_ssts: newly_uploaded_sstable_infos, - uploaded_ssts: syncing_data.uploaded, - table_watermarks: syncing_data.table_watermarks, - } - }); - Poll::Ready(Some((epoch, result, syncing_data.sync_result_sender))) - } else { - Poll::Ready(None) + if syncing_data.uploading_tasks.is_empty() { + let syncing_data = self.syncing_data.pop_back().expect("non-empty"); + let SyncingData { + sync_epoch, + uploading_tasks, + uploaded, + table_watermarks, + sync_result_sender, + } = syncing_data; + assert!(uploading_tasks.is_empty()); + context + .stats + .uploader_syncing_epoch_count + .set(self.syncing_data.len() as _); + set_max_synced_epoch(sync_epoch); + send_sync_result( + sync_result_sender, + Ok(SyncedData { + uploaded_ssts: uploaded, + table_watermarks, + }), + ) + } + + if let Some(sstable_info) = sstable_info { + return Poll::Ready(Some(Ok(sstable_info))); + } } + Poll::Ready(None) } /// Poll the success of the oldest spilled task of unsync spill data. Return `Poll::Ready(None)` if /// there is no spilling task. - fn poll_spill_task(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_spill_task(&mut self, cx: &mut Context<'_>) -> Poll>> { // iterator from older epoch to new epoch so that the spill task are finished in epoch order for epoch_data in self.unsync_data.epoch_data.values_mut() { // if None, there is no spilling task. Search for the unsync data of the next epoch in @@ -1308,53 +1341,50 @@ impl UploaderData { } } -pub(super) enum UploaderEvent { - // staging sstable info of newer data comes first - SyncFinish(HummockEpoch, SyncedData), - DataSpilled(StagingSstableInfo), -} - impl HummockUploader { - pub(super) fn next_event(&mut self) -> impl Future + '_ { + pub(super) fn next_uploaded_sst( + &mut self, + ) -> impl Future> + '_ { poll_fn(|cx| { let UploaderState::Working(data) = &mut self.state else { return Poll::Pending; }; - if let Some((epoch, result, result_sender)) = - ready!(data.poll_syncing_task(cx, &self.context)) + + if let Some(result) = + ready!( + data.poll_syncing_task(cx, &self.context, |new_synced_epoch| { + Self::set_max_synced_epoch( + &mut self.max_synced_epoch, + self.max_syncing_epoch, + new_synced_epoch, + ) + }) + ) { match result { Ok(data) => { - self.add_synced_data(epoch); - send_sync_result(result_sender, Ok(&data)); - return Poll::Ready(UploaderEvent::SyncFinish(epoch, data)); + return Poll::Ready(data); } Err(e) => { - send_sync_result( - result_sender, - Err(HummockError::other(format!( - "failed sync task: {:?}", - e.as_report() - ))), - ); + let failed_epoch = e.failed_epoch; let data = must_match!(replace( &mut self.state, - UploaderState::Err { - failed_epoch: epoch, - reason: format!("{:?}", e.as_report()), - }, + UploaderState::Err(e), ), UploaderState::Working(data) => data); data.abort(|| { - HummockError::other(format!("previous epoch {} failed to sync", epoch)) + HummockError::other(format!( + "previous epoch {} failed to sync", + failed_epoch + )) }); return Poll::Pending; } - }; + } } if let Some(sstable_info) = ready!(data.poll_spill_task(cx)) { - return Poll::Ready(UploaderEvent::DataSpilled(sstable_info)); + return Poll::Ready(sstable_info); } Poll::Pending @@ -1394,8 +1424,7 @@ pub(crate) mod tests { use crate::hummock::event_handler::uploader::uploader_imm::UploaderImm; use crate::hummock::event_handler::uploader::{ get_payload_imm_ids, HummockUploader, SyncedData, UploadTaskInfo, UploadTaskOutput, - UploadTaskPayload, UploaderContext, UploaderData, UploaderEvent, UploaderState, - UploadingTask, + UploadTaskPayload, UploaderContext, UploaderData, UploaderState, UploadingTask, }; use crate::hummock::event_handler::{LocalInstanceId, TEST_LOCAL_INSTANCE_ID}; use crate::hummock::local_version::pinned_version::PinnedVersion; @@ -1420,10 +1449,6 @@ pub(crate) mod tests { fn data(&self) -> &UploaderData { must_match!(&self.state, UploaderState::Working(data) => data) } - - fn start_sync_epoch_for_test(&mut self, epoch: HummockEpoch) { - self.start_sync_epoch(epoch, oneshot::channel().0) - } } fn test_hummock_version(epoch: HummockEpoch) -> HummockVersion { @@ -1654,24 +1679,34 @@ pub(crate) mod tests { uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm.clone()); uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1); - uploader.start_sync_epoch_for_test(epoch1); + let (sync_tx, sync_rx) = oneshot::channel(); + uploader.start_sync_epoch(epoch1, sync_tx); assert_eq!(epoch1 as HummockEpoch, uploader.max_syncing_epoch); assert_eq!(1, uploader.data().syncing_data.len()); let syncing_data = uploader.data().syncing_data.front().unwrap(); assert_eq!(epoch1 as HummockEpoch, syncing_data.sync_epoch); assert!(syncing_data.uploaded.is_empty()); - assert!(syncing_data.uploading_tasks.is_some()); + assert!(!syncing_data.uploading_tasks.is_empty()); - match uploader.next_event().await { - UploaderEvent::SyncFinish(finished_epoch, data) => { - assert_eq!(epoch1, finished_epoch); + let staging_sst = uploader.next_uploaded_sst().await; + assert_eq!(&vec![epoch1], staging_sst.epochs()); + assert_eq!( + &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]), + staging_sst.imm_ids() + ); + assert_eq!( + &dummy_success_upload_output().new_value_ssts, + staging_sst.sstable_infos() + ); + + match sync_rx.await { + Ok(Ok(data)) => { let SyncedData { - newly_upload_ssts, uploaded_ssts, table_watermarks, } = data; - assert_eq!(1, newly_upload_ssts.len()); - let staging_sst = newly_upload_ssts.first().unwrap(); + assert_eq!(1, uploaded_ssts.len()); + let staging_sst = &uploaded_ssts[0]; assert_eq!(&vec![epoch1], staging_sst.epochs()); assert_eq!( &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]), @@ -1681,7 +1716,6 @@ pub(crate) mod tests { &dummy_success_upload_output().new_value_ssts, staging_sst.sstable_infos() ); - assert!(uploaded_ssts.is_empty()); assert!(table_watermarks.is_empty()); } _ => unreachable!(), @@ -1701,14 +1735,15 @@ pub(crate) mod tests { let mut uploader = test_uploader(dummy_success_upload_future); let epoch1 = INITIAL_EPOCH.next_epoch(); - uploader.start_sync_epoch_for_test(epoch1); + let (sync_tx, sync_rx) = oneshot::channel(); + uploader.start_sync_epoch(epoch1, sync_tx); assert_eq!(epoch1, uploader.max_syncing_epoch); - match uploader.next_event().await { - UploaderEvent::SyncFinish(finished_epoch, data) => { - assert_eq!(epoch1, finished_epoch); + assert_uploader_pending(&mut uploader).await; + + match sync_rx.await { + Ok(Ok(data)) => { assert!(data.uploaded_ssts.is_empty()); - assert!(data.newly_upload_ssts.is_empty()); } _ => unreachable!(), }; @@ -1733,14 +1768,15 @@ pub(crate) mod tests { uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1); uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm); - uploader.start_sync_epoch_for_test(epoch1); + let (sync_tx, sync_rx) = oneshot::channel(); + uploader.start_sync_epoch(epoch1, sync_tx); assert_eq!(epoch1, uploader.max_syncing_epoch); - match uploader.next_event().await { - UploaderEvent::SyncFinish(finished_epoch, data) => { - assert_eq!(epoch1, finished_epoch); + assert_uploader_pending(&mut uploader).await; + + match sync_rx.await { + Ok(Ok(data)) => { assert!(data.uploaded_ssts.is_empty()); - assert!(data.newly_upload_ssts.is_empty()); } _ => unreachable!(), }; @@ -1758,9 +1794,11 @@ pub(crate) mod tests { async fn test_uploader_poll_empty() { let mut uploader = test_uploader(dummy_success_upload_future); let data = must_match!(&mut uploader.state, UploaderState::Working(data) => data); - assert!(poll_fn(|cx| data.poll_syncing_task(cx, &uploader.context)) - .await - .is_none()); + assert!( + poll_fn(|cx| data.poll_syncing_task(cx, &uploader.context, |_| unreachable!())) + .await + .is_none() + ); assert!(poll_fn(|cx| data.poll_spill_task(cx)).await.is_none()); } @@ -1784,7 +1822,8 @@ pub(crate) mod tests { assert_eq!(epoch1, uploader.max_syncing_epoch); uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch6); - uploader.add_imm(TEST_LOCAL_INSTANCE_ID, gen_imm(epoch6).await); + let imm = gen_imm(epoch6).await; + uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm.clone()); uploader.update_pinned_version(version2); assert_eq!(epoch2, uploader.max_synced_epoch); assert_eq!(epoch2, uploader.max_syncing_epoch); @@ -1794,18 +1833,25 @@ pub(crate) mod tests { assert_eq!(epoch3, uploader.max_synced_epoch); assert_eq!(epoch3, uploader.max_syncing_epoch); - uploader.start_sync_epoch_for_test(epoch6); + let (sync_tx, sync_rx) = oneshot::channel(); + uploader.start_sync_epoch(epoch6, sync_tx); assert_eq!(epoch6, uploader.max_syncing_epoch); uploader.update_pinned_version(version4); assert_eq!(epoch4, uploader.max_synced_epoch); assert_eq!(epoch6, uploader.max_syncing_epoch); - match uploader.next_event().await { - UploaderEvent::SyncFinish(epoch, _) => { - assert_eq!(epoch6, epoch); + let sst = uploader.next_uploaded_sst().await; + assert_eq!(&get_imm_ids([&imm]), sst.imm_ids()); + + match sync_rx.await { + Ok(Ok(data)) => { + assert!(data.table_watermarks.is_empty()); + assert_eq!(1, data.uploaded_ssts.len()); + assert_eq!(&get_imm_ids([&imm]), data.uploaded_ssts[0].imm_ids()); } - UploaderEvent::DataSpilled(_) => unreachable!(), + _ => unreachable!(), } + uploader.update_pinned_version(version5); assert_eq!(epoch6, uploader.max_synced_epoch); assert_eq!(epoch6, uploader.max_syncing_epoch); @@ -1881,11 +1927,12 @@ pub(crate) mod tests { yield_now().await; } assert!( - poll_fn(|cx| Poll::Ready(uploader.next_event().poll_unpin(cx))) + poll_fn(|cx| Poll::Ready(uploader.next_uploaded_sst().poll_unpin(cx))) .await .is_pending() ) } + #[tokio::test] async fn test_uploader_finish_in_order() { let config = StorageOpts { @@ -1933,19 +1980,13 @@ pub(crate) mod tests { assert_uploader_pending(&mut uploader).await; finish_tx1.send(()).unwrap(); - if let UploaderEvent::DataSpilled(sst) = uploader.next_event().await { - assert_eq!(&get_payload_imm_ids(&epoch1_spill_payload12), sst.imm_ids()); - assert_eq!(&vec![epoch1], sst.epochs()); - } else { - unreachable!("") - } + let sst = uploader.next_uploaded_sst().await; + assert_eq!(&get_payload_imm_ids(&epoch1_spill_payload12), sst.imm_ids()); + assert_eq!(&vec![epoch1], sst.epochs()); - if let UploaderEvent::DataSpilled(sst) = uploader.next_event().await { - assert_eq!(&get_payload_imm_ids(&epoch2_spill_payload), sst.imm_ids()); - assert_eq!(&vec![epoch2], sst.epochs()); - } else { - unreachable!("") - } + let sst = uploader.next_uploaded_sst().await; + assert_eq!(&get_payload_imm_ids(&epoch2_spill_payload), sst.imm_ids()); + assert_eq!(&vec![epoch2], sst.epochs()); let imm1_3 = gen_imm_with_limiter(epoch1, memory_limiter).await; uploader.add_imm(instance_id1, imm1_3.clone()); @@ -1960,7 +2001,8 @@ pub(crate) mod tests { let (await_start1_4, finish_tx1_4) = new_task_notifier(get_payload_imm_ids(&epoch1_sync_payload)); uploader.local_seal_epoch_for_test(instance_id1, epoch1); - uploader.start_sync_epoch_for_test(epoch1); + let (sync_tx1, mut sync_rx1) = oneshot::channel(); + uploader.start_sync_epoch(epoch1, sync_tx1); await_start1_4.await; let epoch3 = epoch2.next_epoch(); @@ -2012,24 +2054,30 @@ pub(crate) mod tests { assert_uploader_pending(&mut uploader).await; finish_tx1_3.send(()).unwrap(); - if let UploaderEvent::SyncFinish(epoch, data) = uploader.next_event().await { - assert_eq!(epoch1, epoch); - assert_eq!(2, data.newly_upload_ssts.len()); - assert_eq!(1, data.uploaded_ssts.len()); + let sst = uploader.next_uploaded_sst().await; + assert_eq!(&get_payload_imm_ids(&epoch1_spill_payload3), sst.imm_ids()); + + assert!(poll_fn(|cx| Poll::Ready(sync_rx1.poll_unpin(cx).is_pending())).await); + + let sst = uploader.next_uploaded_sst().await; + assert_eq!(&get_payload_imm_ids(&epoch1_sync_payload), sst.imm_ids()); + + if let Ok(Ok(data)) = sync_rx1.await { + assert_eq!(3, data.uploaded_ssts.len()); assert_eq!( &get_payload_imm_ids(&epoch1_sync_payload), - data.newly_upload_ssts[0].imm_ids() + data.uploaded_ssts[0].imm_ids() ); assert_eq!( &get_payload_imm_ids(&epoch1_spill_payload3), - data.newly_upload_ssts[1].imm_ids() + data.uploaded_ssts[1].imm_ids() ); assert_eq!( &get_payload_imm_ids(&epoch1_spill_payload12), - data.uploaded_ssts[0].imm_ids() + data.uploaded_ssts[2].imm_ids() ); } else { - unreachable!("should be sync finish"); + unreachable!() } // current uploader state: @@ -2039,10 +2087,13 @@ pub(crate) mod tests { // syncing: empty // synced: epoch1: sst([imm1_4]), sst([imm1_3]), sst([imm1_2, imm1_1]) - uploader.start_sync_epoch_for_test(epoch2); - if let UploaderEvent::SyncFinish(epoch, data) = uploader.next_event().await { - assert_eq!(epoch2, epoch); - assert!(data.newly_upload_ssts.is_empty()); + let (sync_tx2, sync_rx2) = oneshot::channel(); + uploader.start_sync_epoch(epoch2, sync_tx2); + uploader.local_seal_epoch_for_test(instance_id2, epoch3); + let sst = uploader.next_uploaded_sst().await; + assert_eq!(&get_payload_imm_ids(&epoch3_spill_payload1), sst.imm_ids()); + + if let Ok(Ok(data)) = sync_rx2.await { assert_eq!(data.uploaded_ssts.len(), 1); assert_eq!( &get_payload_imm_ids(&epoch2_spill_payload), @@ -2053,21 +2104,6 @@ pub(crate) mod tests { } assert_eq!(epoch2, uploader.max_synced_epoch); - // current uploader state: - // unsealed: epoch3: imm: imm3_3, uploading: [imm3_2], [imm3_1] - // epoch4: imm: imm4 - // sealed: empty - // syncing: empty - // synced: epoch1: sst([imm1_4]), sst([imm1_3]), sst([imm1_2, imm1_1]) - // epoch2: sst([imm2]) - - uploader.local_seal_epoch_for_test(instance_id2, epoch3); - if let UploaderEvent::DataSpilled(sst) = uploader.next_event().await { - assert_eq!(&get_payload_imm_ids(&epoch3_spill_payload1), sst.imm_ids()); - } else { - unreachable!("should be data spilled"); - } - // current uploader state: // unsealed: epoch4: imm: imm4 // sealed: imm: imm3_3, uploading: [imm3_2], uploaded: sst([imm3_1]) @@ -2080,7 +2116,8 @@ pub(crate) mod tests { let epoch4_sync_payload = HashMap::from_iter([(instance_id1, vec![imm4, imm3_3])]); let (await_start4_with_3_3, finish_tx4_with_3_3) = new_task_notifier(get_payload_imm_ids(&epoch4_sync_payload)); - uploader.start_sync_epoch_for_test(epoch4); + let (sync_tx4, mut sync_rx4) = oneshot::channel(); + uploader.start_sync_epoch(epoch4, sync_tx4); await_start4_with_3_3.await; // current uploader state: @@ -2091,25 +2128,30 @@ pub(crate) mod tests { // epoch2: sst([imm2]) assert_uploader_pending(&mut uploader).await; + finish_tx3_2.send(()).unwrap(); - assert_uploader_pending(&mut uploader).await; + let sst = uploader.next_uploaded_sst().await; + assert_eq!(&get_payload_imm_ids(&epoch3_spill_payload2), sst.imm_ids()); + finish_tx4_with_3_3.send(()).unwrap(); + assert!(poll_fn(|cx| Poll::Ready(sync_rx4.poll_unpin(cx).is_pending())).await); + + let sst = uploader.next_uploaded_sst().await; + assert_eq!(&get_payload_imm_ids(&epoch4_sync_payload), sst.imm_ids()); - if let UploaderEvent::SyncFinish(epoch, data) = uploader.next_event().await { - assert_eq!(epoch4, epoch); - assert_eq!(2, data.newly_upload_ssts.len()); + if let Ok(Ok(data)) = sync_rx4.await { + assert_eq!(3, data.uploaded_ssts.len()); assert_eq!( &get_payload_imm_ids(&epoch4_sync_payload), - data.newly_upload_ssts[0].imm_ids() + data.uploaded_ssts[0].imm_ids() ); assert_eq!( &get_payload_imm_ids(&epoch3_spill_payload2), - data.newly_upload_ssts[1].imm_ids() + data.uploaded_ssts[1].imm_ids() ); - assert_eq!(1, data.uploaded_ssts.len()); assert_eq!( &get_payload_imm_ids(&epoch3_spill_payload1), - data.uploaded_ssts[0].imm_ids(), + data.uploaded_ssts[2].imm_ids(), ) } else { unreachable!("should be sync finish"); diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 7f9f956b62ddf..b4a3c55bbf829 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -570,7 +570,11 @@ impl StateStore for HummockStorage { sync_result_sender: tx, }) .expect("should send success"); - rx.map(|recv_result| Ok(recv_result.expect("should wait success")?)) + rx.map(|recv_result| { + Ok(recv_result + .expect("should wait success")? + .into_sync_result()) + }) } fn seal_epoch(&self, epoch: u64, is_checkpoint: bool) {