From 9f12b10b34fd73a7b3b984c7d1fbeb0b721e583f Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 28 Oct 2024 18:19:39 +0800 Subject: [PATCH 1/9] fix(meta): fix vnode count data type when querying system table `rw_fragments` (#19155) Signed-off-by: Bugen Zhao --- src/meta/src/controller/utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index 6cf09e754c24f..2240f701a47bf 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -273,7 +273,7 @@ pub struct FragmentDesc { pub state_table_ids: I32Array, pub upstream_fragment_id: I32Array, pub parallelism: i64, - pub vnode_count: i64, + pub vnode_count: i32, } /// List all objects that are using the given one in a cascade way. It runs a recursive CTE to find all the dependencies. From 0d013eab1bba8cd86c575057e165586262362636 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Mon, 28 Oct 2024 18:20:27 +0800 Subject: [PATCH 2/9] perf(over window): pass through `Update`s not affecting window function outputs (#19056) Signed-off-by: Richard Chien --- .../over_window/generated/batch/main.slt.part | 1 + .../batch/no_effect_updates/mod.slt.part | 94 ++++++++++++++++ .../generated/streaming/main.slt.part | 1 + .../streaming/no_effect_updates/mod.slt.part | 94 ++++++++++++++++ e2e_test/over_window/templates/main.slt.part | 1 + .../templates/no_effect_updates/mod.slt.part | 92 ++++++++++++++++ src/common/src/array/stream_record.rs | 2 +- src/stream/src/common/table/state_table.rs | 8 +- .../src/executor/over_window/general.rs | 102 +++++++++++++++--- .../executor/over_window/over_partition.rs | 15 +-- 10 files changed, 384 insertions(+), 26 deletions(-) create mode 100644 e2e_test/over_window/generated/batch/no_effect_updates/mod.slt.part create mode 100644 e2e_test/over_window/generated/streaming/no_effect_updates/mod.slt.part create mode 100644 e2e_test/over_window/templates/no_effect_updates/mod.slt.part diff --git a/e2e_test/over_window/generated/batch/main.slt.part b/e2e_test/over_window/generated/batch/main.slt.part index 5e91838cfac89..dcd341e3e714f 100644 --- a/e2e_test/over_window/generated/batch/main.slt.part +++ b/e2e_test/over_window/generated/batch/main.slt.part @@ -9,3 +9,4 @@ include ./expr_in_win_func/mod.slt.part include ./agg_in_win_func/mod.slt.part include ./opt_agg_then_join/mod.slt.part include ./with_filter/mod.slt.part +include ./no_effect_updates/mod.slt.part diff --git a/e2e_test/over_window/generated/batch/no_effect_updates/mod.slt.part b/e2e_test/over_window/generated/batch/no_effect_updates/mod.slt.part new file mode 100644 index 0000000000000..827b54113b962 --- /dev/null +++ b/e2e_test/over_window/generated/batch/no_effect_updates/mod.slt.part @@ -0,0 +1,94 @@ +# This file is generated by `gen.py`. Do not edit it manually! + +# Test handling of updates having no effect on window function outputs. + +statement ok +create table t ( + id int + , foo int + , bar int +); + +statement ok +create view v1 as +select + * + , rank() over (partition by 1::int order by foo) as r1 +from t; + +statement ok +create view v2 as +select + * + , rank() over (partition by 1::int order by bar) as r2 +from t; + +statement ok +insert into t values + (100001, 701, 805) +, (100002, 700, 806) +, (100003, 723, 807) +, (100004, 702, 808); + +query iii +select * from v1 order by r1, id; +---- +100002 700 806 1 +100001 701 805 2 +100004 702 808 3 +100003 723 807 4 + +query iii +select * from v2 order by r2, id; +---- +100001 701 805 1 +100002 700 806 2 +100003 723 807 3 +100004 702 808 4 + +statement ok +update t set foo = 733 where id = 100001; + +query iii +select * from v1 order by r1, id; +---- +100002 700 806 1 +100004 702 808 2 +100003 723 807 3 +100001 733 805 4 + +query iii +select * from v2 order by r2, id; +---- +100001 733 805 1 +100002 700 806 2 +100003 723 807 3 +100004 702 808 4 + +statement ok +update t set bar = 804 where id = 100001; + +query iii +select * from v1 order by r1, id; +---- +100002 700 806 1 +100004 702 808 2 +100003 723 807 3 +100001 733 804 4 + +query iii +select * from v2 order by r2, id; +---- +100001 733 804 1 +100002 700 806 2 +100003 723 807 3 +100004 702 808 4 + +statement ok +drop view v1; + +statement ok +drop view v2; + +statement ok +drop table t; diff --git a/e2e_test/over_window/generated/streaming/main.slt.part b/e2e_test/over_window/generated/streaming/main.slt.part index 5e91838cfac89..dcd341e3e714f 100644 --- a/e2e_test/over_window/generated/streaming/main.slt.part +++ b/e2e_test/over_window/generated/streaming/main.slt.part @@ -9,3 +9,4 @@ include ./expr_in_win_func/mod.slt.part include ./agg_in_win_func/mod.slt.part include ./opt_agg_then_join/mod.slt.part include ./with_filter/mod.slt.part +include ./no_effect_updates/mod.slt.part diff --git a/e2e_test/over_window/generated/streaming/no_effect_updates/mod.slt.part b/e2e_test/over_window/generated/streaming/no_effect_updates/mod.slt.part new file mode 100644 index 0000000000000..0261fbb1cc454 --- /dev/null +++ b/e2e_test/over_window/generated/streaming/no_effect_updates/mod.slt.part @@ -0,0 +1,94 @@ +# This file is generated by `gen.py`. Do not edit it manually! + +# Test handling of updates having no effect on window function outputs. + +statement ok +create table t ( + id int + , foo int + , bar int +); + +statement ok +create materialized view v1 as +select + * + , rank() over (partition by 1::int order by foo) as r1 +from t; + +statement ok +create materialized view v2 as +select + * + , rank() over (partition by 1::int order by bar) as r2 +from t; + +statement ok +insert into t values + (100001, 701, 805) +, (100002, 700, 806) +, (100003, 723, 807) +, (100004, 702, 808); + +query iii +select * from v1 order by r1, id; +---- +100002 700 806 1 +100001 701 805 2 +100004 702 808 3 +100003 723 807 4 + +query iii +select * from v2 order by r2, id; +---- +100001 701 805 1 +100002 700 806 2 +100003 723 807 3 +100004 702 808 4 + +statement ok +update t set foo = 733 where id = 100001; + +query iii +select * from v1 order by r1, id; +---- +100002 700 806 1 +100004 702 808 2 +100003 723 807 3 +100001 733 805 4 + +query iii +select * from v2 order by r2, id; +---- +100001 733 805 1 +100002 700 806 2 +100003 723 807 3 +100004 702 808 4 + +statement ok +update t set bar = 804 where id = 100001; + +query iii +select * from v1 order by r1, id; +---- +100002 700 806 1 +100004 702 808 2 +100003 723 807 3 +100001 733 804 4 + +query iii +select * from v2 order by r2, id; +---- +100001 733 804 1 +100002 700 806 2 +100003 723 807 3 +100004 702 808 4 + +statement ok +drop materialized view v1; + +statement ok +drop materialized view v2; + +statement ok +drop table t; diff --git a/e2e_test/over_window/templates/main.slt.part b/e2e_test/over_window/templates/main.slt.part index 7f7ad74ab7424..e7ec434b8cfaf 100644 --- a/e2e_test/over_window/templates/main.slt.part +++ b/e2e_test/over_window/templates/main.slt.part @@ -7,3 +7,4 @@ include ./expr_in_win_func/mod.slt.part include ./agg_in_win_func/mod.slt.part include ./opt_agg_then_join/mod.slt.part include ./with_filter/mod.slt.part +include ./no_effect_updates/mod.slt.part diff --git a/e2e_test/over_window/templates/no_effect_updates/mod.slt.part b/e2e_test/over_window/templates/no_effect_updates/mod.slt.part new file mode 100644 index 0000000000000..32627f5f38f0c --- /dev/null +++ b/e2e_test/over_window/templates/no_effect_updates/mod.slt.part @@ -0,0 +1,92 @@ +# Test handling of updates having no effect on window function outputs. + +statement ok +create table t ( + id int + , foo int + , bar int +); + +statement ok +create $view_type v1 as +select + * + , rank() over (partition by 1::int order by foo) as r1 +from t; + +statement ok +create $view_type v2 as +select + * + , rank() over (partition by 1::int order by bar) as r2 +from t; + +statement ok +insert into t values + (100001, 701, 805) +, (100002, 700, 806) +, (100003, 723, 807) +, (100004, 702, 808); + +query iii +select * from v1 order by r1, id; +---- +100002 700 806 1 +100001 701 805 2 +100004 702 808 3 +100003 723 807 4 + +query iii +select * from v2 order by r2, id; +---- +100001 701 805 1 +100002 700 806 2 +100003 723 807 3 +100004 702 808 4 + +statement ok +update t set foo = 733 where id = 100001; + +query iii +select * from v1 order by r1, id; +---- +100002 700 806 1 +100004 702 808 2 +100003 723 807 3 +100001 733 805 4 + +query iii +select * from v2 order by r2, id; +---- +100001 733 805 1 +100002 700 806 2 +100003 723 807 3 +100004 702 808 4 + +statement ok +update t set bar = 804 where id = 100001; + +query iii +select * from v1 order by r1, id; +---- +100002 700 806 1 +100004 702 808 2 +100003 723 807 3 +100001 733 804 4 + +query iii +select * from v2 order by r2, id; +---- +100001 733 804 1 +100002 700 806 2 +100003 723 807 3 +100004 702 808 4 + +statement ok +drop $view_type v1; + +statement ok +drop $view_type v2; + +statement ok +drop table t; diff --git a/src/common/src/array/stream_record.rs b/src/common/src/array/stream_record.rs index 1037f92b286ce..cfd474017d933 100644 --- a/src/common/src/array/stream_record.rs +++ b/src/common/src/array/stream_record.rs @@ -28,7 +28,7 @@ pub enum RecordType { } /// Generic type to represent a row change. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy)] pub enum Record { Insert { new_row: R }, Delete { old_row: R }, diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 55957af9e6e5b..0ecb0eac90718 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -644,16 +644,16 @@ where pub async fn get_encoded_row(&self, pk: impl Row) -> StreamExecutorResult> { assert!(pk.len() <= self.pk_indices.len()); - if self.prefix_hint_len != 0 { - debug_assert_eq!(self.prefix_hint_len, pk.len()); - } - let serialized_pk = serialize_pk_with_vnode(&pk, &self.pk_serde, self.compute_vnode_by_pk(&pk)); let prefix_hint = if self.prefix_hint_len != 0 && self.prefix_hint_len == pk.len() { Some(serialized_pk.slice(VirtualNode::SIZE..)) } else { + #[cfg(debug_assertions)] + if self.prefix_hint_len != 0 { + warn!("prefix_hint_len is not equal to pk.len(), may not be able to utilize bloom filter"); + } None }; diff --git a/src/stream/src/executor/over_window/general.rs b/src/stream/src/executor/over_window/general.rs index be71950694a04..1a5073b6e9546 100644 --- a/src/stream/src/executor/over_window/general.rs +++ b/src/stream/src/executor/over_window/general.rs @@ -17,6 +17,7 @@ use std::marker::PhantomData; use std::ops::RangeInclusive; use delta_btree_map::Change; +use itertools::Itertools; use risingwave_common::array::stream_record::Record; use risingwave_common::array::Op; use risingwave_common::row::RowExt; @@ -149,6 +150,8 @@ pub(super) struct Calls { pub(super) range_frames: Vec, pub(super) start_is_unbounded: bool, pub(super) end_is_unbounded: bool, + /// Deduplicated indices of all arguments of all calls. + pub(super) all_arg_indices: Vec, } impl Calls { @@ -171,12 +174,19 @@ impl Calls { .iter() .any(|call| call.frame.bounds.end_is_unbounded()); + let all_arg_indices = calls + .iter() + .flat_map(|call| call.args.val_indices().iter().copied()) + .dedup() + .collect(); + Self { calls, super_rows_frame_bounds, range_frames, start_is_unbounded, end_is_unbounded, + all_arg_indices, } } @@ -326,8 +336,12 @@ impl OverWindowExecutor { chunk: StreamChunk, metrics: &'a OverWindowMetrics, ) { - // (deduped) partition key => changes happened in the partition. - let mut deltas: BTreeMap, PartitionDelta> = BTreeMap::new(); + // (deduped) partition key => ( + // significant changes happened in the partition, + // no-effect changes happened in the partition, + // ) + let mut deltas: BTreeMap, (PartitionDelta, PartitionDelta)> = + BTreeMap::new(); // input pk of update records of which the order key is changed. let mut key_change_updated_pks = HashSet::new(); @@ -336,16 +350,16 @@ impl OverWindowExecutor { match record { Record::Insert { new_row } => { let part_key = this.get_partition_key(new_row).into(); - let part_delta = deltas.entry(part_key).or_default(); - part_delta.insert( + let (delta, _) = deltas.entry(part_key).or_default(); + delta.insert( this.row_to_cache_key(new_row)?, Change::Insert(new_row.into_owned_row()), ); } Record::Delete { old_row } => { let part_key = this.get_partition_key(old_row).into(); - let part_delta = deltas.entry(part_key).or_default(); - part_delta.insert(this.row_to_cache_key(old_row)?, Change::Delete); + let (delta, _) = deltas.entry(part_key).or_default(); + delta.insert(this.row_to_cache_key(old_row)?, Change::Delete); } Record::Update { old_row, new_row } => { let old_part_key = this.get_partition_key(old_row).into(); @@ -354,23 +368,31 @@ impl OverWindowExecutor { let new_state_key = this.row_to_cache_key(new_row)?; if old_part_key == new_part_key && old_state_key == new_state_key { // not a key-change update - let part_delta = deltas.entry(old_part_key).or_default(); - part_delta.insert(old_state_key, Change::Insert(new_row.into_owned_row())); + let (delta, no_effect_delta) = deltas.entry(old_part_key).or_default(); + if old_row.project(&this.calls.all_arg_indices) + == new_row.project(&this.calls.all_arg_indices) + { + // partition key, order key and arguments are all the same + no_effect_delta + .insert(old_state_key, Change::Insert(new_row.into_owned_row())); + } else { + delta.insert(old_state_key, Change::Insert(new_row.into_owned_row())); + } } else if old_part_key == new_part_key { // order-change update, split into delete + insert, will be merged after // building changes key_change_updated_pks.insert(this.get_input_pk(old_row)); - let part_delta = deltas.entry(old_part_key).or_default(); - part_delta.insert(old_state_key, Change::Delete); - part_delta.insert(new_state_key, Change::Insert(new_row.into_owned_row())); + let (delta, _) = deltas.entry(old_part_key).or_default(); + delta.insert(old_state_key, Change::Delete); + delta.insert(new_state_key, Change::Insert(new_row.into_owned_row())); } else { // partition-change update, split into delete + insert // NOTE(rc): Since we append partition key to logical pk, we can't merge the // delete + insert back to update later. // TODO: IMO this behavior is problematic. Deep discussion is needed. - let old_part_delta = deltas.entry(old_part_key).or_default(); + let (old_part_delta, _) = deltas.entry(old_part_key).or_default(); old_part_delta.insert(old_state_key, Change::Delete); - let new_part_delta = deltas.entry(new_part_key).or_default(); + let (new_part_delta, _) = deltas.entry(new_part_key).or_default(); new_part_delta .insert(new_state_key, Change::Insert(new_row.into_owned_row())); } @@ -384,7 +406,7 @@ impl OverWindowExecutor { let mut chunk_builder = StreamChunkBuilder::new(this.chunk_size, this.schema.data_types()); // Build final changes partition by partition. - for (part_key, delta) in deltas { + for (part_key, (delta, no_effect_delta)) in deltas { vars.stats.cache_lookup += 1; if !vars.cached_partitions.contains(&part_key.0) { vars.stats.cache_miss += 1; @@ -392,6 +414,54 @@ impl OverWindowExecutor { .put(part_key.0.clone(), new_empty_partition_cache()); } let mut cache = vars.cached_partitions.get_mut(&part_key).unwrap(); + + // First, handle `Update`s that don't affect window function outputs. + // Be careful that changes in `delta` may (though we believe unlikely) affect the + // window function outputs of rows in `no_effect_delta`, so before handling `delta` + // we need to write all changes to state table, range cache and chunk builder. + for (key, change) in no_effect_delta { + let new_row = change.into_insert().unwrap(); // new row of an `Update` + + let (old_row, from_cache) = if let Some(old_row) = cache.inner().get(&key).cloned() + { + // Got old row from range cache. + (old_row, true) + } else { + // Retrieve old row from state table. + let table_pk = (&new_row).project(this.state_table.pk_indices()); + // The accesses to the state table is ordered by table PK, so ideally we + // can leverage the block cache under the hood. + if let Some(old_row) = this.state_table.get_row(table_pk).await? { + (old_row, false) + } else { + consistency_panic!(?part_key, ?key, ?new_row, "updating non-existing row"); + continue; + } + }; + + // concatenate old outputs + let input_len = new_row.len(); + let new_row = OwnedRow::new( + new_row + .into_iter() + .chain(old_row.as_inner().iter().skip(input_len).cloned()) // chain old outputs + .collect(), + ); + + // apply & emit the change + let record = Record::Update { + old_row: &old_row, + new_row: &new_row, + }; + if let Some(chunk) = chunk_builder.append_record(record.as_ref()) { + yield chunk; + } + this.state_table.write_record(record); + if from_cache { + cache.insert(key, new_row); + } + } + let mut partition = OverPartition::new( &part_key, &mut cache, @@ -406,6 +476,10 @@ impl OverWindowExecutor { }, ); + if delta.is_empty() { + continue; + } + // Build changes for current partition. let (part_changes, accessed_range) = partition.build_changes(&this.state_table, delta).await?; diff --git a/src/stream/src/executor/over_window/over_partition.rs b/src/stream/src/executor/over_window/over_partition.rs index 8406d094d7ff7..93f17703f3073 100644 --- a/src/stream/src/executor/over_window/over_partition.rs +++ b/src/stream/src/executor/over_window/over_partition.rs @@ -398,13 +398,16 @@ impl<'a, S: StateStore> OverPartition<'a, S> { let input_schema_len = table.get_data_types().len() - self.calls.len(); let calls = self.calls; + // return values let mut part_changes = BTreeMap::new(); + let mut accessed_range: Option> = None; + + // stats let mut accessed_entry_count = 0; let mut compute_count = 0; let mut same_output_count = 0; - // Find affected ranges, this also ensures that all rows in the affected ranges are loaded - // into the cache. + // Find affected ranges, this also ensures that all rows in the affected ranges are loaded into the cache. let (part_with_delta, affected_ranges) = self.find_affected_ranges(table, &mut delta).await?; @@ -424,8 +427,6 @@ impl<'a, S: StateStore> OverPartition<'a, S> { } } - let mut accessed_range: Option> = None; - for AffectedRange { first_frame_start, first_curr_key, @@ -595,13 +596,13 @@ impl<'a, S: StateStore> OverPartition<'a, S> { 'a: 'delta, 's: 'delta, { - self.ensure_delta_in_cache(table, delta).await?; - let delta = &*delta; // let's make it immutable - if delta.is_empty() { return Ok((DeltaBTreeMap::new(self.range_cache.inner(), delta), vec![])); } + self.ensure_delta_in_cache(table, delta).await?; + let delta = &*delta; // let's make it immutable + let delta_first = delta.first_key_value().unwrap().0.as_normal_expect(); let delta_last = delta.last_key_value().unwrap().0.as_normal_expect(); From 0935cf2dff746165b03dc5aa15a942f2934ce7a2 Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Mon, 28 Oct 2024 20:44:14 +0800 Subject: [PATCH 3/9] refactor(meta): decouple global barrier worker from context with trait (#19034) --- src/meta/node/src/server.rs | 8 +- src/meta/src/barrier/command.rs | 7 +- .../barrier/creating_job/barrier_control.rs | 12 +- src/meta/src/barrier/creating_job/mod.rs | 15 +- src/meta/src/barrier/mod.rs | 375 +++++++++++------- src/meta/src/barrier/progress.rs | 10 +- src/meta/src/barrier/recovery.rs | 264 ++++++------ src/meta/src/barrier/rpc.rs | 89 +++-- src/meta/src/barrier/schedule.rs | 126 +++--- src/meta/src/manager/metadata.rs | 51 ++- 10 files changed, 577 insertions(+), 380 deletions(-) diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index 1e9a71cdda2b0..ff91fe9aa9eed 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -440,11 +440,8 @@ pub async fn start_service_as_election_leader( None }; - let (barrier_scheduler, scheduled_barriers) = BarrierScheduler::new_pair( - hummock_manager.clone(), - meta_metrics.clone(), - system_params_reader.checkpoint_frequency() as usize, - ); + let (barrier_scheduler, scheduled_barriers) = + BarrierScheduler::new_pair(hummock_manager.clone(), meta_metrics.clone()); // Initialize services. let backup_manager = BackupManager::new( @@ -498,7 +495,6 @@ pub async fn start_service_as_election_leader( hummock_manager.clone(), source_manager.clone(), sink_manager.clone(), - meta_metrics.clone(), scale_controller.clone(), ) .await; diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index ef4a571f67d11..f1f0bda1bd265 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -41,7 +41,8 @@ use risingwave_pb::stream_service::WaitEpochCommitRequest; use tracing::warn; use super::info::{CommandFragmentChanges, InflightGraphInfo}; -use crate::barrier::{BarrierInfo, GlobalBarrierWorkerContext, InflightSubscriptionInfo}; +use crate::barrier::info::BarrierInfo; +use crate::barrier::{GlobalBarrierWorkerContextImpl, InflightSubscriptionInfo}; use crate::controller::fragment::InflightFragmentInfo; use crate::manager::{DdlType, StreamingJob}; use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism}; @@ -926,7 +927,7 @@ impl Command { impl CommandContext { pub async fn wait_epoch_commit( &self, - barrier_manager_context: &GlobalBarrierWorkerContext, + barrier_manager_context: &GlobalBarrierWorkerContextImpl, ) -> MetaResult<()> { let table_id = self.table_ids_to_commit.iter().next().cloned(); // try wait epoch on an existing random table id @@ -956,7 +957,7 @@ impl CommandContext { /// the given command. pub async fn post_collect( &self, - barrier_manager_context: &GlobalBarrierWorkerContext, + barrier_manager_context: &GlobalBarrierWorkerContextImpl, ) -> MetaResult<()> { match &self.command { Command::Plain(_) => {} diff --git a/src/meta/src/barrier/creating_job/barrier_control.rs b/src/meta/src/barrier/creating_job/barrier_control.rs index 83a1dd9cfb5e7..7d238c37d4ebb 100644 --- a/src/meta/src/barrier/creating_job/barrier_control.rs +++ b/src/meta/src/barrier/creating_job/barrier_control.rs @@ -25,7 +25,7 @@ use risingwave_meta_model::WorkerId; use risingwave_pb::stream_service::BarrierCompleteResponse; use tracing::debug; -use crate::rpc::metrics::MetaMetrics; +use crate::rpc::metrics::GLOBAL_META_METRICS; #[derive(Debug)] struct CreatingStreamingJobEpochState { @@ -57,7 +57,7 @@ pub(super) struct CreatingStreamingJobBarrierControl { } impl CreatingStreamingJobBarrierControl { - pub(super) fn new(table_id: TableId, backfill_epoch: u64, metrics: &MetaMetrics) -> Self { + pub(super) fn new(table_id: TableId, backfill_epoch: u64) -> Self { let table_id_str = format!("{}", table_id.table_id); Self { table_id, @@ -68,16 +68,16 @@ impl CreatingStreamingJobBarrierControl { pending_barriers_to_complete: Default::default(), completing_barrier: None, - consuming_snapshot_barrier_latency: metrics + consuming_snapshot_barrier_latency: GLOBAL_META_METRICS .snapshot_backfill_barrier_latency .with_guarded_label_values(&[&table_id_str, "consuming_snapshot"]), - consuming_log_store_barrier_latency: metrics + consuming_log_store_barrier_latency: GLOBAL_META_METRICS .snapshot_backfill_barrier_latency .with_guarded_label_values(&[&table_id_str, "consuming_log_store"]), - wait_commit_latency: metrics + wait_commit_latency: GLOBAL_META_METRICS .snapshot_backfill_wait_commit_latency .with_guarded_label_values(&[&table_id_str]), - inflight_barrier_num: metrics + inflight_barrier_num: GLOBAL_META_METRICS .snapshot_backfill_inflight_barrier_num .with_guarded_label_values(&[&table_id_str]), } diff --git a/src/meta/src/barrier/creating_job/mod.rs b/src/meta/src/barrier/creating_job/mod.rs index 1ffdfb25fa13e..5a75d5499d451 100644 --- a/src/meta/src/barrier/creating_job/mod.rs +++ b/src/meta/src/barrier/creating_job/mod.rs @@ -33,11 +33,11 @@ use crate::barrier::creating_job::barrier_control::CreatingStreamingJobBarrierCo use crate::barrier::creating_job::status::{ CreatingJobInjectBarrierInfo, CreatingStreamingJobStatus, }; -use crate::barrier::info::InflightGraphInfo; +use crate::barrier::info::{BarrierInfo, InflightGraphInfo}; use crate::barrier::progress::CreateMviewProgressTracker; use crate::barrier::rpc::ControlStreamManager; -use crate::barrier::{BarrierInfo, Command, CreateStreamingJobCommandInfo, SnapshotBackfillInfo}; -use crate::rpc::metrics::MetaMetrics; +use crate::barrier::{Command, CreateStreamingJobCommandInfo, SnapshotBackfillInfo}; +use crate::rpc::metrics::GLOBAL_META_METRICS; use crate::MetaResult; #[derive(Debug)] @@ -60,7 +60,6 @@ impl CreatingStreamingJobControl { snapshot_backfill_info: SnapshotBackfillInfo, backfill_epoch: u64, version_stat: &HummockVersionStats, - metrics: &MetaMetrics, initial_mutation: Mutation, ) -> Self { info!( @@ -81,11 +80,7 @@ impl CreatingStreamingJobControl { Self { info, snapshot_backfill_info, - barrier_control: CreatingStreamingJobBarrierControl::new( - table_id, - backfill_epoch, - metrics, - ), + barrier_control: CreatingStreamingJobBarrierControl::new(table_id, backfill_epoch), backfill_epoch, graph_info: InflightGraphInfo::new(fragment_info), status: CreatingStreamingJobStatus::ConsumingSnapshot { @@ -98,7 +93,7 @@ impl CreatingStreamingJobControl { pending_non_checkpoint_barriers: vec![], initial_barrier_info: Some((actors_to_create, initial_mutation)), }, - upstream_lag: metrics + upstream_lag: GLOBAL_META_METRICS .snapshot_backfill_lag .with_guarded_label_values(&[&table_id_str]), } diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index f5835f5f4f4c9..d138f7f286afb 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -29,6 +29,7 @@ use risingwave_common::catalog::TableId; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY; use risingwave_common::{bail, must_match}; +use risingwave_connector::source::SplitImpl; use risingwave_hummock_sdk::change_log::build_table_change_log_delta; use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::table_stats::from_prost_table_stats_map; @@ -37,11 +38,14 @@ use risingwave_hummock_sdk::table_watermark::{ }; use risingwave_hummock_sdk::{HummockSstableObjectId, HummockVersionId, LocalSstableInfo}; use risingwave_meta_model::WorkerId; +use risingwave_pb::common::WorkerNode; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::meta::{PausedReason, PbRecoveryStatus}; +use risingwave_pb::stream_plan::StreamActor; use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress; use risingwave_pb::stream_service::BarrierCompleteResponse; +use risingwave_rpc_client::StreamingControlHandle; use thiserror_ext::AsReport; use tokio::sync::mpsc::unbounded_channel; use tokio::sync::oneshot::{Receiver, Sender}; @@ -55,7 +59,7 @@ use crate::barrier::creating_job::{CompleteJobType, CreatingStreamingJobControl} use crate::barrier::info::{BarrierInfo, InflightGraphInfo}; use crate::barrier::progress::{CreateMviewProgressTracker, TrackingCommand, TrackingJob}; use crate::barrier::rpc::{merge_node_rpc_errors, ControlStreamManager}; -use crate::barrier::schedule::ScheduledBarriers; +use crate::barrier::schedule::{PeriodicBarriers, ScheduledBarriers}; use crate::barrier::state::BarrierWorkerState; use crate::error::MetaErrorInner; use crate::hummock::{CommitEpochInfo, HummockManagerRef, NewTableFragmentInfo}; @@ -64,7 +68,8 @@ use crate::manager::{ ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv, MetadataManager, }; -use crate::rpc::metrics::MetaMetrics; +use crate::model::{ActorId, TableFragments}; +use crate::rpc::metrics::GLOBAL_META_METRICS; use crate::stream::{ScaleControllerRef, SourceManagerRef}; use crate::{MetaError, MetaResult}; @@ -128,7 +133,6 @@ enum BarrierManagerStatus { struct Scheduled { command: Command, notifiers: Vec, - send_latency_timer: HistogramTimer, span: tracing::Span, /// Choose a different barrier(checkpoint == true) according to it checkpoint: bool, @@ -151,8 +155,11 @@ pub(crate) enum BarrierManagerRequest { GetDdlProgress(Sender>), } -#[derive(Clone)] -struct GlobalBarrierWorkerContext { +struct GlobalBarrierWorkerContextImpl { + scheduled_barriers: ScheduledBarriers, + + status: Arc>, + metadata_manager: MetadataManager, hummock_manager: HummockManagerRef, @@ -163,8 +170,6 @@ struct GlobalBarrierWorkerContext { sink_manager: SinkCoordinatorManager, - pub(super) metrics: Arc, - env: MetaSrvEnv, } @@ -176,7 +181,6 @@ impl GlobalBarrierManager { hummock_manager: HummockManagerRef, source_manager: SourceManagerRef, sink_manager: SinkCoordinatorManager, - meta_metrics: Arc, scale_controller: ScaleControllerRef, ) -> (Arc, JoinHandle<()>, oneshot::Sender<()>) { let (request_tx, request_rx) = unbounded_channel(); @@ -187,13 +191,12 @@ impl GlobalBarrierManager { hummock_manager, source_manager, sink_manager, - meta_metrics, scale_controller, request_rx, ) .await; let manager = Self { - status: barrier_worker.status.clone(), + status: barrier_worker.context.status.clone(), hummock_manager: barrier_worker.context.hummock_manager.clone(), request_tx, metadata_manager: barrier_worker.context.metadata_manager.clone(), @@ -203,6 +206,108 @@ impl GlobalBarrierManager { } } +trait GlobalBarrierWorkerContext: Send + Sync + 'static { + fn commit_epoch( + &self, + commit_info: CommitEpochInfo, + ) -> impl Future> + Send + '_; + + async fn next_scheduled(&self) -> Scheduled; + fn abort_and_mark_blocked(&self, recovery_reason: RecoveryReason); + fn mark_ready(&self); + + fn post_collect_command<'a>( + &'a self, + command: &'a CommandContext, + ) -> impl Future> + Send + 'a; + + async fn notify_creating_job_failed(&self, err: &MetaError); + + fn finish_creating_job( + &self, + job: TrackingJob, + ) -> impl Future> + Send + '_; + + async fn new_control_stream( + &self, + node: &WorkerNode, + mv_depended_subscriptions: &HashMap>, + ) -> MetaResult; + + async fn reload_runtime_info( + &self, + ) -> MetaResult<( + ActiveStreamingWorkerNodes, + InflightGraphInfo, + InflightSubscriptionInfo, + Option, + HashMap>, + HashMap>, + HashMap, + HummockVersionStats, + )>; +} + +impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl { + async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> MetaResult { + self.hummock_manager.commit_epoch(commit_info).await?; + Ok(self.hummock_manager.get_version_stats().await) + } + + async fn next_scheduled(&self) -> Scheduled { + self.scheduled_barriers.next_scheduled().await + } + + fn abort_and_mark_blocked(&self, recovery_reason: RecoveryReason) { + self.set_status(BarrierManagerStatus::Recovering(recovery_reason)); + + // Mark blocked and abort buffered schedules, they might be dirty already. + self.scheduled_barriers + .abort_and_mark_blocked("cluster is under recovering"); + } + + fn mark_ready(&self) { + self.scheduled_barriers.mark_ready(); + self.set_status(BarrierManagerStatus::Running); + } + + async fn post_collect_command<'a>(&'a self, command: &'a CommandContext) -> MetaResult<()> { + command.post_collect(self).await + } + + async fn notify_creating_job_failed(&self, err: &MetaError) { + self.metadata_manager.notify_finish_failed(err).await + } + + async fn finish_creating_job(&self, job: TrackingJob) -> MetaResult<()> { + job.finish(&self.metadata_manager).await + } + + async fn new_control_stream( + &self, + node: &WorkerNode, + mv_depended_subscriptions: &HashMap>, + ) -> MetaResult { + self.new_control_stream_impl(node, mv_depended_subscriptions) + .await + } + + async fn reload_runtime_info( + &self, + ) -> MetaResult<( + ActiveStreamingWorkerNodes, + InflightGraphInfo, + InflightSubscriptionInfo, + Option, + HashMap>, + HashMap>, + HashMap, + HummockVersionStats, + )> { + self.reload_runtime_info_impl().await + } +} + /// [`crate::barrier::GlobalBarrierWorker`] sends barriers to all registered compute nodes and /// collect them, with monotonic increasing epoch numbers. On compute nodes, `LocalBarrierManager` /// in `risingwave_stream` crate will serve these requests and dispatch them to source actors. @@ -212,19 +317,17 @@ impl GlobalBarrierManager { /// accepting [`Command`] that carries info to build `Mutation`. To keep the consistency between /// barrier manager and meta store, some actions like "drop materialized view" or "create mv on mv" /// must be done in barrier manager transactional using [`Command`]. -struct GlobalBarrierWorker { +struct GlobalBarrierWorker { /// Enable recovery or not when failover. enable_recovery: bool, /// The queue of scheduled barriers. - scheduled_barriers: schedule::ScheduledBarriers, + periodic_barriers: PeriodicBarriers, /// The max barrier nums in flight in_flight_barrier_nums: usize, - context: GlobalBarrierWorkerContext, - - status: Arc>, + context: Arc, env: MetaSrvEnv, @@ -257,24 +360,21 @@ struct CheckpointControl { hummock_version_stats: HummockVersionStats, create_mview_tracker: CreateMviewProgressTracker, - - context: GlobalBarrierWorkerContext, } impl CheckpointControl { - async fn new( - context: GlobalBarrierWorkerContext, + fn new( create_mview_tracker: CreateMviewProgressTracker, state: BarrierWorkerState, + hummock_version_stats: HummockVersionStats, ) -> Self { Self { state, command_ctx_queue: Default::default(), completing_barrier: None, creating_streaming_job_controls: Default::default(), - hummock_version_stats: context.hummock_manager.get_version_stats().await, + hummock_version_stats, create_mview_tracker, - context, } } @@ -288,14 +388,13 @@ impl CheckpointControl { /// Update the metrics of barrier nums. fn update_barrier_nums_metrics(&self) { - self.context.metrics.in_flight_barrier_nums.set( + GLOBAL_META_METRICS.in_flight_barrier_nums.set( self.command_ctx_queue .values() .filter(|x| x.state.is_inflight()) .count() as i64, ); - self.context - .metrics + GLOBAL_META_METRICS .all_barrier_nums .set(self.total_command_num() as i64); } @@ -329,7 +428,7 @@ impl CheckpointControl { node_to_collect: HashSet, creating_jobs_to_wait: HashSet, ) { - let timer = self.context.metrics.barrier_latency.start_timer(); + let timer = GLOBAL_META_METRICS.barrier_latency.start_timer(); if let Some((_, node)) = self.command_ctx_queue.last_key_value() { assert_eq!( @@ -429,7 +528,7 @@ impl CheckpointControl { } } -impl GlobalBarrierWorker { +impl GlobalBarrierWorker { /// We need to make sure there are no changes when doing recovery pub async fn clear_on_err(&mut self, err: &MetaError) { // join spawned completing command to finish no matter it succeeds or not. @@ -473,7 +572,10 @@ impl GlobalBarrierWorker { .map(|(command, _)| command.barrier_info.prev_epoch()), task.creating_job_epochs.clone(), ); - match self.context.clone().complete_barrier(task).await { + match task + .complete_barrier(&*self.context, self.env.clone()) + .await + { Ok(hummock_version_stats) => { self.checkpoint_control .ack_completed(BarrierCompleteOutput { @@ -562,7 +664,7 @@ enum CompletingTask { Err(MetaError), } -impl GlobalBarrierWorker { +impl GlobalBarrierWorker { /// Create a new [`crate::barrier::GlobalBarrierWorker`]. pub async fn new( scheduled_barriers: schedule::ScheduledBarriers, @@ -571,7 +673,6 @@ impl GlobalBarrierWorker { hummock_manager: HummockManagerRef, source_manager: SourceManagerRef, sink_manager: SinkCoordinatorManager, - metrics: Arc, scale_controller: ScaleControllerRef, request_rx: mpsc::UnboundedReceiver, ) -> Self { @@ -585,32 +686,45 @@ impl GlobalBarrierWorker { None, ); - let active_streaming_nodes = ActiveStreamingWorkerNodes::uninitialized(); + let active_streaming_nodes = + ActiveStreamingWorkerNodes::uninitialized(metadata_manager.clone()); let tracker = CreateMviewProgressTracker::default(); let status = Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting))); - let context = GlobalBarrierWorkerContext { + let context = Arc::new(GlobalBarrierWorkerContextImpl { + scheduled_barriers, + status, metadata_manager, hummock_manager, source_manager, scale_controller, sink_manager, - metrics, env: env.clone(), - }; + }); + + let control_stream_manager = ControlStreamManager::new(env.clone()); + let checkpoint_control = CheckpointControl::new( + tracker, + initial_invalid_state, + context.hummock_manager.get_version_stats().await, + ); - let control_stream_manager = ControlStreamManager::new(context.clone()); - let checkpoint_control = - CheckpointControl::new(context.clone(), tracker, initial_invalid_state).await; + let checkpoint_frequency = env.system_params_reader().await.checkpoint_frequency() as _; + let interval = + Duration::from_millis(env.system_params_reader().await.barrier_interval_ms() as u64); + let periodic_barriers = PeriodicBarriers::new(interval, checkpoint_frequency); + tracing::info!( + "Starting barrier scheduler with: checkpoint_frequency={:?}", + checkpoint_frequency, + ); Self { enable_recovery, - scheduled_barriers, + periodic_barriers, in_flight_barrier_nums, context, - status, env, checkpoint_control, completing_task: CompletingTask::None, @@ -652,15 +766,9 @@ impl GlobalBarrierWorker { } /// Start an infinite loop to take scheduled barriers and send them. - async fn run(mut self, mut shutdown_rx: Receiver<()>) { - // Initialize the barrier manager. - let interval = Duration::from_millis( - self.env.system_params_reader().await.barrier_interval_ms() as u64, - ); - self.scheduled_barriers.set_min_interval(interval); + async fn run(mut self, shutdown_rx: Receiver<()>) { tracing::info!( - "Starting barrier manager with: interval={:?}, enable_recovery={}, in_flight_barrier_nums={}", - interval, + "Starting barrier manager with: enable_recovery={}, in_flight_barrier_nums={}", self.enable_recovery, self.in_flight_barrier_nums, ); @@ -686,7 +794,6 @@ impl GlobalBarrierWorker { // consistency. // Even if there's no actor to recover, we still go through the recovery process to // inject the first `Initial` barrier. - self.set_status(BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap)); let span = tracing::info_span!("bootstrap_recovery"); crate::telemetry::report_event( risingwave_pb::telemetry::TelemetryEventStage::Recovery, @@ -700,11 +807,17 @@ impl GlobalBarrierWorker { let paused = self.take_pause_on_bootstrap().await.unwrap_or(false); let paused_reason = paused.then_some(PausedReason::Manual); - self.recovery(paused_reason, None).instrument(span).await; + self.recovery(paused_reason, None, RecoveryReason::Bootstrap) + .instrument(span) + .await; } - self.set_status(BarrierManagerStatus::Running); + self.run_inner(shutdown_rx).await + } +} +impl GlobalBarrierWorker { + async fn run_inner(mut self, mut shutdown_rx: Receiver<()>) { let (local_notification_tx, mut local_notification_rx) = tokio::sync::mpsc::unbounded_channel(); self.env @@ -747,48 +860,7 @@ impl GlobalBarrierWorker { changed_worker = self.active_streaming_nodes.changed() => { #[cfg(debug_assertions)] { - use risingwave_pb::common::WorkerNode; - match self - .context - .metadata_manager - .list_active_streaming_compute_nodes() - .await - { - Ok(worker_nodes) => { - let ignore_irrelevant_info = |node: &WorkerNode| { - ( - node.id, - WorkerNode { - id: node.id, - r#type: node.r#type, - host: node.host.clone(), - parallelism: node.parallelism, - property: node.property.clone(), - resource: node.resource.clone(), - ..Default::default() - }, - ) - }; - let worker_nodes: HashMap<_, _> = - worker_nodes.iter().map(ignore_irrelevant_info).collect(); - let curr_worker_nodes: HashMap<_, _> = self - .active_streaming_nodes - .current() - .values() - .map(ignore_irrelevant_info) - .collect(); - if worker_nodes != curr_worker_nodes { - warn!( - ?worker_nodes, - ?curr_worker_nodes, - "different to global snapshot" - ); - } - } - Err(e) => { - warn!(e = ?e.as_report(), "fail to list_active_streaming_compute_nodes to compare with local snapshot"); - } - } + self.active_streaming_nodes.validate_change().await; } info!(?changed_worker, "worker changed"); @@ -797,7 +869,7 @@ impl GlobalBarrierWorker { .on_new_worker_node_map(self.active_streaming_nodes.current()); self.checkpoint_control.creating_streaming_job_controls.values().for_each(|job| job.on_new_worker_node_map(self.active_streaming_nodes.current())); if let ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) = changed_worker { - self.control_stream_manager.add_worker(node, &self.checkpoint_control.state.inflight_subscription_info).await; + self.control_stream_manager.add_worker(node, &self.checkpoint_control.state.inflight_subscription_info, &*self.context).await; } } @@ -806,8 +878,8 @@ impl GlobalBarrierWorker { match notification { // Handle barrier interval and checkpoint frequency changes. LocalNotification::SystemParamsChange(p) => { - self.scheduled_barriers.set_min_interval(Duration::from_millis(p.barrier_interval_ms() as u64)); - self.scheduled_barriers + self.periodic_barriers.set_min_interval(Duration::from_millis(p.barrier_interval_ms() as u64)); + self.periodic_barriers .set_checkpoint_frequency(p.checkpoint_frequency() as usize) }, // Handle adhoc recovery triggered by user. @@ -820,9 +892,11 @@ impl GlobalBarrierWorker { complete_result = self .completing_task .next_completed_barrier( - &mut self.scheduled_barriers, + &mut self.periodic_barriers, &mut self.checkpoint_control, - &mut self.control_stream_manager + &mut self.control_stream_manager, + &self.context, + &self.env, ) => { match complete_result { Ok(output) => { @@ -843,7 +917,7 @@ impl GlobalBarrierWorker { let errors = self.control_stream_manager.collect_errors(worker_id, e).await; let err = merge_node_rpc_errors("get error from control stream", errors); if let Some(failed_barrier) = failed_barrier { - self.context.report_collect_failure(failed_barrier, &err); + self.report_collect_failure(failed_barrier, &err); } self.failure_recovery(err).await; } else { @@ -852,7 +926,7 @@ impl GlobalBarrierWorker { } } } - scheduled = self.scheduled_barriers.next_barrier(), + scheduled = self.periodic_barriers.next_barrier(&*self.context), if self .checkpoint_control .can_inject_barrier(self.in_flight_barrier_nums) => { @@ -877,7 +951,6 @@ impl CheckpointControl { let Scheduled { mut command, mut notifiers, - send_latency_timer, checkpoint, span, } = scheduled; @@ -948,7 +1021,6 @@ impl CheckpointControl { snapshot_backfill_info.clone(), barrier_info.prev_epoch(), &self.hummock_version_stats, - &self.context.metrics, mutation, ), ); @@ -977,8 +1049,6 @@ impl CheckpointControl { }); span.record("epoch", barrier_info.curr_epoch.value().0); - send_latency_timer.observe_duration(); - for creating_job in &mut self.creating_streaming_job_controls.values_mut() { creating_job.on_new_command(control_stream_manager, &command, &barrier_info)?; } @@ -1019,19 +1089,13 @@ impl CheckpointControl { } } -impl GlobalBarrierWorker { +impl GlobalBarrierWorker { /// Set barrier manager status. - fn set_status(&self, new_status: BarrierManagerStatus) { - self.status.store(Arc::new(new_status)); - } async fn failure_recovery(&mut self, err: MetaError) { self.clear_on_err(&err).await; if self.enable_recovery { - self.set_status(BarrierManagerStatus::Recovering(RecoveryReason::Failover( - err.clone(), - ))); let span = tracing::info_span!( "failure_recovery", error = %err.as_report(), @@ -1046,10 +1110,13 @@ impl GlobalBarrierWorker { None, ); + let reason = RecoveryReason::Failover(err.clone()); + // No need to clean dirty tables for barrier recovery, // The foreground stream job should cleanup their own tables. - self.recovery(None, Some(err)).instrument(span).await; - self.set_status(BarrierManagerStatus::Running); + self.recovery(None, Some(err), reason) + .instrument(span) + .await; } else { panic!("failed to execute barrier: {}", err.as_report()); } @@ -1059,7 +1126,6 @@ impl GlobalBarrierWorker { let err = MetaErrorInner::AdhocRecovery.into(); self.clear_on_err(&err).await; - self.set_status(BarrierManagerStatus::Recovering(RecoveryReason::Adhoc)); let span = tracing::info_span!( "adhoc_recovery", error = %err.as_report(), @@ -1076,8 +1142,9 @@ impl GlobalBarrierWorker { // No need to clean dirty tables for barrier recovery, // The foreground stream job should cleanup their own tables. - self.recovery(None, Some(err)).instrument(span).await; - self.set_status(BarrierManagerStatus::Running); + self.recovery(None, Some(err), RecoveryReason::Adhoc) + .instrument(span) + .await; } } @@ -1091,7 +1158,11 @@ pub struct CompleteBarrierTask { creating_job_epochs: Vec<(TableId, u64)>, } -impl GlobalBarrierWorkerContext { +impl GlobalBarrierWorkerContextImpl { + fn set_status(&self, new_status: BarrierManagerStatus) { + self.status.store(Arc::new(new_status)); + } + fn collect_creating_job_commit_epoch_info( commit_info: &mut CommitEpochInfo, epoch: u64, @@ -1122,44 +1193,57 @@ impl GlobalBarrierWorkerContext { }); }; } +} - async fn complete_barrier(self, task: CompleteBarrierTask) -> MetaResult { - let result: MetaResult<()> = try { - let wait_commit_timer = self.metrics.barrier_wait_commit_latency.start_timer(); - self.hummock_manager.commit_epoch(task.commit_info).await?; - if let Some((command_ctx, _)) = &task.command_context { - command_ctx.post_collect(&self).await?; +impl CompleteBarrierTask { + async fn complete_barrier( + self, + context: &impl GlobalBarrierWorkerContext, + env: MetaSrvEnv, + ) -> MetaResult { + let result: MetaResult = try { + let wait_commit_timer = GLOBAL_META_METRICS + .barrier_wait_commit_latency + .start_timer(); + let version_stats = context.commit_epoch(self.commit_info).await?; + if let Some((command_ctx, _)) = &self.command_context { + context.post_collect_command(command_ctx).await?; } wait_commit_timer.observe_duration(); + version_stats }; - { - if let Err(e) = result { - for notifier in task.notifiers { - notifier.notify_collection_failed(e.clone()); + let version_stats = { + let version_stats = match result { + Ok(version_stats) => version_stats, + Err(e) => { + for notifier in self.notifiers { + notifier.notify_collection_failed(e.clone()); + } + return Err(e); } - return Err(e); - } - task.notifiers.into_iter().for_each(|notifier| { + }; + self.notifiers.into_iter().for_each(|notifier| { notifier.notify_collected(); }); try_join_all( - task.finished_jobs + self.finished_jobs .into_iter() - .map(|finished_job| finished_job.finish(&self.metadata_manager)), + .map(|finished_job| context.finish_creating_job(finished_job)), ) .await?; - if let Some((command_ctx, enqueue_time)) = task.command_context { + if let Some((command_ctx, enqueue_time)) = self.command_context { let duration_sec = enqueue_time.stop_and_record(); - self.report_complete_event(duration_sec, &command_ctx); - self.metrics + Self::report_complete_event(env, duration_sec, &command_ctx); + GLOBAL_META_METRICS .last_committed_barrier_time .set(command_ctx.barrier_info.curr_epoch.value().as_unix_secs() as i64); } - } + version_stats + }; - Ok(self.hummock_manager.get_version_stats().await) + Ok(version_stats) } } @@ -1204,8 +1288,8 @@ impl CreateMviewProgressTracker { } } -impl GlobalBarrierWorkerContext { - fn report_complete_event(&self, duration_sec: f64, command_ctx: &CommandContext) { +impl CompleteBarrierTask { + fn report_complete_event(env: MetaSrvEnv, duration_sec: f64, command_ctx: &CommandContext) { // Record barrier latency in event log. use risingwave_pb::meta::event_log; let event = event_log::EventBarrierComplete { @@ -1215,8 +1299,7 @@ impl GlobalBarrierWorkerContext { command: command_ctx.command.to_string(), barrier_kind: command_ctx.barrier_info.kind.as_str_name().to_string(), }; - self.env - .event_log_manager_ref() + env.event_log_manager_ref() .add_event_logs(vec![event_log::Event::BarrierComplete(event)]); } } @@ -1255,7 +1338,7 @@ impl CheckpointControl { fn next_complete_barrier_task( &mut self, - mut context: Option<(&mut ScheduledBarriers, &mut ControlStreamManager)>, + mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>, ) -> Option { // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough let mut creating_jobs_task = vec![]; @@ -1372,7 +1455,7 @@ impl CheckpointControl { if !creating_jobs_task.is_empty() { let task = task.get_or_insert_default(); for (table_id, epoch, resps, is_first_time) in creating_jobs_task { - GlobalBarrierWorkerContext::collect_creating_job_commit_epoch_info( + GlobalBarrierWorkerContextImpl::collect_creating_job_commit_epoch_info( &mut task.commit_info, epoch, resps, @@ -1393,9 +1476,11 @@ impl CheckpointControl { impl CompletingTask { pub(super) fn next_completed_barrier<'a>( &'a mut self, - scheduled_barriers: &mut ScheduledBarriers, + scheduled_barriers: &mut PeriodicBarriers, checkpoint_control: &mut CheckpointControl, control_stream_manager: &mut ControlStreamManager, + context: &Arc, + env: &MetaSrvEnv, ) -> impl Future> + 'a { // If there is no completing barrier, try to start completing the earliest barrier if // it has been collected. @@ -1409,8 +1494,10 @@ impl CompletingTask { .command_context .as_ref() .map(|(command, _)| command.barrier_info.prev_epoch()); + let context = context.clone(); + let env = env.clone(); let join_handle = - tokio::spawn(checkpoint_control.context.clone().complete_barrier(task)); + tokio::spawn(async move { task.complete_barrier(&*context, env).await }); *self = CompletingTask::Completing { command_prev_epoch, join_handle, @@ -1505,7 +1592,7 @@ impl GlobalBarrierManager { } } -impl GlobalBarrierWorkerContext { +impl GlobalBarrierWorkerContextImpl { /// Resolve actor information from cluster, fragment manager and `ChangedTableId`. /// We use `changed_table_id` to modify the actors to be sent or collected. Because these actor /// will create or drop before this barrier flow through them. diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index 592f1920b68b9..3bb936a8eb2b2 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -233,8 +233,7 @@ impl TrackingJob { Ok(()) } TrackingJob::Recovered(recovered) => { - recovered - .metadata_manager + metadata_manager .catalog_controller .finish_streaming_job(recovered.id, None) .await?; @@ -268,7 +267,6 @@ impl std::fmt::Debug for TrackingJob { pub struct RecoveredTrackingJob { pub id: ObjectId, - pub metadata_manager: MetadataManager, } /// The command tracking by the [`CreateMviewProgressTracker`]. @@ -303,8 +301,7 @@ impl CreateMviewProgressTracker { /// 2. `Backfill` position. pub fn recover( mview_map: HashMap, - version_stats: HummockVersionStats, - metadata_manager: MetadataManager, + version_stats: &HummockVersionStats, ) -> Self { let mut actor_map = HashMap::new(); let mut progress_map = HashMap::new(); @@ -323,11 +320,10 @@ impl CreateMviewProgressTracker { backfill_upstream_types, table_fragments.dependent_table_ids(), definition, - &version_stats, + version_stats, ); let tracking_job = TrackingJob::Recovered(RecoveredTrackingJob { id: creating_table_id.table_id as i32, - metadata_manager: metadata_manager.clone(), }); progress_map.insert(creating_table_id, (progress, tracking_job)); } diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index ffd3c74ab6a11..f2d28a086d742 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -22,7 +22,9 @@ use risingwave_common::catalog::TableId; use risingwave_common::config::DefaultParallelism; use risingwave_common::hash::WorkerSlotId; use risingwave_common::util::epoch::Epoch; +use risingwave_connector::source::SplitImpl; use risingwave_meta_model::{StreamingParallelism, WorkerId}; +use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::{PausedReason, Recovery}; use risingwave_pb::stream_plan::barrier_mutation::Mutation; @@ -32,21 +34,22 @@ use tokio::time::Instant; use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tracing::{debug, info, warn, Instrument}; -use super::{CheckpointControl, GlobalBarrierWorker, GlobalBarrierWorkerContext, TracedEpoch}; +use super::{ + CheckpointControl, GlobalBarrierWorker, GlobalBarrierWorkerContext, RecoveryReason, TracedEpoch, +}; use crate::barrier::info::{BarrierInfo, InflightGraphInfo, InflightSubscriptionInfo}; use crate::barrier::progress::CreateMviewProgressTracker; use crate::barrier::rpc::ControlStreamManager; use crate::barrier::schedule::ScheduledBarriers; use crate::barrier::state::BarrierWorkerState; -use crate::barrier::BarrierKind; +use crate::barrier::{BarrierKind, GlobalBarrierWorkerContextImpl}; use crate::manager::ActiveStreamingWorkerNodes; -use crate::model::{TableFragments, TableParallelism}; +use crate::model::{ActorId, TableFragments, TableParallelism}; +use crate::rpc::metrics::GLOBAL_META_METRICS; use crate::stream::{build_actor_connector_splits, RescheduleOptions, TableResizePolicy}; use crate::{model, MetaError, MetaResult}; -impl GlobalBarrierWorker { - // Migration timeout. - const RECOVERY_FORCE_MIGRATION_TIMEOUT: Duration = Duration::from_secs(300); +impl GlobalBarrierWorker { // Retry base interval in milliseconds. const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20; // Retry max interval. @@ -61,7 +64,7 @@ impl GlobalBarrierWorker { } } -impl GlobalBarrierWorkerContext { +impl GlobalBarrierWorkerContextImpl { /// Clean catalogs for creating streaming jobs that are in foreground mode or table fragments not persisted. async fn clean_dirty_streaming_jobs(&self) -> MetaResult<()> { self.metadata_manager @@ -89,7 +92,9 @@ impl GlobalBarrierWorkerContext { } // FIXME: didn't consider Values here - async fn recover_background_mv_progress(&self) -> MetaResult { + async fn recover_background_mv_progress( + &self, + ) -> MetaResult> { let mgr = &self.metadata_manager; let mviews = mgr .catalog_controller @@ -107,25 +112,22 @@ impl GlobalBarrierWorkerContext { mview_map.insert(table_id, (mview.definition.clone(), table_fragments)); } - let version_stats = self.hummock_manager.get_version_stats().await; // If failed, enter recovery mode. - Ok(CreateMviewProgressTracker::recover( - mview_map, - version_stats, - mgr.clone(), - )) + Ok(mview_map) } +} +impl ScheduledBarriers { /// Pre buffered drop and cancel command, return true if any. - fn pre_apply_drop_cancel(&self, scheduled_barriers: &ScheduledBarriers) -> bool { - let (dropped_actors, cancelled) = scheduled_barriers.pre_apply_drop_cancel_scheduled(); + fn pre_apply_drop_cancel(&self) -> bool { + let (dropped_actors, cancelled) = self.pre_apply_drop_cancel_scheduled(); !dropped_actors.is_empty() || !cancelled.is_empty() } } -impl GlobalBarrierWorker { +impl GlobalBarrierWorker { /// Recovery the whole cluster from the latest epoch. /// /// If `paused_reason` is `Some`, all data sources (including connectors and DMLs) will be @@ -133,56 +135,57 @@ impl GlobalBarrierWorker { /// the cluster or `risectl` command. Used for debugging purpose. /// /// Returns the new state of the barrier manager after recovery. - pub async fn recovery(&mut self, paused_reason: Option, err: Option) { - // Mark blocked and abort buffered schedules, they might be dirty already. - self.scheduled_barriers - .abort_and_mark_blocked("cluster is under recovering"); + pub async fn recovery( + &mut self, + paused_reason: Option, + err: Option, + recovery_reason: RecoveryReason, + ) { + self.context.abort_and_mark_blocked(recovery_reason); // Clear all control streams to release resources (connections to compute nodes) first. self.control_stream_manager.clear(); - tracing::info!("recovery start!"); - let retry_strategy = Self::get_retry_strategy(); - - // We take retry into consideration because this is the latency user sees for a cluster to - // get recovered. - let recovery_timer = self.context.metrics.recovery_latency.start_timer(); - - let new_state = tokio_retry::Retry::spawn(retry_strategy, || { - async { - let recovery_result: MetaResult<_> = try { - if let Some(err) = &err { - self.context - .metadata_manager - .notify_finish_failed(err) - .await; - } + self.recovery_inner(paused_reason, err).await; + self.context.mark_ready(); + } +} - self.context - .clean_dirty_streaming_jobs() +impl GlobalBarrierWorkerContextImpl { + pub(super) async fn reload_runtime_info_impl( + &self, + ) -> MetaResult<( + ActiveStreamingWorkerNodes, + InflightGraphInfo, + InflightSubscriptionInfo, + Option, + HashMap>, + HashMap>, + HashMap, + HummockVersionStats, + )> { + { + { + { + self.clean_dirty_streaming_jobs() .await .context("clean dirty streaming jobs")?; // Mview progress needs to be recovered. tracing::info!("recovering mview progress"); - let tracker = self - .context + let background_mview_progress = self .recover_background_mv_progress() .await .context("recover mview progress should not fail")?; tracing::info!("recovered mview progress"); // This is a quick path to accelerate the process of dropping and canceling streaming jobs. - let _ = self - .context - .pre_apply_drop_cancel(&self.scheduled_barriers); + let _ = self.scheduled_barriers.pre_apply_drop_cancel(); - let mut active_streaming_nodes = ActiveStreamingWorkerNodes::new_snapshot( - self.context.metadata_manager.clone(), - ) - .await?; + let mut active_streaming_nodes = + ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone()) + .await?; let background_streaming_jobs = self - .context .metadata_manager .list_background_creating_jobs() .await?; @@ -194,44 +197,37 @@ impl GlobalBarrierWorker { let mut info = if !self.env.opts.disable_automatic_parallelism_control && background_streaming_jobs.is_empty() { - self.context - .scale_actors(&active_streaming_nodes) + self.scale_actors(&active_streaming_nodes) .await .inspect_err(|err| { warn!(error = %err.as_report(), "scale actors failed"); })?; - self.context.resolve_graph_info().await.inspect_err(|err| { + self.resolve_graph_info().await.inspect_err(|err| { warn!(error = %err.as_report(), "resolve actor info failed"); })? } else { // Migrate actors in expired CN to newly joined one. - self.context - .migrate_actors(&mut active_streaming_nodes) + self.migrate_actors(&mut active_streaming_nodes) .await .inspect_err(|err| { warn!(error = %err.as_report(), "migrate actors failed"); })? }; - if self - .context - .pre_apply_drop_cancel(&self.scheduled_barriers) - { - info = self.context.resolve_graph_info().await.inspect_err(|err| { + if self.scheduled_barriers.pre_apply_drop_cancel() { + info = self.resolve_graph_info().await.inspect_err(|err| { warn!(error = %err.as_report(), "resolve actor info failed"); })? } let info = info; - self.context - .purge_state_table_from_hummock(&info.existing_table_ids().collect()) + self.purge_state_table_from_hummock(&info.existing_table_ids().collect()) .await .context("purge state table from hummock")?; - let (prev_epoch, version_id) = self - .context + let prev_epoch = self .hummock_manager .on_current_version(|version| { let state_table_info = version.state_table_info.info(); @@ -257,44 +253,23 @@ impl GlobalBarrierWorker { ); } } - ( - committed_epoch.map(|committed_epoch| { - TracedEpoch::new(Epoch::from(committed_epoch)) - }), - version.id, - ) + committed_epoch.map(|committed_epoch| { + TracedEpoch::new(Epoch::from(committed_epoch)) + }) }) .await; - let mut control_stream_manager = - ControlStreamManager::new(self.context.clone()); - let subscription_info = InflightSubscriptionInfo { mv_depended_subscriptions: self - .context .metadata_manager .get_mv_depended_subscriptions() .await?, }; - let reset_start_time = Instant::now(); - control_stream_manager - .reset( - version_id, - &subscription_info, - active_streaming_nodes.current(), - ) - .await - .inspect_err(|err| { - warn!(error = %err.as_report(), "reset compute nodes failed"); - })?; - info!(elapsed=?reset_start_time.elapsed(), "control stream reset"); - - self.context.sink_manager.reset().await; + self.sink_manager.reset().await; // update and build all actors. let node_actors = self - .context .load_all_actors(&info, &active_streaming_nodes) .await .inspect_err(|err| { @@ -302,8 +277,67 @@ impl GlobalBarrierWorker { })?; // get split assignments for all actors - let source_split_assignments = - self.context.source_manager.list_assignments().await; + let source_split_assignments = self.source_manager.list_assignments().await; + Ok(( + active_streaming_nodes, + info, + subscription_info, + prev_epoch, + node_actors, + source_split_assignments, + background_mview_progress, + self.hummock_manager.get_version_stats().await, + )) + } + } + } + } +} + +impl GlobalBarrierWorker { + async fn recovery_inner( + &mut self, + paused_reason: Option, + err: Option, + ) { + tracing::info!("recovery start!"); + let retry_strategy = Self::get_retry_strategy(); + + // We take retry into consideration because this is the latency user sees for a cluster to + // get recovered. + let recovery_timer = GLOBAL_META_METRICS.recovery_latency.start_timer(); + + let new_state = tokio_retry::Retry::spawn(retry_strategy, || async { + if let Some(err) = &err { + self.context.notify_creating_job_failed(err).await; + }; + let ( + active_streaming_nodes, + info, + subscription_info, + prev_epoch, + node_actors, + source_split_assignments, + background_mview_progress, + version_stats, + ) = self.context.reload_runtime_info().await?; + + let mut control_stream_manager = ControlStreamManager::new(self.env.clone()); + let reset_start_time = Instant::now(); + control_stream_manager + .reset( + &subscription_info, + active_streaming_nodes.current(), + &*self.context, + ) + .await + .inspect_err(|err| { + warn!(error = %err.as_report(), "reset compute nodes failed"); + })?; + info!(elapsed=?reset_start_time.elapsed(), "control stream reset"); + + let recovery_result: MetaResult<_> = try { + { let mutation = Mutation::Add(AddMutation { // Actors built during recovery is not treated as newly added actors. actor_dispatchers: Default::default(), @@ -334,9 +368,8 @@ impl GlobalBarrierWorker { )?; debug!(?node_to_collect, "inject initial barrier"); while !node_to_collect.is_empty() { - let (worker_id, result) = control_stream_manager - .next_collect_barrier_response() - .await; + let (worker_id, result) = + control_stream_manager.next_collect_barrier_response().await; let resp = result?; assert_eq!(resp.epoch, barrier_info.prev_epoch()); assert!(node_to_collect.remove(&worker_id)); @@ -352,30 +385,34 @@ impl GlobalBarrierWorker { BarrierWorkerState::new(new_epoch, info, subscription_info, paused_reason), active_streaming_nodes, control_stream_manager, - tracker, + CreateMviewProgressTracker::recover( + background_mview_progress, + &version_stats, + ), + version_stats, ) - }; - if recovery_result.is_err() { - self.context.metrics.recovery_failure_cnt.inc(); } - recovery_result + }; + if recovery_result.is_err() { + GLOBAL_META_METRICS.recovery_failure_cnt.inc(); } - .instrument(tracing::info_span!("recovery_attempt")) + recovery_result }) + .instrument(tracing::info_span!("recovery_attempt")) .await .expect("Retry until recovery success."); recovery_timer.observe_duration(); - self.scheduled_barriers.mark_ready(); let create_mview_tracker: CreateMviewProgressTracker; - let state: BarrierWorkerState; + let version_stats: HummockVersionStats; ( state, self.active_streaming_nodes, self.control_stream_manager, create_mview_tracker, + version_stats, ) = new_state; tracing::info!( @@ -385,7 +422,7 @@ impl GlobalBarrierWorker { ); self.checkpoint_control = - CheckpointControl::new(self.context.clone(), create_mview_tracker, state).await; + CheckpointControl::new(create_mview_tracker, state, version_stats); self.env .notification_manager() @@ -393,7 +430,10 @@ impl GlobalBarrierWorker { } } -impl GlobalBarrierWorkerContext { +impl GlobalBarrierWorkerContextImpl { + // Migration timeout. + const RECOVERY_FORCE_MIGRATION_TIMEOUT: Duration = Duration::from_secs(300); + /// Migrate actors in expired CNs to newly joined ones, return true if any actor is migrated. async fn migrate_actors( &self, @@ -452,7 +492,7 @@ impl GlobalBarrierWorkerContext { let mut available_size = new_worker_slots.len(); if available_size < to_migration_size - && start.elapsed() > GlobalBarrierWorker::RECOVERY_FORCE_MIGRATION_TIMEOUT + && start.elapsed() > Self::RECOVERY_FORCE_MIGRATION_TIMEOUT { let mut factor = 2; @@ -512,7 +552,7 @@ impl GlobalBarrierWorkerContext { let changed = active_nodes .wait_changed( Duration::from_millis(5000), - GlobalBarrierWorker::RECOVERY_FORCE_MIGRATION_TIMEOUT, + Self::RECOVERY_FORCE_MIGRATION_TIMEOUT, |active_nodes| { let current_nodes = active_nodes .current() @@ -756,7 +796,7 @@ mod tests { // total 10, assigned custom, actual 5, default full -> fixed(5) assert_eq!( TableParallelism::Fixed(5), - GlobalBarrierWorkerContext::derive_target_parallelism( + GlobalBarrierWorkerContextImpl::derive_target_parallelism( 10, TableParallelism::Custom, Some(5), @@ -767,7 +807,7 @@ mod tests { // total 10, assigned custom, actual 10, default full -> adaptive assert_eq!( TableParallelism::Adaptive, - GlobalBarrierWorkerContext::derive_target_parallelism( + GlobalBarrierWorkerContextImpl::derive_target_parallelism( 10, TableParallelism::Custom, Some(10), @@ -778,7 +818,7 @@ mod tests { // total 10, assigned custom, actual 11, default full -> adaptive assert_eq!( TableParallelism::Adaptive, - GlobalBarrierWorkerContext::derive_target_parallelism( + GlobalBarrierWorkerContextImpl::derive_target_parallelism( 10, TableParallelism::Custom, Some(11), @@ -789,7 +829,7 @@ mod tests { // total 10, assigned fixed(5), actual _, default full -> fixed(5) assert_eq!( TableParallelism::Adaptive, - GlobalBarrierWorkerContext::derive_target_parallelism( + GlobalBarrierWorkerContextImpl::derive_target_parallelism( 10, TableParallelism::Custom, None, @@ -800,7 +840,7 @@ mod tests { // total 10, assigned adaptive, actual _, default full -> adaptive assert_eq!( TableParallelism::Adaptive, - GlobalBarrierWorkerContext::derive_target_parallelism( + GlobalBarrierWorkerContextImpl::derive_target_parallelism( 10, TableParallelism::Adaptive, None, @@ -811,7 +851,7 @@ mod tests { // total 10, assigned adaptive, actual 5, default 5 -> fixed(5) assert_eq!( TableParallelism::Fixed(5), - GlobalBarrierWorkerContext::derive_target_parallelism( + GlobalBarrierWorkerContextImpl::derive_target_parallelism( 10, TableParallelism::Adaptive, Some(5), @@ -822,7 +862,7 @@ mod tests { // total 10, assigned adaptive, actual 6, default 5 -> adaptive assert_eq!( TableParallelism::Adaptive, - GlobalBarrierWorkerContext::derive_target_parallelism( + GlobalBarrierWorkerContextImpl::derive_target_parallelism( 10, TableParallelism::Adaptive, Some(6), diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 62c2615eb37c4..70de1554c6faa 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -25,7 +25,6 @@ use futures::StreamExt; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::util::tracing::TracingContext; -use risingwave_hummock_sdk::HummockVersionId; use risingwave_meta_model::WorkerId; use risingwave_pb::common::{ActorInfo, WorkerNode}; use risingwave_pb::meta::PausedReason; @@ -44,8 +43,12 @@ use tokio_retry::strategy::ExponentialBackoff; use tracing::{error, info, warn}; use uuid::Uuid; -use super::{Command, GlobalBarrierWorkerContext, InflightSubscriptionInfo}; +use super::{ + Command, GlobalBarrierWorker, GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl, + InflightSubscriptionInfo, +}; use crate::barrier::info::{BarrierInfo, InflightGraphInfo}; +use crate::manager::MetaSrvEnv; use crate::{MetaError, MetaResult}; const COLLECT_ERROR_TIMEOUT: Duration = Duration::from_secs(3); @@ -56,15 +59,15 @@ struct ControlStreamNode { } pub(super) struct ControlStreamManager { - context: GlobalBarrierWorkerContext, nodes: HashMap, + env: MetaSrvEnv, } impl ControlStreamManager { - pub(super) fn new(context: GlobalBarrierWorkerContext) -> Self { + pub(super) fn new(env: MetaSrvEnv) -> Self { Self { - context, nodes: Default::default(), + env, } } @@ -72,34 +75,34 @@ impl ControlStreamManager { &mut self, node: WorkerNode, subscription: &InflightSubscriptionInfo, + context: &impl GlobalBarrierWorkerContext, ) { let node_id = node.id as WorkerId; if self.nodes.contains_key(&node_id) { warn!(id = node.id, host = ?node.host, "node already exists"); return; } - let version_id = self - .context - .hummock_manager - .on_current_version(|version| version.id) - .await; let node_host = node.host.clone().unwrap(); let mut backoff = ExponentialBackoff::from_millis(100) .max_delay(Duration::from_secs(3)) .factor(5); const MAX_RETRY: usize = 5; for i in 1..=MAX_RETRY { - match self - .context - .new_control_stream_node( - node.clone(), - version_id, - &subscription.mv_depended_subscriptions, - ) + match context + .new_control_stream(&node, &subscription.mv_depended_subscriptions) .await { - Ok(stream_node) => { - assert!(self.nodes.insert(node_id, stream_node).is_none()); + Ok(handle) => { + assert!(self + .nodes + .insert( + node_id, + ControlStreamNode { + worker: node.clone(), + handle, + } + ) + .is_none()); info!(?node_host, "add control stream worker"); return; } @@ -117,20 +120,21 @@ impl ControlStreamManager { pub(super) async fn reset( &mut self, - version_id: HummockVersionId, subscriptions: &InflightSubscriptionInfo, nodes: &HashMap, + context: &impl GlobalBarrierWorkerContext, ) -> MetaResult<()> { - let nodes = try_join_all(nodes.iter().map(|(worker_id, node)| async { - let node = self - .context - .new_control_stream_node( - node.clone(), - version_id, - &subscriptions.mv_depended_subscriptions, - ) + let nodes = try_join_all(nodes.iter().map(|(worker_id, node)| async move { + let handle = context + .new_control_stream(node, &subscriptions.mv_depended_subscriptions) .await?; - Result::<_, MetaError>::Ok((*worker_id, node)) + Result::<_, MetaError>::Ok(( + *worker_id, + ControlStreamNode { + worker: node.clone(), + handle, + }, + )) })) .await?; self.nodes.clear(); @@ -143,7 +147,7 @@ impl ControlStreamManager { /// Clear all nodes and response streams in the manager. pub(super) fn clear(&mut self) { - *self = Self::new(self.context.clone()); + *self = Self::new(self.env.clone()); } async fn next_response( @@ -406,8 +410,7 @@ impl ControlStreamManager { cur_epoch: barrier_info.curr_epoch.value().0, error: e.to_report_string(), }; - self.context - .env + self.env .event_log_manager_ref() .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]); })?; @@ -435,26 +438,28 @@ impl ControlStreamManager { } } -impl GlobalBarrierWorkerContext { - async fn new_control_stream_node( +impl GlobalBarrierWorkerContextImpl { + pub(super) async fn new_control_stream_impl( &self, - node: WorkerNode, - initial_version_id: HummockVersionId, + node: &WorkerNode, mv_depended_subscriptions: &HashMap>, - ) -> MetaResult { + ) -> MetaResult { + let initial_version_id = self + .hummock_manager + .on_current_version(|version| version.id) + .await; let handle = self .env .stream_client_pool() - .get(&node) + .get(node) .await? .start_streaming_control(initial_version_id, mv_depended_subscriptions) .await?; - Ok(ControlStreamNode { - worker: node.clone(), - handle, - }) + Ok(handle) } +} +impl GlobalBarrierWorker { /// Send barrier-complete-rpc and wait for responses from all CNs pub(super) fn report_collect_failure(&self, barrier_info: &BarrierInfo, error: &MetaError) { // Record failure in event log. diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index fa1044bfb5fe8..c6e75b84c303b 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -20,6 +20,7 @@ use std::time::{Duration, Instant}; use anyhow::{anyhow, Context}; use assert_matches::assert_matches; use parking_lot::Mutex; +use prometheus::HistogramTimer; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::HummockVersionId; use risingwave_pb::meta::PausedReason; @@ -28,7 +29,7 @@ use tokio::sync::{oneshot, watch}; use tokio::time::Interval; use super::notifier::Notifier; -use super::{Command, Scheduled}; +use super::{Command, GlobalBarrierWorkerContext, Scheduled}; use crate::hummock::HummockManagerRef; use crate::model::ActorId; use crate::rpc::metrics::MetaMetrics; @@ -56,8 +57,16 @@ enum QueueStatus { Blocked(String), } +struct ScheduledQueueItem { + command: Command, + checkpoint: bool, + notifiers: Vec, + send_latency_timer: HistogramTimer, + span: tracing::Span, +} + pub(super) struct ScheduledQueue { - queue: VecDeque, + queue: VecDeque, status: QueueStatus, } @@ -81,7 +90,7 @@ impl ScheduledQueue { self.queue.len() } - fn push_back(&mut self, scheduled: Scheduled) -> MetaResult<()> { + fn push_back(&mut self, scheduled: ScheduledQueueItem) -> MetaResult<()> { // We don't allow any command to be scheduled when the queue is blocked, except for dropping streaming jobs. // Because we allow dropping streaming jobs when the cluster is under recovery, so we have to buffer the drop // command and execute it when the cluster is ready to clean up it. @@ -100,6 +109,18 @@ impl ScheduledQueue { } } +fn tracing_span() -> tracing::Span { + if tracing::Span::current().is_none() { + tracing::Span::none() + } else { + tracing::info_span!( + "barrier", + checkpoint = tracing::field::Empty, + epoch = tracing::field::Empty + ) + } +} + impl Inner { /// Create a new scheduled barrier with the given `checkpoint`, `command` and `notifiers`. fn new_scheduled( @@ -107,15 +128,11 @@ impl Inner { checkpoint: bool, command: Command, notifiers: impl IntoIterator, - ) -> Scheduled { + ) -> ScheduledQueueItem { // Create a span only if we're being traced, instead of for every periodic barrier. - let span = if tracing::Span::current().is_none() { - tracing::Span::none() - } else { - tracing::info_span!("barrier", checkpoint, epoch = tracing::field::Empty) - }; + let span = tracing_span(); - Scheduled { + ScheduledQueueItem { command, notifiers: notifiers.into_iter().collect(), send_latency_timer: self.metrics.barrier_send_latency.start_timer(), @@ -141,12 +158,7 @@ impl BarrierScheduler { pub fn new_pair( hummock_manager: HummockManagerRef, metrics: Arc, - checkpoint_frequency: usize, ) -> (Self, ScheduledBarriers) { - tracing::info!( - "Starting barrier scheduler with: checkpoint_frequency={:?}", - checkpoint_frequency, - ); let inner = Arc::new(Inner { queue: Mutex::new(ScheduledQueue::new()), changed_tx: watch::channel(()).0, @@ -158,18 +170,12 @@ impl BarrierScheduler { inner: inner.clone(), hummock_manager, }, - ScheduledBarriers { - num_uncheckpointed_barrier: 0, - force_checkpoint: false, - checkpoint_frequency, - inner, - min_interval: None, - }, + ScheduledBarriers { inner }, ) } /// Push a scheduled barrier into the queue. - fn push(&self, scheduleds: impl IntoIterator) -> MetaResult<()> { + fn push(&self, scheduleds: impl IntoIterator) -> MetaResult<()> { let mut queue = self.inner.queue.lock(); for scheduled in scheduleds { queue.push_back(scheduled)?; @@ -210,7 +216,7 @@ impl BarrierScheduler { ) -> MetaResult<()> { let mut queue = self.inner.queue.lock(); match queue.queue.front_mut() { - Some(Scheduled { + Some(ScheduledQueueItem { notifiers, checkpoint, .. @@ -330,9 +336,13 @@ impl BarrierScheduler { } /// The receiver side of the barrier scheduling queue. -/// Held by the [`super::GlobalBarrierWorker`] to execute these commands. pub struct ScheduledBarriers { - min_interval: Option, + inner: Arc, +} + +/// Held by the [`super::GlobalBarrierWorker`] to execute these commands. +pub(super) struct PeriodicBarriers { + min_interval: Interval, /// Force checkpoint in next barrier. force_checkpoint: bool, @@ -340,36 +350,46 @@ pub struct ScheduledBarriers { /// The numbers of barrier (checkpoint = false) since the last barrier (checkpoint = true) num_uncheckpointed_barrier: usize, checkpoint_frequency: usize, - inner: Arc, } -impl ScheduledBarriers { +impl PeriodicBarriers { + pub(super) fn new(min_interval: Duration, checkpoint_frequency: usize) -> PeriodicBarriers { + Self { + min_interval: tokio::time::interval(min_interval), + force_checkpoint: false, + num_uncheckpointed_barrier: 0, + checkpoint_frequency, + } + } + pub(super) fn set_min_interval(&mut self, min_interval: Duration) { - let set_new_interval = match &self.min_interval { - None => true, - Some(prev_min_interval) => min_interval != prev_min_interval.period(), - }; + let set_new_interval = min_interval != self.min_interval.period(); if set_new_interval { let mut min_interval = tokio::time::interval(min_interval); min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); - self.min_interval = Some(min_interval); + self.min_interval = min_interval; } } - pub(super) async fn next_barrier(&mut self) -> Scheduled { + pub(super) async fn next_barrier( + &mut self, + context: &impl GlobalBarrierWorkerContext, + ) -> Scheduled { let checkpoint = self.try_get_checkpoint(); let scheduled = select! { biased; - mut scheduled = self.inner.next_scheduled() => { - if let Some(min_interval) = &mut self.min_interval { - min_interval.reset(); - } + mut scheduled = context.next_scheduled() => { + self.min_interval.reset(); scheduled.checkpoint = scheduled.checkpoint || checkpoint; scheduled }, - _ = self.min_interval.as_mut().expect("should have set min interval").tick() => { - self.inner - .new_scheduled(checkpoint, Command::barrier(), std::iter::empty()) + _ = self.min_interval.tick() => { + Scheduled { + command: Command::barrier(), + notifiers: vec![], + span: tracing_span(), + checkpoint, + } } }; self.update_num_uncheckpointed_barrier(scheduled.checkpoint); @@ -377,14 +397,20 @@ impl ScheduledBarriers { } } -impl Inner { - async fn next_scheduled(&self) -> Scheduled { +impl ScheduledBarriers { + pub(super) async fn next_scheduled(&self) -> Scheduled { loop { - let mut rx = self.changed_tx.subscribe(); + let mut rx = self.inner.changed_tx.subscribe(); { - let mut queue = self.queue.lock(); - if let Some(scheduled) = queue.queue.pop_front() { - break scheduled; + let mut queue = self.inner.queue.lock(); + if let Some(item) = queue.queue.pop_front() { + item.send_latency_timer.observe_duration(); + break Scheduled { + command: item.command, + notifiers: item.notifiers, + span: item.span, + checkpoint: item.checkpoint, + }; } } rx.changed().await.unwrap(); @@ -398,7 +424,7 @@ impl ScheduledBarriers { pub(super) fn abort_and_mark_blocked(&self, reason: impl Into + Copy) { let mut queue = self.inner.queue.lock(); queue.mark_blocked(reason.into()); - while let Some(Scheduled { notifiers, .. }) = queue.queue.pop_front() { + while let Some(ScheduledQueueItem { notifiers, .. }) = queue.queue.pop_front() { notifiers .into_iter() .for_each(|notify| notify.notify_collection_failed(anyhow!(reason.into()).into())) @@ -418,7 +444,7 @@ impl ScheduledBarriers { assert_matches!(queue.status, QueueStatus::Blocked(_)); let (mut dropped_actors, mut cancel_table_ids) = (vec![], HashSet::new()); - while let Some(Scheduled { + while let Some(ScheduledQueueItem { notifiers, command, .. }) = queue.queue.pop_front() { @@ -440,7 +466,9 @@ impl ScheduledBarriers { } (dropped_actors, cancel_table_ids) } +} +impl PeriodicBarriers { /// Whether the barrier(checkpoint = true) should be injected. fn try_get_checkpoint(&self) -> bool { self.num_uncheckpointed_barrier + 1 >= self.checkpoint_frequency || self.force_checkpoint diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index 48dfbfaf55c0a..fd94167d8dc2e 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -58,13 +58,16 @@ pub(crate) enum ActiveStreamingWorkerChange { pub struct ActiveStreamingWorkerNodes { worker_nodes: HashMap, rx: UnboundedReceiver, + #[cfg_attr(not(debug_assertions), expect(dead_code))] + meta_manager: MetadataManager, } impl ActiveStreamingWorkerNodes { - pub(crate) fn uninitialized() -> Self { + pub(crate) fn uninitialized(meta_manager: MetadataManager) -> Self { Self { worker_nodes: Default::default(), rx: unbounded_channel().1, + meta_manager, } } @@ -76,6 +79,7 @@ impl ActiveStreamingWorkerNodes { Ok(Self { worker_nodes: nodes.into_iter().map(|node| (node.id as _, node)).collect(), rx, + meta_manager, }) } @@ -191,6 +195,51 @@ impl ActiveStreamingWorkerNodes { ret } + + #[cfg(debug_assertions)] + pub(crate) async fn validate_change(&self) { + use risingwave_pb::common::WorkerNode; + use thiserror_ext::AsReport; + match self + .meta_manager + .list_active_streaming_compute_nodes() + .await + { + Ok(worker_nodes) => { + let ignore_irrelevant_info = |node: &WorkerNode| { + ( + node.id, + WorkerNode { + id: node.id, + r#type: node.r#type, + host: node.host.clone(), + parallelism: node.parallelism, + property: node.property.clone(), + resource: node.resource.clone(), + ..Default::default() + }, + ) + }; + let worker_nodes: HashMap<_, _> = + worker_nodes.iter().map(ignore_irrelevant_info).collect(); + let curr_worker_nodes: HashMap<_, _> = self + .current() + .values() + .map(ignore_irrelevant_info) + .collect(); + if worker_nodes != curr_worker_nodes { + warn!( + ?worker_nodes, + ?curr_worker_nodes, + "different to global snapshot" + ); + } + } + Err(e) => { + warn!(e = ?e.as_report(), "fail to list_active_streaming_compute_nodes to compare with local snapshot"); + } + } + } } impl MetadataManager { From b0dad757e8a3de951ef03ce83feffbb3a1b43c48 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Tue, 29 Oct 2024 01:45:43 +0800 Subject: [PATCH 4/9] perf(over window): skip remaining affected rows when rank is not changed (#18950) Signed-off-by: Richard Chien --- src/expr/core/src/window_function/kind.rs | 5 ++-- .../plan_node/logical_over_window.rs | 2 +- .../optimizer/rule/over_window_split_rule.rs | 2 +- .../rule/over_window_to_topn_rule.rs | 2 +- .../src/executor/over_window/general.rs | 10 ++++++++ .../executor/over_window/over_partition.rs | 25 +++++++++++++++++-- 6 files changed, 39 insertions(+), 7 deletions(-) diff --git a/src/expr/core/src/window_function/kind.rs b/src/expr/core/src/window_function/kind.rs index 9abea9b8655a4..6bf5ea8c45e5f 100644 --- a/src/expr/core/src/window_function/kind.rs +++ b/src/expr/core/src/window_function/kind.rs @@ -13,6 +13,7 @@ // limitations under the License. use anyhow::Context; +use enum_as_inner::EnumAsInner; use parse_display::{Display, FromStr}; use risingwave_common::bail; @@ -20,7 +21,7 @@ use crate::aggregate::AggType; use crate::Result; /// Kind of window functions. -#[derive(Debug, Display, FromStr /* for builtin */, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Display, FromStr /* for builtin */, Clone, PartialEq, Eq, Hash, EnumAsInner)] #[display(style = "snake_case")] pub enum WindowFuncKind { // General-purpose window functions. @@ -64,7 +65,7 @@ impl WindowFuncKind { } impl WindowFuncKind { - pub fn is_rank(&self) -> bool { + pub fn is_numbering(&self) -> bool { matches!(self, Self::RowNumber | Self::Rank | Self::DenseRank) } } diff --git a/src/frontend/src/optimizer/plan_node/logical_over_window.rs b/src/frontend/src/optimizer/plan_node/logical_over_window.rs index a790e006ddd97..7273e7d418515 100644 --- a/src/frontend/src/optimizer/plan_node/logical_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_over_window.rs @@ -278,7 +278,7 @@ impl LogicalOverWindow { let rewritten_selected_items = over_window_builder.rewrite_selected_items(select_exprs)?; for window_func in &window_functions { - if window_func.kind.is_rank() && window_func.order_by.sort_exprs.is_empty() { + if window_func.kind.is_numbering() && window_func.order_by.sort_exprs.is_empty() { return Err(ErrorCode::InvalidInputSyntax(format!( "window rank function without order by: {:?}", window_func diff --git a/src/frontend/src/optimizer/rule/over_window_split_rule.rs b/src/frontend/src/optimizer/rule/over_window_split_rule.rs index b8114f85302ca..85510a5ff9746 100644 --- a/src/frontend/src/optimizer/rule/over_window_split_rule.rs +++ b/src/frontend/src/optimizer/rule/over_window_split_rule.rs @@ -36,7 +36,7 @@ impl Rule for OverWindowSplitRule { .iter() .enumerate() .map(|(idx, func)| { - let func_seq = if func.kind.is_rank() { + let func_seq = if func.kind.is_numbering() { rank_func_seq += 1; rank_func_seq } else { diff --git a/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs b/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs index 76fbb649038a6..10ab630a64e3a 100644 --- a/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs +++ b/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs @@ -70,7 +70,7 @@ impl Rule for OverWindowToTopNRule { return None; } let window_func = &over_window.window_functions()[0]; - if !window_func.kind.is_rank() { + if !window_func.kind.is_numbering() { // Only rank functions can be converted to TopN. return None; } diff --git a/src/stream/src/executor/over_window/general.rs b/src/stream/src/executor/over_window/general.rs index 1a5073b6e9546..16bc8065f2a00 100644 --- a/src/stream/src/executor/over_window/general.rs +++ b/src/stream/src/executor/over_window/general.rs @@ -152,6 +152,11 @@ pub(super) struct Calls { pub(super) end_is_unbounded: bool, /// Deduplicated indices of all arguments of all calls. pub(super) all_arg_indices: Vec, + + // TODO(rc): The following flags are used to optimize for `row_number`, `rank` and `dense_rank`. + // We should try our best to remove these flags while maintaining the performance in the future. + pub(super) numbering_only: bool, + pub(super) has_rank: bool, } impl Calls { @@ -180,6 +185,9 @@ impl Calls { .dedup() .collect(); + let numbering_only = calls.iter().all(|call| call.kind.is_numbering()); + let has_rank = calls.iter().any(|call| call.kind.is_rank()); + Self { calls, super_rows_frame_bounds, @@ -187,6 +195,8 @@ impl Calls { start_is_unbounded, end_is_unbounded, all_arg_indices, + numbering_only, + has_rank, } } diff --git a/src/stream/src/executor/over_window/over_partition.rs b/src/stream/src/executor/over_window/over_partition.rs index 93f17703f3073..c2f8ea895dccb 100644 --- a/src/stream/src/executor/over_window/over_partition.rs +++ b/src/stream/src/executor/over_window/over_partition.rs @@ -395,8 +395,10 @@ impl<'a, S: StateStore> OverPartition<'a, S> { BTreeMap>, Option>, )> { - let input_schema_len = table.get_data_types().len() - self.calls.len(); let calls = self.calls; + let input_schema_len = table.get_data_types().len() - calls.len(); + let numbering_only = calls.numbering_only; + let has_rank = calls.has_rank; // return values let mut part_changes = BTreeMap::new(); @@ -413,6 +415,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> { let snapshot = part_with_delta.snapshot(); let delta = part_with_delta.delta(); + let last_delta_key = delta.last_key_value().map(|(k, _)| k.as_normal_expect()); // Generate delete changes first, because deletes are skipped during iteration over // `part_with_delta` in the next step. @@ -442,6 +445,8 @@ impl<'a, S: StateStore> OverPartition<'a, S> { assert!(last_curr_key.is_normal()); assert!(last_frame_end.is_normal()); + let last_delta_key = last_delta_key.unwrap(); + if let Some(accessed_range) = accessed_range.as_mut() { let min_start = first_frame_start .as_normal_expect() @@ -504,12 +509,28 @@ impl<'a, S: StateStore> OverPartition<'a, S> { let (key, row) = curr_key_cursor .key_value() .expect("cursor must be valid until `last_curr_key`"); + let mut should_continue = true; + let output = states.slide_no_evict_hint()?; compute_count += 1; let old_output = &row.as_inner()[input_schema_len..]; if !old_output.is_empty() && old_output == output { same_output_count += 1; + + if numbering_only { + if has_rank { + // It's possible that an `Insert` doesn't affect it's ties but affects + // all the following rows, so we need to check the `order_key`. + if key.as_normal_expect().order_key > last_delta_key.order_key { + // there won't be any more changes after this point, we can stop early + should_continue = false; + } + } else if key.as_normal_expect() >= last_delta_key { + // there won't be any more changes after this point, we can stop early + should_continue = false; + } + } } let new_row = OwnedRow::new( @@ -542,7 +563,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> { curr_key_cursor.move_next(); - key != last_curr_key + should_continue && key != last_curr_key } {} } From d44814ddba49a768c72578c785a8681e70b5e3a6 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 29 Oct 2024 09:41:03 +0800 Subject: [PATCH 5/9] chore(deps): Bump notify from 6.1.1 to 7.0.0 (#19137) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- Cargo.lock | 43 +++++++++++++++++++++++++++++++++---------- src/meta/Cargo.toml | 2 +- 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 997fe3a8d99ae..e4fa7107f874b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3267,7 +3267,7 @@ dependencies = [ "bitflags 1.3.2", "crossterm_winapi", "libc", - "mio", + "mio 0.8.11", "parking_lot 0.12.1", "signal-hook", "signal-hook-mio", @@ -6525,9 +6525,9 @@ dependencies = [ [[package]] name = "inotify" -version = "0.9.6" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" +checksum = "fdd168d97690d0b8c412d6b6c10360277f4d7ee495c5d0d5d5fe0854923255cc" dependencies = [ "bitflags 1.3.2", "inotify-sys", @@ -7575,6 +7575,19 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "mio" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" +dependencies = [ + "hermit-abi 0.3.9", + "libc", + "log", + "wasi", + "windows-sys 0.52.0", +] + [[package]] name = "mirai-annotations" version = "1.12.0" @@ -7739,7 +7752,7 @@ dependencies = [ "keyed_priority_queue", "lazy_static", "lru 0.12.0", - "mio", + "mio 0.8.11", "mysql_common", "native-tls", "once_cell", @@ -7920,9 +7933,9 @@ checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" [[package]] name = "notify" -version = "6.1.1" +version = "7.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d" +checksum = "c533b4c39709f9ba5005d8002048266593c1cfaf3c5f0739d5b8ab0c6c504009" dependencies = [ "bitflags 2.6.0", "filetime", @@ -7931,9 +7944,19 @@ dependencies = [ "kqueue", "libc", "log", - "mio", + "mio 1.0.2", + "notify-types", "walkdir", - "windows-sys 0.48.0", + "windows-sys 0.52.0", +] + +[[package]] +name = "notify-types" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7393c226621f817964ffb3dc5704f9509e107a8b024b489cc2c1b217378785df" +dependencies = [ + "instant", ] [[package]] @@ -13393,7 +13416,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "29ad2e15f37ec9a6cc544097b78a1ec90001e9f71b81338ca39f430adaca99af" dependencies = [ "libc", - "mio", + "mio 0.8.11", "signal-hook", ] @@ -14489,7 +14512,7 @@ dependencies = [ "backtrace", "bytes", "libc", - "mio", + "mio 0.8.11", "num_cpus", "parking_lot 0.12.1", "pin-project-lite", diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index d3174b7ddfb19..defe4e14c8fd8 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -42,7 +42,7 @@ jsonbb = { workspace = true } maplit = "1.0.2" memcomparable = { version = "0.2" } mime_guess = "2" -notify = { version = "6", default-features = false, features = ["macos_fsevent"] } +notify = { version = "7", default-features = false, features = ["macos_fsevent"] } num-integer = "0.1" num-traits = "0.2" otlp-embedded = { workspace = true } From e24e19b9ba7c9f133eec21e3889242e31b48b937 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 29 Oct 2024 09:42:04 +0800 Subject: [PATCH 6/9] chore(deps): Bump elasticsearch from 8.5.0-alpha.1 to 8.15.0-alpha.1 (#18908) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- Cargo.lock | 21 +++++++-------------- src/connector/Cargo.toml | 2 +- 2 files changed, 8 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e4fa7107f874b..1122170df03e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1949,12 +1949,6 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4c7f02d4ea65f2c1853089ffd8d2787bdbc63de2f0d29dedbcf8ccdfa0ccd4cf" -[[package]] -name = "base64" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b41b7ea54a0c9d92199de89e20e58d49f02f8e699814ef3fdf266f6f748d15c7" - [[package]] name = "base64" version = "0.13.1" @@ -4512,20 +4506,21 @@ dependencies = [ [[package]] name = "elasticsearch" -version = "8.5.0-alpha.1" +version = "8.15.0-alpha.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40d9bd57d914cc66ce878f098f63ed7b5d5b64c30644a5adb950b008f874a6c6" +checksum = "d99147dd351d320432ec103a20804cbb593d3d08fd2eed0ee86f21a0f5137a3c" dependencies = [ - "base64 0.11.0", + "base64 0.22.0", "bytes", "dyn-clone", "lazy_static", "percent-encoding", - "reqwest 0.11.20", - "rustc_version 0.2.3", + "reqwest 0.12.4", + "rustc_version 0.4.0", "serde", "serde_json", - "serde_with 1.14.0", + "serde_with 3.8.1", + "tokio", "url", "void", ] @@ -10188,7 +10183,6 @@ version = "0.11.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3e9ad3fe7488d7e34558a2033d45a0c90b72d97b4f80705666fea71472e2e6a1" dependencies = [ - "async-compression", "base64 0.21.7", "bytes", "encoding_rs", @@ -10216,7 +10210,6 @@ dependencies = [ "tokio", "tokio-native-tls", "tokio-rustls 0.24.1", - "tokio-util", "tower-service", "url", "wasm-bindgen", diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 79764508e22c4..d8733c9e505ea 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -50,7 +50,7 @@ csv = "1.3" deltalake = { workspace = true } duration-str = "0.11.2" easy-ext = "1" -elasticsearch = { version = "8.5.0-alpha.1", features = ["rustls-tls"] } +elasticsearch = { version = "8.15.0-alpha.1", features = ["rustls-tls"] } enum-as-inner = "0.6" futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = { workspace = true } From d39ac9c085ec60cc202902322e2e200bcbc5e635 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 29 Oct 2024 09:50:56 +0800 Subject: [PATCH 7/9] chore(deps): Bump simd-json from 0.13.3 to 0.14.2 (#19118) Signed-off-by: dependabot[bot] Signed-off-by: xxchan Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: xxchan --- Cargo.lock | 22 +++++++++++++++------- src/connector/Cargo.toml | 2 +- src/connector/src/parser/common.rs | 2 +- src/connector/src/parser/unified/json.rs | 2 +- 4 files changed, 18 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1122170df03e4..db421dd9ac45f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5053,6 +5053,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "float-cmp" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b09cf3155332e944990140d967ff5eceb70df778b34f77d8075db46e4704e6d8" +dependencies = [ + "num-traits", +] + [[package]] name = "flume" version = "0.11.0" @@ -9259,7 +9268,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59230a63c37f3e18569bdb90e4a89cbf5bf8b06fea0b84e65ea10cc4df47addd" dependencies = [ "difflib", - "float-cmp", + "float-cmp 0.9.0", "itertools 0.10.5", "normalize-line-endings", "predicates-core", @@ -13456,13 +13465,12 @@ dependencies = [ [[package]] name = "simd-json" -version = "0.13.3" +version = "0.14.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d123f285a3635e423ec2ef5b67e0168dcf86c0d62fffbcea88fcd1c926e47413" +checksum = "b1df0290e9bfe79ddd5ff8798ca887cd107b75353d2957efe9777296e17f26b5" dependencies = [ "getrandom", "halfbrown", - "lexical-core", "ref-cast", "serde", "serde_json", @@ -15375,11 +15383,11 @@ checksum = "5a84c137d37ab0142f0f2ddfe332651fdbf252e7b7dbb4e67b6c1f1b2e925101" [[package]] name = "value-trait" -version = "0.8.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea87257cfcbedcb9444eda79c59fdfea71217e6305afee8ee33f500375c2ac97" +checksum = "9170e001f458781e92711d2ad666110f153e4e50bfd5cbd02db6547625714187" dependencies = [ - "float-cmp", + "float-cmp 0.10.0", "halfbrown", "itoa", "ryu", diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index d8733c9e505ea..9fd49fea88c7b 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -139,7 +139,7 @@ serde = { version = "1", features = ["derive", "rc"] } serde_derive = "1" serde_json = "1" serde_with = { version = "3", features = ["json"] } -simd-json = { version = "0.13.3", features = ["hints"] } +simd-json = { version = "0.14.2", features = ["hints"] } sqlx = { workspace = true } strum = "0.26" strum_macros = "0.26" diff --git a/src/connector/src/parser/common.rs b/src/connector/src/parser/common.rs index 102014966db42..68779080f0c8c 100644 --- a/src/connector/src/parser/common.rs +++ b/src/connector/src/parser/common.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use simd_json::prelude::ValueAsContainer; +use simd_json::prelude::ValueAsObject; use simd_json::BorrowedValue; /// Get a value from a json object by key, case insensitive. diff --git a/src/connector/src/parser/unified/json.rs b/src/connector/src/parser/unified/json.rs index 8ee8f9fe9386f..3584e233d74a1 100644 --- a/src/connector/src/parser/unified/json.rs +++ b/src/connector/src/parser/unified/json.rs @@ -28,7 +28,7 @@ use risingwave_common::types::{ use risingwave_common::util::iter_util::ZipEqFast; use risingwave_connector_codec::decoder::utils::extract_decimal; use simd_json::prelude::{ - TypedValue, ValueAsContainer, ValueAsScalar, ValueObjectAccess, ValueTryAsScalar, + TypedValue, ValueAsArray, ValueAsScalar, ValueObjectAccess, ValueTryAsScalar, }; use simd_json::{BorrowedValue, ValueType}; use thiserror_ext::AsReport; From 31a650521dae6bbb83c87c37db7795c60e2a0a0a Mon Sep 17 00:00:00 2001 From: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Date: Tue, 29 Oct 2024 11:26:01 +0800 Subject: [PATCH 8/9] chore(dashboard): remove unused function (#19158) --- dashboard/lib/layout.ts | 30 ------------------------------ 1 file changed, 30 deletions(-) diff --git a/dashboard/lib/layout.ts b/dashboard/lib/layout.ts index e6549d4b1e979..714ed9fe0f0db 100644 --- a/dashboard/lib/layout.ts +++ b/dashboard/lib/layout.ts @@ -516,33 +516,3 @@ export function generateFragmentEdges( } return links } - -export function generateRelationBackPressureEdges( - layoutMap: RelationBoxPosition[] -): Edge[] { - const links = [] - const relationMap = new Map() - for (const x of layoutMap) { - relationMap.set(x.id, x) - } - for (const relation of layoutMap) { - for (const parentId of relation.parentIds) { - const parentRelation = relationMap.get(parentId)! - links.push({ - points: [ - { - x: relation.x + relation.width / 2, - y: relation.y + relation.height / 2, - }, - { - x: parentRelation.x + parentRelation.width / 2, - y: parentRelation.y + parentRelation.height / 2, - }, - ], - source: relation.id, - target: parentId, - }) - } - } - return links -} From 193e93fd8d9f9dbae717fe6a5b411e7f33382f27 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 29 Oct 2024 13:07:00 +0800 Subject: [PATCH 9/9] ci: fix scout in docker pipeline (#19164) Signed-off-by: xxchan --- ci/workflows/docker.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/ci/workflows/docker.yml b/ci/workflows/docker.yml index 6fda1ea24138e..603376ace665a 100644 --- a/ci/workflows/docker.yml +++ b/ci/workflows/docker.yml @@ -98,6 +98,7 @@ steps: retry: *auto-retry - label: "generate notification step" + if: build.env("ENABLE_DOCKER_SCOUT") == "true" depends_on: - "docker-scout" command: ci/scripts/docker-scout-notify.sh