Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(meta): inject and collect barrier in bidi stream #14887

Merged
merged 5 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 21 additions & 23 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,33 +44,13 @@ message DropActorsResponse {
common.Status status = 2;
}

message ForceStopActorsRequest {
string request_id = 1;
uint64 prev_epoch = 2;
}

message ForceStopActorsResponse {
string request_id = 1;
common.Status status = 2;
}

message InjectBarrierRequest {
string request_id = 1;
stream_plan.Barrier barrier = 2;
repeated uint32 actor_ids_to_send = 3;
repeated uint32 actor_ids_to_collect = 4;
}

message InjectBarrierResponse {
string request_id = 1;
common.Status status = 2;
}

message BarrierCompleteRequest {
string request_id = 1;
uint64 prev_epoch = 2;
map<string, string> tracing_context = 3;
}
message BarrierCompleteResponse {
message CreateMviewProgress {
uint32 backfill_actor_id = 1;
Expand Down Expand Up @@ -104,15 +84,33 @@ message WaitEpochCommitResponse {
common.Status status = 1;
}

message StreamingControlStreamRequest {
message InitRequest {
uint64 prev_epoch = 2;
}

oneof request {
InitRequest init = 1;
InjectBarrierRequest inject_barrier = 2;
}
}

message StreamingControlStreamResponse {
message InitResponse {}

oneof response {
InitResponse init = 1;
BarrierCompleteResponse complete_barrier = 2;
}
}

service StreamService {
rpc UpdateActors(UpdateActorsRequest) returns (UpdateActorsResponse);
rpc BuildActors(BuildActorsRequest) returns (BuildActorsResponse);
rpc BroadcastActorInfoTable(BroadcastActorInfoTableRequest) returns (BroadcastActorInfoTableResponse);
rpc DropActors(DropActorsRequest) returns (DropActorsResponse);
rpc ForceStopActors(ForceStopActorsRequest) returns (ForceStopActorsResponse);
rpc InjectBarrier(InjectBarrierRequest) returns (InjectBarrierResponse);
rpc BarrierComplete(BarrierCompleteRequest) returns (BarrierCompleteResponse);
rpc WaitEpochCommit(WaitEpochCommitRequest) returns (WaitEpochCommitResponse);
rpc StreamingControlStream(stream StreamingControlStreamRequest) returns (stream StreamingControlStreamResponse);
}

// TODO: Lifecycle management for actors.
115 changes: 28 additions & 87 deletions src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,16 @@
// limitations under the License.

use await_tree::InstrumentAwait;
use itertools::Itertools;
use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map;
use risingwave_hummock_sdk::LocalSstableInfo;
use risingwave_pb::stream_service::barrier_complete_response::GroupedSstableInfo;
use futures::{Stream, StreamExt, TryStreamExt};
use risingwave_pb::stream_service::stream_service_server::StreamService;
use risingwave_pb::stream_service::*;
use risingwave_storage::dispatch_state_store;
use risingwave_stream::error::StreamError;
use risingwave_stream::executor::Barrier;
use risingwave_stream::task::{BarrierCompleteResult, LocalStreamManager, StreamEnvironment};
use risingwave_stream::task::{LocalStreamManager, StreamEnvironment};
use thiserror_ext::AsReport;
use tonic::{Request, Response, Status};
use tokio::sync::mpsc::unbounded_channel;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tonic::{Request, Response, Status, Streaming};

#[derive(Clone)]
pub struct StreamServiceImpl {
Expand All @@ -40,6 +38,9 @@ impl StreamServiceImpl {

#[async_trait::async_trait]
impl StreamService for StreamServiceImpl {
type StreamingControlStreamStream =
impl Stream<Item = std::result::Result<StreamingControlStreamResponse, tonic::Status>>;

#[cfg_attr(coverage, coverage(off))]
async fn update_actors(
&self,
Expand Down Expand Up @@ -110,86 +111,6 @@ impl StreamService for StreamServiceImpl {
}))
}

#[cfg_attr(coverage, coverage(off))]
async fn force_stop_actors(
&self,
request: Request<ForceStopActorsRequest>,
) -> std::result::Result<Response<ForceStopActorsResponse>, Status> {
let req = request.into_inner();
self.mgr.reset(req.prev_epoch).await;
Ok(Response::new(ForceStopActorsResponse {
request_id: req.request_id,
status: None,
}))
}

#[cfg_attr(coverage, coverage(off))]
async fn inject_barrier(
&self,
request: Request<InjectBarrierRequest>,
) -> Result<Response<InjectBarrierResponse>, Status> {
let req = request.into_inner();
let barrier =
Barrier::from_protobuf(req.get_barrier().unwrap()).map_err(StreamError::from)?;

self.mgr
.send_barrier(barrier, req.actor_ids_to_send, req.actor_ids_to_collect)
.await?;

Ok(Response::new(InjectBarrierResponse {
request_id: req.request_id,
status: None,
}))
}

#[cfg_attr(coverage, coverage(off))]
async fn barrier_complete(
&self,
request: Request<BarrierCompleteRequest>,
) -> Result<Response<BarrierCompleteResponse>, Status> {
let req = request.into_inner();
let BarrierCompleteResult {
create_mview_progress,
sync_result,
} = self
.mgr
.collect_barrier(req.prev_epoch)
.instrument_await(format!("collect_barrier (epoch {})", req.prev_epoch))
.await
.inspect_err(
|err| tracing::error!(error = %err.as_report(), "failed to collect barrier"),
)?;

let (synced_sstables, table_watermarks) = sync_result
.map(|sync_result| (sync_result.uncommitted_ssts, sync_result.table_watermarks))
.unwrap_or_default();

Ok(Response::new(BarrierCompleteResponse {
request_id: req.request_id,
status: None,
create_mview_progress,
synced_sstables: synced_sstables
.into_iter()
.map(
|LocalSstableInfo {
compaction_group_id,
sst_info,
table_stats,
}| GroupedSstableInfo {
compaction_group_id,
sst: Some(sst_info),
table_stats_map: to_prost_table_stats_map(table_stats),
},
)
.collect_vec(),
worker_id: self.env.worker_id(),
table_watermarks: table_watermarks
.into_iter()
.map(|(key, value)| (key.table_id, value.to_protobuf()))
.collect(),
}))
}

#[cfg_attr(coverage, coverage(off))]
async fn wait_epoch_commit(
&self,
Expand All @@ -210,4 +131,24 @@ impl StreamService for StreamServiceImpl {

Ok(Response::new(WaitEpochCommitResponse { status: None }))
}

async fn streaming_control_stream(
&self,
request: Request<Streaming<StreamingControlStreamRequest>>,
) -> Result<Response<Self::StreamingControlStreamStream>, Status> {
let mut stream = request.into_inner().boxed();
let first_request = stream.try_next().await?;
let Some(StreamingControlStreamRequest {
request: Some(streaming_control_stream_request::Request::Init(init_request)),
}) = first_request
else {
return Err(Status::invalid_argument(format!(
"unexpected first request: {:?}",
first_request
)));
};
let (tx, rx) = unbounded_channel();
self.mgr.handle_new_control_stream(tx, stream, init_request);
Ok(Response::new(UnboundedReceiverStream::new(rx)))
}
}
31 changes: 18 additions & 13 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ pub enum Command {
RescheduleFragment {
reschedules: HashMap<FragmentId, Reschedule>,
table_parallelism: HashMap<TableId, TableParallelism>,
// should contain the actor ids in upstream and downstream fragment of `reschedules`
fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comment, I suppose RescheduleFragment is actually RescheduleFragments? Hence we have a hashmap here, rather than just the actor ids in upstream and downstream fragment of reschedules.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also why we refactor and add the field fragment_actors here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it is to make to_mutation non-async

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It's for making it non-async.

},

/// `ReplaceTable` command generates a `Update` barrier with the given `merge_updates`. This is
Expand Down Expand Up @@ -351,7 +353,7 @@ impl CommandContext {

impl CommandContext {
/// Generate a mutation for the given command.
pub async fn to_mutation(&self) -> MetaResult<Option<Mutation>> {
pub fn to_mutation(&self) -> Option<Mutation> {
let mutation =
match &self.command {
Command::Plain(mutation) => mutation.clone(),
Expand Down Expand Up @@ -479,21 +481,23 @@ impl CommandContext {
init_split_assignment,
),

Command::RescheduleFragment { reschedules, .. } => {
let metadata_manager = &self.barrier_manager_context.metadata_manager;

Command::RescheduleFragment {
reschedules,
fragment_actors,
..
} => {
let mut dispatcher_update = HashMap::new();
for reschedule in reschedules.values() {
for &(upstream_fragment_id, dispatcher_id) in
&reschedule.upstream_fragment_dispatcher_ids
{
// Find the actors of the upstream fragment.
let upstream_actor_ids = metadata_manager
.get_running_actors_of_fragment(upstream_fragment_id)
.await?;
let upstream_actor_ids = fragment_actors
.get(&upstream_fragment_id)
.expect("should contain");

// Record updates for all actors.
for actor_id in upstream_actor_ids {
for &actor_id in upstream_actor_ids {
// Index with the dispatcher id to check duplicates.
dispatcher_update
.try_insert(
Expand Down Expand Up @@ -526,9 +530,9 @@ impl CommandContext {
for (&fragment_id, reschedule) in reschedules {
for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
// Find the actors of the downstream fragment.
let downstream_actor_ids = metadata_manager
.get_running_actors_of_fragment(downstream_fragment_id)
.await?;
let downstream_actor_ids = fragment_actors
.get(&downstream_fragment_id)
.expect("should contain");

// Downstream removed actors should be skipped
// Newly created actors of the current fragment will not dispatch Update
Expand All @@ -545,7 +549,7 @@ impl CommandContext {
.unwrap_or_default();

// Record updates for all actors.
for actor_id in downstream_actor_ids {
for &actor_id in downstream_actor_ids {
if downstream_removed_actors.contains(&actor_id) {
continue;
}
Expand Down Expand Up @@ -620,7 +624,7 @@ impl CommandContext {
}
};

Ok(mutation)
mutation
}

fn generate_update_mutation_for_replace_table(
Expand Down Expand Up @@ -962,6 +966,7 @@ impl CommandContext {
Command::RescheduleFragment {
reschedules,
table_parallelism,
..
} => {
let removed_actors = reschedules
.values()
Expand Down
Loading
Loading