From c5c2119901fa0ff17fd56a878293359f770b1f14 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Thu, 14 Nov 2024 16:12:21 +0800 Subject: [PATCH 1/7] fix(ci): delete unused python install before embedded UDF tests (#19386) Signed-off-by: Richard Chien --- ci/scripts/run-e2e-test.sh | 1 - 1 file changed, 1 deletion(-) diff --git a/ci/scripts/run-e2e-test.sh b/ci/scripts/run-e2e-test.sh index b3956998bd0e..a8601fbb0ebe 100755 --- a/ci/scripts/run-e2e-test.sh +++ b/ci/scripts/run-e2e-test.sh @@ -136,7 +136,6 @@ sqllogictest -p 4566 -d dev './e2e_test/udf/external_udf.slt' pkill java echo "--- e2e, $mode, embedded udf" -python3 -m pip install --break-system-packages flask waitress sqllogictest -p 4566 -d dev './e2e_test/udf/wasm_udf.slt' sqllogictest -p 4566 -d dev './e2e_test/udf/rust_udf.slt' sqllogictest -p 4566 -d dev './e2e_test/udf/js_udf.slt' From 9aded7178aefb5436009b4cebd5e8f46b59429b2 Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Thu, 14 Nov 2024 17:00:59 +0800 Subject: [PATCH 2/7] refactor(meta): merge drop and cancel streaming job command (#19369) --- src/meta/src/barrier/checkpoint/control.rs | 28 ++++++++------ src/meta/src/barrier/command.rs | 39 ++++++++++---------- src/meta/src/barrier/context/context_impl.rs | 19 ---------- src/meta/src/barrier/progress.rs | 2 +- src/meta/src/barrier/schedule.rs | 25 ++++--------- src/meta/src/stream/stream_manager.rs | 11 +++--- 6 files changed, 52 insertions(+), 72 deletions(-) diff --git a/src/meta/src/barrier/checkpoint/control.rs b/src/meta/src/barrier/checkpoint/control.rs index beb77b3217ad..31150554cc68 100644 --- a/src/meta/src/barrier/checkpoint/control.rs +++ b/src/meta/src/barrier/checkpoint/control.rs @@ -758,20 +758,26 @@ impl DatabaseCheckpointControl { (None, vec![]) }; - if let Some(table_to_cancel) = command.as_ref().and_then(Command::table_to_cancel) - && self + for table_to_cancel in command + .as_ref() + .map(Command::tables_to_drop) + .into_iter() + .flatten() + { + if self .creating_streaming_job_controls .contains_key(&table_to_cancel) - { - warn!( - table_id = table_to_cancel.table_id, - "ignore cancel command on creating streaming job" - ); - for notifier in notifiers { - notifier - .notify_start_failed(anyhow!("cannot cancel creating streaming job, the job will continue creating until created or recovery").into()); + { + warn!( + table_id = table_to_cancel.table_id, + "ignore cancel command on creating streaming job" + ); + for notifier in notifiers { + notifier + .notify_start_failed(anyhow!("cannot cancel creating streaming job, the job will continue creating until created or recovery").into()); + } + return Ok(()); } - return Ok(()); } if let Some(Command::RescheduleFragment { .. }) = &command { diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 73ebc8d44629..d2dd3058544c 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -232,6 +232,7 @@ pub enum Command { /// After the barrier is collected, it notifies the local stream manager of compute nodes to /// drop actors, and then delete the table fragments info from meta store. DropStreamingJobs { + table_fragments_ids: HashSet, actors: Vec, unregistered_state_table_ids: HashSet, unregistered_fragment_ids: HashSet, @@ -253,11 +254,6 @@ pub enum Command { MergeSnapshotBackfillStreamingJobs( HashMap, ), - /// `CancelStreamingJob` command generates a `Stop` barrier including the actors of the given - /// table fragment. - /// - /// The collecting and cleaning part works exactly the same as `DropStreamingJobs` command. - CancelStreamingJob(TableFragments), /// `Reschedule` command generates a `Update` barrier by the [`Reschedule`] of each fragment. /// Mainly used for scaling and migration. @@ -313,6 +309,18 @@ impl Command { Self::Resume(reason) } + pub fn cancel(table_fragments: &TableFragments) -> Self { + Self::DropStreamingJobs { + table_fragments_ids: HashSet::from_iter([table_fragments.table_id()]), + actors: table_fragments.actor_ids(), + unregistered_state_table_ids: table_fragments + .all_table_ids() + .map(TableId::new) + .collect(), + unregistered_fragment_ids: table_fragments.fragment_ids().collect(), + } + } + pub(crate) fn fragment_changes(&self) -> Option> { match self { Command::Flush => None, @@ -352,13 +360,6 @@ impl Command { Some(changes) } - Command::CancelStreamingJob(table_fragments) => Some( - table_fragments - .fragments - .values() - .map(|fragment| (fragment.fragment_id, CommandFragmentChanges::RemoveFragment)) - .collect(), - ), Command::RescheduleFragment { reschedules, .. } => Some( reschedules .iter() @@ -726,11 +727,6 @@ impl Command { })) } - Command::CancelStreamingJob(table_fragments) => { - let actors = table_fragments.actor_ids(); - Some(Mutation::Stop(StopMutation { actors })) - } - Command::ReplaceTable(ReplaceTablePlan { old_table_fragments, merge_updates, @@ -1013,10 +1009,15 @@ impl Command { } /// For `CancelStreamingJob`, returns the table id of the target table. - pub fn table_to_cancel(&self) -> Option { + pub fn tables_to_drop(&self) -> impl Iterator + '_ { match self { - Command::CancelStreamingJob(table_fragments) => Some(table_fragments.table_id()), + Command::DropStreamingJobs { + table_fragments_ids, + .. + } => Some(table_fragments_ids.iter().cloned()), _ => None, } + .into_iter() + .flatten() } } diff --git a/src/meta/src/barrier/context/context_impl.rs b/src/meta/src/barrier/context/context_impl.rs index 947c8a08ad3f..fee2a31550cc 100644 --- a/src/meta/src/barrier/context/context_impl.rs +++ b/src/meta/src/barrier/context/context_impl.rs @@ -15,7 +15,6 @@ use std::sync::Arc; use futures::future::try_join_all; -use risingwave_common::catalog::TableId; use risingwave_pb::common::WorkerNode; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::meta::PausedReason; @@ -163,24 +162,6 @@ impl CommandContext { .await?; } - Command::CancelStreamingJob(table_fragments) => { - tracing::debug!(id = ?table_fragments.table_id(), "cancelling stream job"); - - // NOTE(kwannoel): At this point, meta has already registered the table ids. - // We should unregister them. - // This is required for background ddl, for foreground ddl this is a no-op. - // Foreground ddl is handled entirely by stream manager, so it will unregister - // the table ids on failure. - // On the other hand background ddl could be handled by barrier manager. - // It won't clean the tables on failure, - // since the failure could be recoverable. - // As such it needs to be handled here. - barrier_manager_context - .hummock_manager - .unregister_table_ids(table_fragments.all_table_ids().map(TableId::new)) - .await?; - } - Command::CreateStreamingJob { info, job_type } => { let CreateStreamingJobCommandInfo { table_fragments, diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index 36d1a9a0b242..a40d526bc9ee 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -447,7 +447,7 @@ impl CreateMviewProgressTracker { .flat_map(|resp| resp.create_mview_progress.iter()), version_stats, ); - if let Some(table_id) = command.and_then(Command::table_to_cancel) { + for table_id in command.map(Command::tables_to_drop).into_iter().flatten() { // the cancelled command is possibly stashed in `finished_commands` and waiting // for checkpoint, we should also clear it. self.cancel_command(table_id); diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index ebffb56efe5a..2b3b78ede2f7 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashSet, VecDeque}; +use std::collections::VecDeque; use std::iter::once; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -32,7 +32,6 @@ use super::notifier::Notifier; use super::{Command, Scheduled}; use crate::barrier::context::GlobalBarrierWorkerContext; use crate::hummock::HummockManagerRef; -use crate::model::ActorId; use crate::rpc::metrics::MetaMetrics; use crate::{MetaError, MetaResult}; @@ -106,9 +105,7 @@ impl ScheduledQueue { if let QueueStatus::Blocked(reason) = &self.status && !matches!( scheduled.command, - Command::DropStreamingJobs { .. } - | Command::CancelStreamingJob(_) - | Command::DropSubscription { .. } + Command::DropStreamingJobs { .. } | Command::DropSubscription { .. } ) { return Err(MetaError::unavailable(reason)); @@ -400,9 +397,7 @@ impl ScheduledBarriers { impl ScheduledBarriers { /// Pre buffered drop and cancel command, return true if any. pub(super) fn pre_apply_drop_cancel(&self) -> bool { - let (dropped_actors, cancelled) = self.pre_apply_drop_cancel_scheduled(); - - !dropped_actors.is_empty() || !cancelled.is_empty() + self.pre_apply_drop_cancel_scheduled() } /// Mark command scheduler as blocked and abort all queued scheduled command and notify with @@ -425,22 +420,18 @@ impl ScheduledBarriers { /// Try to pre apply drop and cancel scheduled command and return them if any. /// It should only be called in recovery. - pub(super) fn pre_apply_drop_cancel_scheduled(&self) -> (Vec, HashSet) { + pub(super) fn pre_apply_drop_cancel_scheduled(&self) -> bool { let mut queue = self.inner.queue.lock(); assert_matches!(queue.status, QueueStatus::Blocked(_)); - let (mut dropped_actors, mut cancel_table_ids) = (vec![], HashSet::new()); + let mut applied = false; while let Some(ScheduledQueueItem { notifiers, command, .. }) = queue.queue.pop_front() { match command { - Command::DropStreamingJobs { actors, .. } => { - dropped_actors.extend(actors); - } - Command::CancelStreamingJob(table_fragments) => { - let table_id = table_fragments.table_id(); - cancel_table_ids.insert(table_id); + Command::DropStreamingJobs { .. } => { + applied = true; } Command::DropSubscription { .. } => {} _ => { @@ -451,7 +442,7 @@ impl ScheduledBarriers { notify.notify_collected(); }); } - (dropped_actors, cancel_table_ids) + applied } } diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 509fffefd99b..d15a73ecfa9c 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -299,10 +299,7 @@ impl GlobalStreamManager { .await?; self.barrier_scheduler - .run_command( - database_id, - Command::CancelStreamingJob(table_fragments), - ) + .run_command(database_id, Command::cancel(&table_fragments)) .await?; } else { // streaming job is already completed. @@ -514,6 +511,10 @@ impl GlobalStreamManager { .run_command( database_id, Command::DropStreamingJobs { + table_fragments_ids: streaming_job_ids + .iter() + .map(|job_id| TableId::new(*job_id as _)) + .collect(), actors: removed_actors, unregistered_state_table_ids: state_table_ids .into_iter() @@ -576,7 +577,7 @@ impl GlobalStreamManager { if let Some(database_id) = database_id { self.barrier_scheduler - .run_command(DatabaseId::new(database_id as _), Command::CancelStreamingJob(fragment)) + .run_command(DatabaseId::new(database_id as _), Command::cancel(&fragment)) .await?; } }; From 1a97b4cccf266594477e487c063de76f2a085944 Mon Sep 17 00:00:00 2001 From: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Date: Thu, 14 Nov 2024 17:35:07 +0800 Subject: [PATCH 3/7] feat(frontend): support iceberg predicate pushdown (#19228) --- ci/scripts/e2e-iceberg-sink-v2-test.sh | 1 + ci/workflows/main-cron.yml | 3 +- ci/workflows/pull-request.yml | 2 +- .../iceberg/start_spark_connect_server.sh | 2 +- .../test_case/iceberg_predicate_pushdown.slt | 143 ++++++++ .../test_case/iceberg_predicate_pushdown.toml | 11 + src/connector/src/source/iceberg/mod.rs | 42 ++- src/frontend/src/optimizer/mod.rs | 8 + .../src/optimizer/plan_node/batch_filter.rs | 6 + .../optimizer/plan_node/batch_iceberg_scan.rs | 46 ++- .../batch/batch_iceberg_predicate_pushdown.rs | 305 ++++++++++++++++++ src/frontend/src/optimizer/rule/batch/mod.rs | 1 + src/frontend/src/optimizer/rule/mod.rs | 2 + src/frontend/src/scheduler/plan_fragmenter.rs | 55 +++- 14 files changed, 599 insertions(+), 28 deletions(-) create mode 100644 e2e_test/iceberg/test_case/iceberg_predicate_pushdown.slt create mode 100644 e2e_test/iceberg/test_case/iceberg_predicate_pushdown.toml create mode 100644 src/frontend/src/optimizer/rule/batch/batch_iceberg_predicate_pushdown.rs diff --git a/ci/scripts/e2e-iceberg-sink-v2-test.sh b/ci/scripts/e2e-iceberg-sink-v2-test.sh index bcb530ae9fdd..27fc92a789d1 100755 --- a/ci/scripts/e2e-iceberg-sink-v2-test.sh +++ b/ci/scripts/e2e-iceberg-sink-v2-test.sh @@ -49,6 +49,7 @@ poetry run python main.py -t ./test_case/iceberg_select_empty_table.toml poetry run python main.py -t ./test_case/iceberg_source_equality_delete.toml poetry run python main.py -t ./test_case/iceberg_source_position_delete.toml poetry run python main.py -t ./test_case/iceberg_source_all_delete.toml +poetry run python main.py -t ./test_case/iceberg_predicate_pushdown.toml echo "--- Kill cluster" diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index 3f3cd705f09f..e8a0fa32f101 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -414,14 +414,13 @@ steps: depends_on: - "build" - "build-other" - plugins: - docker-compose#v5.1.0: run: rw-build-env config: ci/docker-compose.yml mount-buildkite-agent: true - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 7 + timeout_in_minutes: 9 retry: *auto-retry - label: "end-to-end iceberg sink v2 test (release)" diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index 2991397dfb1c..e10ffb2d0091 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -270,7 +270,7 @@ steps: config: ci/docker-compose.yml mount-buildkite-agent: true - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 15 + timeout_in_minutes: 17 retry: *auto-retry - label: "end-to-end iceberg cdc test" diff --git a/e2e_test/iceberg/start_spark_connect_server.sh b/e2e_test/iceberg/start_spark_connect_server.sh index 8f0c2640a1b5..7996899f7a4d 100755 --- a/e2e_test/iceberg/start_spark_connect_server.sh +++ b/e2e_test/iceberg/start_spark_connect_server.sh @@ -11,7 +11,7 @@ PACKAGES="$PACKAGES,org.apache.spark:spark-connect_2.12:$SPARK_VERSION" SPARK_FILE="spark-${SPARK_VERSION}-bin-hadoop3.tgz" if [ ! -d "spark-${SPARK_VERSION}-bin-hadoop3" ];then - wget https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/$SPARK_FILE + wget --no-verbose https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/$SPARK_FILE tar -xzf $SPARK_FILE --no-same-owner fi diff --git a/e2e_test/iceberg/test_case/iceberg_predicate_pushdown.slt b/e2e_test/iceberg/test_case/iceberg_predicate_pushdown.slt new file mode 100644 index 000000000000..2075d129a8a1 --- /dev/null +++ b/e2e_test/iceberg/test_case/iceberg_predicate_pushdown.slt @@ -0,0 +1,143 @@ +statement ok +set sink_decouple = false; + +statement ok +set streaming_parallelism=4; + +statement ok +drop table if exists s1 cascade; + +statement ok +CREATE TABLE s1 (i1 int, i2 varchar, i3 varchar); + +statement ok +insert into s1 select x, 'some str', 'another str' from generate_series(1, 500) t(x); + +statement ok +insert into s1 select x, null as y, null as z from generate_series(501, 1000) t(x); + +statement ok +flush; + +statement ok +CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM s1; + +statement ok +CREATE SINK sink1 AS select * from mv1 WITH ( + connector = 'iceberg', + type = 'append-only', + force_append_only = 'true', + database.name = 'demo_db', + table.name = 't1', + catalog.name = 'demo', + catalog.type = 'storage', + warehouse.path = 's3a://icebergdata/demo', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin', + commit_checkpoint_interval = 1, + create_table_if_not_exists = 'true' +); + +statement ok +drop source if exists iceberg_t1_source; + +statement ok +CREATE SOURCE iceberg_t1_source +WITH ( + connector = 'iceberg', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin', + s3.path.style.access = 'true', + catalog.type = 'storage', + warehouse.path = 's3a://icebergdata/demo', + database.name = 'demo_db', + table.name = 't1', +); + +statement ok +flush; + +query I +select * from iceberg_t1_source order by i1 limit 1; +---- +1 some str another str + +query I +select count(*) from iceberg_t1_source; +---- +1000 + +query I +select * from iceberg_t1_source where i1 > 990 order by i1; +---- +991 NULL NULL +992 NULL NULL +993 NULL NULL +994 NULL NULL +995 NULL NULL +996 NULL NULL +997 NULL NULL +998 NULL NULL +999 NULL NULL +1000 NULL NULL + +query I +explain select * from iceberg_t1_source where i1 > 500 and i1 < 600 and i1 >= 550 and i1 <= 590 and i1 != 570 and i1 = 580; +---- + BatchExchange { order: [], dist: Single } + └─BatchIcebergScan { source: iceberg_t1_source, columns: [i1, i2, i3], predicate: (((((i1 = 580) AND (i1 > 500)) AND (i1 < 600)) AND (i1 >= 550)) AND (i1 <= 590)) AND (i1 != 570) } + +query I +select i1 from iceberg_t1_source where i1 > 500 and i1 < 600 and i1 >= 550 and i1 <= 590 and i1 != 570 and i1 = 580; +---- +580 + +query I +explain select * from iceberg_t1_source where i1 in (1, 2, 3, 4, 5); +---- + BatchExchange { order: [], dist: Single } + └─BatchIcebergScan { source: iceberg_t1_source, columns: [i1, i2, i3], predicate: i1 IN (5, 4, 1, 3, 2) } + +query I +select i1 from iceberg_t1_source where i1 in (1, 2, 3, 4, 5) order by i1; +---- +1 +2 +3 +4 +5 + +query I +select count(*), i2, i3 from iceberg_t1_source where i2 = 'some str' and i3 = 'another str' group by i2, i3; +---- +500 some str another str + +query I +explain select i1 from iceberg_t1_source where i1 > 500 and i2 = i3; +---- + BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [i1] } + └─BatchFilter { predicate: (i2 = i3) } + └─BatchIcebergScan { source: iceberg_t1_source, columns: [i1, i2, i3], predicate: i1 > 500 } + +query I +select i1 from iceberg_t1_source where i1 > 500 and i2 = i3; +---- + +# Empty splits should not panic +query I +select i1 from iceberg_t1_source where i1 > 1001; +---- + +statement ok +DROP SINK sink1; + +statement ok +DROP SOURCE iceberg_t1_source; + +statement ok +DROP TABLE s1 cascade; diff --git a/e2e_test/iceberg/test_case/iceberg_predicate_pushdown.toml b/e2e_test/iceberg/test_case/iceberg_predicate_pushdown.toml new file mode 100644 index 000000000000..c08dcbb827db --- /dev/null +++ b/e2e_test/iceberg/test_case/iceberg_predicate_pushdown.toml @@ -0,0 +1,11 @@ +init_sqls = [ + 'CREATE SCHEMA IF NOT EXISTS demo_db', + 'DROP TABLE IF EXISTS demo_db.t1', +] + +slt = 'test_case/iceberg_predicate_pushdown.slt' + +drop_sqls = [ + 'DROP TABLE IF EXISTS demo_db.t1', + 'DROP SCHEMA IF EXISTS demo_db', +] diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index 60a26e43e1d3..aeb642c80a01 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -19,6 +19,7 @@ use std::collections::HashMap; use anyhow::anyhow; use async_trait::async_trait; use futures_async_stream::for_await; +use iceberg::expr::Predicate as IcebergPredicate; use iceberg::scan::FileScanTask; use iceberg::spec::TableMetadata; use iceberg::table::Table; @@ -137,6 +138,19 @@ pub struct IcebergSplit { pub position_delete_files: Vec, } +impl IcebergSplit { + pub fn empty(table_meta: TableMetadataJsonStr) -> Self { + Self { + split_id: 0, + snapshot_id: 0, + table_meta, + files: vec![], + equality_delete_files: vec![], + position_delete_files: vec![], + } + } +} + impl SplitMetaData for IcebergSplit { fn id(&self) -> SplitId { self.split_id.to_string().into() @@ -189,6 +203,7 @@ impl IcebergSplitEnumerator { schema: Schema, time_traval_info: Option, batch_parallelism: usize, + predicate: IcebergPredicate, ) -> ConnectorResult> { if batch_parallelism == 0 { bail!("Batch parallelism is 0. Cannot split the iceberg files."); @@ -199,14 +214,9 @@ impl IcebergSplitEnumerator { let current_snapshot = table.metadata().current_snapshot(); if current_snapshot.is_none() { // If there is no snapshot, we will return a mock `IcebergSplit` with empty files. - return Ok(vec![IcebergSplit { - split_id: 0, - snapshot_id: 0, - table_meta: TableMetadataJsonStr::serialize(table.metadata()), - files: vec![], - equality_delete_files: vec![], - position_delete_files: vec![], - }]); + return Ok(vec![IcebergSplit::empty(TableMetadataJsonStr::serialize( + table.metadata(), + ))]); } let snapshot_id = match time_traval_info { @@ -246,11 +256,15 @@ impl IcebergSplitEnumerator { let require_names = Self::get_require_field_names(&table, snapshot_id, &schema).await?; + let table_schema = table.metadata().current_schema(); + tracing::debug!("iceberg_table_schema: {:?}", table_schema); + let mut position_delete_files = vec![]; let mut data_files = vec![]; let mut equality_delete_files = vec![]; let scan = table .scan() + .with_filter(predicate) .snapshot_id(snapshot_id) .select(require_names) .build() @@ -302,10 +316,18 @@ impl IcebergSplitEnumerator { .files .push(data_files[split_num * split_size + i].clone()); } - Ok(splits + let splits = splits .into_iter() .filter(|split| !split.files.is_empty()) - .collect_vec()) + .collect_vec(); + + if splits.is_empty() { + return Ok(vec![IcebergSplit::empty(TableMetadataJsonStr::serialize( + table.metadata(), + ))]); + } + + Ok(splits) } /// The required field names are the intersection of the output shema and the equality delete columns. diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index e08f6b2c4dd4..30d51bb93326 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -402,6 +402,14 @@ impl PlanRoot { ApplyOrder::BottomUp, )); + // For iceberg scan, we do iceberg predicate pushdown + // BatchFilter -> BatchIcebergScan + let plan = plan.optimize_by_rules(&OptimizationStage::new( + "Iceberg Predicate Pushdown", + vec![BatchIcebergPredicatePushDownRule::create()], + ApplyOrder::BottomUp, + )); + assert_eq!(plan.convention(), Convention::Batch); Ok(plan) } diff --git a/src/frontend/src/optimizer/plan_node/batch_filter.rs b/src/frontend/src/optimizer/plan_node/batch_filter.rs index ff89eacd485c..6404fd852e6d 100644 --- a/src/frontend/src/optimizer/plan_node/batch_filter.rs +++ b/src/frontend/src/optimizer/plan_node/batch_filter.rs @@ -45,6 +45,12 @@ impl BatchFilter { pub fn predicate(&self) -> &Condition { &self.core.predicate } + + pub fn clone_with_predicate(&self, predicate: Condition) -> Self { + let mut core = self.core.clone(); + core.predicate = predicate; + Self::new(core) + } } impl_distill_by_unit!(BatchFilter, core, "BatchFilter"); diff --git a/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs b/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs index 4333fcaa3e90..815b711faa29 100644 --- a/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::hash::{Hash, Hasher}; use std::rc::Rc; +use iceberg::expr::Predicate as IcebergPredicate; use pretty_xmlish::{Pretty, XmlNode}; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::IcebergScanNode; @@ -29,10 +31,36 @@ use crate::error::Result; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::property::{Distribution, Order}; -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone)] pub struct BatchIcebergScan { pub base: PlanBase, pub core: generic::Source, + pub predicate: IcebergPredicate, +} + +impl PartialEq for BatchIcebergScan { + fn eq(&self, other: &Self) -> bool { + if self.predicate == IcebergPredicate::AlwaysTrue + && other.predicate == IcebergPredicate::AlwaysTrue + { + self.base == other.base && self.core == other.core + } else { + panic!("BatchIcebergScan::eq: comparing non-AlwaysTrue predicates is not supported") + } + } +} + +impl Eq for BatchIcebergScan {} + +impl Hash for BatchIcebergScan { + fn hash(&self, state: &mut H) { + if self.predicate != IcebergPredicate::AlwaysTrue { + panic!("BatchIcebergScan::hash: hashing non-AlwaysTrue predicates is not supported") + } else { + self.base.hash(state); + self.core.hash(state); + } + } } impl BatchIcebergScan { @@ -44,7 +72,11 @@ impl BatchIcebergScan { Order::any(), ); - Self { base, core } + Self { + base, + core, + predicate: IcebergPredicate::AlwaysTrue, + } } pub fn column_names(&self) -> Vec<&str> { @@ -62,6 +94,15 @@ impl BatchIcebergScan { Self { base, core: self.core.clone(), + predicate: self.predicate.clone(), + } + } + + pub fn clone_with_predicate(&self, predicate: IcebergPredicate) -> Self { + Self { + base: self.base.clone(), + core: self.core.clone(), + predicate, } } @@ -78,6 +119,7 @@ impl Distill for BatchIcebergScan { let fields = vec![ ("source", src), ("columns", column_names_pretty(self.schema())), + ("predicate", Pretty::from(self.predicate.to_string())), ]; childless_record("BatchIcebergScan", fields) } diff --git a/src/frontend/src/optimizer/rule/batch/batch_iceberg_predicate_pushdown.rs b/src/frontend/src/optimizer/rule/batch/batch_iceberg_predicate_pushdown.rs new file mode 100644 index 000000000000..8df8777d5938 --- /dev/null +++ b/src/frontend/src/optimizer/rule/batch/batch_iceberg_predicate_pushdown.rs @@ -0,0 +1,305 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +use chrono::Datelike; +use iceberg::expr::{Predicate as IcebergPredicate, Reference}; +use iceberg::spec::Datum as IcebergDatum; +use risingwave_common::catalog::Field; +use risingwave_common::types::{Decimal, ScalarImpl}; + +use crate::expr::{ExprImpl, ExprType, Literal}; +use crate::optimizer::plan_node::generic::GenericPlanRef; +use crate::optimizer::plan_node::{BatchFilter, BatchIcebergScan, PlanTreeNodeUnary}; +use crate::optimizer::rule::{BoxedRule, Rule}; +use crate::optimizer::PlanRef; +use crate::utils::Condition; + +/// NOTE(kwannoel): We do predicate pushdown to the iceberg-sdk here. +/// zone-map is used to evaluate predicates on iceberg tables. +/// Without zone-map, iceberg-sdk will still apply the predicate on its own. +/// See: . +pub struct BatchIcebergPredicatePushDownRule {} + +impl Rule for BatchIcebergPredicatePushDownRule { + fn apply(&self, plan: PlanRef) -> Option { + let filter: &BatchFilter = plan.as_batch_filter()?; + let input = filter.input(); + let scan: &BatchIcebergScan = input.as_batch_iceberg_scan()?; + // NOTE(kwannoel): We only fill iceberg predicate here. + assert_eq!(scan.predicate, IcebergPredicate::AlwaysTrue); + + let predicate = filter.predicate().clone(); + let (iceberg_predicate, rw_predicate) = + rw_predicate_to_iceberg_predicate(predicate, scan.schema().fields()); + let scan = scan.clone_with_predicate(iceberg_predicate); + if rw_predicate.always_true() { + Some(scan.into()) + } else { + let filter = filter + .clone_with_input(scan.into()) + .clone_with_predicate(rw_predicate); + Some(filter.into()) + } + } +} + +fn rw_literal_to_iceberg_datum(literal: &Literal) -> Option { + let Some(scalar) = literal.get_data() else { + return None; + }; + match scalar { + ScalarImpl::Bool(b) => Some(IcebergDatum::bool(*b)), + ScalarImpl::Int32(i) => Some(IcebergDatum::int(*i)), + ScalarImpl::Int64(i) => Some(IcebergDatum::long(*i)), + ScalarImpl::Float32(f) => Some(IcebergDatum::float(*f)), + ScalarImpl::Float64(f) => Some(IcebergDatum::double(*f)), + ScalarImpl::Decimal(d) => { + let Decimal::Normalized(d) = d else { + return None; + }; + let Ok(d) = IcebergDatum::decimal(*d) else { + return None; + }; + Some(d) + } + ScalarImpl::Date(d) => { + let Ok(datum) = IcebergDatum::date_from_ymd(d.0.year(), d.0.month(), d.0.day()) else { + return None; + }; + Some(datum) + } + ScalarImpl::Timestamp(t) => Some(IcebergDatum::timestamp_nanos( + t.0.and_utc().timestamp_nanos_opt()?, + )), + ScalarImpl::Timestamptz(t) => Some(IcebergDatum::timestamptz_micros(t.timestamp_micros())), + ScalarImpl::Utf8(s) => Some(IcebergDatum::string(s)), + ScalarImpl::Bytea(b) => Some(IcebergDatum::binary(b.clone())), + _ => None, + } +} + +fn rw_expr_to_iceberg_predicate(expr: &ExprImpl, fields: &[Field]) -> Option { + match expr { + ExprImpl::Literal(l) => match l.get_data() { + Some(ScalarImpl::Bool(b)) => { + if *b { + Some(IcebergPredicate::AlwaysTrue) + } else { + Some(IcebergPredicate::AlwaysFalse) + } + } + _ => None, + }, + ExprImpl::FunctionCall(f) => { + let args = f.inputs(); + match f.func_type() { + ExprType::Not => { + let arg = rw_expr_to_iceberg_predicate(&args[0], fields)?; + Some(IcebergPredicate::negate(arg)) + } + ExprType::And => { + let arg0 = rw_expr_to_iceberg_predicate(&args[0], fields)?; + let arg1 = rw_expr_to_iceberg_predicate(&args[1], fields)?; + Some(IcebergPredicate::and(arg0, arg1)) + } + ExprType::Or => { + let arg0 = rw_expr_to_iceberg_predicate(&args[0], fields)?; + let arg1 = rw_expr_to_iceberg_predicate(&args[1], fields)?; + Some(IcebergPredicate::or(arg0, arg1)) + } + ExprType::Equal => match [&args[0], &args[1]] { + [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] + | [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + let datum = rw_literal_to_iceberg_datum(rhs)?; + Some(reference.equal_to(datum)) + } + _ => None, + }, + ExprType::NotEqual => match [&args[0], &args[1]] { + [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] + | [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + let datum = rw_literal_to_iceberg_datum(rhs)?; + Some(reference.not_equal_to(datum)) + } + _ => None, + }, + ExprType::GreaterThan => match [&args[0], &args[1]] { + [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + let datum = rw_literal_to_iceberg_datum(rhs)?; + Some(reference.greater_than(datum)) + } + [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + let datum = rw_literal_to_iceberg_datum(rhs)?; + Some(reference.less_than_or_equal_to(datum)) + } + _ => None, + }, + ExprType::GreaterThanOrEqual => match [&args[0], &args[1]] { + [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + let datum = rw_literal_to_iceberg_datum(rhs)?; + Some(reference.greater_than_or_equal_to(datum)) + } + [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + let datum = rw_literal_to_iceberg_datum(rhs)?; + Some(reference.less_than(datum)) + } + _ => None, + }, + ExprType::LessThan => match [&args[0], &args[1]] { + [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + let datum = rw_literal_to_iceberg_datum(rhs)?; + Some(reference.less_than(datum)) + } + [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + let datum = rw_literal_to_iceberg_datum(rhs)?; + Some(reference.greater_than_or_equal_to(datum)) + } + _ => None, + }, + ExprType::LessThanOrEqual => match [&args[0], &args[1]] { + [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + let datum = rw_literal_to_iceberg_datum(rhs)?; + Some(reference.less_than_or_equal_to(datum)) + } + [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + let datum = rw_literal_to_iceberg_datum(rhs)?; + Some(reference.greater_than(datum)) + } + _ => None, + }, + ExprType::IsNull => match &args[0] { + ExprImpl::InputRef(lhs) => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + Some(reference.is_null()) + } + _ => None, + }, + ExprType::IsNotNull => match &args[0] { + ExprImpl::InputRef(lhs) => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + Some(reference.is_not_null()) + } + _ => None, + }, + ExprType::In => match &args[0] { + ExprImpl::InputRef(lhs) => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + let mut datums = Vec::with_capacity(args.len() - 1); + for arg in &args[1..] { + if let ExprImpl::Literal(l) = arg { + if let Some(datum) = rw_literal_to_iceberg_datum(l) { + datums.push(datum); + } else { + return None; + } + } else { + return None; + } + } + Some(reference.is_in(datums)) + } + _ => None, + }, + _ => None, + } + } + _ => None, + } +} +fn rw_predicate_to_iceberg_predicate( + predicate: Condition, + fields: &[Field], +) -> (IcebergPredicate, Condition) { + if predicate.always_true() { + return (IcebergPredicate::AlwaysTrue, predicate); + } + + let mut conjunctions = predicate.conjunctions; + let mut ignored_conjunctions: Vec = Vec::with_capacity(conjunctions.len()); + + let mut iceberg_condition_root = None; + while let Some(conjunction) = conjunctions.pop() { + match rw_expr_to_iceberg_predicate(&conjunction, fields) { + iceberg_predicate @ Some(_) => { + iceberg_condition_root = iceberg_predicate; + break; + } + None => { + ignored_conjunctions.push(conjunction); + continue; + } + } + } + + let mut iceberg_condition_root = match iceberg_condition_root { + Some(p) => p, + None => { + return ( + IcebergPredicate::AlwaysTrue, + Condition { + conjunctions: ignored_conjunctions, + }, + ) + } + }; + + for rw_condition in conjunctions { + match rw_expr_to_iceberg_predicate(&rw_condition, fields) { + Some(iceberg_predicate) => { + iceberg_condition_root = iceberg_condition_root.and(iceberg_predicate) + } + None => ignored_conjunctions.push(rw_condition), + } + } + ( + iceberg_condition_root, + Condition { + conjunctions: ignored_conjunctions, + }, + ) +} + +impl BatchIcebergPredicatePushDownRule { + pub fn create() -> BoxedRule { + Box::new(BatchIcebergPredicatePushDownRule {}) + } +} diff --git a/src/frontend/src/optimizer/rule/batch/mod.rs b/src/frontend/src/optimizer/rule/batch/mod.rs index 6061c985b669..c4d31faf3cfb 100644 --- a/src/frontend/src/optimizer/rule/batch/mod.rs +++ b/src/frontend/src/optimizer/rule/batch/mod.rs @@ -12,5 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod batch_iceberg_predicate_pushdown; pub(crate) mod batch_project_merge_rule; pub mod batch_push_limit_to_scan_rule; diff --git a/src/frontend/src/optimizer/rule/mod.rs b/src/frontend/src/optimizer/rule/mod.rs index 7468f1c96524..e9bd08e6c679 100644 --- a/src/frontend/src/optimizer/rule/mod.rs +++ b/src/frontend/src/optimizer/rule/mod.rs @@ -165,6 +165,7 @@ mod table_function_to_mysql_query_rule; mod table_function_to_postgres_query_rule; mod values_extract_project_rule; +pub use batch::batch_iceberg_predicate_pushdown::*; pub use batch::batch_push_limit_to_scan_rule::*; pub use pull_up_correlated_predicate_agg_rule::*; pub use source_to_iceberg_scan_rule::*; @@ -248,6 +249,7 @@ macro_rules! for_all_rules { , { AggCallMergeRule } , { ValuesExtractProjectRule } , { BatchPushLimitToScanRule } + , { BatchIcebergPredicatePushDownRule } , { PullUpCorrelatedPredicateAggRule } , { SourceToKafkaScanRule } , { SourceToIcebergScanRule } diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 90984750bc46..9cec27601a24 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -22,6 +22,7 @@ use anyhow::anyhow; use async_recursion::async_recursion; use enum_as_inner::EnumAsInner; use futures::TryStreamExt; +use iceberg::expr::Predicate as IcebergPredicate; use itertools::Itertools; use pgwire::pg_server::SessionId; use risingwave_batch::error::BatchError; @@ -268,11 +269,25 @@ impl Query { } } +#[derive(Debug, Clone)] +pub enum SourceFetchParameters { + IcebergPredicate(IcebergPredicate), + KafkaTimebound { + lower: Option, + upper: Option, + }, + Empty, +} + #[derive(Debug, Clone)] pub struct SourceFetchInfo { pub schema: Schema, + /// These are user-configured connector properties. + /// e.g. host, username, etc... pub connector: ConnectorProperties, - pub timebound: (Option, Option), + /// These parameters are internally derived by the plan node. + /// e.g. predicate pushdown for iceberg, timebound for kafka. + pub fetch_parameters: SourceFetchParameters, pub as_of: Option, } @@ -295,13 +310,16 @@ impl SourceScanInfo { unreachable!("Never call complete when SourceScanInfo is already complete") } }; - match fetch_info.connector { - ConnectorProperties::Kafka(prop) => { + match (fetch_info.connector, fetch_info.fetch_parameters) { + ( + ConnectorProperties::Kafka(prop), + SourceFetchParameters::KafkaTimebound { lower, upper }, + ) => { let mut kafka_enumerator = KafkaSplitEnumerator::new(*prop, SourceEnumeratorContext::dummy().into()) .await?; let split_info = kafka_enumerator - .list_splits_batch(fetch_info.timebound.0, fetch_info.timebound.1) + .list_splits_batch(lower, upper) .await? .into_iter() .map(SplitImpl::Kafka) @@ -309,7 +327,7 @@ impl SourceScanInfo { Ok(SourceScanInfo::Complete(split_info)) } - ConnectorProperties::OpendalS3(prop) => { + (ConnectorProperties::OpendalS3(prop), SourceFetchParameters::Empty) => { let lister: OpendalEnumerator = OpendalEnumerator::new_s3_source(prop.s3_properties, prop.assume_role)?; let stream = build_opendal_fs_list_for_batch(lister); @@ -322,7 +340,7 @@ impl SourceScanInfo { Ok(SourceScanInfo::Complete(res)) } - ConnectorProperties::Gcs(prop) => { + (ConnectorProperties::Gcs(prop), SourceFetchParameters::Empty) => { let lister: OpendalEnumerator = OpendalEnumerator::new_gcs_source(*prop)?; let stream = build_opendal_fs_list_for_batch(lister); @@ -331,7 +349,7 @@ impl SourceScanInfo { Ok(SourceScanInfo::Complete(res)) } - ConnectorProperties::Azblob(prop) => { + (ConnectorProperties::Azblob(prop), SourceFetchParameters::Empty) => { let lister: OpendalEnumerator = OpendalEnumerator::new_azblob_source(*prop)?; let stream = build_opendal_fs_list_for_batch(lister); @@ -340,7 +358,10 @@ impl SourceScanInfo { Ok(SourceScanInfo::Complete(res)) } - ConnectorProperties::Iceberg(prop) => { + ( + ConnectorProperties::Iceberg(prop), + SourceFetchParameters::IcebergPredicate(predicate), + ) => { let iceberg_enumerator = IcebergSplitEnumerator::new(*prop, SourceEnumeratorContext::dummy().into()) .await?; @@ -369,7 +390,12 @@ impl SourceScanInfo { }; let split_info = iceberg_enumerator - .list_splits_batch(fetch_info.schema, time_travel_info, batch_parallelism) + .list_splits_batch( + fetch_info.schema, + time_travel_info, + batch_parallelism, + predicate, + ) .await? .into_iter() .map(SplitImpl::Iceberg) @@ -1068,7 +1094,10 @@ impl BatchPlanFragmenter { return Ok(Some(SourceScanInfo::new(SourceFetchInfo { schema: batch_kafka_scan.base.schema().clone(), connector: property, - timebound: timestamp_bound, + fetch_parameters: SourceFetchParameters::KafkaTimebound { + lower: timestamp_bound.0, + upper: timestamp_bound.1, + }, as_of: None, }))); } @@ -1082,7 +1111,9 @@ impl BatchPlanFragmenter { return Ok(Some(SourceScanInfo::new(SourceFetchInfo { schema: batch_iceberg_scan.base.schema().clone(), connector: property, - timebound: (None, None), + fetch_parameters: SourceFetchParameters::IcebergPredicate( + batch_iceberg_scan.predicate.clone(), + ), as_of, }))); } @@ -1097,7 +1128,7 @@ impl BatchPlanFragmenter { return Ok(Some(SourceScanInfo::new(SourceFetchInfo { schema: source_node.base.schema().clone(), connector: property, - timebound: (None, None), + fetch_parameters: SourceFetchParameters::Empty, as_of, }))); } From fa99969ebbfbd46476455d7dc33a553aa18ab68a Mon Sep 17 00:00:00 2001 From: StrikeW Date: Thu, 14 Nov 2024 17:41:11 +0800 Subject: [PATCH 4/7] fix(pg-cdc): write transactional WAL message as heartbeat action (#19385) --- .../src/main/resources/postgres.properties | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/postgres.properties b/java/connector-node/risingwave-connector-service/src/main/resources/postgres.properties index 89701280b246..3922ad2fdf47 100644 --- a/java/connector-node/risingwave-connector-service/src/main/resources/postgres.properties +++ b/java/connector-node/risingwave-connector-service/src/main/resources/postgres.properties @@ -19,9 +19,9 @@ publication.autocreate.mode=disabled publication.name=${publication.name:-rw_publication} # default heartbeat interval 5 mins heartbeat.interval.ms=${debezium.heartbeat.interval.ms:-300000} -# emit a WAL message to the replication stream +# emit a transactional WAL message to the replication stream # see https://github.com/risingwavelabs/risingwave/issues/16697 for more details -heartbeat.action.query=SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar) +heartbeat.action.query=SELECT pg_logical_emit_message(true, 'heartbeat', now()::varchar) # In sharing cdc source mode, we will subscribe to multiple tables in the given database, # so here we set ${table.name} to a default value `RW_CDC_Sharing` just for display. name=${hostname}:${port}:${database.name}.${schema.name}.${table.name:-RW_CDC_Sharing} From c1435dd7bc1f5281bee14a2845e925c9f5e76347 Mon Sep 17 00:00:00 2001 From: August Date: Thu, 14 Nov 2024 17:46:32 +0800 Subject: [PATCH 5/7] fix: fix system acl column type to support `\l` command (#19379) --- .../batch/catalog/slash_l_database.slt.part | 17 +++++++++++++ .../src/catalog/system_catalog/mod.rs | 25 ++++++++----------- .../system_catalog/pg_catalog/pg_database.rs | 2 +- .../system_catalog/pg_catalog/pg_namespace.rs | 2 +- .../pg_catalog/pg_tablespace.rs | 2 +- .../rw_catalog/rw_connections.rs | 4 +-- .../system_catalog/rw_catalog/rw_databases.rs | 2 +- .../system_catalog/rw_catalog/rw_functions.rs | 2 +- .../system_catalog/rw_catalog/rw_indexes.rs | 4 +-- .../rw_catalog/rw_internal_tables.rs | 2 +- .../rw_catalog/rw_materialized_views.rs | 2 +- .../system_catalog/rw_catalog/rw_relations.rs | 2 +- .../system_catalog/rw_catalog/rw_schemas.rs | 2 +- .../system_catalog/rw_catalog/rw_secrets.rs | 4 +-- .../system_catalog/rw_catalog/rw_sinks.rs | 2 +- .../system_catalog/rw_catalog/rw_sources.rs | 2 +- .../rw_catalog/rw_subscriptions.rs | 2 +- .../rw_catalog/rw_system_tables.rs | 2 +- .../system_catalog/rw_catalog/rw_tables.rs | 2 +- .../system_catalog/rw_catalog/rw_views.rs | 2 +- 20 files changed, 48 insertions(+), 36 deletions(-) create mode 100644 e2e_test/batch/catalog/slash_l_database.slt.part diff --git a/e2e_test/batch/catalog/slash_l_database.slt.part b/e2e_test/batch/catalog/slash_l_database.slt.part new file mode 100644 index 000000000000..1b1d463029cc --- /dev/null +++ b/e2e_test/batch/catalog/slash_l_database.slt.part @@ -0,0 +1,17 @@ +# wrapped test of `\l` command for better consistency. +query T +SELECT count(*) > 0 +FROM +(SELECT + d.datname AS "Name", + pg_catalog.pg_get_userbyid (d.datdba) AS "Owner", + pg_catalog.pg_encoding_to_char (d.encoding) AS "Encoding", + d.datcollate AS "Collate", + d.datctype AS "Ctype", + pg_catalog.array_to_string (d.datacl, E'\n') AS "Access privileges" +FROM + pg_catalog.pg_database AS d +ORDER BY + 1); +---- +t diff --git a/src/frontend/src/catalog/system_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/mod.rs index 432266d1871a..b374567c4372 100644 --- a/src/frontend/src/catalog/system_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/mod.rs @@ -237,9 +237,8 @@ fn get_acl_items( for_dml_table: bool, users: &Vec, username_map: &HashMap, -) -> String { - let mut res = String::from("{"); - let mut empty_flag = true; +) -> Vec { + let mut res = vec![]; let super_privilege = available_prost_privilege(*object, for_dml_table); for user in users { let privileges = if user.is_super { @@ -263,25 +262,21 @@ fn get_acl_items( }) }); for (granted_by, actions) in grantor_map { - if empty_flag { - empty_flag = false; - } else { - res.push(','); - } - res.push_str(&user.name); - res.push('='); + let mut aclitem = String::new(); + aclitem.push_str(&user.name); + aclitem.push('='); for (action, option) in actions { - res.push_str(&AclMode::from(action).to_string()); + aclitem.push_str(&AclMode::from(action).to_string()); if option { - res.push('*'); + aclitem.push('*'); } } - res.push('/'); + aclitem.push('/'); // should be able to query grantor's name - res.push_str(username_map.get(&granted_by).unwrap()); + aclitem.push_str(username_map.get(&granted_by).unwrap()); + res.push(aclitem); } } - res.push('}'); res } diff --git a/src/frontend/src/catalog/system_catalog/pg_catalog/pg_database.rs b/src/frontend/src/catalog/system_catalog/pg_catalog/pg_database.rs index b0510d024434..183200af8a49 100644 --- a/src/frontend/src/catalog/system_catalog/pg_catalog/pg_database.rs +++ b/src/frontend/src/catalog/system_catalog/pg_catalog/pg_database.rs @@ -58,5 +58,5 @@ struct PgDatabase { datallowconn: bool, datconnlimit: i32, dattablespace: i32, - datacl: String, + datacl: Vec, } diff --git a/src/frontend/src/catalog/system_catalog/pg_catalog/pg_namespace.rs b/src/frontend/src/catalog/system_catalog/pg_catalog/pg_namespace.rs index 69506da1ecd6..e01023f75d79 100644 --- a/src/frontend/src/catalog/system_catalog/pg_catalog/pg_namespace.rs +++ b/src/frontend/src/catalog/system_catalog/pg_catalog/pg_namespace.rs @@ -28,5 +28,5 @@ struct PgNamespace { oid: i32, nspname: String, nspowner: i32, - nspacl: String, + nspacl: Vec, } diff --git a/src/frontend/src/catalog/system_catalog/pg_catalog/pg_tablespace.rs b/src/frontend/src/catalog/system_catalog/pg_catalog/pg_tablespace.rs index 81ef923bfa24..f6d35011a27f 100644 --- a/src/frontend/src/catalog/system_catalog/pg_catalog/pg_tablespace.rs +++ b/src/frontend/src/catalog/system_catalog/pg_catalog/pg_tablespace.rs @@ -24,6 +24,6 @@ struct PgTablespace { oid: i32, spcname: String, spcowner: i32, - spcacl: String, + spcacl: Vec, spcoptions: String, } diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_connections.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_connections.rs index 2af0b29b16f7..fcc7e8efc338 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_connections.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_connections.rs @@ -27,7 +27,7 @@ struct RwConnection { owner: i32, type_: String, provider: String, - acl: String, + acl: Vec, } #[system_catalog(table, "rw_catalog.rw_connections")] @@ -44,7 +44,7 @@ fn read_rw_connections(reader: &SysCatalogReaderImpl) -> Result, } #[system_catalog(table, "rw_catalog.rw_databases")] diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_functions.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_functions.rs index 9f002dcab6f1..ce34bfcef42b 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_functions.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_functions.rs @@ -31,7 +31,7 @@ struct RwFunction { return_type_id: i32, language: String, link: Option, - acl: String, + acl: Vec, always_retry_on_network_error: bool, } diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_indexes.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_indexes.rs index 558e628a3fbf..b1d42a1ba0c6 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_indexes.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_indexes.rs @@ -29,7 +29,7 @@ struct RwIndex { schema_id: i32, owner: i32, definition: String, - acl: String, + acl: Vec, initialized_at: Option, created_at: Option, initialized_at_cluster_version: Option, @@ -76,7 +76,7 @@ fn read_rw_indexes(reader: &SysCatalogReaderImpl) -> Result> { schema_id: schema.id() as i32, owner: index.index_table.owner as i32, definition: index.index_table.create_sql(), - acl: "".into(), + acl: vec![], initialized_at: index.initialized_at_epoch.map(|e| e.as_timestamptz()), created_at: index.created_at_epoch.map(|e| e.as_timestamptz()), initialized_at_cluster_version: index.initialized_at_cluster_version.clone(), diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_internal_tables.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_internal_tables.rs index 9ea91bfa5073..989226d104a5 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_internal_tables.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_internal_tables.rs @@ -27,7 +27,7 @@ struct RwInternalTable { schema_id: i32, owner: i32, definition: String, - acl: String, + acl: Vec, initialized_at: Option, created_at: Option, initialized_at_cluster_version: Option, diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_materialized_views.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_materialized_views.rs index a0e8d98b24b6..a1ce41737ab8 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_materialized_views.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_materialized_views.rs @@ -29,7 +29,7 @@ struct RwMaterializedView { owner: i32, definition: String, append_only: bool, - acl: String, + acl: Vec, initialized_at: Option, created_at: Option, initialized_at_cluster_version: Option, diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_relations.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_relations.rs index abe39dbc329a..a500020e6289 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_relations.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_relations.rs @@ -37,5 +37,5 @@ struct RwRelation { schema_id: i32, owner: i32, definition: String, - acl: String, + acl: Vec, } diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_schemas.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_schemas.rs index 8d8786e0b109..adaf6563d4ae 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_schemas.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_schemas.rs @@ -26,7 +26,7 @@ struct RwSchema { id: i32, name: String, owner: i32, - acl: String, + acl: Vec, } #[system_catalog(table, "rw_catalog.rw_schemas")] diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_secrets.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_secrets.rs index 09c9a98798e0..33a43c3de51f 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_secrets.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_secrets.rs @@ -24,7 +24,7 @@ struct RwSecret { id: i32, name: String, owner: i32, - acl: String, + acl: Vec, } #[system_catalog(table, "rw_catalog.rw_secrets")] @@ -38,7 +38,7 @@ fn read_rw_view_info(reader: &SysCatalogReaderImpl) -> Result> { id: secret.id.secret_id() as i32, name: secret.name.clone(), owner: secret.owner as i32, - acl: "".into(), + acl: vec![], }) }) .collect()) diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sinks.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sinks.rs index a7bee63805fc..e382a5b7dfaf 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sinks.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sinks.rs @@ -31,7 +31,7 @@ struct RwSink { sink_type: String, connection_id: Option, definition: String, - acl: String, + acl: Vec, initialized_at: Option, created_at: Option, initialized_at_cluster_version: Option, diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs index 40df3dfc3a84..bdcfe355057a 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs @@ -35,7 +35,7 @@ struct RwSource { associated_table_id: Option, connection_id: Option, definition: String, - acl: String, + acl: Vec, initialized_at: Option, created_at: Option, initialized_at_cluster_version: Option, diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_subscriptions.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_subscriptions.rs index 95d22630475a..c5c1e108e3ef 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_subscriptions.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_subscriptions.rs @@ -27,7 +27,7 @@ struct RwSubscription { schema_id: i32, owner: i32, definition: String, - acl: String, + acl: Vec, initialized_at: Option, created_at: Option, initialized_at_cluster_version: Option, diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_system_tables.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_system_tables.rs index 0fda14d72687..17d3001e3263 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_system_tables.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_system_tables.rs @@ -28,7 +28,7 @@ struct SystemTable { schema_id: i32, owner: i32, definition: Option, - acl: String, + acl: Vec, } #[system_catalog(table, "rw_catalog.rw_system_tables")] diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_tables.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_tables.rs index 78416c97b71a..d991315d26b7 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_tables.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_tables.rs @@ -28,7 +28,7 @@ struct RwTable { owner: i32, definition: String, append_only: bool, - acl: String, + acl: Vec, initialized_at: Option, created_at: Option, initialized_at_cluster_version: Option, diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_views.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_views.rs index 7c156d783a1c..2141f808362f 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_views.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_views.rs @@ -27,7 +27,7 @@ struct RwView { schema_id: i32, owner: i32, definition: String, - acl: String, + acl: Vec, } #[system_catalog(table, "rw_catalog.rw_views")] From daed1f243da61ad8132f6f14a702dce6936b825d Mon Sep 17 00:00:00 2001 From: Dylan Date: Thu, 14 Nov 2024 18:20:02 +0800 Subject: [PATCH 6/7] feat(iceberg): make wrehouse.path optional for iceberg rest catalog (#19380) --- .../src/connector_common/iceberg/mod.rs | 122 +++++++++++------- src/connector/src/sink/iceberg/mod.rs | 26 ++-- src/connector/with_options_sink.yaml | 2 +- src/connector/with_options_source.yaml | 2 +- 4 files changed, 97 insertions(+), 55 deletions(-) diff --git a/src/connector/src/connector_common/iceberg/mod.rs b/src/connector/src/connector_common/iceberg/mod.rs index 3d1b016577c6..d10a9eefb68a 100644 --- a/src/connector/src/connector_common/iceberg/mod.rs +++ b/src/connector/src/connector_common/iceberg/mod.rs @@ -49,7 +49,7 @@ pub struct IcebergCommon { pub secret_key: String, /// Path of iceberg warehouse, only applicable in storage catalog. #[serde(rename = "warehouse.path")] - pub warehouse_path: String, + pub warehouse_path: Option, /// Catalog name, can be omitted for storage catalog, but /// must be set for other catalogs. #[serde(rename = "catalog.name")] @@ -142,23 +142,31 @@ impl IcebergCommon { self.secret_key.clone().to_string(), ); - let (bucket, _) = { - let url = Url::parse(&self.warehouse_path) - .with_context(|| format!("Invalid warehouse path: {}", self.warehouse_path))?; - let bucket = url - .host_str() - .with_context(|| { - format!( - "Invalid s3 path: {}, bucket is missing", - self.warehouse_path - ) - })? - .to_string(); - let root = url.path().trim_start_matches('/').to_string(); - (bucket, root) - }; + match &self.warehouse_path { + Some(warehouse_path) => { + let (bucket, _) = { + let url = Url::parse(warehouse_path).with_context(|| { + format!("Invalid warehouse path: {}", warehouse_path) + })?; + let bucket = url + .host_str() + .with_context(|| { + format!("Invalid s3 path: {}, bucket is missing", warehouse_path) + })? + .to_string(); + let root = url.path().trim_start_matches('/').to_string(); + (bucket, root) + }; + + iceberg_configs.insert("iceberg.table.io.bucket".to_string(), bucket); + } + None => { + if catalog_type != "rest" { + bail!("`warehouse.path` must be set in {} catalog", &catalog_type); + } + } + } - iceberg_configs.insert("iceberg.table.io.bucket".to_string(), bucket); // #TODO // Support load config file iceberg_configs.insert( @@ -176,7 +184,9 @@ impl IcebergCommon { java_catalog_configs.insert("uri".to_string(), uri.to_string()); } - java_catalog_configs.insert("warehouse".to_string(), self.warehouse_path.clone()); + if let Some(warehouse_path) = &self.warehouse_path { + java_catalog_configs.insert("warehouse".to_string(), warehouse_path.clone()); + } java_catalog_configs.extend(java_catalog_props.clone()); // Currently we only support s3, so let's set it to s3 @@ -241,6 +251,7 @@ impl IcebergCommon { /// icelake mod v1 { + use anyhow::anyhow; use icelake::catalog::{load_catalog, CatalogRef}; use icelake::{Table, TableIdentifier}; @@ -268,7 +279,9 @@ mod v1 { "storage" => { iceberg_configs.insert( format!("iceberg.catalog.{}.warehouse", self.catalog_name()), - self.warehouse_path.clone(), + self.warehouse_path.clone().ok_or_else(|| { + anyhow!("`warehouse.path` must be set in storage catalog") + })?, ); iceberg_configs.insert(CATALOG_TYPE.to_string(), "storage".into()); } @@ -318,28 +331,36 @@ mod v1 { ); } - let (bucket, root) = { - let url = Url::parse(&self.warehouse_path) - .with_context(|| format!("Invalid warehouse path: {}", self.warehouse_path))?; - let bucket = url - .host_str() - .with_context(|| { - format!( - "Invalid s3 path: {}, bucket is missing", - self.warehouse_path - ) - })? - .to_string(); - let root = url.path().trim_start_matches('/').to_string(); - (bucket, root) - }; + match &self.warehouse_path { + Some(warehouse_path) => { + let (bucket, root) = { + let url = Url::parse(warehouse_path).with_context(|| { + format!("Invalid warehouse path: {}", warehouse_path) + })?; + let bucket = url + .host_str() + .with_context(|| { + format!("Invalid s3 path: {}, bucket is missing", warehouse_path) + })? + .to_string(); + let root = url.path().trim_start_matches('/').to_string(); + (bucket, root) + }; - iceberg_configs.insert("iceberg.table.io.bucket".to_string(), bucket); + iceberg_configs.insert("iceberg.table.io.bucket".to_string(), bucket); - // Only storage catalog should set this. - if catalog_type == "storage" { - iceberg_configs.insert("iceberg.table.io.root".to_string(), root); + // Only storage catalog should set this. + if catalog_type == "storage" { + iceberg_configs.insert("iceberg.table.io.root".to_string(), root); + } + } + None => { + if catalog_type == "storage" { + bail!("`warehouse.path` must be set in storage catalog"); + } + } } + // #TODO // Support load config file iceberg_configs.insert( @@ -416,6 +437,7 @@ mod v1 { /// iceberg-rust mod v2 { + use anyhow::anyhow; use iceberg::spec::TableMetadata; use iceberg::table::Table as TableV2; use iceberg::{Catalog as CatalogV2, TableIdent}; @@ -442,7 +464,9 @@ mod v2 { match self.catalog_type() { "storage" => { let config = storage_catalog::StorageCatalogConfig::builder() - .warehouse(self.warehouse_path.clone()) + .warehouse(self.warehouse_path.clone().ok_or_else(|| { + anyhow!("`warehouse.path` must be set in storage catalog") + })?) .access_key(self.access_key.clone()) .secret_key(self.secret_key.clone()) .region(self.region.clone()) @@ -468,12 +492,18 @@ mod v2 { S3_SECRET_ACCESS_KEY.to_string(), self.secret_key.clone().to_string(), ); - let config = iceberg_catalog_rest::RestCatalogConfig::builder() + let config_builder = iceberg_catalog_rest::RestCatalogConfig::builder() .uri(self.catalog_uri.clone().with_context(|| { "`catalog.uri` must be set in rest catalog".to_string() })?) - .props(iceberg_configs) - .build(); + .props(iceberg_configs); + + let config = match &self.warehouse_path { + Some(warehouse_path) => { + config_builder.warehouse(warehouse_path.clone()).build() + } + None => config_builder.build(), + }; let catalog = iceberg_catalog_rest::RestCatalog::new(config); Ok(Arc::new(catalog)) } @@ -509,7 +539,9 @@ mod v2 { self.secret_key.clone().to_string(), ); let config_builder = iceberg_catalog_glue::GlueCatalogConfig::builder() - .warehouse(self.warehouse_path.clone()) + .warehouse(self.warehouse_path.clone().ok_or_else(|| { + anyhow!("`warehouse.path` must be set in glue catalog") + })?) .props(iceberg_configs); let config = if let Some(uri) = self.catalog_uri.as_deref() { config_builder.uri(uri.to_string()).build() @@ -575,7 +607,9 @@ mod v2 { match self.catalog_type() { "storage" => { let config = storage_catalog::StorageCatalogConfig::builder() - .warehouse(self.warehouse_path.clone()) + .warehouse(self.warehouse_path.clone().ok_or_else(|| { + anyhow!("`warehouse.path` must be set in storage catalog") + })?) .access_key(self.access_key.clone()) .secret_key(self.secret_key.clone()) .region(self.region.clone()) diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index e2f00afd5b52..0c878ae1ba6d 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -281,18 +281,26 @@ impl IcebergSink { let location = { let mut names = namespace.clone().inner(); names.push(self.config.common.table_name.to_string()); - if self.config.common.warehouse_path.ends_with('/') { - format!("{}{}", self.config.common.warehouse_path, names.join("/")) - } else { - format!("{}/{}", self.config.common.warehouse_path, names.join("/")) + match &self.config.common.warehouse_path { + Some(warehouse_path) => { + if warehouse_path.ends_with('/') { + Some(format!("{}{}", warehouse_path, names.join("/"))) + } else { + Some(format!("{}/{}", warehouse_path, names.join("/"))) + } + } + None => None, } }; - let table_creation = TableCreation::builder() + let table_creation_builder = TableCreation::builder() .name(self.config.common.table_name.clone()) - .schema(iceberg_schema) - .location(location) - .build(); + .schema(iceberg_schema); + + let table_creation = match location { + Some(location) => table_creation_builder.location(location).build(), + None => table_creation_builder.build(), + }; catalog .create_table(&namespace, table_creation) @@ -998,7 +1006,7 @@ mod test { let expected_iceberg_config = IcebergConfig { common: IcebergCommon { - warehouse_path: "s3://iceberg".to_string(), + warehouse_path: Some("s3://iceberg".to_string()), catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_string()), region: Some("us-east-1".to_string()), endpoint: Some("http://127.0.0.1:9301".to_string()), diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 2c286695b312..31579dfd7032 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -394,7 +394,7 @@ IcebergConfig: - name: warehouse.path field_type: String comments: Path of iceberg warehouse, only applicable in storage catalog. - required: true + required: false - name: catalog.name field_type: String comments: |- diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index 41c27a1af7eb..75972546b299 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -99,7 +99,7 @@ IcebergProperties: - name: warehouse.path field_type: String comments: Path of iceberg warehouse, only applicable in storage catalog. - required: true + required: false - name: catalog.name field_type: String comments: |- From c92069481d2aba5a77ffb673b48c7e27b5a698a6 Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Thu, 14 Nov 2024 18:43:01 +0800 Subject: [PATCH 7/7] refactor(barrier): add control request to explicitly create partial graph (#19383) --- proto/stream_service.proto | 12 +++- src/meta/src/barrier/checkpoint/control.rs | 18 +++--- src/meta/src/barrier/context/context_impl.rs | 6 +- src/meta/src/barrier/context/mod.rs | 4 +- src/meta/src/barrier/rpc.rs | 61 ++++++++++++++----- src/meta/src/barrier/worker.rs | 7 ++- src/rpc_client/src/stream_client.rs | 9 +-- src/stream/src/task/barrier_manager.rs | 52 ++++++++++++---- .../src/task/barrier_manager/managed_state.rs | 51 ++++++++++------ src/stream/src/task/stream_manager.rs | 8 ++- 10 files changed, 160 insertions(+), 68 deletions(-) diff --git a/proto/stream_service.proto b/proto/stream_service.proto index 8b006626cd74..62cc8746aeca 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -63,10 +63,19 @@ message WaitEpochCommitResponse { } message StreamingControlStreamRequest { - message InitRequest { + message InitialPartialGraph { + uint64 partial_graph_id = 1; repeated stream_plan.SubscriptionUpstreamInfo subscriptions = 2; } + message InitRequest { + repeated InitialPartialGraph graphs = 1; + } + + message CreatePartialGraphRequest { + uint64 partial_graph_id = 1; + } + message RemovePartialGraphRequest { repeated uint64 partial_graph_ids = 1; } @@ -75,6 +84,7 @@ message StreamingControlStreamRequest { InitRequest init = 1; InjectBarrierRequest inject_barrier = 2; RemovePartialGraphRequest remove_partial_graph = 3; + CreatePartialGraphRequest create_partial_graph = 4; } } diff --git a/src/meta/src/barrier/checkpoint/control.rs b/src/meta/src/barrier/checkpoint/control.rs index 31150554cc68..1489738c2f9e 100644 --- a/src/meta/src/barrier/checkpoint/control.rs +++ b/src/meta/src/barrier/checkpoint/control.rs @@ -24,7 +24,6 @@ use risingwave_meta_model::WorkerId; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::meta::PausedReason; -use risingwave_pb::stream_plan::PbSubscriptionUpstreamInfo; use risingwave_pb::stream_service::BarrierCompleteResponse; use tracing::{debug, warn}; @@ -40,7 +39,7 @@ use crate::barrier::schedule::{NewBarrier, PeriodicBarriers}; use crate::barrier::utils::collect_creating_job_commit_epoch_info; use crate::barrier::{ BarrierKind, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType, - SnapshotBackfillInfo, TracedEpoch, + InflightSubscriptionInfo, SnapshotBackfillInfo, TracedEpoch, }; use crate::manager::ActiveStreamingWorkerNodes; use crate::rpc::metrics::GLOBAL_META_METRICS; @@ -147,6 +146,7 @@ impl CheckpointControl { } else { new_database.state.in_flight_prev_epoch().clone() }; + control_stream_manager.add_partial_graph(database_id, None)?; (entry.insert(new_database), max_prev_epoch) } Command::Flush @@ -276,10 +276,12 @@ impl CheckpointControl { .for_each(|database| database.create_mview_tracker.abort_all()); } - pub(crate) fn subscriptions(&self) -> impl Iterator + '_ { - self.databases - .values() - .flat_map(|database| &database.state.inflight_subscription_info) + pub(crate) fn subscriptions( + &self, + ) -> impl Iterator + '_ { + self.databases.iter().map(|(database_id, database)| { + (*database_id, &database.state.inflight_subscription_info) + }) } } @@ -828,8 +830,10 @@ impl DatabaseCheckpointControl { .expect("checked Some") .to_mutation(None) .expect("should have some mutation in `CreateStreamingJob` command"); + let job_id = info.table_fragments.table_id(); + control_stream_manager.add_partial_graph(self.database_id, Some(job_id))?; self.creating_streaming_job_controls.insert( - info.table_fragments.table_id(), + job_id, CreatingStreamingJobControl::new( info.clone(), snapshot_backfill_info.clone(), diff --git a/src/meta/src/barrier/context/context_impl.rs b/src/meta/src/barrier/context/context_impl.rs index fee2a31550cc..2ccdee278286 100644 --- a/src/meta/src/barrier/context/context_impl.rs +++ b/src/meta/src/barrier/context/context_impl.rs @@ -18,7 +18,7 @@ use futures::future::try_join_all; use risingwave_pb::common::WorkerNode; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::meta::PausedReason; -use risingwave_pb::stream_plan::SubscriptionUpstreamInfo; +use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest; use risingwave_pb::stream_service::WaitEpochCommitRequest; use risingwave_rpc_client::StreamingControlHandle; @@ -70,9 +70,9 @@ impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl { async fn new_control_stream( &self, node: &WorkerNode, - subscriptions: impl Iterator, + init_request: &PbInitRequest, ) -> MetaResult { - self.new_control_stream_impl(node, subscriptions).await + self.new_control_stream_impl(node, init_request).await } async fn reload_runtime_info(&self) -> MetaResult { diff --git a/src/meta/src/barrier/context/mod.rs b/src/meta/src/barrier/context/mod.rs index e69b9644de8d..7306c1617162 100644 --- a/src/meta/src/barrier/context/mod.rs +++ b/src/meta/src/barrier/context/mod.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use arc_swap::ArcSwap; use risingwave_pb::common::WorkerNode; use risingwave_pb::hummock::HummockVersionStats; -use risingwave_pb::stream_plan::SubscriptionUpstreamInfo; +use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest; use risingwave_rpc_client::StreamingControlHandle; use crate::barrier::command::CommandContext; @@ -60,7 +60,7 @@ pub(super) trait GlobalBarrierWorkerContext: Send + Sync + 'static { async fn new_control_stream( &self, node: &WorkerNode, - subscriptions: impl Iterator, + init_request: &PbInitRequest, ) -> MetaResult; async fn reload_runtime_info(&self) -> MetaResult; diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index ea84625b1933..dfb9f1cc13d3 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -30,7 +30,9 @@ use risingwave_pb::common::{ActorInfo, WorkerNode}; use risingwave_pb::meta::PausedReason; use risingwave_pb::stream_plan::barrier_mutation::Mutation; use risingwave_pb::stream_plan::{Barrier, BarrierMutation, StreamActor, SubscriptionUpstreamInfo}; -use risingwave_pb::stream_service::streaming_control_stream_request::RemovePartialGraphRequest; +use risingwave_pb::stream_service::streaming_control_stream_request::{ + CreatePartialGraphRequest, PbInitRequest, PbInitialPartialGraph, RemovePartialGraphRequest, +}; use risingwave_pb::stream_service::{ streaming_control_stream_request, streaming_control_stream_response, BarrierCompleteResponse, InjectBarrierRequest, StreamingControlStreamRequest, @@ -94,7 +96,7 @@ impl ControlStreamManager { pub(super) async fn add_worker( &mut self, node: WorkerNode, - subscriptions: impl Iterator, + initial_subscriptions: impl Iterator, context: &impl GlobalBarrierWorkerContext, ) { let node_id = node.id as WorkerId; @@ -106,13 +108,10 @@ impl ControlStreamManager { let mut backoff = ExponentialBackoff::from_millis(100) .max_delay(Duration::from_secs(3)) .factor(5); - let subscriptions = subscriptions.collect_vec(); + let init_request = Self::collect_init_request(initial_subscriptions); const MAX_RETRY: usize = 5; for i in 1..=MAX_RETRY { - match context - .new_control_stream(&node, subscriptions.iter().cloned()) - .await - { + match context.new_control_stream(&node, &init_request).await { Ok(handle) => { assert!(self .nodes @@ -141,16 +140,14 @@ impl ControlStreamManager { pub(super) async fn reset( &mut self, - subscriptions: impl Iterator, + initial_subscriptions: impl Iterator, nodes: &HashMap, context: &impl GlobalBarrierWorkerContext, ) -> MetaResult<()> { - let subscriptions = subscriptions.cloned().collect_vec(); - let subscriptions = &subscriptions; + let init_request = Self::collect_init_request(initial_subscriptions); + let init_request = &init_request; let nodes = try_join_all(nodes.iter().map(|(worker_id, node)| async move { - let handle = context - .new_control_stream(node, subscriptions.iter().flatten()) - .await?; + let handle = context.new_control_stream(node, init_request).await?; Result::<_, MetaError>::Ok(( *worker_id, ControlStreamNode { @@ -270,6 +267,19 @@ impl ControlStreamManager { tracing::debug!(?errors, "collected stream errors"); errors } + + fn collect_init_request( + initial_subscriptions: impl Iterator, + ) -> PbInitRequest { + PbInitRequest { + graphs: initial_subscriptions + .map(|(database_id, info)| PbInitialPartialGraph { + partial_graph_id: to_partial_graph_id(database_id, None), + subscriptions: info.into_iter().collect_vec(), + }) + .collect(), + } + } } impl ControlStreamManager { @@ -436,6 +446,27 @@ impl ControlStreamManager { Ok(node_need_collect) } + pub(super) fn add_partial_graph( + &mut self, + database_id: DatabaseId, + creating_job_id: Option, + ) -> MetaResult<()> { + let partial_graph_id = to_partial_graph_id(database_id, creating_job_id); + self.nodes.iter().try_for_each(|(_, node)| { + node.handle + .request_sender + .send(StreamingControlStreamRequest { + request: Some( + streaming_control_stream_request::Request::CreatePartialGraph( + CreatePartialGraphRequest { partial_graph_id }, + ), + ), + }) + .map_err(|_| anyhow!("failed to add partial graph")) + })?; + Ok(()) + } + pub(super) fn remove_partial_graph( &mut self, database_id: DatabaseId, @@ -472,14 +503,14 @@ impl GlobalBarrierWorkerContextImpl { pub(super) async fn new_control_stream_impl( &self, node: &WorkerNode, - subscriptions: impl Iterator, + init_request: &PbInitRequest, ) -> MetaResult { let handle = self .env .stream_client_pool() .get(node) .await? - .start_streaming_control(subscriptions) + .start_streaming_control(init_request.clone()) .await?; Ok(handle) } diff --git a/src/meta/src/barrier/worker.rs b/src/meta/src/barrier/worker.rs index ee0cdd97fb61..f6999e5ce923 100644 --- a/src/meta/src/barrier/worker.rs +++ b/src/meta/src/barrier/worker.rs @@ -14,7 +14,7 @@ use std::collections::HashMap; use std::mem::replace; -use std::sync::Arc; +use std::sync::{Arc, LazyLock}; use std::time::Duration; use arc_swap::ArcSwap; @@ -46,7 +46,7 @@ use crate::barrier::rpc::{merge_node_rpc_errors, ControlStreamManager}; use crate::barrier::schedule::PeriodicBarriers; use crate::barrier::{ schedule, BarrierKind, BarrierManagerRequest, BarrierManagerStatus, - BarrierWorkerRuntimeInfoSnapshot, RecoveryReason, TracedEpoch, + BarrierWorkerRuntimeInfoSnapshot, InflightSubscriptionInfo, RecoveryReason, TracedEpoch, }; use crate::error::MetaErrorInner; use crate::hummock::HummockManagerRef; @@ -558,9 +558,10 @@ impl GlobalBarrierWorker { let mut control_stream_manager = ControlStreamManager::new(self.env.clone()); let reset_start_time = Instant::now(); + let empty_subscriptions = LazyLock::new(InflightSubscriptionInfo::default); control_stream_manager .reset( - subscription_infos.values(), + database_fragment_infos.keys().map(|database_id| (*database_id, subscription_infos.get(database_id).unwrap_or_else(|| &*empty_subscriptions))), active_streaming_nodes.current(), &*self.context, ) diff --git a/src/rpc_client/src/stream_client.rs b/src/rpc_client/src/stream_client.rs index 6484adb1c921..9b83ab82d7fe 100644 --- a/src/rpc_client/src/stream_client.rs +++ b/src/rpc_client/src/stream_client.rs @@ -21,9 +21,8 @@ use futures::TryStreamExt; use risingwave_common::config::MAX_CONNECTION_WINDOW_SIZE; use risingwave_common::monitor::{EndpointExt, TcpConfig}; use risingwave_common::util::addr::HostAddr; -use risingwave_pb::stream_plan::SubscriptionUpstreamInfo; use risingwave_pb::stream_service::stream_service_client::StreamServiceClient; -use risingwave_pb::stream_service::streaming_control_stream_request::InitRequest; +use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest; use risingwave_pb::stream_service::streaming_control_stream_response::InitResponse; use risingwave_pb::stream_service::*; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -86,13 +85,11 @@ pub type StreamingControlHandle = impl StreamClient { pub async fn start_streaming_control( &self, - subscriptions: impl Iterator, + init_request: PbInitRequest, ) -> Result { let first_request = StreamingControlStreamRequest { request: Some(streaming_control_stream_request::Request::Init( - InitRequest { - subscriptions: subscriptions.collect(), - }, + init_request, )), }; let mut client = self.0.to_owned(); diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 0bfdcdd59117..155320281421 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -52,7 +52,9 @@ use risingwave_common::util::runtime::BackgroundShutdownRuntime; use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; use risingwave_hummock_sdk::{LocalSstableInfo, SyncResult}; use risingwave_pb::stream_plan::barrier::BarrierKind; -use risingwave_pb::stream_service::streaming_control_stream_request::{InitRequest, Request}; +use risingwave_pb::stream_service::streaming_control_stream_request::{ + InitRequest, InitialPartialGraph, Request, +}; use risingwave_pb::stream_service::streaming_control_stream_response::{ InitResponse, ShutdownResponse, }; @@ -64,7 +66,9 @@ use risingwave_pb::stream_service::{ use crate::executor::exchange::permit::Receiver; use crate::executor::monitor::StreamingMetrics; use crate::executor::{Barrier, BarrierInner, StreamExecutorError}; -use crate::task::barrier_manager::managed_state::ManagedBarrierStateDebugInfo; +use crate::task::barrier_manager::managed_state::{ + ManagedBarrierStateDebugInfo, PartialGraphManagedBarrierState, +}; use crate::task::barrier_manager::progress::BackfillState; /// If enabled, all actors will be grouped in the same tracing span within one epoch. @@ -273,7 +277,10 @@ pub(super) struct LocalBarrierWorker { } impl LocalBarrierWorker { - pub(super) fn new(actor_manager: Arc) -> Self { + pub(super) fn new( + actor_manager: Arc, + initial_partial_graphs: Vec, + ) -> Self { let (event_tx, event_rx) = unbounded_channel(); let (failure_tx, failure_rx) = unbounded_channel(); let shared_context = Arc::new(SharedContext::new( @@ -284,7 +291,11 @@ impl LocalBarrierWorker { }, )); Self { - state: ManagedBarrierState::new(actor_manager.clone(), shared_context.clone()), + state: ManagedBarrierState::new( + actor_manager.clone(), + shared_context.clone(), + initial_partial_graphs, + ), control_stream_handle: ControlStreamHandle::empty(), actor_manager, current_shared_context: shared_context, @@ -327,8 +338,7 @@ impl LocalBarrierWorker { match actor_op { LocalActorOperation::NewControlStream { handle, init_request } => { self.control_stream_handle.reset_stream_with_err(Status::internal("control stream has been reset to a new one")); - self.reset().await; - self.state.add_subscriptions(init_request.subscriptions); + self.reset(init_request.graphs).await; self.control_stream_handle = handle; self.control_stream_handle.send_response(StreamingControlStreamResponse { response: Some(streaming_control_stream_response::Response::Init(InitResponse {})) @@ -379,6 +389,10 @@ impl LocalBarrierWorker { ); Ok(()) } + Request::CreatePartialGraph(req) => { + self.add_partial_graph(PartialGraphId::new(req.partial_graph_id)); + Ok(()) + } Request::Init(_) => { unreachable!() } @@ -557,9 +571,20 @@ impl LocalBarrierWorker { } } + pub(super) fn add_partial_graph(&mut self, partial_graph_id: PartialGraphId) { + assert!(self + .state + .graph_states + .insert( + partial_graph_id, + PartialGraphManagedBarrierState::new(&self.actor_manager) + ) + .is_none()); + } + /// Reset all internal states. - pub(super) fn reset_state(&mut self) { - *self = Self::new(self.actor_manager.clone()); + pub(super) fn reset_state(&mut self, initial_partial_graphs: Vec) { + *self = Self::new(self.actor_manager.clone(), initial_partial_graphs); } /// When a [`crate::executor::StreamConsumer`] (typically [`crate::executor::DispatchExecutor`]) get a barrier, it should report @@ -659,7 +684,7 @@ impl LocalBarrierWorker { await_tree_reg, runtime: runtime.into(), }); - let worker = LocalBarrierWorker::new(actor_manager); + let worker = LocalBarrierWorker::new(actor_manager, vec![]); tokio::spawn(worker.run(actor_op_rx)) } } @@ -842,7 +867,9 @@ pub(crate) mod barrier_test_utils { use assert_matches::assert_matches; use futures::StreamExt; - use risingwave_pb::stream_service::streaming_control_stream_request::InitRequest; + use risingwave_pb::stream_service::streaming_control_stream_request::{ + InitRequest, PbInitialPartialGraph, + }; use risingwave_pb::stream_service::{ streaming_control_stream_request, streaming_control_stream_response, InjectBarrierRequest, StreamingControlStreamRequest, StreamingControlStreamResponse, @@ -876,7 +903,10 @@ pub(crate) mod barrier_test_utils { UnboundedReceiverStream::new(request_rx).boxed(), ), init_request: InitRequest { - subscriptions: vec![], + graphs: vec![PbInitialPartialGraph { + partial_graph_id: u64::MAX, + subscriptions: vec![], + }], }, }); diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index 462f78233f25..cd6bb924f478 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -142,6 +142,7 @@ mod await_epoch_completed_future { use await_epoch_completed_future::*; use risingwave_pb::stream_plan::SubscriptionUpstreamInfo; +use risingwave_pb::stream_service::streaming_control_stream_request::InitialPartialGraph; use risingwave_pb::stream_service::InjectBarrierRequest; fn sync_epoch( @@ -372,6 +373,8 @@ pub(super) struct PartialGraphManagedBarrierState { prev_barrier_table_ids: Option<(EpochPair, HashSet)>, + mv_depended_subscriptions: HashMap>, + /// Record the progress updates of creating mviews for each epoch of concurrent checkpoints. /// /// This is updated by [`super::CreateMviewProgressReporter::update`] and will be reported to meta @@ -390,7 +393,15 @@ pub(super) struct PartialGraphManagedBarrierState { } impl PartialGraphManagedBarrierState { - fn new( + pub(super) fn new(actor_manager: &StreamActorManager) -> Self { + Self::new_inner( + actor_manager.env.state_store(), + actor_manager.streaming_metrics.clone(), + actor_manager.await_tree_reg.clone(), + ) + } + + fn new_inner( state_store: StateStoreImpl, streaming_metrics: Arc, barrier_await_tree_reg: Option, @@ -398,6 +409,7 @@ impl PartialGraphManagedBarrierState { Self { epoch_barrier_state_map: Default::default(), prev_barrier_table_ids: None, + mv_depended_subscriptions: Default::default(), create_mview_progress: Default::default(), await_epoch_completed_futures: Default::default(), state_store, @@ -408,7 +420,7 @@ impl PartialGraphManagedBarrierState { #[cfg(test)] pub(crate) fn for_test() -> Self { - Self::new( + Self::new_inner( StateStoreImpl::for_test(), Arc::new(StreamingMetrics::unused()), None, @@ -425,8 +437,6 @@ pub(crate) struct ManagedBarrierState { pub(super) graph_states: HashMap, - mv_depended_subscriptions: HashMap>, - actor_manager: Arc, current_shared_context: Arc, @@ -437,11 +447,18 @@ impl ManagedBarrierState { pub(super) fn new( actor_manager: Arc, current_shared_context: Arc, + initial_partial_graphs: Vec, ) -> Self { Self { actor_states: Default::default(), - graph_states: Default::default(), - mv_depended_subscriptions: Default::default(), + graph_states: initial_partial_graphs + .into_iter() + .map(|graph| { + let mut state = PartialGraphManagedBarrierState::new(&actor_manager); + state.add_subscriptions(graph.subscriptions); + (PartialGraphId::new(graph.partial_graph_id), state) + }) + .collect(), actor_manager, current_shared_context, } @@ -506,7 +523,9 @@ impl ManagedBarrierState { .expect("should exist") .register_barrier_sender(tx) } +} +impl PartialGraphManagedBarrierState { pub(super) fn add_subscriptions(&mut self, subscriptions: Vec) { for subscription_to_add in subscriptions { if !self @@ -557,14 +576,14 @@ impl ManagedBarrierState { } } } +} +impl ManagedBarrierState { pub(super) fn transform_to_issued( &mut self, barrier: &Barrier, request: InjectBarrierRequest, ) -> StreamResult<()> { - self.add_subscriptions(request.subscriptions_to_add); - self.remove_subscriptions(request.subscriptions_to_remove); let partial_graph_id = PartialGraphId::new(request.partial_graph_id); let actor_to_stop = barrier.all_stop_actors(); let is_stop_actor = |actor_id| { @@ -574,14 +593,11 @@ impl ManagedBarrierState { }; let graph_state = self .graph_states - .entry(partial_graph_id) - .or_insert_with(|| { - PartialGraphManagedBarrierState::new( - self.actor_manager.env.state_store(), - self.actor_manager.streaming_metrics.clone(), - self.actor_manager.await_tree_reg.clone(), - ) - }); + .get_mut(&partial_graph_id) + .expect("should exist"); + + graph_state.add_subscriptions(request.subscriptions_to_add); + graph_state.remove_subscriptions(request.subscriptions_to_remove); graph_state.transform_to_issued( barrier, @@ -590,7 +606,8 @@ impl ManagedBarrierState { ); let mut new_actors = HashSet::new(); - let subscriptions = LazyCell::new(|| Arc::new(self.mv_depended_subscriptions.clone())); + let subscriptions = + LazyCell::new(|| Arc::new(graph_state.mv_depended_subscriptions.clone())); for actor in request.actors_to_build { let actor_id = actor.actor_id; assert!(!is_stop_actor(actor_id)); diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 361c5d9582d6..648afb81ebc8 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -33,7 +33,9 @@ use risingwave_pb::plan_common::StorageTableDesc; use risingwave_pb::stream_plan; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{StreamActor, StreamNode, StreamScanNode, StreamScanType}; -use risingwave_pb::stream_service::streaming_control_stream_request::InitRequest; +use risingwave_pb::stream_service::streaming_control_stream_request::{ + InitRequest, InitialPartialGraph, +}; use risingwave_pb::stream_service::{ StreamingControlStreamRequest, StreamingControlStreamResponse, }; @@ -248,7 +250,7 @@ impl LocalStreamManager { impl LocalBarrierWorker { /// Force stop all actors on this worker, and then drop their resources. - pub(super) async fn reset(&mut self) { + pub(super) async fn reset(&mut self, initial_partial_graphs: Vec) { self.state.abort_actors().await; if let Some(m) = self.actor_manager.await_tree_reg.as_ref() { m.clear(); @@ -260,8 +262,8 @@ impl LocalBarrierWorker { .verbose_instrument_await("store_clear_shared_buffer") .await } - self.reset_state(); self.actor_manager.env.dml_manager_ref().clear(); + self.reset_state(initial_partial_graphs); } }