diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index f47919299957..07903a2c7b34 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -391,8 +391,8 @@ impl SourceExecutor { }; let mut boot_state = Vec::default(); - if let Some(mutation) = barrier.mutation.as_ref() { - match mutation.as_ref() { + if let Some(mutation) = barrier.mutation.as_deref() { + match mutation { Mutation::Add(AddMutation { splits, .. }) | Mutation::Update(UpdateMutation { actor_splits: splits, @@ -460,179 +460,154 @@ impl SourceExecutor { let mut metric_row_per_barrier: u64 = 0; while let Some(msg) = stream.next().await { + let Ok(msg) = msg else { + tokio::time::sleep(Duration::from_millis(1000)).await; + self.rebuild_stream_reader_from_error( + &source_desc, + &mut stream, + &mut latest_split_info, + msg.unwrap_err(), + ) + .await?; + continue; + }; + match msg { - Err(e) => { - tokio::time::sleep(Duration::from_millis(1000)).await; - self.rebuild_stream_reader_from_error( - &source_desc, - &mut stream, - &mut latest_split_info, - e, - ) - .await?; - } - Ok(msg) => { - match msg { - // This branch will be preferred. - Either::Left(msg) => match &msg { - Message::Barrier(barrier) => { - last_barrier_time = Instant::now(); - - if self_paused { - stream.resume_stream(); - self_paused = false; - } - - let epoch = barrier.epoch; - - let mut target_state = None; - let mut should_trim_state = false; - - if let Some(ref mutation) = barrier.mutation.as_deref() { - match mutation { - Mutation::Pause => stream.pause_stream(), - Mutation::Resume => stream.resume_stream(), - Mutation::SourceChangeSplit(actor_splits) => { - tracing::info!( - actor_id = self.actor_ctx.id, - actor_splits = ?actor_splits, - "source change split received" - ); - - target_state = self - .apply_split_change( - &source_desc, - &mut stream, - actor_splits, - ) - .await?; - should_trim_state = true; - } - - Mutation::Update(UpdateMutation { - actor_splits, .. - }) => { - target_state = self - .apply_split_change( - &source_desc, - &mut stream, - actor_splits, - ) - .await?; - } - _ => {} - } - } - - if let Some(target_state) = &target_state { - latest_split_info = target_state.clone(); - } - - self.persist_state_and_clear_cache( - epoch, - target_state, - should_trim_state, - ) - .await?; - - self.metrics - .source_row_per_barrier - .with_label_values(&[ - self.actor_ctx.id.to_string().as_str(), - self.stream_source_core - .as_ref() - .unwrap() - .source_id - .to_string() - .as_ref(), - self.actor_ctx.fragment_id.to_string().as_str(), - ]) - .inc_by(metric_row_per_barrier); - metric_row_per_barrier = 0; - - yield msg; - } - _ => { - // For the source executor, the message we receive from this arm - // should always be barrier message. - unreachable!(); - } - }, - - Either::Right(chunk) => { - // TODO: confirm when split_offset_mapping is None - let split_offset_mapping = - get_split_offset_mapping_from_chunk(&chunk, split_idx, offset_idx); - if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms { - // Exceeds the max wait barrier time, the source will be paused. - // Currently we can guarantee the - // source is not paused since it received stream - // chunks. - self_paused = true; - tracing::warn!( - "source {} paused, wait barrier for {:?}", - self.info.identity, - last_barrier_time.elapsed() + // This branch will be preferred. + Either::Left(Message::Barrier(barrier)) => { + last_barrier_time = Instant::now(); + + if self_paused { + stream.resume_stream(); + self_paused = false; + } + + let epoch = barrier.epoch; + + let mut target_state = None; + let mut should_trim_state = false; + + if let Some(mutation) = barrier.mutation.as_deref() { + match mutation { + Mutation::Pause => stream.pause_stream(), + Mutation::Resume => stream.resume_stream(), + Mutation::SourceChangeSplit(actor_splits) => { + tracing::info!( + actor_id = self.actor_ctx.id, + actor_splits = ?actor_splits, + "source change split received" ); - stream.pause_stream(); - - // Only update `max_wait_barrier_time_ms` to capture - // `barrier_interval_ms` - // changes here to avoid frequently accessing the shared - // `system_params`. - max_wait_barrier_time_ms = - self.system_params.load().barrier_interval_ms() as u128 - * WAIT_BARRIER_MULTIPLE_TIMES; + + target_state = self + .apply_split_change(&source_desc, &mut stream, actor_splits) + .await?; + should_trim_state = true; } - if let Some(mapping) = split_offset_mapping { - let state: HashMap<_, _> = mapping - .iter() - .flat_map(|(split_id, offset)| { - let origin_split_impl = self - .stream_source_core - .as_mut() - .unwrap() - .stream_source_splits - .get_mut(split_id); - - origin_split_impl.map(|split_impl| { - split_impl.update_in_place(offset.clone())?; - Ok::<_, anyhow::Error>(( - split_id.clone(), - split_impl.clone(), - )) - }) - }) - .try_collect()?; - - self.stream_source_core - .as_mut() - .unwrap() - .state_cache - .extend(state); + + Mutation::Update(UpdateMutation { actor_splits, .. }) => { + target_state = self + .apply_split_change(&source_desc, &mut stream, actor_splits) + .await?; } - metric_row_per_barrier += chunk.cardinality() as u64; - - self.metrics - .source_output_row_count - .with_label_values( - &self - .get_metric_labels() - .iter() - .map(AsRef::as_ref) - .collect::>(), - ) - .inc_by(chunk.cardinality() as u64); - let chunk = prune_additional_cols( - &chunk, - split_idx, - offset_idx, - &source_desc.columns, - ); - yield Message::Chunk(chunk); - self.try_flush_data().await?; + _ => {} } } + + if let Some(target_state) = &target_state { + latest_split_info = target_state.clone(); + } + + self.persist_state_and_clear_cache(epoch, target_state, should_trim_state) + .await?; + + self.metrics + .source_row_per_barrier + .with_label_values(&[ + self.actor_ctx.id.to_string().as_str(), + self.stream_source_core + .as_ref() + .unwrap() + .source_id + .to_string() + .as_ref(), + self.actor_ctx.fragment_id.to_string().as_str(), + ]) + .inc_by(metric_row_per_barrier); + metric_row_per_barrier = 0; + + yield Message::Barrier(barrier); + } + Either::Left(_) => { + // For the source executor, the message we receive from this arm + // should always be barrier message. + unreachable!(); + } + + Either::Right(chunk) => { + // TODO: confirm when split_offset_mapping is None + let split_offset_mapping = + get_split_offset_mapping_from_chunk(&chunk, split_idx, offset_idx); + if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms { + // Exceeds the max wait barrier time, the source will be paused. + // Currently we can guarantee the + // source is not paused since it received stream + // chunks. + self_paused = true; + tracing::warn!( + "source {} paused, wait barrier for {:?}", + self.info.identity, + last_barrier_time.elapsed() + ); + stream.pause_stream(); + + // Only update `max_wait_barrier_time_ms` to capture + // `barrier_interval_ms` + // changes here to avoid frequently accessing the shared + // `system_params`. + max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms() + as u128 + * WAIT_BARRIER_MULTIPLE_TIMES; + } + if let Some(mapping) = split_offset_mapping { + let state: HashMap<_, _> = mapping + .iter() + .flat_map(|(split_id, offset)| { + let origin_split_impl = self + .stream_source_core + .as_mut() + .unwrap() + .stream_source_splits + .get_mut(split_id); + + origin_split_impl.map(|split_impl| { + split_impl.update_in_place(offset.clone())?; + Ok::<_, anyhow::Error>((split_id.clone(), split_impl.clone())) + }) + }) + .try_collect()?; + + self.stream_source_core + .as_mut() + .unwrap() + .state_cache + .extend(state); + } + metric_row_per_barrier += chunk.cardinality() as u64; + + self.metrics + .source_output_row_count + .with_label_values( + &self + .get_metric_labels() + .iter() + .map(AsRef::as_ref) + .collect::>(), + ) + .inc_by(chunk.cardinality() as u64); + let chunk = + prune_additional_cols(&chunk, split_idx, offset_idx, &source_desc.columns); + yield Message::Chunk(chunk); + self.try_flush_data().await?; } } }