Skip to content

Commit

Permalink
feat(meta): inject and collect barrier in bidi stream (#14887)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Mar 12, 2024
1 parent 5a8a6a2 commit 72d37ef
Show file tree
Hide file tree
Showing 15 changed files with 924 additions and 684 deletions.
44 changes: 21 additions & 23 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,33 +44,13 @@ message DropActorsResponse {
common.Status status = 2;
}

message ForceStopActorsRequest {
string request_id = 1;
uint64 prev_epoch = 2;
}

message ForceStopActorsResponse {
string request_id = 1;
common.Status status = 2;
}

message InjectBarrierRequest {
string request_id = 1;
stream_plan.Barrier barrier = 2;
repeated uint32 actor_ids_to_send = 3;
repeated uint32 actor_ids_to_collect = 4;
}

message InjectBarrierResponse {
string request_id = 1;
common.Status status = 2;
}

message BarrierCompleteRequest {
string request_id = 1;
uint64 prev_epoch = 2;
map<string, string> tracing_context = 3;
}
message BarrierCompleteResponse {
message CreateMviewProgress {
uint32 backfill_actor_id = 1;
Expand Down Expand Up @@ -104,15 +84,33 @@ message WaitEpochCommitResponse {
common.Status status = 1;
}

message StreamingControlStreamRequest {
message InitRequest {
uint64 prev_epoch = 2;
}

oneof request {
InitRequest init = 1;
InjectBarrierRequest inject_barrier = 2;
}
}

message StreamingControlStreamResponse {
message InitResponse {}

oneof response {
InitResponse init = 1;
BarrierCompleteResponse complete_barrier = 2;
}
}

service StreamService {
rpc UpdateActors(UpdateActorsRequest) returns (UpdateActorsResponse);
rpc BuildActors(BuildActorsRequest) returns (BuildActorsResponse);
rpc BroadcastActorInfoTable(BroadcastActorInfoTableRequest) returns (BroadcastActorInfoTableResponse);
rpc DropActors(DropActorsRequest) returns (DropActorsResponse);
rpc ForceStopActors(ForceStopActorsRequest) returns (ForceStopActorsResponse);
rpc InjectBarrier(InjectBarrierRequest) returns (InjectBarrierResponse);
rpc BarrierComplete(BarrierCompleteRequest) returns (BarrierCompleteResponse);
rpc WaitEpochCommit(WaitEpochCommitRequest) returns (WaitEpochCommitResponse);
rpc StreamingControlStream(stream StreamingControlStreamRequest) returns (stream StreamingControlStreamResponse);
}

// TODO: Lifecycle management for actors.
115 changes: 28 additions & 87 deletions src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,16 @@
// limitations under the License.

use await_tree::InstrumentAwait;
use itertools::Itertools;
use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map;
use risingwave_hummock_sdk::LocalSstableInfo;
use risingwave_pb::stream_service::barrier_complete_response::GroupedSstableInfo;
use futures::{Stream, StreamExt, TryStreamExt};
use risingwave_pb::stream_service::stream_service_server::StreamService;
use risingwave_pb::stream_service::*;
use risingwave_storage::dispatch_state_store;
use risingwave_stream::error::StreamError;
use risingwave_stream::executor::Barrier;
use risingwave_stream::task::{BarrierCompleteResult, LocalStreamManager, StreamEnvironment};
use risingwave_stream::task::{LocalStreamManager, StreamEnvironment};
use thiserror_ext::AsReport;
use tonic::{Request, Response, Status};
use tokio::sync::mpsc::unbounded_channel;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tonic::{Request, Response, Status, Streaming};

#[derive(Clone)]
pub struct StreamServiceImpl {
Expand All @@ -40,6 +38,9 @@ impl StreamServiceImpl {

#[async_trait::async_trait]
impl StreamService for StreamServiceImpl {
type StreamingControlStreamStream =
impl Stream<Item = std::result::Result<StreamingControlStreamResponse, tonic::Status>>;

#[cfg_attr(coverage, coverage(off))]
async fn update_actors(
&self,
Expand Down Expand Up @@ -110,86 +111,6 @@ impl StreamService for StreamServiceImpl {
}))
}

#[cfg_attr(coverage, coverage(off))]
async fn force_stop_actors(
&self,
request: Request<ForceStopActorsRequest>,
) -> std::result::Result<Response<ForceStopActorsResponse>, Status> {
let req = request.into_inner();
self.mgr.reset(req.prev_epoch).await;
Ok(Response::new(ForceStopActorsResponse {
request_id: req.request_id,
status: None,
}))
}

#[cfg_attr(coverage, coverage(off))]
async fn inject_barrier(
&self,
request: Request<InjectBarrierRequest>,
) -> Result<Response<InjectBarrierResponse>, Status> {
let req = request.into_inner();
let barrier =
Barrier::from_protobuf(req.get_barrier().unwrap()).map_err(StreamError::from)?;

self.mgr
.send_barrier(barrier, req.actor_ids_to_send, req.actor_ids_to_collect)
.await?;

Ok(Response::new(InjectBarrierResponse {
request_id: req.request_id,
status: None,
}))
}

#[cfg_attr(coverage, coverage(off))]
async fn barrier_complete(
&self,
request: Request<BarrierCompleteRequest>,
) -> Result<Response<BarrierCompleteResponse>, Status> {
let req = request.into_inner();
let BarrierCompleteResult {
create_mview_progress,
sync_result,
} = self
.mgr
.collect_barrier(req.prev_epoch)
.instrument_await(format!("collect_barrier (epoch {})", req.prev_epoch))
.await
.inspect_err(
|err| tracing::error!(error = %err.as_report(), "failed to collect barrier"),
)?;

let (synced_sstables, table_watermarks) = sync_result
.map(|sync_result| (sync_result.uncommitted_ssts, sync_result.table_watermarks))
.unwrap_or_default();

Ok(Response::new(BarrierCompleteResponse {
request_id: req.request_id,
status: None,
create_mview_progress,
synced_sstables: synced_sstables
.into_iter()
.map(
|LocalSstableInfo {
compaction_group_id,
sst_info,
table_stats,
}| GroupedSstableInfo {
compaction_group_id,
sst: Some(sst_info),
table_stats_map: to_prost_table_stats_map(table_stats),
},
)
.collect_vec(),
worker_id: self.env.worker_id(),
table_watermarks: table_watermarks
.into_iter()
.map(|(key, value)| (key.table_id, value.to_protobuf()))
.collect(),
}))
}

#[cfg_attr(coverage, coverage(off))]
async fn wait_epoch_commit(
&self,
Expand All @@ -210,4 +131,24 @@ impl StreamService for StreamServiceImpl {

Ok(Response::new(WaitEpochCommitResponse { status: None }))
}

async fn streaming_control_stream(
&self,
request: Request<Streaming<StreamingControlStreamRequest>>,
) -> Result<Response<Self::StreamingControlStreamStream>, Status> {
let mut stream = request.into_inner().boxed();
let first_request = stream.try_next().await?;
let Some(StreamingControlStreamRequest {
request: Some(streaming_control_stream_request::Request::Init(init_request)),
}) = first_request
else {
return Err(Status::invalid_argument(format!(
"unexpected first request: {:?}",
first_request
)));
};
let (tx, rx) = unbounded_channel();
self.mgr.handle_new_control_stream(tx, stream, init_request);
Ok(Response::new(UnboundedReceiverStream::new(rx)))
}
}
31 changes: 18 additions & 13 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ pub enum Command {
RescheduleFragment {
reschedules: HashMap<FragmentId, Reschedule>,
table_parallelism: HashMap<TableId, TableParallelism>,
// should contain the actor ids in upstream and downstream fragment of `reschedules`
fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
},

/// `ReplaceTable` command generates a `Update` barrier with the given `merge_updates`. This is
Expand Down Expand Up @@ -351,7 +353,7 @@ impl CommandContext {

impl CommandContext {
/// Generate a mutation for the given command.
pub async fn to_mutation(&self) -> MetaResult<Option<Mutation>> {
pub fn to_mutation(&self) -> Option<Mutation> {
let mutation =
match &self.command {
Command::Plain(mutation) => mutation.clone(),
Expand Down Expand Up @@ -479,21 +481,23 @@ impl CommandContext {
init_split_assignment,
),

Command::RescheduleFragment { reschedules, .. } => {
let metadata_manager = &self.barrier_manager_context.metadata_manager;

Command::RescheduleFragment {
reschedules,
fragment_actors,
..
} => {
let mut dispatcher_update = HashMap::new();
for reschedule in reschedules.values() {
for &(upstream_fragment_id, dispatcher_id) in
&reschedule.upstream_fragment_dispatcher_ids
{
// Find the actors of the upstream fragment.
let upstream_actor_ids = metadata_manager
.get_running_actors_of_fragment(upstream_fragment_id)
.await?;
let upstream_actor_ids = fragment_actors
.get(&upstream_fragment_id)
.expect("should contain");

// Record updates for all actors.
for actor_id in upstream_actor_ids {
for &actor_id in upstream_actor_ids {
// Index with the dispatcher id to check duplicates.
dispatcher_update
.try_insert(
Expand Down Expand Up @@ -526,9 +530,9 @@ impl CommandContext {
for (&fragment_id, reschedule) in reschedules {
for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
// Find the actors of the downstream fragment.
let downstream_actor_ids = metadata_manager
.get_running_actors_of_fragment(downstream_fragment_id)
.await?;
let downstream_actor_ids = fragment_actors
.get(&downstream_fragment_id)
.expect("should contain");

// Downstream removed actors should be skipped
// Newly created actors of the current fragment will not dispatch Update
Expand All @@ -545,7 +549,7 @@ impl CommandContext {
.unwrap_or_default();

// Record updates for all actors.
for actor_id in downstream_actor_ids {
for &actor_id in downstream_actor_ids {
if downstream_removed_actors.contains(&actor_id) {
continue;
}
Expand Down Expand Up @@ -620,7 +624,7 @@ impl CommandContext {
}
};

Ok(mutation)
mutation
}

fn generate_update_mutation_for_replace_table(
Expand Down Expand Up @@ -962,6 +966,7 @@ impl CommandContext {
Command::RescheduleFragment {
reschedules,
table_parallelism,
..
} => {
let removed_actors = reschedules
.values()
Expand Down
Loading

0 comments on commit 72d37ef

Please sign in to comment.