Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: unify to subscribe mutation via barrier sender #18255

Merged
merged 16 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,12 @@
option java_package = "com.risingwave.proto";
option optimize_for = SPEED;

message InjectBarrierRequest {

Check failure on line 12 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "7" with name "actor_ids_to_pre_sync_barrier_mutation" on message "InjectBarrierRequest" was deleted without reserving the name "actor_ids_to_pre_sync_barrier_mutation".

Check failure on line 12 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "7" with name "actor_ids_to_pre_sync_barrier_mutation" on message "InjectBarrierRequest" was deleted without reserving the number "7".
string request_id = 1;
stream_plan.Barrier barrier = 2;
repeated uint32 actor_ids_to_collect = 4;
repeated uint32 table_ids_to_sync = 5;
uint32 partial_graph_id = 6;
// Actors in the partial graphs of the creating jobs that need to be pre-synced the barrier mutation to.
//
// This is required because in snapshot backfill, the snapshot backfill executor receive barriers from
// both local barrier manager and upstream. If we don't pre-sync the barrier mutations, when an input executor
// of an snapshot backfill actor receive a barrier, it will be blocked when trying the fetch the mutation
// of this upstream barrier. The reason for blocking is that, the snapshot backfill have slower progress,
// and therefore won't be synced with the mutation of barrier in upstream. To solve this issue of blocking,
// we specify the set of snapshot backfill actors that needs to be pre-synced with the upstream barrier mutation,
// so that the input executor won't be blocked at waiting for the mutation of upstream barriers.
repeated uint32 actor_ids_to_pre_sync_barrier_mutation = 7;

repeated common.ActorInfo broadcast_info = 8;
repeated stream_plan.StreamActor actors_to_build = 9;
Expand Down
35 changes: 1 addition & 34 deletions src/meta/src/barrier/creating_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mod barrier_control;
mod status;

use std::cmp::max;
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::mem::take;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -43,7 +43,6 @@ use crate::barrier::progress::CreateMviewProgressTracker;
use crate::barrier::rpc::ControlStreamManager;
use crate::barrier::{Command, CreateStreamingJobCommandInfo, SnapshotBackfillInfo};
use crate::manager::WorkerId;
use crate::model::ActorId;
use crate::rpc::metrics::MetaMetrics;
use crate::MetaResult;

Expand Down Expand Up @@ -77,18 +76,6 @@ impl CreatingStreamingJobControl {
let mut create_mview_tracker = CreateMviewProgressTracker::default();
create_mview_tracker.update_tracking_jobs(Some((&info, None)), [], version_stat);
let fragment_info: HashMap<_, _> = info.new_fragment_info().collect();
let snapshot_backfill_actors_set = info.table_fragments.snapshot_backfill_actor_ids();
let mut snapshot_backfill_actors: HashMap<_, HashSet<_>> = HashMap::new();
for fragment in fragment_info.values() {
for (actor_id, worker_node) in &fragment.actors {
if snapshot_backfill_actors_set.contains(actor_id) {
snapshot_backfill_actors
.entry(*worker_node)
.or_default()
.insert(*actor_id);
}
}
}

let table_id = info.table_fragments.table_id();
let table_id_str = format!("{}", table_id.table_id);
Expand All @@ -108,7 +95,6 @@ impl CreatingStreamingJobControl {
graph_info: InflightGraphInfo::new(fragment_info),
backfill_epoch,
pending_non_checkpoint_barriers: vec![],
snapshot_backfill_actors,
initial_barrier_info: Some((actors_to_create, initial_mutation)),
},
upstream_lag: metrics
Expand Down Expand Up @@ -139,22 +125,6 @@ impl CreatingStreamingJobControl {
}
}

pub(super) fn actors_to_pre_sync_barrier(
&self,
) -> impl Iterator<Item = (&WorkerId, &HashSet<ActorId>)> + '_ {
if let CreatingStreamingJobStatus::ConsumingSnapshot {
snapshot_backfill_actors,
..
} = &self.status
{
Some(snapshot_backfill_actors)
} else {
None
}
.into_iter()
.flat_map(|actors| actors.iter())
}

