Skip to content

Commit

Permalink
feat: handle streaming control rpc in worker loop (#14737)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Jan 30, 2024
1 parent a0574b7 commit 5db21ec
Show file tree
Hide file tree
Showing 10 changed files with 443 additions and 272 deletions.
4 changes: 2 additions & 2 deletions src/compute/src/rpc/service/config_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use tonic::{Code, Request, Response, Status};

pub struct ConfigServiceImpl {
batch_mgr: Arc<BatchManager>,
stream_mgr: Arc<LocalStreamManager>,
stream_mgr: LocalStreamManager,
}

#[async_trait::async_trait]
Expand All @@ -46,7 +46,7 @@ impl ConfigService for ConfigServiceImpl {
}

impl ConfigServiceImpl {
pub fn new(batch_mgr: Arc<BatchManager>, stream_mgr: Arc<LocalStreamManager>) -> Self {
pub fn new(batch_mgr: Arc<BatchManager>, stream_mgr: LocalStreamManager) -> Self {
Self {
batch_mgr,
stream_mgr,
Expand Down
4 changes: 2 additions & 2 deletions src/compute/src/rpc/service/exchange_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const BATCH_EXCHANGE_BUFFER_SIZE: usize = 1024;
#[derive(Clone)]
pub struct ExchangeServiceImpl {
batch_mgr: Arc<BatchManager>,
stream_mgr: Arc<LocalStreamManager>,
stream_mgr: LocalStreamManager,
metrics: Arc<ExchangeServiceMetrics>,
}

Expand Down Expand Up @@ -128,7 +128,7 @@ impl ExchangeService for ExchangeServiceImpl {
impl ExchangeServiceImpl {
pub fn new(
mgr: Arc<BatchManager>,
stream_mgr: Arc<LocalStreamManager>,
stream_mgr: LocalStreamManager,
metrics: Arc<ExchangeServiceMetrics>,
) -> Self {
ExchangeServiceImpl {
Expand Down
5 changes: 2 additions & 3 deletions src/compute/src/rpc/service/monitor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::ffi::CString;
use std::fs;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;

use itertools::Itertools;
Expand All @@ -37,14 +36,14 @@ use tonic::{Code, Request, Response, Status};

#[derive(Clone)]
pub struct MonitorServiceImpl {
stream_mgr: Arc<LocalStreamManager>,
stream_mgr: LocalStreamManager,
grpc_await_tree_reg: Option<AwaitTreeRegistryRef>,
server_config: ServerConfig,
}

impl MonitorServiceImpl {
pub fn new(
stream_mgr: Arc<LocalStreamManager>,
stream_mgr: LocalStreamManager,
grpc_await_tree_reg: Option<AwaitTreeRegistryRef>,
server_config: ServerConfig,
) -> Self {
Expand Down
42 changes: 7 additions & 35 deletions src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use await_tree::InstrumentAwait;
use itertools::Itertools;
use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map;
Expand All @@ -26,16 +24,16 @@ use risingwave_stream::error::StreamError;
use risingwave_stream::executor::Barrier;
use risingwave_stream::task::{BarrierCompleteResult, LocalStreamManager, StreamEnvironment};
use thiserror_ext::AsReport;
use tonic::{Code, Request, Response, Status};
use tonic::{Request, Response, Status};

#[derive(Clone)]
pub struct StreamServiceImpl {
mgr: Arc<LocalStreamManager>,
mgr: LocalStreamManager,
env: StreamEnvironment,
}

impl StreamServiceImpl {
pub fn new(mgr: Arc<LocalStreamManager>, env: StreamEnvironment) -> Self {
pub fn new(mgr: LocalStreamManager, env: StreamEnvironment) -> Self {
StreamServiceImpl { mgr, env }
}
}
Expand All @@ -48,7 +46,7 @@ impl StreamService for StreamServiceImpl {
request: Request<UpdateActorsRequest>,
) -> std::result::Result<Response<UpdateActorsResponse>, Status> {
let req = request.into_inner();
let res = self.mgr.update_actors(&req.actors);
let res = self.mgr.update_actors(req.actors).await;
match res {
Err(e) => {
error!(error = %e.as_report(), "failed to update stream actor");
Expand All @@ -66,10 +64,7 @@ impl StreamService for StreamServiceImpl {
let req = request.into_inner();

let actor_id = req.actor_id;
let res = self
.mgr
.build_actors(actor_id.as_slice(), self.env.clone())
.await;
let res = self.mgr.build_actors(actor_id).await;
match res {
Err(e) => {
error!(error = %e.as_report(), "failed to build actors");
Expand Down Expand Up @@ -108,7 +103,7 @@ impl StreamService for StreamServiceImpl {
) -> std::result::Result<Response<DropActorsResponse>, Status> {
let req = request.into_inner();
let actors = req.actor_ids;
self.mgr.drop_actors(&actors)?;
self.mgr.drop_actors(actors).await?;
Ok(Response::new(DropActorsResponse {
request_id: req.request_id,
status: None,
Expand All @@ -121,8 +116,7 @@ impl StreamService for StreamServiceImpl {
request: Request<ForceStopActorsRequest>,
) -> std::result::Result<Response<ForceStopActorsResponse>, Status> {
let req = request.into_inner();
self.mgr.stop_all_actors().await?;
self.env.dml_manager_ref().clear();
self.mgr.reset().await;
Ok(Response::new(ForceStopActorsResponse {
request_id: req.request_id,
status: None,
Expand All @@ -138,28 +132,6 @@ impl StreamService for StreamServiceImpl {
let barrier =
Barrier::from_protobuf(req.get_barrier().unwrap()).map_err(StreamError::from)?;

// The barrier might be outdated and been injected after recovery in some certain extreme
// scenarios. So some newly creating actors in the barrier are possibly not rebuilt during
// recovery. Check it here and return an error here if some actors are not found to
// avoid collection hang. We need some refine in meta side to remove this workaround since
// it will cause another round of unnecessary recovery.
let actor_ids = self.mgr.all_actor_ids();
let missing_actor_ids = req
.actor_ids_to_collect
.iter()
.filter(|id| !actor_ids.contains(id))
.collect_vec();
if !missing_actor_ids.is_empty() {
tracing::warn!(
"to collect actors not found, they should be cleaned when recovering: {:?}",
missing_actor_ids
);
return Err(Status::new(
Code::InvalidArgument,
"to collect actors not found",
));
}

self.mgr
.send_barrier(barrier, req.actor_ids_to_send, req.actor_ids_to_collect)
.await?;
Expand Down
21 changes: 10 additions & 11 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,17 +298,6 @@ pub async fn compute_node_serve(
// Run a background heap profiler
heap_profiler.start();

let stream_mgr = Arc::new(LocalStreamManager::new(
advertise_addr.clone(),
state_store.clone(),
streaming_metrics.clone(),
config.streaming.clone(),
await_tree_config.clone(),
memory_mgr.get_watermark_epoch(),
));

let grpc_await_tree_reg = await_tree_config
.map(|config| AwaitTreeRegistryRef::new(await_tree::Registry::new(config).into()));
let dml_mgr = Arc::new(DmlManager::new(
worker_id,
config.streaming.developer.dml_channel_initial_permits,
Expand Down Expand Up @@ -363,6 +352,16 @@ pub async fn compute_node_serve(
meta_client.clone(),
);

let stream_mgr = LocalStreamManager::new(
stream_env.clone(),
streaming_metrics.clone(),
await_tree_config.clone(),
memory_mgr.get_watermark_epoch(),
);

let grpc_await_tree_reg = await_tree_config
.map(|config| AwaitTreeRegistryRef::new(await_tree::Registry::new(config).into()));

// Generally, one may use `risedev ctl trace` to manually get the trace reports. However, if
// this is not the case, we can use the following command to get it printed into the logs
// periodically.
Expand Down
1 change: 0 additions & 1 deletion src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,6 @@ impl GlobalStreamManager {
res
}

/// First broadcasts the actor info to `WorkerNodes`, and then let them build actors and channels.
async fn build_actors(
&self,
table_fragments: &TableFragments,
Expand Down
Loading

0 comments on commit 5db21ec

Please sign in to comment.