Skip to content

Commit

Permalink
refactor(meta): build actors in inject barrier (#18270)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Aug 29, 2024
1 parent a137e30 commit fac2904
Show file tree
Hide file tree
Showing 13 changed files with 260 additions and 669 deletions.
36 changes: 3 additions & 33 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,6 @@ message BuildActorInfo {
map<uint32, SubscriptionIds> related_subscriptions = 2;
}

// Describe the fragments which will be running on this node
message UpdateActorsRequest {
string request_id = 1;
repeated BuildActorInfo actors = 2;
}

message UpdateActorsResponse {
common.Status status = 1;
}

message BroadcastActorInfoTableRequest {
repeated common.ActorInfo info = 1;
}

// Create channels and gRPC connections for a fragment
message BuildActorsRequest {
string request_id = 1;
repeated uint32 actor_id = 2;
}

message BuildActorsResponse {
string request_id = 1;
common.Status status = 2;
}

message DropActorsRequest {
string request_id = 1;
repeated uint32 actor_ids = 2;
Expand All @@ -68,6 +43,9 @@ message InjectBarrierRequest {
// we specify the set of snapshot backfill actors that needs to be pre-synced with the upstream barrier mutation,
// so that the input executor won't be blocked at waiting for the mutation of upstream barriers.
repeated uint32 actor_ids_to_pre_sync_barrier_mutation = 7;

repeated common.ActorInfo broadcast_info = 8;
repeated BuildActorInfo actors_to_build = 9;
}

message BarrierCompleteResponse {
Expand Down Expand Up @@ -95,11 +73,6 @@ message BarrierCompleteResponse {
uint64 epoch = 9;
}

// Before starting streaming, the leader node broadcast the actor-host table to needed workers.
message BroadcastActorInfoTableResponse {
common.Status status = 1;
}

message WaitEpochCommitRequest {
uint64 epoch = 1;
}
Expand Down Expand Up @@ -136,9 +109,6 @@ message StreamingControlStreamResponse {
}

service StreamService {
rpc UpdateActors(UpdateActorsRequest) returns (UpdateActorsResponse);
rpc BuildActors(BuildActorsRequest) returns (BuildActorsResponse);
rpc BroadcastActorInfoTable(BroadcastActorInfoTableRequest) returns (BroadcastActorInfoTableResponse);
rpc DropActors(DropActorsRequest) returns (DropActorsResponse);
rpc WaitEpochCommit(WaitEpochCommitRequest) returns (WaitEpochCommitResponse);
rpc StreamingControlStream(stream StreamingControlStreamRequest) returns (stream StreamingControlStreamResponse);
Expand Down
57 changes: 0 additions & 57 deletions src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use risingwave_pb::stream_service::*;
use risingwave_storage::dispatch_state_store;
use risingwave_stream::error::StreamError;
use risingwave_stream::task::{LocalStreamManager, StreamEnvironment};
use thiserror_ext::AsReport;
use tokio::sync::mpsc::unbounded_channel;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tonic::{Request, Response, Status, Streaming};
Expand All @@ -41,62 +40,6 @@ impl StreamService for StreamServiceImpl {
type StreamingControlStreamStream =
impl Stream<Item = std::result::Result<StreamingControlStreamResponse, tonic::Status>>;

#[cfg_attr(coverage, coverage(off))]
async fn update_actors(
&self,
request: Request<UpdateActorsRequest>,
) -> std::result::Result<Response<UpdateActorsResponse>, Status> {
let req = request.into_inner();
let res = self.mgr.update_actors(req.actors).await;
match res {
Err(e) => {
error!(error = %e.as_report(), "failed to update stream actor");
Err(e.into())
}
Ok(()) => Ok(Response::new(UpdateActorsResponse { status: None })),
}
}

#[cfg_attr(coverage, coverage(off))]
async fn build_actors(
&self,
request: Request<BuildActorsRequest>,
) -> std::result::Result<Response<BuildActorsResponse>, Status> {
let req = request.into_inner();

let actor_id = req.actor_id;
let res = self.mgr.build_actors(actor_id).await;
match res {
Err(e) => {
error!(error = %e.as_report(), "failed to build actors");
Err(e.into())
}
Ok(()) => Ok(Response::new(BuildActorsResponse {
request_id: req.request_id,
status: None,
})),
}
}

#[cfg_attr(coverage, coverage(off))]
async fn broadcast_actor_info_table(
&self,
request: Request<BroadcastActorInfoTableRequest>,
) -> std::result::Result<Response<BroadcastActorInfoTableResponse>, Status> {
let req = request.into_inner();

let res = self.mgr.update_actor_info(req.info).await;
match res {
Err(e) => {
error!(error = %e.as_report(), "failed to update actor info table actor");
Err(e.into())
}
Ok(()) => Ok(Response::new(BroadcastActorInfoTableResponse {
status: None,
})),
}
}

#[cfg_attr(coverage, coverage(off))]
async fn drop_actors(
&self,
Expand Down
36 changes: 36 additions & 0 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,42 @@ impl CommandContext {
mutation
}

pub fn actors_to_create(&self) -> Option<HashMap<WorkerId, Vec<StreamActor>>> {
match &self.command {
Command::CreateStreamingJob { info, job_type } => {
let mut map = match job_type {
CreateStreamingJobType::Normal => HashMap::new(),
CreateStreamingJobType::SinkIntoTable(replace_table) => {
replace_table.new_table_fragments.actors_to_create()
}
CreateStreamingJobType::SnapshotBackfill(_) => {
// for snapshot backfill, the actors to create is measured separately
return None;
}
};
for (worker_id, new_actors) in info.table_fragments.actors_to_create() {
map.entry(worker_id).or_default().extend(new_actors)
}
Some(map)
}
Command::RescheduleFragment { reschedules, .. } => {
let mut map: HashMap<WorkerId, Vec<_>> = HashMap::new();
for (actor, status) in reschedules
.values()
.flat_map(|reschedule| reschedule.newly_created_actors.iter())
{
let worker_id = status.location.as_ref().unwrap().worker_node_id;
map.entry(worker_id).or_default().push(actor.clone());
}
Some(map)
}
Command::ReplaceTable(replace_table) => {
Some(replace_table.new_table_fragments.actors_to_create())
}
_ => None,
}
}

fn generate_update_mutation_for_replace_table(
old_table_fragments: &TableFragments,
merge_updates: &[MergeUpdate],
Expand Down
37 changes: 34 additions & 3 deletions src/meta/src/barrier/creating_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,23 @@ use std::mem::take;
use std::sync::Arc;
use std::time::Duration;

use itertools::Itertools;
use prometheus::HistogramTimer;
use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
use risingwave_common::util::epoch::Epoch;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::stream_service::BarrierCompleteResponse;
use risingwave_pb::stream_service::{BarrierCompleteResponse, BuildActorInfo};
use tracing::{debug, info};

use crate::barrier::command::CommandContext;
use crate::barrier::creating_job::barrier_control::{
CreatingStreamingJobBarrierControl, CreatingStreamingJobBarrierType,
};
use crate::barrier::creating_job::status::CreatingStreamingJobStatus;
use crate::barrier::creating_job::status::{
CreatingJobInjectBarrierInfo, CreatingStreamingJobStatus,
};
use crate::barrier::info::InflightGraphInfo;
use crate::barrier::progress::CreateMviewProgressTracker;
use crate::barrier::rpc::ControlStreamManager;
Expand Down Expand Up @@ -89,6 +92,8 @@ impl CreatingStreamingJobControl {
let table_id = info.table_fragments.table_id();
let table_id_str = format!("{}", table_id.table_id);

let actors_to_create = info.table_fragments.actors_to_create();

Self {
info,
snapshot_backfill_info,
Expand All @@ -103,6 +108,23 @@ impl CreatingStreamingJobControl {
backfill_epoch,
pending_non_checkpoint_barriers: vec![],
snapshot_backfill_actors,
actors_to_create: Some(
actors_to_create
.into_iter()
.map(|(worker_id, actors)| {
(
worker_id,
actors
.into_iter()
.map(|actor| BuildActorInfo {
actor: Some(actor),
related_subscriptions: Default::default(),
})
.collect_vec(),
)
})
.collect(),
),
},
upstream_lag: metrics
.snapshot_backfill_lag
Expand Down Expand Up @@ -256,7 +278,13 @@ impl CreatingStreamingJobControl {
.active_graph_info()
.expect("must exist when having barriers to inject");
let table_id = self.info.table_fragments.table_id();
for (curr_epoch, prev_epoch, kind) in barriers_to_inject {
for CreatingJobInjectBarrierInfo {
curr_epoch,
prev_epoch,
kind,
new_actors,
} in barriers_to_inject
{
let node_to_collect = control_stream_manager.inject_barrier(
Some(table_id),
None,
Expand All @@ -265,6 +293,7 @@ impl CreatingStreamingJobControl {
graph_info,
Some(graph_info),
HashMap::new(),
new_actors,
)?;
self.barrier_control.enqueue_epoch(
prev_epoch.value().0,
Expand Down Expand Up @@ -329,6 +358,7 @@ impl CreatingStreamingJobControl {
graph_info,
Some(graph_info),
HashMap::new(),
None,
)?;
self.barrier_control.enqueue_epoch(
command_ctx.prev_epoch.value().0,
Expand Down Expand Up @@ -375,6 +405,7 @@ impl CreatingStreamingJobControl {
Some(graph_info)
},
HashMap::new(),
None,
)?;
let prev_epoch = command_ctx.prev_epoch.value().0;
self.barrier_control.enqueue_epoch(
Expand Down
58 changes: 38 additions & 20 deletions src/meta/src/barrier/creating_job/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;
use risingwave_common::util::epoch::Epoch;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
use risingwave_pb::stream_service::BuildActorInfo;

use crate::barrier::command::CommandContext;
use crate::barrier::info::InflightGraphInfo;
Expand All @@ -39,6 +40,7 @@ pub(super) enum CreatingStreamingJobStatus {
/// The `prev_epoch` of pending non checkpoint barriers
pending_non_checkpoint_barriers: Vec<u64>,
snapshot_backfill_actors: HashMap<WorkerId, HashSet<ActorId>>,
actors_to_create: Option<HashMap<WorkerId, Vec<BuildActorInfo>>>,
},
ConsumingLogStore {
graph_info: InflightGraphInfo,
Expand All @@ -53,6 +55,13 @@ pub(super) enum CreatingStreamingJobStatus {
},
}

pub(super) struct CreatingJobInjectBarrierInfo {
pub curr_epoch: TracedEpoch,
pub prev_epoch: TracedEpoch,
pub kind: BarrierKind,
pub new_actors: Option<HashMap<WorkerId, Vec<BuildActorInfo>>>,
}

impl CreatingStreamingJobStatus {
pub(super) fn active_graph_info(&self) -> Option<&InflightGraphInfo> {
match self {
Expand Down Expand Up @@ -84,42 +93,43 @@ impl CreatingStreamingJobStatus {
/// return
/// - Some(vec[(`curr_epoch`, `prev_epoch`, `barrier_kind`)]) of barriers to newly inject
/// - Some(`graph_info`) when the status should transit to `ConsumingLogStore`
#[expect(clippy::type_complexity)]
pub(super) fn may_inject_fake_barrier(
&mut self,
is_checkpoint: bool,
) -> Option<(
Vec<(TracedEpoch, TracedEpoch, BarrierKind)>,
Option<InflightGraphInfo>,
)> {
) -> Option<(Vec<CreatingJobInjectBarrierInfo>, Option<InflightGraphInfo>)> {
if let CreatingStreamingJobStatus::ConsumingSnapshot {
prev_epoch_fake_physical_time,
pending_commands,
create_mview_tracker,
graph_info,
pending_non_checkpoint_barriers,
ref backfill_epoch,
actors_to_create,
..
} = self
{
if create_mview_tracker.has_pending_finished_jobs() {
assert!(actors_to_create.is_none());
pending_non_checkpoint_barriers.push(*backfill_epoch);

let prev_epoch = Epoch::from_physical_time(*prev_epoch_fake_physical_time);
let barriers_to_inject = [(
TracedEpoch::new(Epoch(*backfill_epoch)),
TracedEpoch::new(prev_epoch),
BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers)),
)]
.into_iter()
.chain(pending_commands.drain(..).map(|command_ctx| {
(
command_ctx.curr_epoch.clone(),
command_ctx.prev_epoch.clone(),
command_ctx.kind.clone(),
)
}))
.collect();
let barriers_to_inject =
[CreatingJobInjectBarrierInfo {
curr_epoch: TracedEpoch::new(Epoch(*backfill_epoch)),
prev_epoch: TracedEpoch::new(prev_epoch),
kind: BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers)),
new_actors: None,
}]
.into_iter()
.chain(pending_commands.drain(..).map(|command_ctx| {
CreatingJobInjectBarrierInfo {
curr_epoch: command_ctx.curr_epoch.clone(),
prev_epoch: command_ctx.prev_epoch.clone(),
kind: command_ctx.kind.clone(),
new_actors: None,
}
}))
.collect();

let graph_info = take(graph_info);
Some((barriers_to_inject, Some(graph_info)))
Expand All @@ -135,7 +145,15 @@ impl CreatingStreamingJobStatus {
} else {
BarrierKind::Barrier
};
Some((vec![(curr_epoch, prev_epoch, kind)], None))
Some((
vec![CreatingJobInjectBarrierInfo {
curr_epoch,
prev_epoch,
kind,
new_actors: actors_to_create.take(),
}],
None,
))
}
} else {
None
Expand Down
Loading

0 comments on commit fac2904

Please sign in to comment.