Skip to content

Commit

Permalink
fix lifetime issue
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Feb 19, 2024
1 parent 9626588 commit 39b4ff6
Showing 1 changed file with 63 additions and 95 deletions.
158 changes: 63 additions & 95 deletions src/stream/src/executor/source/kafka_backfill_executor.rs
Original file line number Diff line number Diff line change
@@ -19,13 +19,14 @@ use std::fmt::Formatter;

use anyhow::anyhow;
use either::Either;
use futures::stream::{select_with_strategy, AbortHandle, Abortable};
use futures::stream::{select_with_strategy, AbortHandle, Abortable, PollNext};
use futures::StreamExt;
use futures_async_stream::try_stream;
use risingwave_common::buffer::BitmapBuilder;
use risingwave_common::row::{Row, RowExt};
use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
use risingwave_common::types::JsonbVal;
use risingwave_connector::source::cdc::split;
use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
use risingwave_connector::source::{
BoxChunkSourceStream, ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitMetaData,
@@ -156,37 +157,6 @@ struct BackfillStage {
unfinished_splits: Vec<SplitImpl>,
}

mod stream {
use either::Either;
use futures::stream::{select_with_strategy, BoxStream, PollNext, SelectWithStrategy};
use futures::{Stream, StreamExt};
use risingwave_common::array::StreamChunk;
use risingwave_connector::source::BoxChunkSourceStream;

use super::{BoxedMessageStream, MessageStreamItem};

type EitherItem = Either<MessageStreamItem, anyhow::Result<StreamChunk>>;

pub type EitherStream<'a> = SelectWithStrategy<
impl Stream<Item = EitherItem>,
impl Stream<Item = EitherItem>,
impl FnMut(&mut ()) -> PollNext,
(),
>;

pub fn build_combined_stream<'a>(
upstream: &'a mut BoxedMessageStream,
backfill: BoxChunkSourceStream,
) -> EitherStream<'a> {
select_with_strategy(
upstream.by_ref().map(Either::Left),
backfill.map(Either::Right),
|_: &mut ()| futures::stream::PollNext::Left,
)
}
}
use stream::{build_combined_stream, EitherStream};

impl<S: StateStore> KafkaBackfillExecutorInner<S> {
#[allow(clippy::too_many_arguments)]
pub fn new(
@@ -325,8 +295,14 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {
// XXX:
// - What's the best poll strategy?
// - Should we also add a barrier stream for backfill executor?
let mut backfill_stream =
Some(build_combined_stream(&mut input, source_chunk_reader));
fn closure() -> impl FnMut(&mut ()) -> PollNext {
|_: &mut ()| futures::stream::PollNext::Left
}
let mut backfill_stream = Some(select_with_strategy(
input.by_ref().map(Either::Left),
source_chunk_reader.map(Either::Right),
closure(),
));
let mut backfill_stage = BackfillStage {
abort_handles,
states: backfill_states,
@@ -343,6 +319,7 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {
};
match msg {
Message::Barrier(barrier) => {
let mut split_changed = false;
if let Some(ref mutation) = barrier.mutation.as_deref() {
match mutation {
Mutation::Pause => { // TODO:
@@ -356,35 +333,68 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {
"source change split received"
);

_ = backfill_stream.take();
self.apply_split_change(
&source_desc,
actor_splits,
&mut backfill_stage,
&mut backfill_stream,
&mut input,
true,
)
.await?;
split_changed = self
.apply_split_change(
actor_splits,
&mut backfill_stage,
true,
)
.await?;
}
Mutation::Update(UpdateMutation {
actor_splits, ..
}) => {
_ = backfill_stream.take();
let _ = self
split_changed = self
.apply_split_change(
&source_desc,
actor_splits,
&mut backfill_stage,
&mut backfill_stream,
&mut input,
false,
)
.await?;
}
_ => {}
}
}
if split_changed {
// rebuild backfill_stream
// Note: we don't put this part in a method, due to some complex lifetime issues.
let mut unfinished_splits = Vec::new();
for split in &mut backfill_stage.unfinished_splits {
let state =
backfill_stage.states.get(split.id().as_ref()).unwrap();
match state {
BackfillState::Backfilling(Some(offset)) => {
split.update_in_place(offset.clone())?;
unfinished_splits.push(split.clone());
}
BackfillState::Backfilling(None)
| BackfillState::SourceCachingUp(_)
| BackfillState::Finished => {}
}
}
backfill_stage.unfinished_splits = unfinished_splits;

tracing::info!(
"actor {:?} apply source split change to {:?}",
self.actor_ctx.id,
backfill_stage.unfinished_splits
);

// Replace the source reader with a new one of the new state.
let (reader, new_abort_handles) = self
.build_stream_source_reader(
&source_desc,
backfill_stage.unfinished_splits.clone(),
)
.await?;
backfill_stage.abort_handles = new_abort_handles;

backfill_stream = Some(select_with_strategy(
input.by_ref().map(Either::Left),
reader.map(Either::Right),
closure(),
));
}

self.backfill_state_store
.set_states(backfill_stage.states.clone())
@@ -486,24 +496,21 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {
/// For newly added splits, we do not need to backfill and can directly forward from upstream.
async fn apply_split_change<'input>(
&mut self,
source_desc: &SourceDesc,
split_assignment: &HashMap<ActorId, Vec<SplitImpl>>,
stage: &mut BackfillStage,
stream: &mut Option<EitherStream<'input>>,
input: &'input mut BoxedMessageStream,
should_trim_state: bool,
) -> StreamExecutorResult<()> {
) -> StreamExecutorResult<bool> {
if let Some(target_splits) = split_assignment.get(&self.actor_ctx.id).cloned() {
if self
.update_state_if_changed(target_splits, stage, should_trim_state)
.await?
{
self.rebuild_stream_reader(source_desc, stage, stream, input)
.await?;
// Note: we don't rebuild backfill_stream here, due to some complex lifetime issues.
return Ok(true);
}
}

Ok(())
Ok(false)
}

/// Returns `true` if split changed. Otherwise `false`.
@@ -576,45 +583,6 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {

Ok(split_changed)
}

async fn rebuild_stream_reader<'input>(
&mut self,
source_desc: &SourceDesc,
stage: &mut BackfillStage,
stream: &mut Option<EitherStream<'input>>,
input: &'input mut BoxedMessageStream,
) -> StreamExecutorResult<()> {
let mut unfinished_splits = Vec::new();
for split in &mut stage.unfinished_splits {
let state = stage.states.get(split.id().as_ref()).unwrap();
match state {
BackfillState::Backfilling(Some(offset)) => {
split.update_in_place(offset.clone())?;
unfinished_splits.push(split.clone());
}
BackfillState::Backfilling(None)
| BackfillState::SourceCachingUp(_)
| BackfillState::Finished => {}
}
}
stage.unfinished_splits = unfinished_splits;

tracing::info!(
"actor {:?} apply source split change to {:?}",
self.actor_ctx.id,
stage.unfinished_splits
);

// Replace the source reader with a new one of the new state.
let (reader, new_abort_handles) = self
.build_stream_source_reader(source_desc, stage.unfinished_splits.clone())
.await?;
stage.abort_handles = new_abort_handles;

*stream = Some(build_combined_stream(input, reader));

Ok(())
}
}

fn compare_kafka_offset(a: Option<&String>, b: &str) -> Ordering {

0 comments on commit 39b4ff6

Please sign in to comment.