From 7932c495dccae10c1fdfcb87ec78a85bdd80b5bc Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 29 Feb 2024 17:15:48 +0800 Subject: [PATCH] give up `abort_handles`, and use single consumer for all partitions --- Cargo.toml | 3 +- .../source/kafka_backfill_executor.rs | 93 ++++++++----------- .../src/executor/source/source_executor.rs | 2 +- 3 files changed, 40 insertions(+), 58 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 262c5ff423eb3..90a4fb6243d75 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -240,7 +240,8 @@ lto = 'off' [profile.release] debug = "full" split-debuginfo = "packed" -lto = "thin" +# lto = "thin" +lto = 'off' # The profile used for CI in main branch. # This profile inherits from the release profile, but turns on some checks and assertions for us to diff --git a/src/stream/src/executor/source/kafka_backfill_executor.rs b/src/stream/src/executor/source/kafka_backfill_executor.rs index f319d276b7a06..24c1cc2bf7573 100644 --- a/src/stream/src/executor/source/kafka_backfill_executor.rs +++ b/src/stream/src/executor/source/kafka_backfill_executor.rs @@ -12,14 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::assert_matches::assert_matches; use std::cmp::Ordering; +use std::collections::hash_map::Entry; use std::fmt::Formatter; use std::time::Instant; use anyhow::anyhow; use either::Either; -use futures::stream::{select_with_strategy, AbortHandle, Abortable, PollNext}; +use futures::stream::{select_with_strategy, PollNext}; use futures::StreamExt; use futures_async_stream::try_stream; use kafka_backfill_executor::source_executor::WAIT_BARRIER_MULTIPLE_TIMES; @@ -63,12 +63,7 @@ impl BackfillState { } /// Returns whether the row from upstream `SourceExecutor` is visible. - fn handle_upstream_row( - &mut self, - split: &str, - offset: &str, - abort_handles: &HashMap, - ) -> bool { + fn handle_upstream_row(&mut self, offset: &str) -> bool { let mut vis = false; match self { BackfillState::Backfilling(None) => { @@ -82,13 +77,11 @@ impl BackfillState { Ordering::Equal => { // backfilling for this split is finished just right. *self = BackfillState::Finished; - abort_handles.get(split).unwrap().abort(); } Ordering::Greater => { // backfilling for this split produced more data than current source's progress. // We should stop backfilling, and filter out rows from upstream with offset <= backfill_offset. *self = BackfillState::SourceCachingUp(backfill_offset.clone()); - abort_handles.get(split).unwrap().abort(); } } } @@ -151,7 +144,6 @@ pub struct KafkaBackfillExecutorInner { /// Local variables used in the backfill stage. struct BackfillStage { // stream: Option>, - abort_handles: HashMap, states: BackfillStates, /// Note: the offsets are not updated. Should use `state`'s offset to update before using it. unfinished_splits: Vec, @@ -181,14 +173,11 @@ impl KafkaBackfillExecutorInner { } } - /// Unlike `SourceExecutor`, which creates a `stream_reader` with all splits, - /// we create a separate `stream_reader` for each split here, because we - /// want to abort early for each split after the split's backfilling is finished. async fn build_stream_source_reader( &self, source_desc: &SourceDesc, splits: Vec, - ) -> StreamExecutorResult<(BoxChunkSourceStream, HashMap)> { + ) -> StreamExecutorResult { let column_ids = source_desc .columns .iter() @@ -205,23 +194,11 @@ impl KafkaBackfillExecutorInner { source_desc.source.config.clone(), self.stream_source_core.source_name.clone(), ); - let source_ctx = Arc::new(source_ctx); - - let mut abort_handles = HashMap::new(); - let mut streams = vec![]; - for split in splits { - let split_id = split.id(); - let reader = source_desc - .source - .to_stream(Some(vec![split]), column_ids.clone(), source_ctx.clone()) - .await - .map_err(StreamExecutorError::connector_error)?; - let (abort_handle, abort_registration) = AbortHandle::new_pair(); - let stream = Abortable::new(reader, abort_registration); - abort_handles.insert(split_id, abort_handle); - streams.push(stream); - } - Ok((futures::stream::select_all(streams).boxed(), abort_handles)) + source_desc + .source + .to_stream(Some(splits), column_ids, Arc::new(source_ctx)) + .await + .map_err(StreamExecutorError::connector_error) } /// `source_id | source_name | actor_id | fragment_id` @@ -302,7 +279,7 @@ impl KafkaBackfillExecutorInner { // Return the ownership of `stream_source_core` to the source executor. self.stream_source_core = core; - let (source_chunk_reader, abort_handles) = self + let source_chunk_reader = self .build_stream_source_reader(&source_desc, unfinished_splits.clone()) .instrument_await("source_build_reader") .await?; @@ -310,9 +287,12 @@ impl KafkaBackfillExecutorInner { fn select_strategy(_: &mut ()) -> PollNext { futures::stream::PollNext::Left } - // XXX: - // - What's the best poll strategy? We should prefer backfill, but also consider barrier from input. - // - Should we also add a barrier stream for backfill executor? + + // We choose "preferring upstream" strategy here, because: + // - When the upstream source's workload is high (i.e., Kafka has new incoming data), it just makes backfilling slower. + // For chunks from upstream, they are simply dropped, so there's no much overhead. + // So possibly this can also affect other running jobs less. + // - When the upstream Source's becomes less busy, SourceBackfill can begin to catch up. let mut backfill_stream = select_with_strategy( input.by_ref().map(Either::Left), source_chunk_reader.map(Either::Right), @@ -343,7 +323,6 @@ impl KafkaBackfillExecutorInner { yield Message::Barrier(barrier); let mut backfill_stage = BackfillStage { - abort_handles, states: backfill_states, unfinished_splits, }; @@ -378,13 +357,12 @@ impl KafkaBackfillExecutorInner { core.source_id.to_string(), ]); - let (reader, new_abort_handles) = self + let reader = self .build_stream_source_reader( &source_desc, backfill_stage.unfinished_splits.clone(), ) .await?; - backfill_stage.abort_handles = new_abort_handles; backfill_stream = select_with_strategy( input.by_ref().map(Either::Left), @@ -475,13 +453,12 @@ impl KafkaBackfillExecutorInner { ); // Replace the source reader with a new one of the new state. - let (reader, new_abort_handles) = self + let reader = self .build_stream_source_reader( &source_desc, backfill_stage.unfinished_splits.clone(), ) .await?; - backfill_stage.abort_handles = new_abort_handles; backfill_stream = select_with_strategy( input.by_ref().map(Either::Left), @@ -514,11 +491,7 @@ impl KafkaBackfillExecutorInner { let offset = row.datum_at(offset_idx).unwrap().into_utf8(); let backfill_state = backfill_stage.states.get_mut(split).unwrap(); - let vis = backfill_state.handle_upstream_row( - split, - offset, - &backfill_stage.abort_handles, - ); + let vis = backfill_state.handle_upstream_row(offset); new_vis.set(i, vis); } // emit chunk if vis is not empty. i.e., some splits finished backfilling. @@ -564,16 +537,24 @@ impl KafkaBackfillExecutorInner { } split_offset_mapping.iter().for_each(|(split_id, offset)| { // update backfill progress - let prev_state = backfill_stage.states.insert( - split_id.clone(), - BackfillState::Backfilling(Some(offset.to_string())), - ); - // abort_handles should prevents other cases happening - assert_matches!( - prev_state, - Some(BackfillState::Backfilling(_)), - "Unexpected backfilling state, split_id: {split_id}" - ); + match backfill_stage.states.entry(split_id.clone()) { + Entry::Occupied(mut entry) => { + let state = entry.get_mut(); + match state { + BackfillState::Backfilling(_) => { + *state = + BackfillState::Backfilling(Some(offset.clone())); + } + BackfillState::SourceCachingUp(_) + | BackfillState::Finished => { + // backfilling stopped. ignore + } + } + } + Entry::Vacant(entry) => { + entry.insert(BackfillState::Backfilling(Some(offset.clone()))); + } + } }); self.metrics .source_backfill_row_count diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 50edd09597632..a51e383304a4c 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -85,7 +85,7 @@ impl SourceExecutor { } } - async fn build_stream_source_reader( + pub async fn build_stream_source_reader( &self, source_desc: &SourceDesc, state: ConnectorState,