pub(super) fn gen_ddl_progress(&self) -> DdlProgress {
let progress = match &self.status {
CreatingStreamingJobStatus::ConsumingSnapshot {
Expand Down Expand Up @@ -278,7 +248,6 @@ impl CreatingStreamingJobControl {
&kind,
graph_info,
Some(graph_info),
HashMap::new(),
new_actors,
vec![],
vec![],
Expand Down Expand Up @@ -345,7 +314,6 @@ impl CreatingStreamingJobControl {
&command_ctx.kind,
graph_info,
Some(graph_info),
HashMap::new(),
None,
vec![],
vec![],
Expand Down Expand Up @@ -394,7 +362,6 @@ impl CreatingStreamingJobControl {
} else {
Some(graph_info)
},
HashMap::new(),
None,
vec![],
vec![],
Expand Down
4 changes: 1 addition & 3 deletions src/meta/src/barrier/creating_job/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::mem::take;
use std::sync::Arc;

Expand All @@ -27,7 +27,6 @@ use crate::barrier::info::InflightGraphInfo;
use crate::barrier::progress::CreateMviewProgressTracker;
use crate::barrier::{BarrierKind, TracedEpoch};
use crate::manager::WorkerId;
use crate::model::ActorId;

#[derive(Debug)]
pub(super) enum CreatingStreamingJobStatus {
Expand All @@ -40,7 +39,6 @@ pub(super) enum CreatingStreamingJobStatus {
backfill_epoch: u64,
/// The `prev_epoch` of pending non checkpoint barriers
pending_non_checkpoint_barriers: Vec<u64>,
snapshot_backfill_actors: HashMap<WorkerId, HashSet<ActorId>>,
/// Info of the first barrier: (`actors_to_create`, `mutation`)
/// Take the mutation out when injecting the first barrier
initial_barrier_info: Option<(HashMap<WorkerId, Vec<StreamActor>>, Mutation)>,
Expand Down
8 changes: 0 additions & 8 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1046,17 +1046,10 @@ impl GlobalBarrierManager {

let table_ids_to_commit: HashSet<_> = pre_applied_graph_info.existing_table_ids().collect();

let mut actors_to_pre_sync_barrier: HashMap<_, Vec<_>> = HashMap::new();
let mut jobs_to_wait = HashSet::new();

for (table_id, creating_job) in &mut self.checkpoint_control.creating_streaming_job_controls
{
for (worker_id, actors) in creating_job.actors_to_pre_sync_barrier() {
actors_to_pre_sync_barrier
.entry(*worker_id)
.or_default()
.extend(actors.iter().cloned())
}
if let Some(wait_job) =
creating_job.on_new_command(&mut self.control_stream_manager, &command_ctx)?
{
Expand All @@ -1071,7 +1064,6 @@ impl GlobalBarrierManager {
&command_ctx,
&pre_applied_graph_info,
Some(&self.state.inflight_graph_info),
actors_to_pre_sync_barrier,
) {
Ok(node_to_collect) => node_to_collect,
Err(err) => {
Expand Down
1 change: 0 additions & 1 deletion src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,6 @@ impl GlobalBarrierManager {
&BarrierKind::Initial,
&info,
Some(&info),
HashMap::new(),
Some(node_actors),
vec![],
vec![],
Expand Down
17 changes: 1 addition & 16 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use futures::stream::{BoxStream, FuturesUnordered};
use futures::StreamExt;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::ActorId;
use risingwave_common::util::tracing::TracingContext;
use risingwave_hummock_sdk::HummockVersionId;
use risingwave_pb::common::{ActorInfo, WorkerNode};
Expand Down Expand Up @@ -275,7 +274,6 @@ impl ControlStreamManager {
command_ctx: &CommandContext,
pre_applied_graph_info: &InflightGraphInfo,
applied_graph_info: Option<&InflightGraphInfo>,
actor_ids_to_pre_sync_mutation: HashMap<WorkerId, Vec<ActorId>>,
) -> MetaResult<HashSet<WorkerId>> {
let mutation = command_ctx.to_mutation();
let subscriptions_to_add = if let Some(Mutation::Add(add)) = &mutation {
Expand All @@ -295,14 +293,12 @@ impl ControlStreamManager {
&command_ctx.kind,
pre_applied_graph_info,
applied_graph_info,
actor_ids_to_pre_sync_mutation,
command_ctx.command.actors_to_create(),
subscriptions_to_add,
subscriptions_to_remove,
)
}

#[expect(clippy::too_many_arguments)]
pub(super) fn inject_barrier(
&mut self,
creating_table_id: Option<TableId>,
Expand All @@ -311,7 +307,6 @@ impl ControlStreamManager {
kind: &BarrierKind,
pre_applied_graph_info: &InflightGraphInfo,
applied_graph_info: Option<&InflightGraphInfo>,
actor_ids_to_pre_sync_mutation: HashMap<WorkerId, Vec<ActorId>>,
mut new_actors: Option<HashMap<WorkerId, Vec<StreamActor>>>,
subscriptions_to_add: Vec<SubscriptionUpstreamInfo>,
subscriptions_to_remove: Vec<SubscriptionUpstreamInfo>,
Expand All @@ -321,10 +316,7 @@ impl ControlStreamManager {
));

let partial_graph_id = creating_table_id
.map(|table_id| {
assert!(actor_ids_to_pre_sync_mutation.is_empty());
table_id.table_id
})
.map(|table_id| table_id.table_id)
.unwrap_or(u32::MAX);

for worker_id in pre_applied_graph_info
Expand Down Expand Up @@ -401,13 +393,6 @@ impl ControlStreamManager {
actor_ids_to_collect,
table_ids_to_sync,
partial_graph_id,
actor_ids_to_pre_sync_barrier_mutation:
actor_ids_to_pre_sync_mutation
.get(node_id)
.into_iter()
.flatten()
.cloned()
.collect(),
broadcast_info: new_actors_location_to_broadcast.clone(),
actors_to_build: new_actors
.as_mut()
Expand Down
63 changes: 18 additions & 45 deletions src/stream/src/executor/backfill/snapshot_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::executor::monitor::StreamingMetrics;
use crate::executor::prelude::{try_stream, StreamExt};
use crate::executor::{
expect_first_barrier, ActorContextRef, BackfillExecutor, Barrier, BoxedMessageStream,
DispatcherBarrier, DispatcherMessage, Execute, Executor, Message, Mutation,
DispatcherBarrier, DispatcherMessage, Execute, MergeExecutorInput, Message, Mutation,
StreamExecutorError, StreamExecutorResult,
};
use crate::task::CreateMviewProgressReporter;
Expand All @@ -50,7 +50,7 @@ pub struct SnapshotBackfillExecutor<S: StateStore> {
upstream_table: StorageTable<S>,

/// Upstream with the same schema with the upstream table.
upstream: Executor,
upstream: MergeExecutorInput,

/// The column indices need to be forwarded to the downstream from the upstream and table scan.
output_indices: Vec<usize>,
Expand All @@ -68,9 +68,9 @@ pub struct SnapshotBackfillExecutor<S: StateStore> {

impl<S: StateStore> SnapshotBackfillExecutor<S> {
#[expect(clippy::too_many_arguments)]
pub fn new(
pub(crate) fn new(
upstream_table: StorageTable<S>,
upstream: Executor,
upstream: MergeExecutorInput,
output_indices: Vec<usize>,
actor_ctx: ActorContextRef,
progress: CreateMviewProgressReporter,
Expand Down Expand Up @@ -101,15 +101,14 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
#[try_stream(ok = Message, error = StreamExecutorError)]
async fn execute_inner(mut self) {
debug!("snapshot backfill executor start");
let mut upstream = erase_upstream_mutation(self.upstream.execute());
let upstream_table_id = self.upstream_table.table_id();
let first_barrier = expect_first_barrier(&mut upstream).await?;
let first_barrier = expect_first_barrier(&mut self.upstream).await?;
debug!(epoch = ?first_barrier.epoch, "get first upstream barrier");
let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
debug!(epoch = ?first_recv_barrier.epoch, "get first inject barrier");
let should_backfill = first_barrier.epoch != first_recv_barrier.epoch;

{
let mut barrier_epoch = {
if should_backfill {
let subscriber_ids = first_recv_barrier
.added_subscriber_on_mv_table(upstream_table_id)
Expand Down Expand Up @@ -140,7 +139,7 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
.with_guarded_label_values(&[&table_id_str, &actor_id_str, "consume_upstream"]);

let mut upstream_buffer = UpstreamBuffer::new(
&mut upstream,
&mut self.upstream,
upstream_table_id,
snapshot_backfill_table_fragment_id,
consume_upstream_row_count,
Expand Down Expand Up @@ -250,6 +249,7 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
table_id = self.upstream_table.table_id().table_id,
"finish consuming log store"
);
barrier_epoch
} else {
info!(
table_id = self.upstream_table.table_id().table_id,
Expand All @@ -258,19 +258,17 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
assert_eq!(first_barrier.epoch, first_recv_barrier.epoch);
yield Message::Barrier(first_recv_barrier);
first_barrier.epoch
}
}
};
let mut upstream = self.upstream.into_executor(self.barrier_rx).execute();
// Phase 3: consume upstream
while let Some(msg) = upstream.try_next().await? {
yield match msg {
DispatcherMessage::Chunk(chunk) => Message::Chunk(chunk),
DispatcherMessage::Watermark(watermark) => Message::Watermark(watermark),
DispatcherMessage::Barrier(barrier) => {
let recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
assert_eq!(barrier.epoch, recv_barrier.epoch);
Message::Barrier(recv_barrier)
}
};
if let Message::Barrier(barrier) = &msg {
assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
barrier_epoch = barrier.epoch;
}
yield msg;
}
}
}
Expand Down Expand Up @@ -404,33 +402,8 @@ impl<'a> UpstreamBufferState for StateOfConsumingLogStore<'a> {
}
}

mod erase_upstream_mutation {
use futures::TryStreamExt;

use crate::executor::prelude::Stream;
use crate::executor::{BoxedMessageStream, DispatcherMessageStreamItem};

pub(super) fn erase_upstream_mutation(upstream: BoxedMessageStream) -> UpstreamStream {
upstream.map_ok(|msg| {
msg.map_mutation(|mutation| {
if let Some(mutation) = mutation {
// TODO: assert none mutation after we explicitly erase mutation
warn!(
?mutation,
"receive non-empty mutation from upstream. ignored"
);
};
})
})
}

pub(super) type UpstreamStream = impl Stream<Item = DispatcherMessageStreamItem> + Unpin;
}

use erase_upstream_mutation::*;

struct UpstreamBuffer<'a, S> {
upstream: &'a mut UpstreamStream,
upstream: &'a mut MergeExecutorInput,
state: S,
consume_upstream_row_count: LabelGuardedIntCounter<3>,
upstream_table_id: TableId,
Expand All @@ -439,7 +412,7 @@ struct UpstreamBuffer<'a, S> {

impl<'a> UpstreamBuffer<'a, StateOfConsumingSnapshot> {
fn new(
upstream: &'a mut UpstreamStream,
upstream: &'a mut MergeExecutorInput,
upstream_table_id: TableId,
current_subscriber_id: u32,
consume_upstream_row_count: LabelGuardedIntCounter<3>,
Expand Down
9 changes: 5 additions & 4 deletions src/stream/src/executor/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1180,10 +1180,6 @@ mod tests {
let actor_id = 233;
let fragment_id = 666;
let barrier_test_env = LocalBarrierTestEnv::for_test().await;
let input = Executor::new(
Default::default(),
ReceiverExecutor::for_test(233, rx, barrier_test_env.shared_context.clone()).boxed(),
);
let ctx = Arc::new(SharedContext::for_test());
let metrics = Arc::new(StreamingMetrics::unused());

Expand Down Expand Up @@ -1252,6 +1248,11 @@ mod tests {
.flush_all_events()
.await;

let input = Executor::new(
Default::default(),
ReceiverExecutor::for_test(actor_id, rx, barrier_test_env.shared_context.clone())
.boxed(),
);
let executor = Box::new(DispatchExecutor::new(
input,
vec![broadcast_dispatcher, simple_dispatcher],
Expand Down
Loading
Loading