Skip to content

Commit

Permalink
build actor in spawn
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Aug 20, 2024
1 parent 167b8d1 commit 81658b5
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 116 deletions.
68 changes: 3 additions & 65 deletions src/stream/src/task/barrier_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@ use std::future::pending;
use std::sync::Arc;
use std::time::Duration;

use anyhow::{anyhow, Context};
use futures::stream::{BoxStream, FuturesUnordered};
use anyhow::anyhow;
use futures::stream::BoxStream;
use futures::StreamExt;
use itertools::Itertools;
use risingwave_common::error::tonic::extra::Score;
use risingwave_pb::stream_service::barrier_complete_response::{
GroupedSstableInfo, PbCreateMviewProgress,
};
use risingwave_rpc_client::error::{ToTonicStatus, TonicStatusWrapper};
use rw_futures_util::{pending_on_none, AttachedFuture};
use thiserror_ext::AsReport;
use tokio::select;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
Expand Down Expand Up @@ -66,10 +65,7 @@ use risingwave_pb::stream_service::{

use crate::executor::exchange::permit::Receiver;
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{
Actor, Barrier, BarrierInner, DispatchExecutor, DispatcherBarrier, Mutation,
StreamExecutorError,
};
use crate::executor::{Barrier, BarrierInner, DispatcherBarrier, Mutation, StreamExecutorError};
use crate::task::barrier_manager::managed_state::ManagedBarrierStateDebugInfo;
use crate::task::barrier_manager::progress::BackfillState;

Expand Down Expand Up @@ -184,9 +180,6 @@ impl ControlStreamHandle {
}
}

#[derive(Clone, Default)]
pub struct CreateActorContext {}

pub(crate) type SubscribeMutationItem = (u64, Option<Arc<Mutation>>);

pub(super) enum LocalBarrierEvent {
Expand Down Expand Up @@ -248,10 +241,6 @@ pub(super) enum LocalActorOperation {
},
}

pub(super) struct CreateActorOutput {
pub(super) actors: Vec<Actor<DispatchExecutor>>,
}

pub(crate) struct StreamActorManagerState {
/// Each processor runs in a future. Upon receiving a `Terminate` message, they will exit.
/// `handles` store join handles of these futures, and therefore we could wait their
Expand All @@ -263,14 +252,6 @@ pub(crate) struct StreamActorManagerState {

/// Stores all actor tokio runtime monitoring tasks.
pub(super) actor_monitor_tasks: HashMap<ActorId, ActorHandle>,

#[expect(clippy::type_complexity)]
pub(super) creating_actors: FuturesUnordered<
AttachedFuture<
JoinHandle<StreamResult<CreateActorOutput>>,
(BTreeSet<ActorId>, oneshot::Sender<StreamResult<()>>),
>,
>,
}

impl StreamActorManagerState {
Expand All @@ -279,22 +260,8 @@ impl StreamActorManagerState {
handles: HashMap::new(),
actors: HashMap::new(),
actor_monitor_tasks: HashMap::new(),
creating_actors: FuturesUnordered::new(),
}
}

async fn next_created_actors(
&mut self,
) -> (
oneshot::Sender<StreamResult<()>>,
StreamResult<CreateActorOutput>,
) {
let (join_result, (_, sender)) = pending_on_none(self.creating_actors.next()).await;
(
sender,
try { join_result.context("failed to join creating actors futures")?? },
)
}
}

pub(crate) struct StreamActorManager {
Expand All @@ -313,7 +280,6 @@ pub(crate) struct StreamActorManager {

pub(super) struct LocalBarrierWorkerDebugInfo<'a> {
running_actors: BTreeSet<ActorId>,
creating_actors: Vec<BTreeSet<ActorId>>,
managed_barrier_state: ManagedBarrierStateDebugInfo<'a>,
has_control_stream_connected: bool,
}
Expand All @@ -325,13 +291,6 @@ impl Display for LocalBarrierWorkerDebugInfo<'_> {
write!(f, "{}, ", actor_id)?;
}

write!(f, "\ncreating_actors: ")?;
for actors in &self.creating_actors {
for actor_id in actors {
write!(f, "{}, ", actor_id)?;
}
}

