Skip to content

Commit

Permalink
give up abort_handles, and use single consumer for all partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Mar 1, 2024
1 parent 4d62bb2 commit 7932c49
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 58 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ lto = 'off'
[profile.release]
debug = "full"
split-debuginfo = "packed"
lto = "thin"
# lto = "thin"
lto = 'off'

# The profile used for CI in main branch.
# This profile inherits from the release profile, but turns on some checks and assertions for us to
Expand Down
93 changes: 37 additions & 56 deletions src/stream/src/executor/source/kafka_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::assert_matches::assert_matches;
use std::cmp::Ordering;
use std::collections::hash_map::Entry;
use std::fmt::Formatter;
use std::time::Instant;

use anyhow::anyhow;
use either::Either;
use futures::stream::{select_with_strategy, AbortHandle, Abortable, PollNext};
use futures::stream::{select_with_strategy, PollNext};
use futures::StreamExt;
use futures_async_stream::try_stream;
use kafka_backfill_executor::source_executor::WAIT_BARRIER_MULTIPLE_TIMES;
Expand Down Expand Up @@ -63,12 +63,7 @@ impl BackfillState {
}

/// Returns whether the row from upstream `SourceExecutor` is visible.
fn handle_upstream_row(
&mut self,
split: &str,
offset: &str,
abort_handles: &HashMap<SplitId, AbortHandle>,
) -> bool {
fn handle_upstream_row(&mut self, offset: &str) -> bool {
let mut vis = false;
match self {
BackfillState::Backfilling(None) => {
Expand All @@ -82,13 +77,11 @@ impl BackfillState {
Ordering::Equal => {
// backfilling for this split is finished just right.
*self = BackfillState::Finished;
abort_handles.get(split).unwrap().abort();
}
Ordering::Greater => {
// backfilling for this split produced more data than current source's progress.
// We should stop backfilling, and filter out rows from upstream with offset <= backfill_offset.
*self = BackfillState::SourceCachingUp(backfill_offset.clone());
abort_handles.get(split).unwrap().abort();
}
}
}
Expand Down Expand Up @@ -151,7 +144,6 @@ pub struct KafkaBackfillExecutorInner<S: StateStore> {
/// Local variables used in the backfill stage.
struct BackfillStage {
// stream: Option<EitherStream<'a>>,
abort_handles: HashMap<SplitId, AbortHandle>,
states: BackfillStates,
/// Note: the offsets are not updated. Should use `state`'s offset to update before using it.
unfinished_splits: Vec<SplitImpl>,
Expand Down Expand Up @@ -181,14 +173,11 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {
}
}

/// Unlike `SourceExecutor`, which creates a `stream_reader` with all splits,
/// we create a separate `stream_reader` for each split here, because we
/// want to abort early for each split after the split's backfilling is finished.
async fn build_stream_source_reader(
&self,
source_desc: &SourceDesc,
splits: Vec<SplitImpl>,
) -> StreamExecutorResult<(BoxChunkSourceStream, HashMap<SplitId, AbortHandle>)> {
) -> StreamExecutorResult<BoxChunkSourceStream> {
let column_ids = source_desc
.columns
.iter()
Expand All @@ -205,23 +194,11 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {
source_desc.source.config.clone(),
self.stream_source_core.source_name.clone(),
);
let source_ctx = Arc::new(source_ctx);

let mut abort_handles = HashMap::new();
let mut streams = vec![];
for split in splits {
let split_id = split.id();
let reader = source_desc
.source
.to_stream(Some(vec![split]), column_ids.clone(), source_ctx.clone())
.await
.map_err(StreamExecutorError::connector_error)?;
let (abort_handle, abort_registration) = AbortHandle::new_pair();
let stream = Abortable::new(reader, abort_registration);
abort_handles.insert(split_id, abort_handle);
streams.push(stream);
}
Ok((futures::stream::select_all(streams).boxed(), abort_handles))
source_desc
.source
.to_stream(Some(splits), column_ids, Arc::new(source_ctx))
.await
.map_err(StreamExecutorError::connector_error)
}

/// `source_id | source_name | actor_id | fragment_id`
Expand Down Expand Up @@ -302,17 +279,20 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {
// Return the ownership of `stream_source_core` to the source executor.
self.stream_source_core = core;

let (source_chunk_reader, abort_handles) = self
let source_chunk_reader = self
.build_stream_source_reader(&source_desc, unfinished_splits.clone())
.instrument_await("source_build_reader")
.await?;

fn select_strategy(_: &mut ()) -> PollNext {
futures::stream::PollNext::Left
}
// XXX:
// - What's the best poll strategy? We should prefer backfill, but also consider barrier from input.
// - Should we also add a barrier stream for backfill executor?

// We choose "preferring upstream" strategy here, because:
// - When the upstream source's workload is high (i.e., Kafka has new incoming data), it just makes backfilling slower.
// For chunks from upstream, they are simply dropped, so there's no much overhead.
// So possibly this can also affect other running jobs less.
// - When the upstream Source's becomes less busy, SourceBackfill can begin to catch up.
let mut backfill_stream = select_with_strategy(
input.by_ref().map(Either::Left),
source_chunk_reader.map(Either::Right),
Expand Down Expand Up @@ -343,7 +323,6 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {
yield Message::Barrier(barrier);

let mut backfill_stage = BackfillStage {
abort_handles,
states: backfill_states,
unfinished_splits,
};
Expand Down Expand Up @@ -378,13 +357,12 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {
core.source_id.to_string(),
]);

let (reader, new_abort_handles) = self
let reader = self
.build_stream_source_reader(
&source_desc,
backfill_stage.unfinished_splits.clone(),
)
.await?;
backfill_stage.abort_handles = new_abort_handles;

backfill_stream = select_with_strategy(
input.by_ref().map(Either::Left),
Expand Down Expand Up @@ -475,13 +453,12 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {
);

// Replace the source reader with a new one of the new state.
let (reader, new_abort_handles) = self
let reader = self
.build_stream_source_reader(
&source_desc,
backfill_stage.unfinished_splits.clone(),
)
.await?;
backfill_stage.abort_handles = new_abort_handles;

backfill_stream = select_with_strategy(
input.by_ref().map(Either::Left),
Expand Down Expand Up @@ -514,11 +491,7 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {
let offset = row.datum_at(offset_idx).unwrap().into_utf8();
let backfill_state =
backfill_stage.states.get_mut(split).unwrap();
let vis = backfill_state.handle_upstream_row(
split,
offset,
&backfill_stage.abort_handles,
);
let vis = backfill_state.handle_upstream_row(offset);
new_vis.set(i, vis);
}
// emit chunk if vis is not empty. i.e., some splits finished backfilling.
Expand Down Expand Up @@ -564,16 +537,24 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {
}
split_offset_mapping.iter().for_each(|(split_id, offset)| {
// update backfill progress
let prev_state = backfill_stage.states.insert(
split_id.clone(),
BackfillState::Backfilling(Some(offset.to_string())),
);
// abort_handles should prevents other cases happening
assert_matches!(
prev_state,
Some(BackfillState::Backfilling(_)),
"Unexpected backfilling state, split_id: {split_id}"
);
match backfill_stage.states.entry(split_id.clone()) {
Entry::Occupied(mut entry) => {
let state = entry.get_mut();
match state {
BackfillState::Backfilling(_) => {
*state =
BackfillState::Backfilling(Some(offset.clone()));
}
BackfillState::SourceCachingUp(_)
| BackfillState::Finished => {
// backfilling stopped. ignore
}
}
}
Entry::Vacant(entry) => {
entry.insert(BackfillState::Backfilling(Some(offset.clone())));
}
}
});
self.metrics
.source_backfill_row_count
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl<S: StateStore> SourceExecutor<S> {
}
}

async fn build_stream_source_reader(
pub async fn build_stream_source_reader(
&self,
source_desc: &SourceDesc,
state: ConnectorState,
Expand Down

0 comments on commit 7932c49

Please sign in to comment.