Skip to content

Commit

Permalink
refactor: unify to subscribe mutation via barrier sender (#18255)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Sep 18, 2024
1 parent 06d5cde commit a3c6e48
Show file tree
Hide file tree
Showing 22 changed files with 621 additions and 687 deletions.
10 changes: 0 additions & 10 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,6 @@ message InjectBarrierRequest {
repeated uint32 actor_ids_to_collect = 4;
repeated uint32 table_ids_to_sync = 5;
uint32 partial_graph_id = 6;
// Actors in the partial graphs of the creating jobs that need to be pre-synced the barrier mutation to.
//
// This is required because in snapshot backfill, the snapshot backfill executor receive barriers from
// both local barrier manager and upstream. If we don't pre-sync the barrier mutations, when an input executor
// of an snapshot backfill actor receive a barrier, it will be blocked when trying the fetch the mutation
// of this upstream barrier. The reason for blocking is that, the snapshot backfill have slower progress,
// and therefore won't be synced with the mutation of barrier in upstream. To solve this issue of blocking,
// we specify the set of snapshot backfill actors that needs to be pre-synced with the upstream barrier mutation,
// so that the input executor won't be blocked at waiting for the mutation of upstream barriers.
repeated uint32 actor_ids_to_pre_sync_barrier_mutation = 7;

repeated common.ActorInfo broadcast_info = 8;
repeated stream_plan.StreamActor actors_to_build = 9;
Expand Down
35 changes: 1 addition & 34 deletions src/meta/src/barrier/creating_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mod barrier_control;
mod status;

use std::cmp::max;
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::mem::take;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -43,7 +43,6 @@ use crate::barrier::progress::CreateMviewProgressTracker;
use crate::barrier::rpc::ControlStreamManager;
use crate::barrier::{Command, CreateStreamingJobCommandInfo, SnapshotBackfillInfo};
use crate::manager::WorkerId;
use crate::model::ActorId;
use crate::rpc::metrics::MetaMetrics;
use crate::MetaResult;

Expand Down Expand Up @@ -77,18 +76,6 @@ impl CreatingStreamingJobControl {
let mut create_mview_tracker = CreateMviewProgressTracker::default();
create_mview_tracker.update_tracking_jobs(Some((&info, None)), [], version_stat);
let fragment_info: HashMap<_, _> = info.new_fragment_info().collect();
let snapshot_backfill_actors_set = info.table_fragments.snapshot_backfill_actor_ids();
let mut snapshot_backfill_actors: HashMap<_, HashSet<_>> = HashMap::new();
for fragment in fragment_info.values() {
for (actor_id, worker_node) in &fragment.actors {
if snapshot_backfill_actors_set.contains(actor_id) {
snapshot_backfill_actors
.entry(*worker_node)
.or_default()
.insert(*actor_id);
}
}
}

let table_id = info.table_fragments.table_id();
let table_id_str = format!("{}", table_id.table_id);
Expand All @@ -108,7 +95,6 @@ impl CreatingStreamingJobControl {
graph_info: InflightGraphInfo::new(fragment_info),
backfill_epoch,
pending_non_checkpoint_barriers: vec![],
snapshot_backfill_actors,
initial_barrier_info: Some((actors_to_create, initial_mutation)),
},
upstream_lag: metrics
Expand Down Expand Up @@ -139,22 +125,6 @@ impl CreatingStreamingJobControl {
}
}

pub(super) fn actors_to_pre_sync_barrier(
&self,
) -> impl Iterator<Item = (&WorkerId, &HashSet<ActorId>)> + '_ {
if let CreatingStreamingJobStatus::ConsumingSnapshot {
snapshot_backfill_actors,
..
} = &self.status
{
Some(snapshot_backfill_actors)
} else {
None
}
.into_iter()
.flat_map(|actors| actors.iter())
}

pub(super) fn gen_ddl_progress(&self) -> DdlProgress {
let progress = match &self.status {
CreatingStreamingJobStatus::ConsumingSnapshot {
Expand Down Expand Up @@ -278,7 +248,6 @@ impl CreatingStreamingJobControl {
&kind,
graph_info,
Some(graph_info),
HashMap::new(),
new_actors,
vec![],
vec![],
Expand Down Expand Up @@ -345,7 +314,6 @@ impl CreatingStreamingJobControl {
&command_ctx.kind,
graph_info,
Some(graph_info),
HashMap::new(),
None,
vec![],
vec![],
Expand Down Expand Up @@ -394,7 +362,6 @@ impl CreatingStreamingJobControl {
} else {
Some(graph_info)
},
HashMap::new(),
None,
vec![],
vec![],
Expand Down
4 changes: 1 addition & 3 deletions src/meta/src/barrier/creating_job/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::mem::take;
use std::sync::Arc;

Expand All @@ -27,7 +27,6 @@ use crate::barrier::info::InflightGraphInfo;
use crate::barrier::progress::CreateMviewProgressTracker;
use crate::barrier::{BarrierKind, TracedEpoch};
use crate::manager::WorkerId;
use crate::model::ActorId;

#[derive(Debug)]
pub(super) enum CreatingStreamingJobStatus {
Expand All @@ -40,7 +39,6 @@ pub(super) enum CreatingStreamingJobStatus {
backfill_epoch: u64,
/// The `prev_epoch` of pending non checkpoint barriers
pending_non_checkpoint_barriers: Vec<u64>,
snapshot_backfill_actors: HashMap<WorkerId, HashSet<ActorId>>,
/// Info of the first barrier: (`actors_to_create`, `mutation`)
/// Take the mutation out when injecting the first barrier
initial_barrier_info: Option<(HashMap<WorkerId, Vec<StreamActor>>, Mutation)>,
Expand Down
8 changes: 0 additions & 8 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1046,17 +1046,10 @@ impl GlobalBarrierManager {

let table_ids_to_commit: HashSet<_> = pre_applied_graph_info.existing_table_ids().collect();

let mut actors_to_pre_sync_barrier: HashMap<_, Vec<_>> = HashMap::new();
let mut jobs_to_wait = HashSet::new();

for (table_id, creating_job) in &mut self.checkpoint_control.creating_streaming_job_controls
{
for (worker_id, actors) in creating_job.actors_to_pre_sync_barrier() {
actors_to_pre_sync_barrier
.entry(*worker_id)
.or_default()
.extend(actors.iter().cloned())
}
if let Some(wait_job) =
creating_job.on_new_command(&mut self.control_stream_manager, &command_ctx)?
{
Expand All @@ -1071,7 +1064,6 @@ impl GlobalBarrierManager {
&command_ctx,
&pre_applied_graph_info,
Some(&self.state.inflight_graph_info),
actors_to_pre_sync_barrier,
) {
Ok(node_to_collect) => node_to_collect,
Err(err) => {
Expand Down
1 change: 0 additions & 1 deletion src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,6 @@ impl GlobalBarrierManager {
&BarrierKind::Initial,
&info,
Some(&info),
HashMap::new(),
Some(node_actors),
vec![],
vec![],
Expand Down
17 changes: 1 addition & 16 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use futures::stream::{BoxStream, FuturesUnordered};
use futures::StreamExt;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::ActorId;
use risingwave_common::util::tracing::TracingContext;
use risingwave_hummock_sdk::HummockVersionId;
use risingwave_pb::common::{ActorInfo, WorkerNode};
Expand Down Expand Up @@ -275,7 +274,6 @@ impl ControlStreamManager {
command_ctx: &CommandContext,
pre_applied_graph_info: &InflightGraphInfo,
applied_graph_info: Option<&InflightGraphInfo>,
actor_ids_to_pre_sync_mutation: HashMap<WorkerId, Vec<ActorId>>,
) -> MetaResult<HashSet<WorkerId>> {
let mutation = command_ctx.to_mutation();
let subscriptions_to_add = if let Some(Mutation::Add(add)) = &mutation {
Expand All @@ -295,14 +293,12 @@ impl ControlStreamManager {
&command_ctx.kind,
pre_applied_graph_info,
applied_graph_info,
actor_ids_to_pre_sync_mutation,
command_ctx.command.actors_to_create(),
subscriptions_to_add,
subscriptions_to_remove,
)
}

#[expect(clippy::too_many_arguments)]
pub(super) fn inject_barrier(
&mut self,
creating_table_id: Option<TableId>,
Expand All @@ -311,7 +307,6 @@ impl ControlStreamManager {
kind: &BarrierKind,
pre_applied_graph_info: &InflightGraphInfo,
applied_graph_info: Option<&InflightGraphInfo>,
actor_ids_to_pre_sync_mutation: HashMap<WorkerId, Vec<ActorId>>,
mut new_actors: Option<HashMap<WorkerId, Vec<StreamActor>>>,
subscriptions_to_add: Vec<SubscriptionUpstreamInfo>,
subscriptions_to_remove: Vec<SubscriptionUpstreamInfo>,
Expand All @@ -321,10 +316,7 @@ impl ControlStreamManager {
));

let partial_graph_id = creating_table_id
.map(|table_id| {
assert!(actor_ids_to_pre_sync_mutation.is_empty());
table_id.table_id
})
.map(|table_id| table_id.table_id)
.unwrap_or(u32::MAX);

for worker_id in pre_applied_graph_info
Expand Down Expand Up @@ -401,13 +393,6 @@ impl ControlStreamManager {
actor_ids_to_collect,
table_ids_to_sync,
partial_graph_id,
actor_ids_to_pre_sync_barrier_mutation:
actor_ids_to_pre_sync_mutation
.get(node_id)
.into_iter()
.flatten()
.cloned()
.collect(),
broadcast_info: new_actors_location_to_broadcast.clone(),
actors_to_build: new_actors
.as_mut()
Expand Down
63 changes: 18 additions & 45 deletions src/stream/src/executor/backfill/snapshot_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::executor::monitor::StreamingMetrics;
use crate::executor::prelude::{try_stream, StreamExt};
use crate::executor::{
expect_first_barrier, ActorContextRef, BackfillExecutor, Barrier, BoxedMessageStream,
DispatcherBarrier, DispatcherMessage, Execute, Executor, Message, Mutation,
DispatcherBarrier, DispatcherMessage, Execute, MergeExecutorInput, Message, Mutation,
StreamExecutorError, StreamExecutorResult,
};
use crate::task::CreateMviewProgressReporter;
Expand All @@ -50,7 +50,7 @@ pub struct SnapshotBackfillExecutor<S: StateStore> {
upstream_table: StorageTable<S>,

/// Upstream with the same schema with the upstream table.
upstream: Executor,
upstream: MergeExecutorInput,

/// The column indices need to be forwarded to the downstream from the upstream and table scan.
output_indices: Vec<usize>,
Expand All @@ -68,9 +68,9 @@ pub struct SnapshotBackfillExecutor<S: StateStore> {

impl<S: StateStore> SnapshotBackfillExecutor<S> {
#[expect(clippy::too_many_arguments)]
pub fn new(
pub(crate) fn new(
upstream_table: StorageTable<S>,
upstream: Executor,
upstream: MergeExecutorInput,
output_indices: Vec<usize>,
actor_ctx: ActorContextRef,
progress: CreateMviewProgressReporter,
Expand Down Expand Up @@ -101,15 +101,14 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
#[try_stream(ok = Message, error = StreamExecutorError)]
async fn execute_inner(mut self) {
debug!("snapshot backfill executor start");
let mut upstream = erase_upstream_mutation(self.upstream.execute());
let upstream_table_id = self.upstream_table.table_id();
let first_barrier = expect_first_barrier(&mut upstream).await?;
let first_barrier = expect_first_barrier(&mut self.upstream).await?;
debug!(epoch = ?first_barrier.epoch, "get first upstream barrier");
let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
debug!(epoch = ?first_recv_barrier.epoch, "get first inject barrier");
let should_backfill = first_barrier.epoch != first_recv_barrier.epoch;

{
let mut barrier_epoch = {
if should_backfill {
let subscriber_ids = first_recv_barrier
.added_subscriber_on_mv_table(upstream_table_id)
Expand Down Expand Up @@ -140,7 +139,7 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
.with_guarded_label_values(&[&table_id_str, &actor_id_str, "consume_upstream"]);

let mut upstream_buffer = UpstreamBuffer::new(
&mut upstream,
&mut self.upstream,
upstream_table_id,
snapshot_backfill_table_fragment_id,
consume_upstream_row_count,
Expand Down Expand Up @@ -250,6 +249,7 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
table_id = self.upstream_table.table_id().table_id,
"finish consuming log store"
);
barrier_epoch
} else {
info!(
table_id = self.upstream_table.table_id().table_id,
Expand All @@ -258,19 +258,17 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
assert_eq!(first_barrier.epoch, first_recv_barrier.epoch);
yield Message::Barrier(first_recv_barrier);
first_barrier.epoch
}
}
};
let mut upstream = self.upstream.into_executor(self.barrier_rx).execute();
// Phase 3: consume upstream
while let Some(msg) = upstream.try_next().await? {
yield match msg {
DispatcherMessage::Chunk(chunk) => Message::Chunk(chunk),
DispatcherMessage::Watermark(watermark) => Message::Watermark(watermark),
DispatcherMessage::Barrier(barrier) => {
let recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
assert_eq!(barrier.epoch, recv_barrier.epoch);
Message::Barrier(recv_barrier)
}
};
if let Message::Barrier(barrier) = &msg {
assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
barrier_epoch = barrier.epoch;
}
yield msg;
}
}
}
Expand Down Expand Up @@ -404,33 +402,8 @@ impl<'a> UpstreamBufferState for StateOfConsumingLogStore<'a> {
}
}

mod erase_upstream_mutation {
use futures::TryStreamExt;

use crate::executor::prelude::Stream;
use crate::executor::{BoxedMessageStream, DispatcherMessageStreamItem};

pub(super) fn erase_upstream_mutation(upstream: BoxedMessageStream) -> UpstreamStream {
upstream.map_ok(|msg| {
msg.map_mutation(|mutation| {
if let Some(mutation) = mutation {
// TODO: assert none mutation after we explicitly erase mutation
warn!(
?mutation,
"receive non-empty mutation from upstream. ignored"
);
};
})
})
}

pub(super) type UpstreamStream = impl Stream<Item = DispatcherMessageStreamItem> + Unpin;
}

use erase_upstream_mutation::*;

struct UpstreamBuffer<'a, S> {
upstream: &'a mut UpstreamStream,
upstream: &'a mut MergeExecutorInput,
state: S,
consume_upstream_row_count: LabelGuardedIntCounter<3>,
upstream_table_id: TableId,
Expand All @@ -439,7 +412,7 @@ struct UpstreamBuffer<'a, S> {

impl<'a> UpstreamBuffer<'a, StateOfConsumingSnapshot> {
fn new(
upstream: &'a mut UpstreamStream,
upstream: &'a mut MergeExecutorInput,
upstream_table_id: TableId,
current_subscriber_id: u32,
consume_upstream_row_count: LabelGuardedIntCounter<3>,
Expand Down
9 changes: 5 additions & 4 deletions src/stream/src/executor/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1180,10 +1180,6 @@ mod tests {
let actor_id = 233;
let fragment_id = 666;
let barrier_test_env = LocalBarrierTestEnv::for_test().await;
let input = Executor::new(
Default::default(),
ReceiverExecutor::for_test(233, rx, barrier_test_env.shared_context.clone()).boxed(),
);
let ctx = Arc::new(SharedContext::for_test());
let metrics = Arc::new(StreamingMetrics::unused());

Expand Down Expand Up @@ -1252,6 +1248,11 @@ mod tests {
.flush_all_events()
.await;

let input = Executor::new(
Default::default(),
ReceiverExecutor::for_test(actor_id, rx, barrier_test_env.shared_context.clone())
.boxed(),
);
let executor = Box::new(DispatchExecutor::new(
input,
vec![broadcast_dispatcher, simple_dispatcher],
Expand Down
Loading

0 comments on commit a3c6e48

Please sign in to comment.