Skip to content

Commit

Permalink
feat(meta): inject and collect barrier in bidi stream
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Jan 31, 2024
1 parent e05015a commit c017c75
Show file tree
Hide file tree
Showing 7 changed files with 294 additions and 290 deletions.
22 changes: 19 additions & 3 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,31 @@ message WaitEpochCommitResponse {
common.Status status = 1;
}

message StreamingControlStreamRequest {
message InitRequest {}

oneof request {
InitRequest init = 1;
InjectBarrierRequest inject_barrier = 2;
}
}

message StreamingControlStreamResponse {
message InitResponse {}

oneof response {
InitResponse init = 1;
BarrierCompleteResponse complete_barrier = 2;
}
}

service StreamService {
rpc UpdateActors(UpdateActorsRequest) returns (UpdateActorsResponse);
rpc BuildActors(BuildActorsRequest) returns (BuildActorsResponse);
rpc BroadcastActorInfoTable(BroadcastActorInfoTableRequest) returns (BroadcastActorInfoTableResponse);
rpc DropActors(DropActorsRequest) returns (DropActorsResponse);
rpc ForceStopActors(ForceStopActorsRequest) returns (ForceStopActorsResponse);
rpc InjectBarrier(InjectBarrierRequest) returns (InjectBarrierResponse);
rpc BarrierComplete(BarrierCompleteRequest) returns (BarrierCompleteResponse);
rpc WaitEpochCommit(WaitEpochCommitRequest) returns (WaitEpochCommitResponse);
rpc StreamingControlStream(stream StreamingControlStreamRequest) returns (stream StreamingControlStreamResponse);
}

// TODO: Lifecycle management for actors.
121 changes: 34 additions & 87 deletions src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,17 @@
// limitations under the License.

use await_tree::InstrumentAwait;
use itertools::Itertools;
use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map;
use risingwave_hummock_sdk::LocalSstableInfo;
use risingwave_pb::stream_service::barrier_complete_response::GroupedSstableInfo;
use futures::{Stream, StreamExt, TryStreamExt};
use risingwave_pb::stream_service::stream_service_server::StreamService;
use risingwave_pb::stream_service::streaming_control_stream_request::InitRequest;
use risingwave_pb::stream_service::*;
use risingwave_storage::dispatch_state_store;
use risingwave_stream::error::StreamError;
use risingwave_stream::executor::Barrier;
use risingwave_stream::task::{BarrierCompleteResult, LocalStreamManager, StreamEnvironment};
use risingwave_stream::task::{LocalStreamManager, StreamEnvironment};
use thiserror_ext::AsReport;
use tonic::{Request, Response, Status};
use tokio::sync::mpsc::unbounded_channel;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tonic::{Request, Response, Status, Streaming};

