From 6d02b187495674db172c9bcbaed014ad3e646be9 Mon Sep 17 00:00:00 2001 From: TennyZhuang Date: Thu, 4 Jan 2024 18:46:50 +0800 Subject: [PATCH 1/7] fix(stream/materialize): enforce update_cache when modify fixed_changes (#14364) Signed-off-by: TennyZhuang --- src/stream/src/executor/mview/materialize.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/stream/src/executor/mview/materialize.rs b/src/stream/src/executor/mview/materialize.rs index ddc985528d7f4..9b8707a5ea52a 100644 --- a/src/stream/src/executor/mview/materialize.rs +++ b/src/stream/src/executor/mview/materialize.rs @@ -443,26 +443,30 @@ impl MaterializeCache { let mut fixed_changes = MaterializeBuffer::new(); for (op, key, value) in row_ops { let mut update_cache = false; + let fixed_changes = &mut fixed_changes; + // When you change the fixed_changes buffer, you must mark `update_cache` to true so the cache and downstream can be consistent. + let fixed_changes = || { + update_cache = true; + fixed_changes + }; match op { Op::Insert | Op::UpdateInsert => { match conflict_behavior { ConflictBehavior::Overwrite => { match self.force_get(&key) { - Some(old_row) => fixed_changes.update( + Some(old_row) => fixed_changes().update( key.clone(), old_row.row.clone(), value.clone(), ), - None => fixed_changes.insert(key.clone(), value.clone()), + None => fixed_changes().insert(key.clone(), value.clone()), }; - update_cache = true; } ConflictBehavior::IgnoreConflict => { match self.force_get(&key) { Some(_) => (), None => { - fixed_changes.insert(key.clone(), value.clone()); - update_cache = true; + fixed_changes().insert(key.clone(), value.clone()); } }; } @@ -487,18 +491,16 @@ impl MaterializeCache { ConflictBehavior::Overwrite => { match self.force_get(&key) { Some(old_row) => { - fixed_changes.delete(key.clone(), old_row.row.clone()); + fixed_changes().delete(key.clone(), old_row.row.clone()); } None => (), // delete a nonexistent value }; - update_cache = true; } ConflictBehavior::IgnoreConflict => { match self.force_get(&key) { Some(old_row) => { if old_row.row == value { - fixed_changes.delete(key.clone(), old_row.row.clone()); - update_cache = true; + fixed_changes().delete(key.clone(), old_row.row.clone()); } } None => (), // delete a nonexistent value From 788d6cd803373aec621fa053cfd4a8742c332c9a Mon Sep 17 00:00:00 2001 From: stonepage <40830455+st1page@users.noreply.github.com> Date: Thu, 4 Jan 2024 19:10:57 +0800 Subject: [PATCH 2/7] test: reorganize the sink to table e2e test (#14158) --- ci/scripts/e2e-sink-test.sh | 2 +- .../basic.slt} | 5 +- .../sink/sink_into_table/dml_on_source.slt | 216 +++++++++++++++++ .../sink/sink_into_table/dml_on_target.slt | 224 ++++++++++++++++++ e2e_test/sink/sink_into_table/rename.slt | 48 ++++ e2e_test/sink/sink_into_table/syntax.slt | 23 ++ 6 files changed, 515 insertions(+), 3 deletions(-) rename e2e_test/sink/{sink_into_table.slt => sink_into_table/basic.slt} (99%) create mode 100644 e2e_test/sink/sink_into_table/dml_on_source.slt create mode 100644 e2e_test/sink/sink_into_table/dml_on_target.slt create mode 100644 e2e_test/sink/sink_into_table/rename.slt create mode 100644 e2e_test/sink/sink_into_table/syntax.slt diff --git a/ci/scripts/e2e-sink-test.sh b/ci/scripts/e2e-sink-test.sh index 92f3e30f6cf4c..c20f422532b79 100755 --- a/ci/scripts/e2e-sink-test.sh +++ b/ci/scripts/e2e-sink-test.sh @@ -61,7 +61,7 @@ sqllogictest -p 4566 -d dev './e2e_test/sink/append_only_sink.slt' sqllogictest -p 4566 -d dev './e2e_test/sink/create_sink_as.slt' sqllogictest -p 4566 -d dev './e2e_test/sink/blackhole_sink.slt' sqllogictest -p 4566 -d dev './e2e_test/sink/remote/types.slt' -sqllogictest -p 4566 -d dev './e2e_test/sink/sink_into_table.slt' +sqllogictest -p 4566 -d dev './e2e_test/sink/sink_into_table/*.slt' sleep 1 echo "--- testing remote sinks" diff --git a/e2e_test/sink/sink_into_table.slt b/e2e_test/sink/sink_into_table/basic.slt similarity index 99% rename from e2e_test/sink/sink_into_table.slt rename to e2e_test/sink/sink_into_table/basic.slt index 15a6a37e17c4c..1bc5a47907077 100644 --- a/e2e_test/sink/sink_into_table.slt +++ b/e2e_test/sink/sink_into_table/basic.slt @@ -1,8 +1,6 @@ statement ok SET RW_IMPLICIT_FLUSH TO true; -# currently, we only support single sink into table - statement ok create table t_simple (v1 int, v2 int); @@ -456,3 +454,6 @@ drop table t_b; statement ok drop table t_c; + +statement ok +drop table t_m; diff --git a/e2e_test/sink/sink_into_table/dml_on_source.slt b/e2e_test/sink/sink_into_table/dml_on_source.slt new file mode 100644 index 0000000000000..18503bb9251e9 --- /dev/null +++ b/e2e_test/sink/sink_into_table/dml_on_source.slt @@ -0,0 +1,216 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +# "nao" means non append only +# "npk" means no primary key + +statement ok +create table t_nao_npk (k int, v1 int, v2 int); + +statement ok +create table t_ao_npk (k int, v1 int, v2 int) APPEND ONLY; + +statement ok +create table t_nao_pk (k int, v1 int, v2 int, primary key(k)); + +statement ok +create table t_ao_source (k int, v1 int, v2 int) APPEND ONLY; + +statement ok +create table t_nao_source (k int, v1 int, v2 int); + +# sinks to t_nao_npk +statement ok +create sink s1_to_t_nao_npk into t_nao_npk +as select k, v1, v2 +from t_ao_source +with ( + type = 'append-only', +); + +statement error Only append-only sinks can sink to a table without primary keys. +create sink s2_to_t_nao_npk into t_nao_npk +as select k, v1, v2 +from t_nao_source; + +statement ok +create sink s2_to_t_nao_npk into t_nao_npk +as select k, v1, v2 +from t_nao_source +with ( + type = 'append-only', + force_append_only='true' +); + +# sinks to t_ao_npk +statement ok +create sink s1_to_t_ao_npk into t_ao_npk +as select k, v1, v2 +from t_ao_source +with ( + type = 'append-only', +); + +statement error Only append-only sinks can sink to a table without primary keys. +create sink s2_to_t_ao_npk into t_ao_npk +as select k, v1, v2 +from t_nao_source; + +statement ok +create sink s2_to_t_ao_npk into t_ao_npk +as select k, v1, v2 +from t_nao_source +with ( + type = 'append-only', + force_append_only='true' +); + +# sinks to t_nao_pk +statement ok +create sink s1_to_t_nao_pk into t_nao_pk +as select k, v1, v2 +from t_ao_source +with ( + type = 'append-only', +); + +statement ok +create sink s2_to_t_nao_pk into t_nao_pk +as select k*10 as k, v1, v2 +from t_nao_source; + +statement ok +create sink s3_to_t_nao_pk into t_nao_pk +as select k*100 as k, v1, v2 +from t_nao_source +with ( + type = 'append-only', + force_append_only='true' +); + +statement ok +insert into t_ao_source values (1, 10, 100); + + +query III rowsort +SELECT * FROM t_nao_npk; +---- +1 10 100 + +query III rowsort +SELECT * FROM t_ao_npk; +---- +1 10 100 + +query III rowsort +SELECT * FROM t_nao_pk; +---- +1 10 100 + + +statement ok +insert into t_nao_source values (2, 20, 200); + + +query III rowsort +SELECT * FROM t_nao_npk; +---- +1 10 100 +2 20 200 + +query III rowsort +SELECT * FROM t_ao_npk; +---- +1 10 100 +2 20 200 + +query III rowsort +SELECT * FROM t_nao_pk; +---- +1 10 100 +20 20 200 +200 20 200 + +statement ok +update t_nao_source set v2 = 1000 where v1 = 20; + +query III rowsort +SELECT * FROM t_nao_npk; +---- +1 10 100 +2 20 1000 +2 20 200 + +query III rowsort +SELECT * FROM t_ao_npk; +---- +1 10 100 +2 20 1000 +2 20 200 + +query III rowsort +SELECT * FROM t_nao_pk; +---- +1 10 100 +20 20 1000 +200 20 1000 + +statement ok +delete from t_nao_source where v1 = 20; + +query III rowsort +SELECT * FROM t_nao_npk; +---- +1 10 100 +2 20 1000 +2 20 200 + +query III rowsort +SELECT * FROM t_ao_npk; +---- +1 10 100 +2 20 1000 +2 20 200 + +query III rowsort +SELECT * FROM t_nao_pk; +---- +1 10 100 +200 20 1000 + + +statement ok +drop sink s3_to_t_nao_pk; + +statement ok +drop sink s2_to_t_nao_pk; + +statement ok +drop sink s1_to_t_nao_pk; + +statement ok +drop sink s2_to_t_ao_npk; + +statement ok +drop sink s1_to_t_ao_npk; + +statement ok +drop sink s2_to_t_nao_npk; + +statement ok +drop sink s1_to_t_nao_npk; + +statement ok +drop table t_ao_source + +statement ok +drop table t_nao_npk + +statement ok +drop table t_ao_npk + +statement ok +drop table t_nao_source + +statement ok +drop table t_nao_pk diff --git a/e2e_test/sink/sink_into_table/dml_on_target.slt b/e2e_test/sink/sink_into_table/dml_on_target.slt new file mode 100644 index 0000000000000..eb3867bef8d1b --- /dev/null +++ b/e2e_test/sink/sink_into_table/dml_on_target.slt @@ -0,0 +1,224 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +# "nao" means non append only +# "npk" means no primary key + +statement ok +create table t_nao_npk (k int, v1 int, v2 int); + +statement ok +create table t_ao_npk (k int, v1 int, v2 int) APPEND ONLY; + +statement ok +create table t_nao_pk (k int, v1 int, v2 int, primary key(k)); + +statement ok +create table t_ao_source (k int, v1 int, v2 int) APPEND ONLY; + +statement ok +create table t_nao_source (k int, v1 int, v2 int); + +# sinks to t_nao_npk +statement ok +create sink s1_to_t_nao_npk into t_nao_npk +as select k, v1, v2 +from t_ao_source +with ( + type = 'append-only', +); + +statement error Only append-only sinks can sink to a table without primary keys. +create sink s2_to_t_nao_npk into t_nao_npk +as select k, v1, v2 +from t_nao_source; + +statement ok +create sink s2_to_t_nao_npk into t_nao_npk +as select k, v1, v2 +from t_nao_source +with ( + type = 'append-only', + force_append_only='true' +); + +# sinks to t_ao_npk +statement ok +create sink s1_to_t_ao_npk into t_ao_npk +as select k, v1, v2 +from t_ao_source +with ( + type = 'append-only', +); + +statement error Only append-only sinks can sink to a table without primary keys. +create sink s2_to_t_ao_npk into t_ao_npk +as select k, v1, v2 +from t_nao_source; + +statement ok +create sink s2_to_t_ao_npk into t_ao_npk +as select k, v1, v2 +from t_nao_source +with ( + type = 'append-only', + force_append_only='true' +); + +# sinks to t_nao_pk +statement ok +create sink s1_to_t_nao_pk into t_nao_pk +as select k, v1, v2 +from t_ao_source +with ( + type = 'append-only', +); + +statement ok +create sink s2_to_t_nao_pk into t_nao_pk +as select k*10 as k, v1, v2 +from t_nao_source; + +statement ok +create sink s3_to_t_nao_pk into t_nao_pk +as select k*100 as k, v1, v2 +from t_nao_source +with ( + type = 'append-only', + force_append_only='true' +); + +statement ok +insert into t_ao_source values (1, 10, 100); + +statement ok +insert into t_nao_source values (2, 20, 200); + +# test on t_ao_npk +query III rowsort +SELECT * FROM t_ao_npk; +---- +1 10 100 +2 20 200 + +statement ok +Insert INTO t_ao_npk values (4, 123, 123); + +query III rowsort +SELECT * FROM t_ao_npk; +---- +1 10 100 +2 20 200 +4 123 123 + +# test on t_nao_npk +query III rowsort +SELECT * FROM t_nao_npk; +---- +1 10 100 +2 20 200 + +statement ok +Insert INTO t_nao_npk values (4, 123, 123); + +query III rowsort +SELECT * FROM t_nao_npk; +---- +1 10 100 +2 20 200 +4 123 123 + +statement ok +update t_nao_npk set v2 = 1000 where v1 = 20; + +query III rowsort +SELECT * FROM t_nao_npk; +---- +1 10 100 +2 20 1000 +4 123 123 + +statement ok +delete from t_nao_npk where k = 2; + +query III rowsort +SELECT * FROM t_nao_npk; +---- +1 10 100 +4 123 123 + +# test on t_nao_pk +query III rowsort +SELECT * FROM t_nao_pk; +---- +1 10 100 +20 20 200 +200 20 200 + +statement ok +Insert INTO t_nao_pk values (4, 123, 123); + +query III rowsort +SELECT * FROM t_nao_pk; +---- +1 10 100 +20 20 200 +200 20 200 +4 123 123 + +statement ok +update t_nao_pk set v2 = 1000 where v1 = 20; + +query III rowsort +SELECT * FROM t_nao_pk; +---- +1 10 100 +20 20 1000 +200 20 1000 +4 123 123 + +statement ok +delete from t_nao_pk where k = 20; + +query III rowsort +SELECT * FROM t_nao_pk; +---- +1 10 100 +200 20 1000 +4 123 123 + +statement ok +drop sink s3_to_t_nao_pk; + +statement ok +drop sink s2_to_t_nao_pk; + +statement ok +drop sink s1_to_t_nao_pk; + +statement ok +drop sink s2_to_t_ao_npk; + +statement ok +drop sink s1_to_t_ao_npk; + +statement ok +drop sink s2_to_t_nao_npk; + +statement ok +drop sink s1_to_t_nao_npk; + +statement ok +drop table t_ao_source + +statement ok +drop table t_nao_npk + +statement ok +drop table t_ao_npk + +statement ok +drop table t_nao_source + +statement ok +drop table t_nao_pk diff --git a/e2e_test/sink/sink_into_table/rename.slt b/e2e_test/sink/sink_into_table/rename.slt new file mode 100644 index 0000000000000..2d6f35ca31dd7 --- /dev/null +++ b/e2e_test/sink/sink_into_table/rename.slt @@ -0,0 +1,48 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create table source(v int) append only; + +statement ok +create table target(v int); + +statement ok +create sink s into target as select * from source with (type = 'append-only'); + +statement ok +SELECT name, definition from rw_sinks; + +statement ok +ALTER SINK s RENAME TO s1; + +query TT +SELECT name, definition from rw_sinks; +---- +s1 CREATE SINK s1 INTO target AS SELECT * FROM source WITH (type = 'append-only') + +statement ok +ALTER TABLE source RENAME TO src; + +statement ok +ALTER TABLE target RENAME TO tar; + +query TT +SELECT name, definition from rw_tables; +---- +tar CREATE TABLE tar (v INT) +src CREATE TABLE src (v INT) APPEND ONLY + +query TT +SELECT name, definition from rw_sinks; +---- +s1 CREATE SINK s1 INTO target AS SELECT * FROM src AS source WITH (type = 'append-only') + +statement ok +drop sink s1; + +statement ok +drop table src; + +statement ok +drop table tar; diff --git a/e2e_test/sink/sink_into_table/syntax.slt b/e2e_test/sink/sink_into_table/syntax.slt new file mode 100644 index 0000000000000..7878999f5b3a4 --- /dev/null +++ b/e2e_test/sink/sink_into_table/syntax.slt @@ -0,0 +1,23 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create table t_simple (v1 int, v2 int); + +statement ok +create table m_simple (v1 int primary key, v2 int); + +statement error unsupported sink type table +create sink s from t_simple with (connector = 'table'); + +statement error connector table is not supported by FORMAT ... ENCODE ... syntax +create sink s into m_simple as select v1, v2 from t_simple with (type = 'upsert') FORMAT PLAIN ENCODE NATIVE; + +statement error connector table is not supported by FORMAT ... ENCODE ... syntax +create sink s into m_simple as select v1, v2 from t_simple with (type = 'upsert') FORMAT PLAIN ENCODE JSON; + +statement ok +drop table t_simple; + +statement ok +drop table m_simple; From 2513cbab0fff771eb61bf130be5ef1a0f0bcf694 Mon Sep 17 00:00:00 2001 From: xxchan Date: Fri, 5 Jan 2024 12:08:12 +0800 Subject: [PATCH 3/7] test: fix flaky sink_into_table test (#14372) --- e2e_test/sink/sink_into_table/rename.slt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/e2e_test/sink/sink_into_table/rename.slt b/e2e_test/sink/sink_into_table/rename.slt index 2d6f35ca31dd7..19b24c9c20b4a 100644 --- a/e2e_test/sink/sink_into_table/rename.slt +++ b/e2e_test/sink/sink_into_table/rename.slt @@ -27,11 +27,11 @@ ALTER TABLE source RENAME TO src; statement ok ALTER TABLE target RENAME TO tar; -query TT +query TT rowsort SELECT name, definition from rw_tables; ---- -tar CREATE TABLE tar (v INT) src CREATE TABLE src (v INT) APPEND ONLY +tar CREATE TABLE tar (v INT) query TT SELECT name, definition from rw_sinks; From cb825f0f3f1611b8ef82daf032bc0f184fc9ac88 Mon Sep 17 00:00:00 2001 From: xxchan Date: Fri, 5 Jan 2024 12:08:43 +0800 Subject: [PATCH 4/7] test: bump sqllogictest & fix backwards-compat-test/e2e-tests (#14354) --- Makefile.toml | 2 +- .../{delete.slt => delete.slt.part} | 0 .../validate_restart.slt | 8 ++ .../{delete.slt => delete.slt.part} | 0 .../{insert.slt => insert.slt.part} | 0 .../validate_restart.slt | 8 ++ ci/Dockerfile | 4 +- ci/build-ci-image.sh | 2 +- ci/docker-compose.yml | 10 +-- e2e_test/batch/duckdb/all.slt.part | 9 +- e2e_test/batch/local_mode.slt | 1 - e2e_test/extended_mode/type.slt | 2 +- .../batch/rank_func/row_number_old.slt.part | 84 +++++++++++++++++++ .../rank_func/row_number_old.slt.part | 84 +++++++++++++++++++ ...umber_old.part => row_number_old.slt.part} | 0 15 files changed, 197 insertions(+), 17 deletions(-) rename backwards-compat-tests/slt/nexmark-backwards-compat/{delete.slt => delete.slt.part} (100%) rename backwards-compat-tests/slt/tpch-backwards-compat/{delete.slt => delete.slt.part} (100%) rename backwards-compat-tests/slt/tpch-backwards-compat/{insert.slt => insert.slt.part} (100%) create mode 100644 e2e_test/over_window/generated/batch/rank_func/row_number_old.slt.part create mode 100644 e2e_test/over_window/generated/streaming/rank_func/row_number_old.slt.part rename e2e_test/over_window/templates/rank_func/{row_number_old.part => row_number_old.slt.part} (100%) diff --git a/Makefile.toml b/Makefile.toml index 9aec344d74d1f..347f2234e5fda 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -1332,7 +1332,7 @@ echo "All processes has exited." [tasks.slt] category = "RiseDev - SQLLogicTest" -install_crate = { version = "0.18.0", crate_name = "sqllogictest-bin", binary = "sqllogictest", test_arg = [ +install_crate = { version = "0.19.1", crate_name = "sqllogictest-bin", binary = "sqllogictest", test_arg = [ "--help", ], install_command = "binstall" } dependencies = ["check-risedev-env-file"] diff --git a/backwards-compat-tests/slt/nexmark-backwards-compat/delete.slt b/backwards-compat-tests/slt/nexmark-backwards-compat/delete.slt.part similarity index 100% rename from backwards-compat-tests/slt/nexmark-backwards-compat/delete.slt rename to backwards-compat-tests/slt/nexmark-backwards-compat/delete.slt.part diff --git a/backwards-compat-tests/slt/nexmark-backwards-compat/validate_restart.slt b/backwards-compat-tests/slt/nexmark-backwards-compat/validate_restart.slt index 093f88f4fdbbe..7a0aede6e2236 100644 --- a/backwards-compat-tests/slt/nexmark-backwards-compat/validate_restart.slt +++ b/backwards-compat-tests/slt/nexmark-backwards-compat/validate_restart.slt @@ -1,9 +1,17 @@ include ../nexmark/test_mv_result.slt.part include ./delete.slt.part + +statement ok +flush; + include ../nexmark/insert_person.slt.part include ../nexmark/insert_auction.slt.part include ../nexmark/insert_bid.slt.part + +statement ok +flush; + include ../nexmark/test_mv_result.slt.part include ../nexmark/drop_views.slt.part diff --git a/backwards-compat-tests/slt/tpch-backwards-compat/delete.slt b/backwards-compat-tests/slt/tpch-backwards-compat/delete.slt.part similarity index 100% rename from backwards-compat-tests/slt/tpch-backwards-compat/delete.slt rename to backwards-compat-tests/slt/tpch-backwards-compat/delete.slt.part diff --git a/backwards-compat-tests/slt/tpch-backwards-compat/insert.slt b/backwards-compat-tests/slt/tpch-backwards-compat/insert.slt.part similarity index 100% rename from backwards-compat-tests/slt/tpch-backwards-compat/insert.slt rename to backwards-compat-tests/slt/tpch-backwards-compat/insert.slt.part diff --git a/backwards-compat-tests/slt/tpch-backwards-compat/validate_restart.slt b/backwards-compat-tests/slt/tpch-backwards-compat/validate_restart.slt index 7c7334cc222d3..81e00f92f0ca1 100644 --- a/backwards-compat-tests/slt/tpch-backwards-compat/validate_restart.slt +++ b/backwards-compat-tests/slt/tpch-backwards-compat/validate_restart.slt @@ -23,7 +23,15 @@ include ../tpch/q22.slt.part # Test deletes and updates should work as per normal. include ./delete.slt.part + +statement ok +flush; + include ./insert.slt.part + +statement ok +flush; + include ../tpch/q1.slt.part include ../tpch/q2.slt.part include ../tpch/q3.slt.part diff --git a/ci/Dockerfile b/ci/Dockerfile index 427e6d68dc116..810a3289b66a8 100644 --- a/ci/Dockerfile +++ b/ci/Dockerfile @@ -48,8 +48,8 @@ ENV CARGO_REGISTRIES_CRATES_IO_PROTOCOL=sparse RUN curl -L --proto '=https' --tlsv1.2 -sSf https://raw.githubusercontent.com/cargo-bins/cargo-binstall/main/install-from-binstall-release.sh | bash RUN cargo binstall -y --no-symlinks cargo-llvm-cov cargo-nextest cargo-hakari cargo-sort cargo-cache cargo-audit \ cargo-make@0.36.10 \ - sqllogictest-bin@0.18.0 \ - && cargo install sccache \ + sqllogictest-bin@0.19.1 \ + sccache@0.7.4 \ && cargo cache -a \ && rm -rf "/root/.cargo/registry/index" \ && rm -rf "/root/.cargo/registry/cache" \ diff --git a/ci/build-ci-image.sh b/ci/build-ci-image.sh index 93718b1e181be..bbdc5d7bfa9ed 100755 --- a/ci/build-ci-image.sh +++ b/ci/build-ci-image.sh @@ -10,7 +10,7 @@ cat ../rust-toolchain # shellcheck disable=SC2155 # REMEMBER TO ALSO UPDATE ci/docker-compose.yml -export BUILD_ENV_VERSION=v20231226 +export BUILD_ENV_VERSION=v20240104_1 export BUILD_TAG="public.ecr.aws/x5u3w5h6/rw-build-env:${BUILD_ENV_VERSION}" diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index e4ba36d5b79a2..0ebb9e77eeb20 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -71,7 +71,7 @@ services: retries: 5 source-test-env: - image: public.ecr.aws/x5u3w5h6/rw-build-env:v20231226 + image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240104_1 depends_on: - mysql - db @@ -81,7 +81,7 @@ services: - ..:/risingwave sink-test-env: - image: public.ecr.aws/x5u3w5h6/rw-build-env:v20231226 + image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240104_1 depends_on: - mysql - db @@ -93,12 +93,12 @@ services: - ..:/risingwave rw-build-env: - image: public.ecr.aws/x5u3w5h6/rw-build-env:v20231226 + image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240104_1 volumes: - ..:/risingwave ci-flamegraph-env: - image: public.ecr.aws/x5u3w5h6/rw-build-env:v20231226 + image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240104_1 # NOTE(kwannoel): This is used in order to permit # syscalls for `nperf` (perf_event_open), # so it can do CPU profiling. @@ -109,7 +109,7 @@ services: - ..:/risingwave regress-test-env: - image: public.ecr.aws/x5u3w5h6/rw-build-env:v20231226 + image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240104_1 depends_on: db: condition: service_healthy diff --git a/e2e_test/batch/duckdb/all.slt.part b/e2e_test/batch/duckdb/all.slt.part index 68adce73dea24..ca5d1b5201c45 100644 --- a/e2e_test/batch/duckdb/all.slt.part +++ b/e2e_test/batch/duckdb/all.slt.part @@ -1,9 +1,6 @@ -include ./aggregate/*.slt.part -include ./aggregate/*/*.slt.part -include ./join/*.slt.part -include ./join/*/*.slt.part -include ./conjunction/*.slt.part -include ./conjunction/*/*.slt.part +include ./aggregate/**/*.slt.part +include ./join/**/*.slt.part +include ./conjunction/**/*.slt.part include ./limit/*.slt.part include ./select/*.slt.part include ./cte/*.slt.part diff --git a/e2e_test/batch/local_mode.slt b/e2e_test/batch/local_mode.slt index 5de337518d433..c3818989443a7 100644 --- a/e2e_test/batch/local_mode.slt +++ b/e2e_test/batch/local_mode.slt @@ -5,7 +5,6 @@ statement ok SET QUERY_MODE TO local; include ./basic/*.slt.part -include ./basic/local/*.slt.part include ./duckdb/all.slt.part include ./order/*.slt.part include ./join/*.slt.part diff --git a/e2e_test/extended_mode/type.slt b/e2e_test/extended_mode/type.slt index 2271ecb51c5c9..b172fcf389abc 100644 --- a/e2e_test/extended_mode/type.slt +++ b/e2e_test/extended_mode/type.slt @@ -21,7 +21,7 @@ SET RW_IMPLICIT_FLUSH TO true; include ../batch/types/boolean.slt.part include ../batch/types/cast.slt.part include ../batch/types/date.slt -include ../batch/types/intercal.slt.part +include ../batch/types/interval.slt.part include ../batch/types/number_arithmetic.slt.part include ../batch/types/temporal_arithmetic.slt.part include ../batch/types/time.slt.part diff --git a/e2e_test/over_window/generated/batch/rank_func/row_number_old.slt.part b/e2e_test/over_window/generated/batch/rank_func/row_number_old.slt.part new file mode 100644 index 0000000000000..24f47a8116451 --- /dev/null +++ b/e2e_test/over_window/generated/batch/rank_func/row_number_old.slt.part @@ -0,0 +1,84 @@ +# This file is generated by `gen.py`. Do not edit it manually! + +statement ok +create table t ( + id int + , p1 int + , p2 int + , time int + , v1 int + , v2 int +); + +statement ok +create view v as +select + * + , row_number() over (partition by p1 order by time, id) as out1 + , row_number() over (partition by p1 order by p2 desc, id) as out2 +from t; + +statement ok +insert into t values + (100001, 100, 200, 1, 701, 805) +, (100002, 100, 200, 2, 700, 806) +, (100003, 100, 208, 2, 723, 807) +, (100004, 103, 200, 2, 702, 808); + +query II +select * from v order by id; +---- +100001 100 200 1 701 805 1 2 +100002 100 200 2 700 806 2 3 +100003 100 208 2 723 807 3 1 +100004 103 200 2 702 808 1 1 + +statement ok +insert into t values + (100005, 100, 200, 3, 717, 810) +, (100006, 105, 204, 5, 703, 828); + +query II +select * from v order by id; +---- +100001 100 200 1 701 805 1 2 +100002 100 200 2 700 806 2 3 +100003 100 208 2 723 807 3 1 +100004 103 200 2 702 808 1 1 +100005 100 200 3 717 810 4 4 +100006 105 204 5 703 828 1 1 + +statement ok +update t set v1 = 799 where id = 100002; -- value change + +statement ok +update t set p2 = 200 where id = 100003; -- partition change + +statement ok +update t set "time" = 1 where id = 100005; -- order change + +query iiiiiii +select * from v order by id; +---- +100001 100 200 1 701 805 1 1 +100002 100 200 2 799 806 3 2 +100003 100 200 2 723 807 4 3 +100004 103 200 2 702 808 1 1 +100005 100 200 1 717 810 2 4 +100006 105 204 5 703 828 1 1 + +statement ok +delete from t where time = 2; + +query iiiiiii +select * from v order by id; +---- +100001 100 200 1 701 805 1 1 +100005 100 200 1 717 810 2 2 +100006 105 204 5 703 828 1 1 + +statement ok +drop view v; + +statement ok +drop table t; diff --git a/e2e_test/over_window/generated/streaming/rank_func/row_number_old.slt.part b/e2e_test/over_window/generated/streaming/rank_func/row_number_old.slt.part new file mode 100644 index 0000000000000..39dec472b5b12 --- /dev/null +++ b/e2e_test/over_window/generated/streaming/rank_func/row_number_old.slt.part @@ -0,0 +1,84 @@ +# This file is generated by `gen.py`. Do not edit it manually! + +statement ok +create table t ( + id int + , p1 int + , p2 int + , time int + , v1 int + , v2 int +); + +statement ok +create materialized view v as +select + * + , row_number() over (partition by p1 order by time, id) as out1 + , row_number() over (partition by p1 order by p2 desc, id) as out2 +from t; + +statement ok +insert into t values + (100001, 100, 200, 1, 701, 805) +, (100002, 100, 200, 2, 700, 806) +, (100003, 100, 208, 2, 723, 807) +, (100004, 103, 200, 2, 702, 808); + +query II +select * from v order by id; +---- +100001 100 200 1 701 805 1 2 +100002 100 200 2 700 806 2 3 +100003 100 208 2 723 807 3 1 +100004 103 200 2 702 808 1 1 + +statement ok +insert into t values + (100005, 100, 200, 3, 717, 810) +, (100006, 105, 204, 5, 703, 828); + +query II +select * from v order by id; +---- +100001 100 200 1 701 805 1 2 +100002 100 200 2 700 806 2 3 +100003 100 208 2 723 807 3 1 +100004 103 200 2 702 808 1 1 +100005 100 200 3 717 810 4 4 +100006 105 204 5 703 828 1 1 + +statement ok +update t set v1 = 799 where id = 100002; -- value change + +statement ok +update t set p2 = 200 where id = 100003; -- partition change + +statement ok +update t set "time" = 1 where id = 100005; -- order change + +query iiiiiii +select * from v order by id; +---- +100001 100 200 1 701 805 1 1 +100002 100 200 2 799 806 3 2 +100003 100 200 2 723 807 4 3 +100004 103 200 2 702 808 1 1 +100005 100 200 1 717 810 2 4 +100006 105 204 5 703 828 1 1 + +statement ok +delete from t where time = 2; + +query iiiiiii +select * from v order by id; +---- +100001 100 200 1 701 805 1 1 +100005 100 200 1 717 810 2 2 +100006 105 204 5 703 828 1 1 + +statement ok +drop materialized view v; + +statement ok +drop table t; diff --git a/e2e_test/over_window/templates/rank_func/row_number_old.part b/e2e_test/over_window/templates/rank_func/row_number_old.slt.part similarity index 100% rename from e2e_test/over_window/templates/rank_func/row_number_old.part rename to e2e_test/over_window/templates/rank_func/row_number_old.slt.part From 7a00e58eb76954ece665bacce2cca2e2330dacc8 Mon Sep 17 00:00:00 2001 From: August Date: Fri, 5 Jan 2024 12:16:42 +0800 Subject: [PATCH 5/7] fix: Do not recover background streaming jobs whose table fragments have been marked as created (#14367) Co-authored-by: zwang28 <84491488@qq.com> --- ci/scripts/cron-e2e-test.sh | 2 +- src/meta/src/barrier/recovery.rs | 66 +++++++++++------------ src/meta/src/manager/catalog/fragment.rs | 69 ------------------------ 3 files changed, 31 insertions(+), 106 deletions(-) diff --git a/ci/scripts/cron-e2e-test.sh b/ci/scripts/cron-e2e-test.sh index 2e8c56c3c1d5f..52c7a6faa366c 100755 --- a/ci/scripts/cron-e2e-test.sh +++ b/ci/scripts/cron-e2e-test.sh @@ -4,7 +4,7 @@ set -euo pipefail source ci/scripts/common.sh -export RUN_COMPACTION=1; +export RUN_COMPACTION=0; export RUN_META_BACKUP=1; export RUN_DELETE_RANGE=1; source ci/scripts/run-e2e-test.sh diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index a122699bf56c3..4a5b13c03caee 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -44,7 +44,7 @@ use crate::barrier::progress::CreateMviewProgressTracker; use crate::barrier::{CheckpointControl, Command, GlobalBarrierManager}; use crate::controller::catalog::ReleaseContext; use crate::manager::{MetadataManager, WorkerId}; -use crate::model::{ActorId, BarrierManagerState, MetadataModel, MigrationPlan, TableFragments}; +use crate::model::{BarrierManagerState, MetadataModel, MigrationPlan, TableFragments}; use crate::stream::{build_actor_connector_splits, RescheduleOptions, TableResizePolicy}; use crate::MetaResult; @@ -149,48 +149,44 @@ impl GlobalBarrierManager { unreachable!() }; let mviews = mgr.catalog_manager.list_creating_background_mvs().await; - let creating_mview_ids = mviews.iter().map(|m| TableId::new(m.id)).collect_vec(); - let mview_definitions = mviews - .into_iter() - .map(|m| (TableId::new(m.id), m.definition)) - .collect::>(); + let mut mview_definitions = HashMap::new(); + let mut table_map = HashMap::new(); + let mut table_fragment_map = HashMap::new(); + let mut upstream_mv_counts = HashMap::new(); let mut senders = HashMap::new(); let mut receivers = Vec::new(); - for table_id in creating_mview_ids.iter().copied() { - let (finished_tx, finished_rx) = oneshot::channel(); - senders.insert( - table_id, - Notifier { - finished: Some(finished_tx), - ..Default::default() - }, - ); - + for mview in mviews { + let table_id = TableId::new(mview.id); let fragments = mgr .fragment_manager .select_table_fragments_by_table_id(&table_id) .await?; let internal_table_ids = fragments.internal_table_ids(); let internal_tables = mgr.catalog_manager.get_tables(&internal_table_ids).await; - let table = mgr.catalog_manager.get_tables(&[table_id.table_id]).await; - assert_eq!(table.len(), 1, "should only have 1 materialized table"); - let table = table.into_iter().next().unwrap(); - receivers.push((table, internal_tables, finished_rx)); + if fragments.is_created() { + // If the mview is already created, we don't need to recover it. + mgr.catalog_manager + .finish_create_table_procedure(internal_tables, mview) + .await?; + tracing::debug!("notified frontend for stream job {}", table_id.table_id); + } else { + table_map.insert(table_id, fragments.backfill_actor_ids()); + mview_definitions.insert(table_id, mview.definition.clone()); + upstream_mv_counts.insert(table_id, fragments.dependent_table_ids()); + table_fragment_map.insert(table_id, fragments); + let (finished_tx, finished_rx) = oneshot::channel(); + senders.insert( + table_id, + Notifier { + finished: Some(finished_tx), + ..Default::default() + }, + ); + receivers.push((mview, internal_tables, finished_rx)); + } } - let table_map = mgr - .fragment_manager - .get_table_id_stream_scan_actor_mapping(&creating_mview_ids) - .await; - let table_fragment_map = mgr - .fragment_manager - .get_table_id_table_fragment_map(&creating_mview_ids) - .await?; - let upstream_mv_counts = mgr - .fragment_manager - .get_upstream_relation_counts(&creating_mview_ids) - .await; let version_stats = self.hummock_manager.get_version_stats().await; // If failed, enter recovery mode. { @@ -252,6 +248,7 @@ impl GlobalBarrierManager { let mut receivers = Vec::new(); let mut table_fragment_map = HashMap::new(); let mut mview_definitions = HashMap::new(); + let mut table_map = HashMap::new(); let mut upstream_mv_counts = HashMap::new(); for mview in &mviews { let (finished_tx, finished_rx) = oneshot::channel(); @@ -270,15 +267,12 @@ impl GlobalBarrierManager { .await?; let table_fragments = TableFragments::from_protobuf(table_fragments); upstream_mv_counts.insert(table_id, table_fragments.dependent_table_ids()); + table_map.insert(table_id, table_fragments.backfill_actor_ids()); table_fragment_map.insert(table_id, table_fragments); mview_definitions.insert(table_id, mview.definition.clone()); receivers.push((mview.table_id, finished_rx)); } - let table_map: HashMap> = table_fragment_map - .iter() - .map(|(id, tf)| (*id, tf.actor_ids().into_iter().collect())) - .collect(); let version_stats = self.hummock_manager.get_version_stats().await; // If failed, enter recovery mode. { diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index a4128aaeb8f95..e5ba6503d79de 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -33,7 +33,6 @@ use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::update_mutation::MergeUpdate; use risingwave_pb::stream_plan::{ DispatchStrategy, Dispatcher, DispatcherType, FragmentTypeFlag, StreamActor, StreamNode, - StreamScanType, }; use tokio::sync::{RwLock, RwLockReadGuard}; @@ -170,74 +169,6 @@ impl FragmentManager { map.values().cloned().collect() } - /// The `table_ids` here should correspond to stream jobs. - /// We get their corresponding table fragment, and from there, - /// we get the actors that are in the table fragment. - pub async fn get_table_id_stream_scan_actor_mapping( - &self, - table_ids: &[TableId], - ) -> HashMap> { - let map = &self.core.read().await.table_fragments; - let mut table_map = HashMap::new(); - // TODO(kwannoel): Can this be unified with `PlanVisitor`? - fn has_backfill(stream_node: &StreamNode) -> bool { - let is_backfill = if let Some(node) = &stream_node.node_body - && let Some(node) = node.as_stream_scan() - { - node.stream_scan_type == StreamScanType::Backfill as i32 - || node.stream_scan_type == StreamScanType::ArrangementBackfill as i32 - } else { - false - }; - is_backfill || stream_node.get_input().iter().any(has_backfill) - } - for table_id in table_ids { - if let Some(table_fragment) = map.get(table_id) { - let mut actors = HashSet::new(); - for fragment in table_fragment.fragments.values() { - for actor in &fragment.actors { - if let Some(node) = &actor.nodes - && has_backfill(node) - { - actors.insert(actor.actor_id); - } else { - tracing::trace!("ignoring actor: {:?}", actor); - } - } - } - table_map.insert(*table_id, actors); - } - } - table_map - } - - /// Gets the counts for each upstream relation that each stream job - /// indicated by `table_ids` depends on. - /// For example in the following query: - /// ```sql - /// CREATE MATERIALIZED VIEW m1 AS - /// SELECT * FROM t1 JOIN t2 ON t1.a = t2.a JOIN t3 ON t2.b = t3.b - /// ``` - /// - /// We have t1 occurring once, and t2 occurring once. - pub async fn get_upstream_relation_counts( - &self, - table_ids: &[TableId], - ) -> HashMap> { - let map = &self.core.read().await.table_fragments; - let mut upstream_relation_counts = HashMap::new(); - for table_id in table_ids { - if let Some(table_fragments) = map.get(table_id) { - let dependent_ids = table_fragments.dependent_table_ids(); - let r = upstream_relation_counts.insert(*table_id, dependent_ids); - assert!(r.is_none(), "Each table_id should be unique!") - } else { - upstream_relation_counts.insert(*table_id, HashMap::new()); - } - } - upstream_relation_counts - } - pub fn get_mv_id_to_internal_table_ids_mapping(&self) -> Option)>> { match self.core.try_read() { Ok(core) => Some( From 19b996b11282aa8bd929b708c47f78954ddf17d3 Mon Sep 17 00:00:00 2001 From: Li0k Date: Fri, 5 Jan 2024 12:36:08 +0800 Subject: [PATCH 6/7] chore(storage): ctl support cancel specific task (#14325) --- proto/hummock.proto | 10 ++++++++++ src/ctl/src/cmd_impl/hummock/compaction_group.rs | 11 +++++++++++ src/ctl/src/lib.rs | 8 ++++++++ src/meta/service/src/hummock_service.rs | 14 ++++++++++++++ src/rpc_client/src/meta_client.rs | 11 +++++++++++ 5 files changed, 54 insertions(+) diff --git a/proto/hummock.proto b/proto/hummock.proto index ccbfef42278f1..58007117811d5 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -750,6 +750,15 @@ message ListCompactTaskProgressResponse { repeated CompactTaskProgress task_progress = 1; } +message CancelCompactTaskRequest { + uint64 task_id = 1; + CompactTask.TaskStatus task_status = 2; +} + +message CancelCompactTaskResponse { + bool ret = 1; +} + service HummockManagerService { rpc UnpinVersionBefore(UnpinVersionBeforeRequest) returns (UnpinVersionBeforeResponse); rpc GetCurrentVersion(GetCurrentVersionRequest) returns (GetCurrentVersionResponse); @@ -788,6 +797,7 @@ service HummockManagerService { rpc GetCompactionScore(GetCompactionScoreRequest) returns (GetCompactionScoreResponse); rpc ListCompactTaskAssignment(ListCompactTaskAssignmentRequest) returns (ListCompactTaskAssignmentResponse); rpc ListCompactTaskProgress(ListCompactTaskProgressRequest) returns (ListCompactTaskProgressResponse); + rpc CancelCompactTask(CancelCompactTaskRequest) returns (CancelCompactTaskResponse); } message CompactionConfig { diff --git a/src/ctl/src/cmd_impl/hummock/compaction_group.rs b/src/ctl/src/cmd_impl/hummock/compaction_group.rs index 60ec391e9b7c5..068472dd1ce4a 100644 --- a/src/ctl/src/cmd_impl/hummock/compaction_group.rs +++ b/src/ctl/src/cmd_impl/hummock/compaction_group.rs @@ -18,6 +18,7 @@ use comfy_table::{Row, Table}; use itertools::Itertools; use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_hummock_sdk::{CompactionGroupId, HummockContextId}; +use risingwave_pb::hummock::compact_task::TaskStatus; use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig; use crate::CtlContext; @@ -260,3 +261,13 @@ pub async fn get_compaction_score( println!("{table}"); Ok(()) } + +pub async fn cancel_compact_task(context: &CtlContext, task_id: u64) -> anyhow::Result<()> { + let meta_client = context.meta_client().await?; + let ret = meta_client + .cancel_compact_task(task_id, TaskStatus::ManualCanceled) + .await?; + println!("cancel_compact_task {} ret {:?}", task_id, ret); + + Ok(()) +} diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index 004b36302cf3b..95c9aa25a6b7a 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -266,6 +266,11 @@ enum HummockCommands { ValidateVersion, /// Rebuild table stats RebuildTableStats, + + CancelCompactTask { + #[clap(short, long)] + task_id: u64, + }, } #[derive(Subcommand)] @@ -655,6 +660,9 @@ pub async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { Commands::Hummock(HummockCommands::RebuildTableStats) => { cmd_impl::hummock::rebuild_table_stats(context).await?; } + Commands::Hummock(HummockCommands::CancelCompactTask { task_id }) => { + cmd_impl::hummock::cancel_compact_task(context, task_id).await?; + } Commands::Table(TableCommands::Scan { mv_name, data_dir }) => { cmd_impl::table::scan(context, mv_name, data_dir).await? } diff --git a/src/meta/service/src/hummock_service.rs b/src/meta/service/src/hummock_service.rs index 599b7a7cdb79d..a082b723dd124 100644 --- a/src/meta/service/src/hummock_service.rs +++ b/src/meta/service/src/hummock_service.rs @@ -651,6 +651,20 @@ impl HummockManagerService for HummockServiceImpl { task_progress, })) } + + async fn cancel_compact_task( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + let ret = self + .hummock_manager + .cancel_compact_task(request.task_id, request.task_status()) + .await?; + + let response = Response::new(CancelCompactTaskResponse { ret }); + return Ok(response); + } } #[cfg(test)] diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index d8318d79b63b1..2dc51961c901e 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -55,6 +55,7 @@ use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient; use risingwave_pb::ddl_service::drop_table_request::SourceId; use risingwave_pb::ddl_service::*; +use risingwave_pb::hummock::compact_task::TaskStatus; use risingwave_pb::hummock::get_compaction_score_response::PickerInfo; use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient; use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig; @@ -1250,6 +1251,15 @@ impl MetaClient { }) .join(); } + + pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result { + let req = CancelCompactTaskRequest { + task_id, + task_status: task_status as _, + }; + let resp = self.inner.cancel_compact_task(req).await?; + Ok(resp.ret) + } } #[async_trait] @@ -1891,6 +1901,7 @@ macro_rules! for_all_meta_rpc { ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse } ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse } ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse } + ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse} ,{ user_client, create_user, CreateUserRequest, CreateUserResponse } ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse } ,{ user_client, drop_user, DropUserRequest, DropUserResponse } From 5a3d2652a63c9d75e596b2a21e7b732d279fcfce Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Fri, 5 Jan 2024 00:19:07 -0500 Subject: [PATCH 7/7] feat(connector): support local fs source (#14312) Co-authored-by: KeXiangWang --- ci/scripts/e2e-source-test.sh | 3 + ci/scripts/notify.py | 3 +- ci/workflows/main-cron.yml | 16 +++ e2e_test/s3/posix_fs_source.py | 136 ++++++++++++++++++ e2e_test/s3/run_csv.py | 1 - e2e_test/source/opendal/data/data1.csv | 6 + e2e_test/source/opendal/data/data2.csv | 6 + e2e_test/source/opendal/posix_fs.slt | 33 +++++ src/connector/src/macros.rs | 1 + src/connector/src/source/base.rs | 12 +- .../source/filesystem/opendal_source/mod.rs | 41 ++++++ .../opendal_source/posix_fs_source.rs | 58 ++++++++ src/connector/src/source/mod.rs | 4 +- src/connector/with_options_source.yaml | 9 ++ src/frontend/src/handler/create_source.rs | 5 +- src/source/src/connector_source.rs | 7 +- .../src/executor/source/fetch_executor.rs | 7 +- src/stream/src/from_proto/source/fs_fetch.rs | 15 +- 18 files changed, 353 insertions(+), 10 deletions(-) create mode 100644 e2e_test/s3/posix_fs_source.py create mode 100644 e2e_test/source/opendal/data/data1.csv create mode 100644 e2e_test/source/opendal/data/data2.csv create mode 100644 e2e_test/source/opendal/posix_fs.slt create mode 100644 src/connector/src/source/filesystem/opendal_source/posix_fs_source.rs diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index c1624230abc7e..64144d051ad58 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -56,6 +56,9 @@ echo "--- inline cdc test" export MYSQL_HOST=mysql MYSQL_TCP_PORT=3306 MYSQL_PWD=123456 sqllogictest -p 4566 -d dev './e2e_test/source/cdc_inline/**/*.slt' +echo "--- opendal source test" +sqllogictest -p 4566 -d dev './e2e_test/source/opendal/**/*.slt' + echo "--- mysql & postgres cdc validate test" sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.validate.mysql.slt' sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.validate.postgres.slt' diff --git a/ci/scripts/notify.py b/ci/scripts/notify.py index 818dfce72143a..5266998b0045f 100755 --- a/ci/scripts/notify.py +++ b/ci/scripts/notify.py @@ -19,7 +19,8 @@ "e2e-java-binding-tests": ["yiming"], "e2e-clickhouse-sink-tests": ["bohan"], "e2e-pulsar-sink-tests": ["renjie"], - "s3-source-test-for-opendal-fs-engine": ["congyi"], + "s3-source-test-for-opendal-fs-engine": ["congyi", "kexiang"], + "s3-source-tests": ["congyi", "kexiang"], "pulsar-source-tests": ["renjie"], "connector-node-integration-test": ["siyuan"], } diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index 75f58eadf2492..653578e4688e2 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -489,6 +489,22 @@ steps: timeout_in_minutes: 25 retry: *auto-retry + - label: "PosixFs source on OpenDAL fs engine (csv parser)" + command: "ci/scripts/s3-source-test.sh -p ci-release -s 'posix_fs_source.py csv_without_header'" + if: | + !(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null + || build.pull_request.labels includes "ci/run-s3-source-tests" + || build.env("CI_STEPS") =~ /(^|,)s3-source-tests?(,|$$)/ + depends_on: build + plugins: + - docker-compose#v4.9.0: + run: rw-build-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs + timeout_in_minutes: 25 + retry: *auto-retry + - label: "S3 source on OpenDAL fs engine" key: "s3-source-test-for-opendal-fs-engine" command: "ci/scripts/s3-source-test-for-opendal-fs-engine.sh -p ci-release -s run" diff --git a/e2e_test/s3/posix_fs_source.py b/e2e_test/s3/posix_fs_source.py new file mode 100644 index 0000000000000..a7cea46fa496a --- /dev/null +++ b/e2e_test/s3/posix_fs_source.py @@ -0,0 +1,136 @@ +import os +import sys +import csv +import random +import psycopg2 +import opendal + +from time import sleep +from io import StringIO +from functools import partial + +def gen_data(file_num, item_num_per_file): + assert item_num_per_file % 2 == 0, \ + f'item_num_per_file should be even to ensure sum(mark) == 0: {item_num_per_file}' + return [ + [{ + 'id': file_id * item_num_per_file + item_id, + 'name': f'{file_id}_{item_id}', + 'sex': item_id % 2, + 'mark': (-1) ** (item_id % 2), + } for item_id in range(item_num_per_file)] + for file_id in range(file_num) + ] + +def format_csv(data, with_header): + csv_files = [] + + for file_data in data: + ostream = StringIO() + writer = csv.DictWriter(ostream, fieldnames=file_data[0].keys()) + if with_header: + writer.writeheader() + for item_data in file_data: + writer.writerow(item_data) + csv_files.append(ostream.getvalue()) + return csv_files + + +def do_test(file_num, item_num_per_file, prefix, fmt): + conn = psycopg2.connect( + host="localhost", + port="4566", + user="root", + database="dev" + ) + + # Open a cursor to execute SQL statements + cur = conn.cursor() + + def _table(): + return f'posix_fs_test_{fmt}' + + def _encode(): + return f"CSV (delimiter = ',', without_header = {str('without' in fmt).lower()})" + + # Execute a SELECT statement + cur.execute(f'''CREATE TABLE {_table()}( + id int, + name TEXT, + sex int, + mark int, + ) WITH ( + connector = 'posix_fs', + match_pattern = '{prefix}*.{fmt}', + posix_fs.root = '/tmp', + ) FORMAT PLAIN ENCODE {_encode()};''') + + total_rows = file_num * item_num_per_file + MAX_RETRIES = 40 + for retry_no in range(MAX_RETRIES): + cur.execute(f'select count(*) from {_table()}') + result = cur.fetchone() + if result[0] == total_rows: + break + print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 30s") + sleep(30) + + stmt = f'select count(*), sum(id), sum(sex), sum(mark) from {_table()}' + print(f'Execute {stmt}') + cur.execute(stmt) + result = cur.fetchone() + + print('Got:', result) + + def _assert_eq(field, got, expect): + assert got == expect, f'{field} assertion failed: got {got}, expect {expect}.' + + _assert_eq('count(*)', result[0], total_rows) + _assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2) + _assert_eq('sum(sex)', result[2], total_rows / 2) + _assert_eq('sum(mark)', result[3], 0) + + print('Test pass') + + cur.execute(f'drop table {_table()}') + cur.close() + conn.close() + + +if __name__ == "__main__": + FILE_NUM = 4001 + ITEM_NUM_PER_FILE = 2 + data = gen_data(FILE_NUM, ITEM_NUM_PER_FILE) + + fmt = sys.argv[1] + FORMATTER = { + 'csv_with_header': partial(format_csv, with_header=True), + 'csv_without_header': partial(format_csv, with_header=False), + } + assert fmt in FORMATTER, f"Unsupported format: {fmt}" + formatted_files = FORMATTER[fmt](data) + + run_id = str(random.randint(1000, 9999)) + _local = lambda idx: f'data_{idx}.{fmt}' + _posix = lambda idx: f"{run_id}_data_{idx}.{fmt}" + # put local files + op = opendal.Operator("fs", root="/tmp") + + print("write file to /tmp") + for idx, file_str in enumerate(formatted_files): + with open(_local(idx), "w") as f: + f.write(file_str) + os.fsync(f.fileno()) + file_name = _posix(idx) + file_bytes = file_str.encode('utf-8') + op.write(file_name, file_bytes) + + # do test + print("do test") + do_test(FILE_NUM, ITEM_NUM_PER_FILE, run_id, fmt) + + # clean up local files + print("clean up local files in /tmp") + for idx, _ in enumerate(formatted_files): + file_name = _posix(idx) + op.delete(file_name) diff --git a/e2e_test/s3/run_csv.py b/e2e_test/s3/run_csv.py index b721e3c796066..a6c0dc37bc4ca 100644 --- a/e2e_test/s3/run_csv.py +++ b/e2e_test/s3/run_csv.py @@ -1,5 +1,4 @@ import os -import string import json import string from time import sleep diff --git a/e2e_test/source/opendal/data/data1.csv b/e2e_test/source/opendal/data/data1.csv new file mode 100644 index 0000000000000..9279ffa24aa25 --- /dev/null +++ b/e2e_test/source/opendal/data/data1.csv @@ -0,0 +1,6 @@ +carat,cut,color,depth +0.25,Ideal,E,61.4 +0.22,Premium,I,62.0 +0.28,Good,J,63.1 +0.23,Very Good,H,57.5 +0.30,Fair,E,64.7 diff --git a/e2e_test/source/opendal/data/data2.csv b/e2e_test/source/opendal/data/data2.csv new file mode 100644 index 0000000000000..b5e39f73f2d51 --- /dev/null +++ b/e2e_test/source/opendal/data/data2.csv @@ -0,0 +1,6 @@ +carat,cut,color,depth +1.25,Ideal,E,61.4 +1.22,Premium,I,62.0 +1.28,Good,J,63.1 +1.23,Very Good,H,57.5 +1.30,Fair,E,64.7 diff --git a/e2e_test/source/opendal/posix_fs.slt b/e2e_test/source/opendal/posix_fs.slt new file mode 100644 index 0000000000000..3fc572a1a1cc8 --- /dev/null +++ b/e2e_test/source/opendal/posix_fs.slt @@ -0,0 +1,33 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +CREATE TABLE diamonds ( + carat FLOAT, + cut TEXT, + color TEXT, + depth FLOAT, +) WITH ( + connector = 'posix_fs', + match_pattern = 'data*.csv', + posix_fs.root = 'e2e_test/source/opendal/data', +) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ','); + +sleep 10s + +query TTTT rowsort +select * from diamonds; +---- +0.22 Premium I 62 +0.23 Very Good H 57.5 +0.25 Ideal E 61.4 +0.28 Good J 63.1 +0.3 Fair E 64.7 +1.22 Premium I 62 +1.23 Very Good H 57.5 +1.25 Ideal E 61.4 +1.28 Good J 63.1 +1.3 Fair E 64.7 + +statement ok +DROP TABLE diamonds; diff --git a/src/connector/src/macros.rs b/src/connector/src/macros.rs index 47839f6dc5d5a..9a2383dbb4a96 100644 --- a/src/connector/src/macros.rs +++ b/src/connector/src/macros.rs @@ -35,6 +35,7 @@ macro_rules! for_all_classified_sources { { S3, $crate::source::filesystem::S3Properties, $crate::source::filesystem::FsSplit }, { Gcs, $crate::source::filesystem::opendal_source::GcsProperties , $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalGcs> }, { OpendalS3, $crate::source::filesystem::opendal_source::OpendalS3Properties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalS3> }, + { PosixFs, $crate::source::filesystem::opendal_source::PosixFsProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalPosixFs> }, { Test, $crate::source::test_source::TestSourceProperties, $crate::source::test_source::TestSourceSplit} } $( diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 1a4a594d44285..b6093a351783b 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -42,7 +42,7 @@ use super::google_pubsub::GooglePubsubMeta; use super::kafka::KafkaMeta; use super::monitor::SourceMetrics; use super::nexmark::source::message::NexmarkMeta; -use super::OPENDAL_S3_CONNECTOR; +use super::{OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR}; use crate::parser::ParserConfig; pub(crate) use crate::source::common::CommonSplitReader; use crate::source::filesystem::FsPageItem; @@ -386,14 +386,20 @@ impl ConnectorProperties { pub fn is_new_fs_connector_b_tree_map(with_properties: &BTreeMap) -> bool { with_properties .get(UPSTREAM_SOURCE_KEY) - .map(|s| s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)) + .map(|s| { + s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR) + || s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR) + }) .unwrap_or(false) } pub fn is_new_fs_connector_hash_map(with_properties: &HashMap) -> bool { with_properties .get(UPSTREAM_SOURCE_KEY) - .map(|s| s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)) + .map(|s| { + s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR) + || s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR) + }) .unwrap_or(false) } } diff --git a/src/connector/src/source/filesystem/opendal_source/mod.rs b/src/connector/src/source/filesystem/opendal_source/mod.rs index 93e09be83ebf1..d6223c467d08b 100644 --- a/src/connector/src/source/filesystem/opendal_source/mod.rs +++ b/src/connector/src/source/filesystem/opendal_source/mod.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; pub mod gcs_source; +pub mod posix_fs_source; pub mod s3_source; use serde::Deserialize; @@ -31,6 +32,7 @@ use crate::source::{SourceProperties, UnknownFields}; pub const GCS_CONNECTOR: &str = "gcs"; // The new s3_v2 will use opendal. pub const OPENDAL_S3_CONNECTOR: &str = "s3_v2"; +pub const POSIX_FS_CONNECTOR: &str = "posix_fs"; #[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)] pub struct GcsProperties { @@ -89,6 +91,17 @@ impl OpendalSource for OpendalGcs { } } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct OpendalPosixFs; + +impl OpendalSource for OpendalPosixFs { + type Properties = PosixFsProperties; + + fn new_enumerator(properties: Self::Properties) -> anyhow::Result> { + OpendalEnumerator::new_posix_fs_source(properties) + } +} + #[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)] pub struct OpendalS3Properties { #[serde(flatten)] @@ -115,3 +128,31 @@ impl SourceProperties for OpendalS3Properties { const SOURCE_NAME: &'static str = OPENDAL_S3_CONNECTOR; } + +#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)] +pub struct PosixFsProperties { + // The root directly of the files to search. The files will be searched recursively. + #[serde(rename = "posix_fs.root")] + pub root: String, + + // The regex pattern to match files under root directory. + #[serde(rename = "match_pattern", default)] + pub match_pattern: Option, + + #[serde(flatten)] + pub unknown_fields: HashMap, +} + +impl UnknownFields for PosixFsProperties { + fn unknown_fields(&self) -> HashMap { + self.unknown_fields.clone() + } +} + +impl SourceProperties for PosixFsProperties { + type Split = OpendalFsSplit; + type SplitEnumerator = OpendalEnumerator; + type SplitReader = OpendalReader; + + const SOURCE_NAME: &'static str = POSIX_FS_CONNECTOR; +} diff --git a/src/connector/src/source/filesystem/opendal_source/posix_fs_source.rs b/src/connector/src/source/filesystem/opendal_source/posix_fs_source.rs new file mode 100644 index 0000000000000..748230ba5a16e --- /dev/null +++ b/src/connector/src/source/filesystem/opendal_source/posix_fs_source.rs @@ -0,0 +1,58 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::marker::PhantomData; + +use anyhow::Context; +use opendal::layers::{LoggingLayer, RetryLayer}; +use opendal::services::Fs; +use opendal::Operator; + +use super::opendal_enumerator::OpendalEnumerator; +use super::{OpendalSource, PosixFsProperties}; + +// Posix fs source should only be used for testing. +// For a single-CN cluster, the behavior is well-defined. It will read from the local file system. +// For a multi-CN cluster, each CN will read from its own local file system under the given directory. + +impl OpendalEnumerator { + /// create opendal posix fs source. + pub fn new_posix_fs_source(posix_fs_properties: PosixFsProperties) -> anyhow::Result { + // Create Fs builder. + let mut builder = Fs::default(); + + builder.root(&posix_fs_properties.root); + + let op: Operator = Operator::new(builder)? + .layer(LoggingLayer::default()) + .layer(RetryLayer::default()) + .finish(); + + let (prefix, matcher) = if let Some(pattern) = posix_fs_properties.match_pattern.as_ref() { + // TODO(Kexiang): Currently, FsListnenr in opendal does not support a prefix. (Seems a bug in opendal) + // So we assign prefix to empty string. + let matcher = glob::Pattern::new(pattern) + .with_context(|| format!("Invalid match_pattern: {}", pattern))?; + (Some(String::new()), Some(matcher)) + } else { + (None, None) + }; + Ok(Self { + op, + prefix, + matcher, + marker: PhantomData, + }) + } +} diff --git a/src/connector/src/source/mod.rs b/src/connector/src/source/mod.rs index 9dd1ef5b76cde..6cdc7d30e277a 100644 --- a/src/connector/src/source/mod.rs +++ b/src/connector/src/source/mod.rs @@ -39,7 +39,9 @@ pub use manager::{SourceColumnDesc, SourceColumnType}; pub use crate::parser::additional_columns::{ get_connector_compatible_additional_columns, CompatibleAdditionalColumnsFn, }; -pub use crate::source::filesystem::opendal_source::{GCS_CONNECTOR, OPENDAL_S3_CONNECTOR}; +pub use crate::source::filesystem::opendal_source::{ + GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, +}; pub use crate::source::filesystem::S3_CONNECTOR; pub use crate::source::nexmark::NEXMARK_CONNECTOR; pub use crate::source::pulsar::PULSAR_CONNECTOR; diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index 6ee69ccd2ab18..98a45599b56f2 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -455,6 +455,15 @@ OpendalS3Properties: field_type: String required: false default: Default::default +PosixFsProperties: + fields: + - name: posix_fs.root + field_type: String + required: true + - name: match_pattern + field_type: String + required: false + default: Default::default PubsubProperties: fields: - name: pubsub.split_count diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 80e526b01c15f..c0d3248b5af8a 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -45,7 +45,7 @@ use risingwave_connector::source::test_source::TEST_CONNECTOR; use risingwave_connector::source::{ get_connector_compatible_additional_columns, GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR, OPENDAL_S3_CONNECTOR, - PULSAR_CONNECTOR, S3_CONNECTOR, + POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR, }; use risingwave_pb::catalog::{ PbSchemaRegistryNameStrategy, PbSource, StreamSourceInfo, WatermarkDesc, @@ -917,6 +917,9 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock hashmap!( Format::Plain => vec![Encode::Csv, Encode::Json], ), + POSIX_FS_CONNECTOR => hashmap!( + Format::Plain => vec![Encode::Csv], + ), MYSQL_CDC_CONNECTOR => hashmap!( Format::Debezium => vec![Encode::Json], // support source stream job diff --git a/src/source/src/connector_source.rs b/src/source/src/connector_source.rs index ecb1846c01249..441a91836bb0a 100644 --- a/src/source/src/connector_source.rs +++ b/src/source/src/connector_source.rs @@ -30,7 +30,7 @@ use risingwave_connector::dispatch_source_prop; use risingwave_connector::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig}; use risingwave_connector::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator; use risingwave_connector::source::filesystem::opendal_source::{ - OpendalGcs, OpendalS3, OpendalSource, + OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource, }; use risingwave_connector::source::filesystem::FsPageItem; use risingwave_connector::source::{ @@ -103,6 +103,11 @@ impl ConnectorSource { OpendalEnumerator::new_s3_source(prop.s3_properties, prop.assume_role)?; Ok(build_opendal_fs_list_stream(lister)) } + ConnectorProperties::PosixFs(prop) => { + let lister: OpendalEnumerator = + OpendalEnumerator::new_posix_fs_source(*prop)?; + Ok(build_opendal_fs_list_stream(lister)) + } other => bail!("Unsupported source: {:?}", other), } } diff --git a/src/stream/src/executor/source/fetch_executor.rs b/src/stream/src/executor/source/fetch_executor.rs index 32cd08e9c87fe..633dce3b0b9a8 100644 --- a/src/stream/src/executor/source/fetch_executor.rs +++ b/src/stream/src/executor/source/fetch_executor.rs @@ -26,7 +26,7 @@ use risingwave_common::hash::VnodeBitmapExt; use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::{ScalarRef, ScalarRefImpl}; use risingwave_connector::source::filesystem::opendal_source::{ - OpendalGcs, OpendalS3, OpendalSource, + OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource, }; use risingwave_connector::source::filesystem::OpendalFsSplit; use risingwave_connector::source::{ @@ -125,6 +125,11 @@ impl FsFetchExecutor { OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?; SplitImpl::from(split) } + risingwave_connector::source::ConnectorProperties::PosixFs(_) => { + let split: OpendalFsSplit = + OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?; + SplitImpl::from(split) + } _ => unreachable!(), }, _ => unreachable!(), diff --git a/src/stream/src/from_proto/source/fs_fetch.rs b/src/stream/src/from_proto/source/fs_fetch.rs index dc6b718f566ea..8d8cb78a80b19 100644 --- a/src/stream/src/from_proto/source/fs_fetch.rs +++ b/src/stream/src/from_proto/source/fs_fetch.rs @@ -15,7 +15,9 @@ use std::sync::Arc; use risingwave_common::catalog::{ColumnId, TableId}; -use risingwave_connector::source::filesystem::opendal_source::{OpendalGcs, OpendalS3}; +use risingwave_connector::source::filesystem::opendal_source::{ + OpendalGcs, OpendalPosixFs, OpendalS3, +}; use risingwave_connector::source::{ConnectorProperties, SourceCtrlOpts}; use risingwave_pb::stream_plan::StreamFsFetchNode; use risingwave_source::source_desc::SourceDescBuilder; @@ -110,6 +112,17 @@ impl ExecutorBuilder for FsFetchExecutorBuilder { ) .boxed() } + risingwave_connector::source::ConnectorProperties::PosixFs(_) => { + FsFetchExecutor::<_, OpendalPosixFs>::new( + params.actor_context.clone(), + params.info, + stream_source_core, + upstream, + source_ctrl_opts, + params.env.connector_params(), + ) + .boxed() + } _ => unreachable!(), }; let rate_limit = source.rate_limit.map(|x| x as _);