Skip to content

Commit

Permalink
refactor: reduce indent level for source_executor (#15089)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan authored Feb 13, 2024
1 parent aeff596 commit 8a26cff
Showing 1 changed file with 144 additions and 169 deletions.
313 changes: 144 additions & 169 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,8 @@ impl<S: StateStore> SourceExecutor<S> {
};

let mut boot_state = Vec::default();
if let Some(mutation) = barrier.mutation.as_ref() {
match mutation.as_ref() {
if let Some(mutation) = barrier.mutation.as_deref() {
match mutation {
Mutation::Add(AddMutation { splits, .. })
| Mutation::Update(UpdateMutation {
actor_splits: splits,
Expand Down Expand Up @@ -460,179 +460,154 @@ impl<S: StateStore> SourceExecutor<S> {
let mut metric_row_per_barrier: u64 = 0;

while let Some(msg) = stream.next().await {
let Ok(msg) = msg else {
tokio::time::sleep(Duration::from_millis(1000)).await;
self.rebuild_stream_reader_from_error(
&source_desc,
&mut stream,
&mut latest_split_info,
msg.unwrap_err(),
)
.await?;
continue;
};

match msg {
Err(e) => {
tokio::time::sleep(Duration::from_millis(1000)).await;
self.rebuild_stream_reader_from_error(
&source_desc,
&mut stream,
&mut latest_split_info,
e,
)
.await?;
}
Ok(msg) => {
match msg {
// This branch will be preferred.
Either::Left(msg) => match &msg {
Message::Barrier(barrier) => {
last_barrier_time = Instant::now();

if self_paused {
stream.resume_stream();
self_paused = false;
}

let epoch = barrier.epoch;

let mut target_state = None;
let mut should_trim_state = false;

if let Some(ref mutation) = barrier.mutation.as_deref() {
match mutation {
Mutation::Pause => stream.pause_stream(),
Mutation::Resume => stream.resume_stream(),
Mutation::SourceChangeSplit(actor_splits) => {
tracing::info!(
actor_id = self.actor_ctx.id,
actor_splits = ?actor_splits,
"source change split received"
);

target_state = self
.apply_split_change(
&source_desc,
&mut stream,
actor_splits,
)
.await?;
should_trim_state = true;
}

Mutation::Update(UpdateMutation {
actor_splits, ..
}) => {
target_state = self
.apply_split_change(
&source_desc,
&mut stream,
actor_splits,
)
.await?;
}
_ => {}
}
}

if let Some(target_state) = &target_state {
latest_split_info = target_state.clone();
}

self.persist_state_and_clear_cache(
epoch,
target_state,
should_trim_state,
)
.await?;

self.metrics
.source_row_per_barrier
.with_label_values(&[
self.actor_ctx.id.to_string().as_str(),
self.stream_source_core
.as_ref()
.unwrap()
.source_id
.to_string()
.as_ref(),
self.actor_ctx.fragment_id.to_string().as_str(),
])
.inc_by(metric_row_per_barrier);
metric_row_per_barrier = 0;

yield msg;
}
_ => {
// For the source executor, the message we receive from this arm
// should always be barrier message.
unreachable!();
}
},

Either::Right(chunk) => {
// TODO: confirm when split_offset_mapping is None
let split_offset_mapping =
get_split_offset_mapping_from_chunk(&chunk, split_idx, offset_idx);
if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
// Exceeds the max wait barrier time, the source will be paused.
// Currently we can guarantee the
// source is not paused since it received stream
// chunks.
self_paused = true;
tracing::warn!(
"source {} paused, wait barrier for {:?}",
self.info.identity,
last_barrier_time.elapsed()
// This branch will be preferred.
Either::Left(Message::Barrier(barrier)) => {
last_barrier_time = Instant::now();

if self_paused {
stream.resume_stream();
self_paused = false;
}

let epoch = barrier.epoch;

let mut target_state = None;
let mut should_trim_state = false;

if let Some(mutation) = barrier.mutation.as_deref() {
match mutation {
Mutation::Pause => stream.pause_stream(),
Mutation::Resume => stream.resume_stream(),
Mutation::SourceChangeSplit(actor_splits) => {
tracing::info!(
actor_id = self.actor_ctx.id,
actor_splits = ?actor_splits,
"source change split received"
);
stream.pause_stream();

// Only update `max_wait_barrier_time_ms` to capture
// `barrier_interval_ms`
// changes here to avoid frequently accessing the shared
// `system_params`.
max_wait_barrier_time_ms =
self.system_params.load().barrier_interval_ms() as u128
* WAIT_BARRIER_MULTIPLE_TIMES;

target_state = self
.apply_split_change(&source_desc, &mut stream, actor_splits)
.await?;
should_trim_state = true;
}
if let Some(mapping) = split_offset_mapping {
let state: HashMap<_, _> = mapping
.iter()
.flat_map(|(split_id, offset)| {
let origin_split_impl = self
.stream_source_core
.as_mut()
.unwrap()
.stream_source_splits
.get_mut(split_id);

origin_split_impl.map(|split_impl| {
split_impl.update_in_place(offset.clone())?;
Ok::<_, anyhow::Error>((
split_id.clone(),
split_impl.clone(),
))
})
})
.try_collect()?;

self.stream_source_core
.as_mut()
.unwrap()
.state_cache
.extend(state);

Mutation::Update(UpdateMutation { actor_splits, .. }) => {
target_state = self
.apply_split_change(&source_desc, &mut stream, actor_splits)
.await?;
}
metric_row_per_barrier += chunk.cardinality() as u64;

self.metrics
.source_output_row_count
.with_label_values(
&self
.get_metric_labels()
.iter()
.map(AsRef::as_ref)
.collect::<Vec<&str>>(),
)
.inc_by(chunk.cardinality() as u64);
let chunk = prune_additional_cols(
&chunk,
split_idx,
offset_idx,
&source_desc.columns,
);
yield Message::Chunk(chunk);
self.try_flush_data().await?;
_ => {}
}
}

if let Some(target_state) = &target_state {
latest_split_info = target_state.clone();
}

self.persist_state_and_clear_cache(epoch, target_state, should_trim_state)
.await?;

self.metrics
.source_row_per_barrier
.with_label_values(&[
self.actor_ctx.id.to_string().as_str(),
self.stream_source_core
.as_ref()
.unwrap()
.source_id
.to_string()
.as_ref(),
self.actor_ctx.fragment_id.to_string().as_str(),
])
.inc_by(metric_row_per_barrier);
metric_row_per_barrier = 0;

yield Message::Barrier(barrier);
}
Either::Left(_) => {
// For the source executor, the message we receive from this arm
// should always be barrier message.
unreachable!();
}

Either::Right(chunk) => {
// TODO: confirm when split_offset_mapping is None
let split_offset_mapping =
get_split_offset_mapping_from_chunk(&chunk, split_idx, offset_idx);
if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
// Exceeds the max wait barrier time, the source will be paused.
// Currently we can guarantee the
// source is not paused since it received stream
// chunks.
self_paused = true;
tracing::warn!(
"source {} paused, wait barrier for {:?}",
self.info.identity,
last_barrier_time.elapsed()
);
stream.pause_stream();

// Only update `max_wait_barrier_time_ms` to capture
// `barrier_interval_ms`
// changes here to avoid frequently accessing the shared
// `system_params`.
max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
as u128
* WAIT_BARRIER_MULTIPLE_TIMES;
}
if let Some(mapping) = split_offset_mapping {
let state: HashMap<_, _> = mapping
.iter()
.flat_map(|(split_id, offset)| {
let origin_split_impl = self
.stream_source_core
.as_mut()
.unwrap()
.stream_source_splits
.get_mut(split_id);

origin_split_impl.map(|split_impl| {
split_impl.update_in_place(offset.clone())?;
Ok::<_, anyhow::Error>((split_id.clone(), split_impl.clone()))
})
})
.try_collect()?;

self.stream_source_core
.as_mut()
.unwrap()
.state_cache
.extend(state);
}
metric_row_per_barrier += chunk.cardinality() as u64;

self.metrics
.source_output_row_count
.with_label_values(
&self
.get_metric_labels()
.iter()
.map(AsRef::as_ref)
.collect::<Vec<&str>>(),
)
.inc_by(chunk.cardinality() as u64);
let chunk =
prune_additional_cols(&chunk, split_idx, offset_idx, &source_desc.columns);
yield Message::Chunk(chunk);
self.try_flush_data().await?;
}
}
}
Expand Down

0 comments on commit 8a26cff

Please sign in to comment.