writeln!(
f,
"\nhas_control_stream_connected: {}",
Expand Down Expand Up @@ -400,12 +359,6 @@ impl LocalBarrierWorker {
fn to_debug_info(&self) -> LocalBarrierWorkerDebugInfo<'_> {
LocalBarrierWorkerDebugInfo {
running_actors: self.actor_manager_state.handles.keys().cloned().collect(),
creating_actors: self
.actor_manager_state
.creating_actors
.iter()
.map(|fut| fut.item().0.clone())
.collect(),
managed_barrier_state: self.state.to_debug_info(),
has_control_stream_connected: self.control_stream_handle.connected(),
}
Expand All @@ -415,9 +368,6 @@ impl LocalBarrierWorker {
loop {
select! {
biased;
(sender, create_actors_result) = self.actor_manager_state.next_created_actors() => {
self.handle_actor_created(sender, create_actors_result);
}
(partial_graph_id, completed_epoch) = self.state.next_completed_epoch() => {
let result = self.on_epoch_completed(partial_graph_id, completed_epoch);
if let Err(err) = result {
Expand Down Expand Up @@ -474,18 +424,6 @@ impl LocalBarrierWorker {
}
}

fn handle_actor_created(
&mut self,
sender: oneshot::Sender<StreamResult<()>>,
create_actor_result: StreamResult<CreateActorOutput>,
) {
let result = create_actor_result.map(|output| {
self.spawn_actors(output.actors);
});

let _ = sender.send(result);
}

fn handle_streaming_control_request(
&mut self,
request: StreamingControlStreamRequest,
Expand Down
84 changes: 33 additions & 51 deletions src/stream/src/task/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use core::time::Duration;
use std::collections::HashSet;
use std::fmt::Debug;
use std::mem::take;
use std::future::Future;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Instant;
Expand All @@ -24,7 +24,7 @@ use anyhow::anyhow;
use async_recursion::async_recursion;
use await_tree::InstrumentAwait;
use futures::stream::BoxStream;
use futures::FutureExt;
use futures::{FutureExt, TryFutureExt};
use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::bitmap::Bitmap;
Expand All @@ -41,7 +41,6 @@ use risingwave_pb::stream_service::{
};
use risingwave_storage::monitor::HummockTraceFutureExt;
use risingwave_storage::{dispatch_state_store, StateStore};
use rw_futures_util::AttachedFuture;
use thiserror_ext::AsReport;
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio::sync::oneshot;
Expand All @@ -59,11 +58,11 @@ use crate::executor::{
};
use crate::from_proto::create_executor;
use crate::task::barrier_manager::{
ControlStreamHandle, CreateActorOutput, EventSender, LocalActorOperation, LocalBarrierWorker,
ControlStreamHandle, EventSender, LocalActorOperation, LocalBarrierWorker,
};
use crate::task::{
ActorId, CreateActorContext, FragmentId, LocalBarrierManager, SharedContext,
StreamActorManager, StreamActorManagerState, StreamEnvironment, UpDownActorIds,
ActorId, FragmentId, LocalBarrierManager, SharedContext, StreamActorManager,
StreamActorManagerState, StreamEnvironment, UpDownActorIds,
};

#[cfg(test)]
Expand Down Expand Up @@ -151,8 +150,6 @@ pub struct ExecutorParams {
pub shared_context: Arc<SharedContext>,

pub local_barrier_manager: LocalBarrierManager,

pub create_actor_context: CreateActorContext,
}

impl Debug for ExecutorParams {
Expand Down Expand Up @@ -309,15 +306,6 @@ impl LocalBarrierWorker {
let result = handle.await;
assert!(result.is_ok() || result.unwrap_err().is_cancelled());
}
// Clear the join handle of creating actors
for handle in take(&mut self.actor_manager_state.creating_actors)
.into_iter()
.map(|attached_future| attached_future.into_inner().0)
{
handle.abort();
let result = handle.await;
assert!(result.is_ok() || result.err().unwrap().is_cancelled());
}
self.actor_manager_state.clear_state();
if let Some(m) = self.actor_manager.await_tree_reg.as_ref() {
m.clear();
Expand Down Expand Up @@ -362,19 +350,21 @@ impl LocalBarrierWorker {
}
}
};
let actor_ids = actors
.iter()
.map(|actor| actor.actor.as_ref().unwrap().actor_id)
.collect();
let actor_manager = self.actor_manager.clone();
let create_actors_fut = crate::CONFIG.scope(
self.actor_manager.env.config().clone(),
actor_manager.create_actors(actors, self.current_shared_context.clone()),
);
let join_handle = self.actor_manager.runtime.spawn(create_actors_fut);
self.actor_manager_state
.creating_actors
.push(AttachedFuture::new(join_handle, (actor_ids, result_sender)));
let shared_context = self.current_shared_context.clone();
self.spawn_actors(actors.into_iter().map(|actor| {
let stream_actor = actor.actor.as_ref().unwrap();
let (actor_id, mview_definition) =
(stream_actor.actor_id, stream_actor.mview_definition.clone());
(
actor_id,
mview_definition,
actor_manager
.clone()
.create_actor(actor, shared_context.clone()),
)
}));
let _ = result_sender.send(Ok(()));
}
}

Expand Down Expand Up @@ -419,7 +409,6 @@ impl StreamActorManager {
has_stateful: bool,
subtasks: &mut Vec<SubtaskHandle>,
shared_context: &Arc<SharedContext>,
create_actor_context: &CreateActorContext,
) -> StreamResult<Executor> {
// The "stateful" here means that the executor may issue read operations to the state store
// massively and continuously. Used to decide whether to apply the optimization of subtasks.
Expand Down Expand Up @@ -453,7 +442,6 @@ impl StreamActorManager {
has_stateful || is_stateful,
subtasks,
shared_context,
create_actor_context,
)
.await?,
);
Expand Down Expand Up @@ -501,7 +489,6 @@ impl StreamActorManager {
watermark_epoch: self.watermark_epoch.clone(),
shared_context: shared_context.clone(),
local_barrier_manager: shared_context.local_barrier_manager.clone(),
create_actor_context: create_actor_context.clone(),
};

let executor = create_executor(executor_params, node, store).await?;
Expand Down Expand Up @@ -531,7 +518,6 @@ impl StreamActorManager {
}

/// Create a chain(tree) of nodes and return the head executor.
#[expect(clippy::too_many_arguments)]
async fn create_nodes(
&self,
fragment_id: FragmentId,
Expand All @@ -540,7 +526,6 @@ impl StreamActorManager {
actor_context: &ActorContextRef,
vnode_bitmap: Option<Bitmap>,
shared_context: &Arc<SharedContext>,
create_actor_context: &CreateActorContext,
) -> StreamResult<(Executor, Vec<SubtaskHandle>)> {
let mut subtasks = vec![];

Expand All @@ -555,22 +540,19 @@ impl StreamActorManager {
false,
&mut subtasks,
shared_context,
create_actor_context,
)
.await
})?;

Ok((executor, subtasks))
}

async fn create_actors(
async fn create_actor(
self: Arc<Self>,
actors: Vec<BuildActorInfo>,
actor: BuildActorInfo,
shared_context: Arc<SharedContext>,
) -> StreamResult<CreateActorOutput> {
let mut ret = Vec::with_capacity(actors.len());
let create_actor_context = CreateActorContext::default();
for actor in actors {
) -> StreamResult<Actor<DispatchExecutor>> {
{
let BuildActorInfo {
actor,
related_subscriptions,
Expand Down Expand Up @@ -603,7 +585,6 @@ impl StreamActorManager {
&actor_context,
vnode_bitmap,
&shared_context,
&create_actor_context,
)
// If hummock tracing is not enabled, it directly returns wrapped future.
.may_trace_hummock()
Expand All @@ -625,24 +606,25 @@ impl StreamActorManager {
expr_context,
shared_context.local_barrier_manager.clone(),
);

ret.push(actor);
Ok(actor)
}
Ok(CreateActorOutput { actors: ret })
}
}

impl LocalBarrierWorker {
pub(super) fn spawn_actors(&mut self, actors: Vec<Actor<DispatchExecutor>>) {
for actor in actors {
pub(super) fn spawn_actors<
F: Future<Output = StreamResult<Actor<DispatchExecutor>>> + Send + 'static,
>(
&mut self,
actors: impl Iterator<Item = (ActorId, String, F)>,
) {
for (actor_id, mview_definition, actor) in actors {
let monitor = tokio_metrics::TaskMonitor::new();
let actor_context = actor.actor_context.clone();
let actor_id = actor_context.id;

let handle = {
let trace_span = format!("Actor {actor_id}: `{}`", actor_context.mview_definition);
let trace_span = format!("Actor {actor_id}: `{}`", mview_definition);
let barrier_manager = self.current_shared_context.local_barrier_manager.clone();
let actor = actor.run().map(move |result| {
let actor = actor.and_then(|actor| actor.run()).map(move |result| {
if let Err(err) = result {
// TODO: check error type and panic if it's unexpected.
// Intentionally use `?` on the report to also include the backtrace.
Expand Down

0 comments on commit 81658b5

Please sign in to comment.