#[derive(Clone)]
pub struct StreamServiceImpl {
Expand All @@ -40,6 +39,9 @@ impl StreamServiceImpl {

#[async_trait::async_trait]
impl StreamService for StreamServiceImpl {
type StreamingControlStreamStream =
impl Stream<Item = std::result::Result<StreamingControlStreamResponse, tonic::Status>>;

#[cfg_attr(coverage, coverage(off))]
async fn update_actors(
&self,
Expand Down Expand Up @@ -110,86 +112,6 @@ impl StreamService for StreamServiceImpl {
}))
}

#[cfg_attr(coverage, coverage(off))]
async fn force_stop_actors(
&self,
request: Request<ForceStopActorsRequest>,
) -> std::result::Result<Response<ForceStopActorsResponse>, Status> {
let req = request.into_inner();
self.mgr.reset().await;
Ok(Response::new(ForceStopActorsResponse {
request_id: req.request_id,
status: None,
}))
}

#[cfg_attr(coverage, coverage(off))]
async fn inject_barrier(
&self,
request: Request<InjectBarrierRequest>,
) -> Result<Response<InjectBarrierResponse>, Status> {
let req = request.into_inner();
let barrier =
Barrier::from_protobuf(req.get_barrier().unwrap()).map_err(StreamError::from)?;

self.mgr
.send_barrier(barrier, req.actor_ids_to_send, req.actor_ids_to_collect)
.await?;

Ok(Response::new(InjectBarrierResponse {
request_id: req.request_id,
status: None,
}))
}

#[cfg_attr(coverage, coverage(off))]
async fn barrier_complete(
&self,
request: Request<BarrierCompleteRequest>,
) -> Result<Response<BarrierCompleteResponse>, Status> {
let req = request.into_inner();
let BarrierCompleteResult {
create_mview_progress,
sync_result,
} = self
.mgr
.collect_barrier(req.prev_epoch)
.instrument_await(format!("collect_barrier (epoch {})", req.prev_epoch))
.await
.inspect_err(
|err| tracing::error!(error = %err.as_report(), "failed to collect barrier"),
)?;

let (synced_sstables, table_watermarks) = sync_result
.map(|sync_result| (sync_result.uncommitted_ssts, sync_result.table_watermarks))
.unwrap_or_default();

Ok(Response::new(BarrierCompleteResponse {
request_id: req.request_id,
status: None,
create_mview_progress,
synced_sstables: synced_sstables
.into_iter()
.map(
|LocalSstableInfo {
compaction_group_id,
sst_info,
table_stats,
}| GroupedSstableInfo {
compaction_group_id,
sst: Some(sst_info),
table_stats_map: to_prost_table_stats_map(table_stats),
},
)
.collect_vec(),
worker_id: self.env.worker_id(),
table_watermarks: table_watermarks
.into_iter()
.map(|(key, value)| (key.table_id, value.to_protobuf()))
.collect(),
}))
}

#[cfg_attr(coverage, coverage(off))]
async fn wait_epoch_commit(
&self,
Expand All @@ -210,4 +132,29 @@ impl StreamService for StreamServiceImpl {

Ok(Response::new(WaitEpochCommitResponse { status: None }))
}

async fn streaming_control_stream(
&self,
request: Request<Streaming<StreamingControlStreamRequest>>,
) -> Result<Response<Self::StreamingControlStreamStream>, Status> {
let mut stream = request.into_inner().boxed();
let first_request = stream
.try_next()
.await?
.ok_or_else(|| Status::invalid_argument(format!("failed to receive first request")))?;
match first_request {
StreamingControlStreamRequest {
request: Some(streaming_control_stream_request::Request::Init(InitRequest {})),
} => {}
other => {
return Err(Status::invalid_argument(format!(
"unexpected first request: {:?}",
other
)));
}
};
let (tx, rx) = unbounded_channel();
self.mgr.handle_new_control_stream(tx, stream);
Ok(Response::new(UnboundedReceiverStream::new(rx)))
}
}
34 changes: 19 additions & 15 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,8 @@ mod tests {
use std::sync::{Arc, Mutex};
use std::time::Duration;

use futures::stream::BoxStream;
use futures::Stream;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::ParallelUnitMapping;
use risingwave_common::system_param::reader::SystemParamsRead;
Expand All @@ -833,7 +835,7 @@ mod tests {
use tokio::sync::Notify;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use tonic::{Request, Response, Status};
use tonic::{Request, Response, Status, Streaming};

use super::*;
use crate::barrier::GlobalBarrierManager;
Expand Down Expand Up @@ -861,6 +863,9 @@ mod tests {

#[async_trait::async_trait]
impl StreamService for FakeStreamService {
type StreamingControlStreamStream =
impl Stream<Item = std::result::Result<StreamingControlStreamResponse, tonic::Status>>;

async fn update_actors(
&self,
request: Request<UpdateActorsRequest>,
Expand Down Expand Up @@ -923,26 +928,25 @@ mod tests {
Ok(Response::new(ForceStopActorsResponse::default()))
}

async fn inject_barrier(
&self,
_request: Request<InjectBarrierRequest>,
) -> std::result::Result<Response<InjectBarrierResponse>, Status> {
Ok(Response::new(InjectBarrierResponse::default()))
}

async fn barrier_complete(
&self,
_request: Request<BarrierCompleteRequest>,
) -> std::result::Result<Response<BarrierCompleteResponse>, Status> {
Ok(Response::new(BarrierCompleteResponse::default()))
}

async fn wait_epoch_commit(
&self,
_request: Request<WaitEpochCommitRequest>,
) -> std::result::Result<Response<WaitEpochCommitResponse>, Status> {
Ok(Response::new(WaitEpochCommitResponse::default()))
}

async fn streaming_control_stream(
&self,
request: Request<Streaming<StreamingControlStreamRequest>>,
) -> Result<Response<Self::StreamingControlStreamStream>, Status> {
Result::<
_,
BoxStream<
'static,
std::result::Result<StreamingControlStreamResponse, tonic::Status>,
>,
>::Err(Status::unimplemented("not implemented"))
}
}

struct MockServices {
Expand Down
43 changes: 38 additions & 5 deletions src/rpc_client/src/stream_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,22 @@
use std::sync::Arc;
use std::time::Duration;

use anyhow::anyhow;
use async_trait::async_trait;
use futures::TryStreamExt;
use risingwave_common::config::MAX_CONNECTION_WINDOW_SIZE;
use risingwave_common::monitor::connection::{EndpointExt, TcpConfig};
use risingwave_common::util::addr::HostAddr;
use risingwave_pb::stream_service::stream_service_client::StreamServiceClient;
use risingwave_pb::stream_service::streaming_control_stream_request::InitRequest;
use risingwave_pb::stream_service::streaming_control_stream_response::InitResponse;
use risingwave_pb::stream_service::*;
use tokio_stream::wrappers::ReceiverStream;
use tonic::transport::Endpoint;

use crate::error::Result;
use crate::error::{Result, RpcError};
use crate::tracing::{Channel, TracingInjectedChannelExt};
use crate::{rpc_client_method_impl, RpcClient, RpcClientPool};
use crate::{rpc_client_method_impl, BidiStreamHandle, RpcClient, RpcClientPool};

#[derive(Clone)]
pub struct StreamClient(StreamServiceClient<Channel>);
Expand Down Expand Up @@ -68,9 +73,6 @@ macro_rules! for_all_stream_rpc {
,{ 0, build_actors, BuildActorsRequest, BuildActorsResponse }
,{ 0, broadcast_actor_info_table, BroadcastActorInfoTableRequest, BroadcastActorInfoTableResponse }
,{ 0, drop_actors, DropActorsRequest, DropActorsResponse }
,{ 0, force_stop_actors, ForceStopActorsRequest, ForceStopActorsResponse}
,{ 0, inject_barrier, InjectBarrierRequest, InjectBarrierResponse }
,{ 0, barrier_complete, BarrierCompleteRequest, BarrierCompleteResponse }
,{ 0, wait_epoch_commit, WaitEpochCommitRequest, WaitEpochCommitResponse }
}
};
Expand All @@ -79,3 +81,34 @@ macro_rules! for_all_stream_rpc {
impl StreamClient {
for_all_stream_rpc! { rpc_client_method_impl }
}

pub type StreamingControlHandle =
BidiStreamHandle<StreamingControlStreamRequest, StreamingControlStreamResponse>;

impl StreamClient {
pub async fn start_streaming_control(&self) -> Result<StreamingControlHandle> {
let first_request = StreamingControlStreamRequest {
request: Some(streaming_control_stream_request::Request::Init(
InitRequest {},
)),
};
let mut client = self.0.to_owned();
let (handle, first_rsp) = BidiStreamHandle::initialize(first_request, |rx| async move {
client
.streaming_control_stream(ReceiverStream::new(rx))
.await
.map(|response| response.into_inner().map_err(RpcError::from))
.map_err(RpcError::from)
})
.await?;
match first_rsp {
StreamingControlStreamResponse {
response: Some(streaming_control_stream_response::Response::Init(InitResponse {})),
} => {}
other => {
return Err(anyhow!("expect InitResponse but get {:?}", other).into());
}
};
Ok(handle)
}
}
Loading

0 comments on commit c017c75

Please sign in to comment.