Skip to content

Commit

Permalink
add max_wait_barrier_time_ms
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Feb 28, 2024
1 parent 2d9c787 commit d3f4864
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 3 deletions.
47 changes: 45 additions & 2 deletions src/stream/src/executor/source/kafka_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,19 @@
use std::assert_matches::assert_matches;
use std::cmp::Ordering;
use std::fmt::Formatter;
use std::time::Instant;

use anyhow::anyhow;
use either::Either;
use futures::stream::{select_with_strategy, AbortHandle, Abortable, PollNext};
use futures::StreamExt;
use futures_async_stream::try_stream;
use kafka_backfill_executor::source_executor::WAIT_BARRIER_MULTIPLE_TIMES;
use risingwave_common::buffer::BitmapBuilder;
use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
use risingwave_common::row::Row;
use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_common::types::JsonbVal;
use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
use risingwave_connector::source::{
Expand Down Expand Up @@ -136,7 +139,7 @@ pub struct KafkaBackfillExecutorInner<S: StateStore> {
// /// Receiver of barrier channel.
// barrier_receiver: Option<UnboundedReceiver<Barrier>>,
/// System parameter reader to read barrier interval
_system_params: SystemParamsReaderRef,
system_params: SystemParamsReaderRef,

// control options for connector level
source_ctrl_opts: SourceCtrlOpts,
Expand Down Expand Up @@ -172,7 +175,7 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {
stream_source_core,
backfill_state_store,
metrics,
_system_params: system_params,
system_params,
source_ctrl_opts,
connector_params,
}
Expand Down Expand Up @@ -335,6 +338,14 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {
};

if !self.backfill_finished(&backfill_stage.states).await? {
// We allow data to flow for `WAIT_BARRIER_MULTIPLE_TIMES` * `expected_barrier_latency_ms`
// milliseconds, considering some other latencies like network and cost in Meta.
let mut max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
as u128
* WAIT_BARRIER_MULTIPLE_TIMES;
let mut last_barrier_time = Instant::now();
let mut self_paused = false;

'backfill_loop: while let Some(either) = backfill_stream.next().await {
match either {
// Upstream
Expand Down Expand Up @@ -373,6 +384,17 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {
};
match msg {
Message::Barrier(barrier) => {
last_barrier_time = Instant::now();

if self_paused {
backfill_stream = select_with_strategy(
input.by_ref().map(Either::Left),
paused_reader.take().expect("no paused reader to resume"),
select_strategy,
);
self_paused = false;
}

let mut split_changed = false;
if let Some(ref mutation) = barrier.mutation.as_deref() {
match mutation {
Expand Down Expand Up @@ -508,6 +530,27 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {
let split_offset_mapping =
get_split_offset_mapping_from_chunk(&chunk, split_idx, offset_idx)
.unwrap();
if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
// Exceeds the max wait barrier time, the source will be paused.
// Currently we can guarantee the
// source is not paused since it received stream
// chunks.
self_paused = true;
tracing::warn!(
"source {} paused, wait barrier for {:?}",
self.info.identity,
last_barrier_time.elapsed()
);
pause_reader!();

// Only update `max_wait_barrier_time_ms` to capture
// `barrier_interval_ms`
// changes here to avoid frequently accessing the shared
// `system_params`.
max_wait_barrier_time_ms =
self.system_params.load().barrier_interval_ms() as u128
* WAIT_BARRIER_MULTIPLE_TIMES;
}
split_offset_mapping.iter().for_each(|(split_id, offset)| {
// update backfill progress
let prev_state = backfill_stage.states.insert(
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::executor::*;

/// A constant to multiply when calculating the maximum time to wait for a barrier. This is due to
/// some latencies in network and cost in meta.
const WAIT_BARRIER_MULTIPLE_TIMES: u128 = 5;
pub const WAIT_BARRIER_MULTIPLE_TIMES: u128 = 5;

pub struct SourceExecutor<S: StateStore> {
actor_ctx: ActorContextRef,
Expand Down

0 comments on commit d3f4864

Please sign in to comment.