Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta): build actors in inject barrier #18270

Merged
merged 5 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 3 additions & 33 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,6 @@ message BuildActorInfo {
map<uint32, SubscriptionIds> related_subscriptions = 2;
}

// Describe the fragments which will be running on this node
message UpdateActorsRequest {
string request_id = 1;
repeated BuildActorInfo actors = 2;
}

message UpdateActorsResponse {
common.Status status = 1;
}

message BroadcastActorInfoTableRequest {
repeated common.ActorInfo info = 1;
}

// Create channels and gRPC connections for a fragment
message BuildActorsRequest {
string request_id = 1;
repeated uint32 actor_id = 2;
}

message BuildActorsResponse {
string request_id = 1;
common.Status status = 2;
}

message DropActorsRequest {
string request_id = 1;
repeated uint32 actor_ids = 2;
Expand All @@ -68,6 +43,9 @@ message InjectBarrierRequest {
// we specify the set of snapshot backfill actors that needs to be pre-synced with the upstream barrier mutation,
// so that the input executor won't be blocked at waiting for the mutation of upstream barriers.
repeated uint32 actor_ids_to_pre_sync_barrier_mutation = 7;

repeated common.ActorInfo broadcast_info = 8;
repeated BuildActorInfo actors_to_build = 9;
}

message BarrierCompleteResponse {
Expand Down Expand Up @@ -95,11 +73,6 @@ message BarrierCompleteResponse {
uint64 epoch = 9;
}

// Before starting streaming, the leader node broadcast the actor-host table to needed workers.
message BroadcastActorInfoTableResponse {
common.Status status = 1;
}

message WaitEpochCommitRequest {
uint64 epoch = 1;
}
Expand Down Expand Up @@ -136,9 +109,6 @@ message StreamingControlStreamResponse {
}

service StreamService {
rpc UpdateActors(UpdateActorsRequest) returns (UpdateActorsResponse);
rpc BuildActors(BuildActorsRequest) returns (BuildActorsResponse);
rpc BroadcastActorInfoTable(BroadcastActorInfoTableRequest) returns (BroadcastActorInfoTableResponse);
rpc DropActors(DropActorsRequest) returns (DropActorsResponse);
rpc WaitEpochCommit(WaitEpochCommitRequest) returns (WaitEpochCommitResponse);
rpc StreamingControlStream(stream StreamingControlStreamRequest) returns (stream StreamingControlStreamResponse);
Expand Down
57 changes: 0 additions & 57 deletions src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use risingwave_pb::stream_service::*;
use risingwave_storage::dispatch_state_store;
use risingwave_stream::error::StreamError;
use risingwave_stream::task::{LocalStreamManager, StreamEnvironment};
use thiserror_ext::AsReport;
use tokio::sync::mpsc::unbounded_channel;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tonic::{Request, Response, Status, Streaming};
Expand All @@ -41,62 +40,6 @@ impl StreamService for StreamServiceImpl {
type StreamingControlStreamStream =
impl Stream<Item = std::result::Result<StreamingControlStreamResponse, tonic::Status>>;

#[cfg_attr(coverage, coverage(off))]
async fn update_actors(
&self,
request: Request<UpdateActorsRequest>,
) -> std::result::Result<Response<UpdateActorsResponse>, Status> {
let req = request.into_inner();
let res = self.mgr.update_actors(req.actors).await;
match res {
Err(e) => {
error!(error = %e.as_report(), "failed to update stream actor");
Err(e.into())
}
Ok(()) => Ok(Response::new(UpdateActorsResponse { status: None })),
}
}

#[cfg_attr(coverage, coverage(off))]
async fn build_actors(
&self,
request: Request<BuildActorsRequest>,
) -> std::result::Result<Response<BuildActorsResponse>, Status> {
let req = request.into_inner();

let actor_id = req.actor_id;
let res = self.mgr.build_actors(actor_id).await;
match res {
Err(e) => {
error!(error = %e.as_report(), "failed to build actors");
Err(e.into())
}
Ok(()) => Ok(Response::new(BuildActorsResponse {
request_id: req.request_id,
status: None,
})),
}
}

#[cfg_attr(coverage, coverage(off))]
async fn broadcast_actor_info_table(
&self,
request: Request<BroadcastActorInfoTableRequest>,
) -> std::result::Result<Response<BroadcastActorInfoTableResponse>, Status> {
let req = request.into_inner();

let res = self.mgr.update_actor_info(req.info).await;
match res {
Err(e) => {
error!(error = %e.as_report(), "failed to update actor info table actor");
Err(e.into())
}
Ok(()) => Ok(Response::new(BroadcastActorInfoTableResponse {
status: None,
})),
}
}

#[cfg_attr(coverage, coverage(off))]
async fn drop_actors(
&self,
Expand Down
36 changes: 36 additions & 0 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,42 @@ impl CommandContext {
mutation
}

pub fn actors_to_create(&self) -> Option<HashMap<WorkerId, Vec<StreamActor>>> {
match &self.command {
Command::CreateStreamingJob { info, job_type } => {
let mut map = match job_type {
CreateStreamingJobType::Normal => HashMap::new(),
CreateStreamingJobType::SinkIntoTable(replace_table) => {
replace_table.new_table_fragments.actors_to_create()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great job!! Looks like #14904 can be resolved naturally!

}
CreateStreamingJobType::SnapshotBackfill(_) => {
// for snapshot backfill, the actors to create is measured separately
return None;
}
};
for (worker_id, new_actors) in info.table_fragments.actors_to_create() {
map.entry(worker_id).or_default().extend(new_actors)
}
Some(map)
}
Command::RescheduleFragment { reschedules, .. } => {
let mut map: HashMap<WorkerId, Vec<_>> = HashMap::new();
for (actor, status) in reschedules
.values()
.flat_map(|reschedule| reschedule.newly_created_actors.iter())
{
let worker_id = status.location.as_ref().unwrap().worker_node_id;
map.entry(worker_id).or_default().push(actor.clone());
}
Some(map)
}
Command::ReplaceTable(replace_table) => {
Some(replace_table.new_table_fragments.actors_to_create())
}
_ => None,
}
}

fn generate_update_mutation_for_replace_table(
old_table_fragments: &TableFragments,
merge_updates: &[MergeUpdate],
Expand Down
37 changes: 34 additions & 3 deletions src/meta/src/barrier/creating_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,23 @@ use std::mem::take;
use std::sync::Arc;
use std::time::Duration;

use itertools::Itertools;
use prometheus::HistogramTimer;
use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
use risingwave_common::util::epoch::Epoch;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::stream_service::BarrierCompleteResponse;
use risingwave_pb::stream_service::{BarrierCompleteResponse, BuildActorInfo};
use tracing::{debug, info};

use crate::barrier::command::CommandContext;
use crate::barrier::creating_job::barrier_control::{
CreatingStreamingJobBarrierControl, CreatingStreamingJobBarrierType,
};
use crate::barrier::creating_job::status::CreatingStreamingJobStatus;
use crate::barrier::creating_job::status::{
CreatingJobInjectBarrierInfo, CreatingStreamingJobStatus,
};
use crate::barrier::info::InflightGraphInfo;
use crate::barrier::progress::CreateMviewProgressTracker;
use crate::barrier::rpc::ControlStreamManager;
Expand Down Expand Up @@ -89,6 +92,8 @@ impl CreatingStreamingJobControl {
let table_id = info.table_fragments.table_id();
let table_id_str = format!("{}", table_id.table_id);

let actors_to_create = info.table_fragments.actors_to_create();

Self {
info,
snapshot_backfill_info,
Expand All @@ -103,6 +108,23 @@ impl CreatingStreamingJobControl {
backfill_epoch,
pending_non_checkpoint_barriers: vec![],
snapshot_backfill_actors,
actors_to_create: Some(
actors_to_create
.into_iter()
.map(|(worker_id, actors)| {
(
worker_id,
actors
.into_iter()
.map(|actor| BuildActorInfo {
actor: Some(actor),
related_subscriptions: Default::default(),
})
.collect_vec(),
)
})
.collect(),
),
},
upstream_lag: metrics
.snapshot_backfill_lag
Expand Down Expand Up @@ -256,7 +278,13 @@ impl CreatingStreamingJobControl {
.active_graph_info()
.expect("must exist when having barriers to inject");
let table_id = self.info.table_fragments.table_id();
for (curr_epoch, prev_epoch, kind) in barriers_to_inject {
for CreatingJobInjectBarrierInfo {
curr_epoch,
prev_epoch,
kind,
new_actors,
} in barriers_to_inject
{
let node_to_collect = control_stream_manager.inject_barrier(
Some(table_id),
None,
Expand All @@ -265,6 +293,7 @@ impl CreatingStreamingJobControl {
graph_info,
Some(graph_info),
HashMap::new(),
new_actors,
)?;
self.barrier_control.enqueue_epoch(
prev_epoch.value().0,
Expand Down Expand Up @@ -329,6 +358,7 @@ impl CreatingStreamingJobControl {
graph_info,
Some(graph_info),
HashMap::new(),
None,
)?;
self.barrier_control.enqueue_epoch(
command_ctx.prev_epoch.value().0,
Expand Down Expand Up @@ -375,6 +405,7 @@ impl CreatingStreamingJobControl {
Some(graph_info)
},
HashMap::new(),
None,
)?;
let prev_epoch = command_ctx.prev_epoch.value().0;
self.barrier_control.enqueue_epoch(
Expand Down
58 changes: 38 additions & 20 deletions src/meta/src/barrier/creating_job/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;
use risingwave_common::util::epoch::Epoch;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
use risingwave_pb::stream_service::BuildActorInfo;

use crate::barrier::command::CommandContext;
use crate::barrier::info::InflightGraphInfo;
Expand All @@ -39,6 +40,7 @@ pub(super) enum CreatingStreamingJobStatus {
/// The `prev_epoch` of pending non checkpoint barriers
pending_non_checkpoint_barriers: Vec<u64>,
snapshot_backfill_actors: HashMap<WorkerId, HashSet<ActorId>>,
actors_to_create: Option<HashMap<WorkerId, Vec<BuildActorInfo>>>,
},
ConsumingLogStore {
graph_info: InflightGraphInfo,
Expand All @@ -53,6 +55,13 @@ pub(super) enum CreatingStreamingJobStatus {
},
}

pub(super) struct CreatingJobInjectBarrierInfo {
pub curr_epoch: TracedEpoch,
pub prev_epoch: TracedEpoch,
pub kind: BarrierKind,
pub new_actors: Option<HashMap<WorkerId, Vec<BuildActorInfo>>>,
}

impl CreatingStreamingJobStatus {
pub(super) fn active_graph_info(&self) -> Option<&InflightGraphInfo> {
match self {
Expand Down Expand Up @@ -84,42 +93,43 @@ impl CreatingStreamingJobStatus {
/// return
/// - Some(vec[(`curr_epoch`, `prev_epoch`, `barrier_kind`)]) of barriers to newly inject
/// - Some(`graph_info`) when the status should transit to `ConsumingLogStore`
#[expect(clippy::type_complexity)]
pub(super) fn may_inject_fake_barrier(
&mut self,
is_checkpoint: bool,
) -> Option<(
Vec<(TracedEpoch, TracedEpoch, BarrierKind)>,
Option<InflightGraphInfo>,
)> {
) -> Option<(Vec<CreatingJobInjectBarrierInfo>, Option<InflightGraphInfo>)> {
if let CreatingStreamingJobStatus::ConsumingSnapshot {
prev_epoch_fake_physical_time,
pending_commands,
create_mview_tracker,
graph_info,
pending_non_checkpoint_barriers,
ref backfill_epoch,
actors_to_create,
..
} = self
{
if create_mview_tracker.has_pending_finished_jobs() {
assert!(actors_to_create.is_none());
pending_non_checkpoint_barriers.push(*backfill_epoch);

let prev_epoch = Epoch::from_physical_time(*prev_epoch_fake_physical_time);
let barriers_to_inject = [(
TracedEpoch::new(Epoch(*backfill_epoch)),
TracedEpoch::new(prev_epoch),
BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers)),
)]
.into_iter()
.chain(pending_commands.drain(..).map(|command_ctx| {
(
command_ctx.curr_epoch.clone(),
command_ctx.prev_epoch.clone(),
command_ctx.kind.clone(),
)
}))
.collect();
let barriers_to_inject =
[CreatingJobInjectBarrierInfo {
curr_epoch: TracedEpoch::new(Epoch(*backfill_epoch)),
prev_epoch: TracedEpoch::new(prev_epoch),
kind: BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers)),
new_actors: None,
}]
.into_iter()
.chain(pending_commands.drain(..).map(|command_ctx| {
CreatingJobInjectBarrierInfo {
curr_epoch: command_ctx.curr_epoch.clone(),
prev_epoch: command_ctx.prev_epoch.clone(),
kind: command_ctx.kind.clone(),
new_actors: None,
}
}))
.collect();

let graph_info = take(graph_info);
Some((barriers_to_inject, Some(graph_info)))
Expand All @@ -135,7 +145,15 @@ impl CreatingStreamingJobStatus {
} else {
BarrierKind::Barrier
};
Some((vec![(curr_epoch, prev_epoch, kind)], None))
Some((
vec![CreatingJobInjectBarrierInfo {
curr_epoch,
prev_epoch,
kind,
new_actors: actors_to_create.take(),
}],
None,
))
}
} else {
None
Expand Down
Loading
Loading