diff --git a/proto/connector_service.proto b/proto/connector_service.proto index 876faa574ca13..bd793cef5fe9c 100644 --- a/proto/connector_service.proto +++ b/proto/connector_service.proto @@ -3,6 +3,7 @@ syntax = "proto3"; package connector_service; import "catalog.proto"; +import "common.proto"; import "data.proto"; import "plan_common.proto"; @@ -183,3 +184,39 @@ service ConnectorService { rpc GetEventStream(GetEventStreamRequest) returns (stream GetEventStreamResponse); rpc ValidateSource(ValidateSourceRequest) returns (ValidateSourceResponse); } + +message CoordinateRequest { + // The first request that starts a coordination between sink writer and coordinator. + // The service will respond after sink writers of all vnodes have sent the request. + message StartCoordinationRequest { + common.Buffer vnode_bitmap = 1; + SinkParam param = 2; + } + + message CommitRequest { + uint64 epoch = 1; + SinkMetadata metadata = 2; + } + + oneof msg { + StartCoordinationRequest start_request = 1; + CommitRequest commit_request = 2; + } +} + +message CoordinateResponse { + message StartCoordinationResponse {} + + message CommitResponse { + uint64 epoch = 1; + } + + oneof msg { + StartCoordinationResponse start_response = 1; + CommitResponse commit_response = 2; + } +} + +service SinkCoordinationService { + rpc Coordinate(stream CoordinateRequest) returns (stream CoordinateResponse); +} diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 2ec1574217f2c..2f440a14c25ab 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -351,6 +351,7 @@ pub async fn compute_node_serve( dml_mgr, system_params_manager.clone(), source_metrics, + meta_client.clone(), ); // Generally, one may use `risedev ctl trace` to manually get the trace reports. However, if diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index c3d7d803a2123..d9fbad7b9bfb5 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -28,6 +28,7 @@ #![feature(type_alias_impl_trait)] #![feature(return_position_impl_trait_in_trait)] #![feature(async_fn_in_trait)] +#![feature(associated_type_defaults)] use std::time::Duration; diff --git a/src/connector/src/sink/coordinate.rs b/src/connector/src/sink/coordinate.rs new file mode 100644 index 0000000000000..b8a4c2771e132 --- /dev/null +++ b/src/connector/src/sink/coordinate.rs @@ -0,0 +1,87 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use anyhow::anyhow; +use risingwave_common::array::StreamChunk; +use risingwave_common::buffer::Bitmap; +use risingwave_pb::connector_service::SinkMetadata; +use risingwave_rpc_client::{CoordinatorStreamHandle, SinkCoordinationRpcClient}; +use tracing::warn; + +use crate::sink::{Result, SinkError, SinkParam, SinkWriter}; + +pub struct CoordinatedSinkWriter>> { + epoch: u64, + coordinator_stream_handle: CoordinatorStreamHandle, + inner: W, +} + +impl>> CoordinatedSinkWriter { + pub async fn new( + client: SinkCoordinationRpcClient, + param: SinkParam, + vnode_bitmap: Bitmap, + inner: W, + ) -> Result { + Ok(Self { + epoch: 0, + coordinator_stream_handle: CoordinatorStreamHandle::new( + client, + param.to_proto(), + vnode_bitmap, + ) + .await?, + inner, + }) + } +} + +#[async_trait::async_trait] +impl>> SinkWriter for CoordinatedSinkWriter { + async fn begin_epoch(&mut self, epoch: u64) -> Result<()> { + self.epoch = epoch; + self.inner.begin_epoch(epoch).await + } + + async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { + self.inner.write_batch(chunk).await + } + + async fn barrier(&mut self, is_checkpoint: bool) -> Result { + let metadata = self.inner.barrier(is_checkpoint).await?; + if is_checkpoint { + let metadata = metadata.ok_or(SinkError::Coordinator(anyhow!( + "should get metadata on checkpoint barrier" + )))?; + // TODO: add metrics to measure time to commit + self.coordinator_stream_handle + .commit(self.epoch, metadata) + .await?; + Ok(()) + } else { + if metadata.is_some() { + warn!("get metadata on non-checkpoint barrier"); + } + Ok(()) + } + } + + async fn abort(&mut self) -> Result<()> { + self.inner.abort().await + } + + async fn update_vnode_bitmap(&mut self, vnode_bitmap: Bitmap) -> Result<()> { + self.inner.update_vnode_bitmap(vnode_bitmap).await + } +} diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 2479d7900df61..9f25ebde470d9 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -14,6 +14,7 @@ pub mod catalog; pub mod clickhouse; +pub mod coordinate; pub mod iceberg; pub mod kafka; pub mod kinesis; @@ -34,7 +35,7 @@ use risingwave_common::error::{anyhow_error, ErrorCode, RwError}; use risingwave_pb::catalog::PbSinkType; use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema}; use risingwave_rpc_client::error::RpcError; -use risingwave_rpc_client::ConnectorClient; +use risingwave_rpc_client::{ConnectorClient, MetaClient}; use thiserror::Error; pub use tracing; @@ -57,7 +58,7 @@ pub const SINK_TYPE_DEBEZIUM: &str = "debezium"; pub const SINK_TYPE_UPSERT: &str = "upsert"; pub const SINK_USER_FORCE_APPEND_ONLY_OPTION: &str = "force_append_only"; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct SinkParam { pub sink_id: SinkId, pub properties: HashMap, @@ -119,16 +120,17 @@ impl From for SinkParam { } } -#[derive(Clone)] +#[derive(Clone, Default)] pub struct SinkWriterParam { pub connector_params: ConnectorParams, pub executor_id: u64, pub vnode_bitmap: Option, + pub meta_client: Option, } #[async_trait] pub trait Sink { - type Writer: SinkWriter; + type Writer: SinkWriter; type Coordinator: SinkCommitCoordinator; async fn validate(&self, client: Option) -> Result<()>; @@ -142,7 +144,8 @@ pub trait Sink { } #[async_trait] -pub trait SinkWriter: Send { +pub trait SinkWriter: Send + 'static { + type CommitMetadata: Send = (); /// Begin a new epoch async fn begin_epoch(&mut self, epoch: u64) -> Result<()>; @@ -151,7 +154,7 @@ pub trait SinkWriter: Send { /// Receive a barrier and mark the end of current epoch. When `is_checkpoint` is true, the sink /// writer should commit the current epoch. - async fn barrier(&mut self, is_checkpoint: bool) -> Result<()>; + async fn barrier(&mut self, is_checkpoint: bool) -> Result; /// Clean up async fn abort(&mut self) -> Result<()>; @@ -162,7 +165,7 @@ pub trait SinkWriter: Send { #[async_trait] // An old version of SinkWriter for backward compatibility -pub trait SinkWriterV1: Send { +pub trait SinkWriterV1: Send + 'static { async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()>; // the following interface is for transactions, if not supported, return Ok(()) diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index cc68fcaf991cd..862b4d1c1d59a 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -222,7 +222,7 @@ impl RemoteSinkWriter { use futures::StreamExt; use tokio_stream::wrappers::UnboundedReceiverStream; - let stream_handle = SinkWriterStreamHandle::new( + let stream_handle = SinkWriterStreamHandle::for_test( request_sender, UnboundedReceiverStream::new(response_receiver).boxed(), ); diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index b21f0c51e5b2a..f8ee3b3433c72 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -50,6 +50,7 @@ use self::progress::TrackingCommand; use crate::barrier::progress::CreateMviewProgressTracker; use crate::barrier::BarrierEpochState::{Completed, InFlight}; use crate::hummock::HummockManagerRef; +use crate::manager::sink_coordination::SinkCoordinatorManager; use crate::manager::{ CatalogManagerRef, ClusterManagerRef, FragmentManagerRef, LocalNotification, MetaSrvEnv, WorkerId, @@ -144,6 +145,8 @@ pub struct GlobalBarrierManager { source_manager: SourceManagerRef, + sink_manager: SinkCoordinatorManager, + metrics: Arc, pub(crate) env: MetaSrvEnv, @@ -493,6 +496,7 @@ where fragment_manager: FragmentManagerRef, hummock_manager: HummockManagerRef, source_manager: SourceManagerRef, + sink_manager: SinkCoordinatorManager, metrics: Arc, ) -> Self { let enable_recovery = env.opts.enable_recovery; @@ -509,6 +513,7 @@ where fragment_manager, hummock_manager, source_manager, + sink_manager, metrics, env, tracker: Mutex::new(tracker), diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 0950fd1e39692..5853cf563fd4e 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -117,6 +117,7 @@ where self.clean_dirty_fragments() .await .expect("clean dirty fragments"); + self.sink_manager.reset().await; let retry_strategy = Self::get_retry_strategy(); // We take retry into consideration because this is the latency user sees for a cluster to diff --git a/src/meta/src/manager/mod.rs b/src/meta/src/manager/mod.rs index 5e9cd46b5b49f..6f787dba23d09 100644 --- a/src/meta/src/manager/mod.rs +++ b/src/meta/src/manager/mod.rs @@ -18,6 +18,7 @@ mod env; mod id; mod idle; mod notification; +pub(crate) mod sink_coordination; mod streaming_job; mod system_param; diff --git a/src/meta/src/manager/sink_coordination/coordinator_worker.rs b/src/meta/src/manager/sink_coordination/coordinator_worker.rs new file mode 100644 index 0000000000000..1a7c42c108661 --- /dev/null +++ b/src/meta/src/manager/sink_coordination/coordinator_worker.rs @@ -0,0 +1,322 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; +use std::pin::pin; + +use anyhow::anyhow; +use futures::future::{select, Either}; +use futures::stream::FuturesUnordered; +use futures::{StreamExt, TryStreamExt}; +use risingwave_common::buffer::Bitmap; +use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; +use risingwave_connector::dispatch_sink; +use risingwave_connector::sink::{build_sink, Sink, SinkCommitCoordinator, SinkParam}; +use risingwave_pb::connector_service::coordinate_request::CommitRequest; +use risingwave_pb::connector_service::coordinate_response::{ + CommitResponse, StartCoordinationResponse, +}; +use risingwave_pb::connector_service::{ + coordinate_request, coordinate_response, CoordinateRequest, CoordinateResponse, SinkMetadata, +}; +use risingwave_rpc_client::ConnectorClient; +use tokio::sync::mpsc::UnboundedReceiver; +use tonic::Status; +use tracing::{error, warn}; + +use crate::manager::sink_coordination::{ + NewSinkWriterRequest, SinkCoordinatorResponseSender, SinkWriterRequestStream, +}; + +macro_rules! send_await_with_err_check { + ($tx:expr, $msg:expr) => { + if $tx.send($msg).await.is_err() { + error!("unable to send msg"); + } + }; +} + +pub(crate) struct CoordinatorWorker { + param: SinkParam, + request_streams: Vec, + response_senders: Vec, + request_rx: UnboundedReceiver, +} + +impl CoordinatorWorker { + pub(crate) async fn run( + first_writer_request: NewSinkWriterRequest, + request_rx: UnboundedReceiver, + connector_client: Option, + ) { + let sink = match build_sink(first_writer_request.param.clone()) { + Ok(sink) => sink, + Err(e) => { + error!( + "unable to build sink with param {:?}: {:?}", + first_writer_request.param, e + ); + send_await_with_err_check!( + first_writer_request.response_tx, + Err(Status::invalid_argument("failed to build sink")) + ); + return; + } + }; + dispatch_sink!(sink, sink, { + let coordinator = match sink.new_coordinator(connector_client).await { + Ok(coordinator) => coordinator, + Err(e) => { + error!( + "unable to build coordinator with param {:?}: {:?}", + first_writer_request.param, e + ); + send_await_with_err_check!( + first_writer_request.response_tx, + Err(Status::invalid_argument("failed to build coordinator")) + ); + return; + } + }; + Self::execute_coordinator(first_writer_request, request_rx, coordinator).await + }); + } + + pub(crate) async fn execute_coordinator( + first_writer_request: NewSinkWriterRequest, + request_rx: UnboundedReceiver, + coordinator: impl SinkCommitCoordinator, + ) { + let mut worker = CoordinatorWorker { + param: first_writer_request.param, + request_streams: vec![first_writer_request.request_stream], + response_senders: vec![first_writer_request.response_tx], + request_rx, + }; + + if let Err(e) = worker + .wait_for_writers(first_writer_request.vnode_bitmap) + .await + { + error!("failed to wait for all writers: {:?}", e); + worker + .send_to_all_sink_writers(|| { + Err(Status::cancelled("failed to wait for all writers")) + }) + .await; + } + + worker.start_coordination(coordinator).await; + } + + async fn send_to_all_sink_writers( + &mut self, + new_msg: impl Fn() -> Result, + ) { + for sender in &self.response_senders { + send_await_with_err_check!(sender, new_msg()); + } + } + + async fn next_new_writer(&mut self) -> anyhow::Result { + // TODO: add timeout log + match select( + pin!(self.request_rx.recv()), + pin!(FuturesUnordered::from_iter( + self.request_streams + .iter_mut() + .map(|stream| stream.try_next()), + ) + .next()), + ) + .await + { + Either::Left((Some(req), _)) => Ok(req), + Either::Left((None, _)) => Err(anyhow!("manager request stream reaches the end")), + Either::Right((Some(Ok(Some(request))), _)) => Err(anyhow!( + "get new request from sink writer before initialize: {:?}", + request + )), + Either::Right((Some(Ok(None)), _)) => Err(anyhow!( + "one sink writer stream reaches the end before initialize" + )), + Either::Right((Some(Err(e)), _)) => Err(anyhow!( + "unable to poll from one sink writer stream: {:?}", + e + )), + Either::Right((None, _)) => unreachable!("request_streams must not be empty"), + } + } + + async fn wait_for_writers(&mut self, first_vnode_bitmap: Bitmap) -> anyhow::Result<()> { + let mut remaining_count = VirtualNode::COUNT; + let mut registered_vnode = HashSet::with_capacity(VirtualNode::COUNT); + + for vnode in first_vnode_bitmap.iter_vnodes() { + remaining_count -= 1; + registered_vnode.insert(vnode); + } + + loop { + let new_writer_request = self.next_new_writer().await?; + if self.param != new_writer_request.param { + // TODO: may return error. + warn!( + "get different param {:?} while current param {:?}", + new_writer_request.param, self.param + ); + } + self.request_streams.push(new_writer_request.request_stream); + self.response_senders.push(new_writer_request.response_tx); + + for vnode in new_writer_request.vnode_bitmap.iter_vnodes() { + if registered_vnode.contains(&vnode) { + return Err(anyhow!( + "get overlapped vnode: {}, current vnode {:?}", + vnode, + registered_vnode + )); + } + registered_vnode.insert(vnode); + remaining_count -= 1; + } + + if remaining_count == 0 { + break; + } + } + + self.send_to_all_sink_writers(|| { + Ok(CoordinateResponse { + msg: Some(coordinate_response::Msg::StartResponse( + StartCoordinationResponse {}, + )), + }) + }) + .await; + Ok(()) + } + + async fn collect_all_metadata(&mut self) -> anyhow::Result<(u64, Vec)> { + let mut epoch = None; + let mut metadata_list = Vec::with_capacity(self.request_streams.len()); + let mut uncollected_futures = FuturesUnordered::from_iter( + self.request_streams + .iter_mut() + .map(|stream| stream.try_next()), + ); + + loop { + match select( + pin!(self.request_rx.recv()), + pin!(uncollected_futures.next()), + ) + .await + { + Either::Left((Some(new_request), _)) => { + warn!("get new writer request while collecting metadata"); + send_await_with_err_check!( + new_request.response_tx, + Err(Status::already_exists( + "coordinator already running, should not get new request" + )) + ); + continue; + } + Either::Left((None, _)) => { + return Err(anyhow!( + "coordinator get notified to stop while collecting metadata" + )); + } + Either::Right((Some(next_result), _)) => match next_result { + Ok(Some(CoordinateRequest { + msg: + Some(coordinate_request::Msg::CommitRequest(CommitRequest { + epoch: request_epoch, + metadata: Some(metadata), + })), + })) => { + match &epoch { + Some(epoch) => { + if *epoch != request_epoch { + warn!( + "current epoch is {} but get request from {}", + epoch, request_epoch + ); + } + } + None => { + epoch = Some(request_epoch); + } + } + metadata_list.push(metadata); + } + Ok(Some(req)) => { + return Err(anyhow!("expect commit request but get {:?}", req)); + } + Ok(None) => { + return Err(anyhow!( + "sink writer input reaches the end while collecting metadata" + )); + } + Err(e) => { + return Err(anyhow!( + "failed to poll from one of the writer request streams: {:?}", + e + )); + } + }, + Either::Right((None, _)) => { + break; + } + } + } + Ok(( + epoch.expect("should not be empty when have at least one writer"), + metadata_list, + )) + } + + async fn start_coordination(&mut self, mut coordinator: impl SinkCommitCoordinator) { + let result: Result<(), ()> = try { + coordinator.init().await.map_err(|e| { + error!("failed to initialize coordinator: {:?}", e); + })?; + loop { + let (epoch, metadata_list) = self.collect_all_metadata().await.map_err(|e| { + error!("failed to collect all metadata: {:?}", e); + })?; + // TODO: measure commit time + coordinator + .commit(epoch, metadata_list) + .await + .map_err(|e| error!("failed to commit metadata of epoch {}: {:?}", epoch, e))?; + + self.send_to_all_sink_writers(|| { + Ok(CoordinateResponse { + msg: Some(coordinate_response::Msg::CommitResponse(CommitResponse { + epoch, + })), + }) + }) + .await; + } + }; + + if result.is_err() { + self.send_to_all_sink_writers(|| Err(Status::aborted("failed to run coordination"))) + .await; + } + } +} diff --git a/src/meta/src/manager/sink_coordination/manager.rs b/src/meta/src/manager/sink_coordination/manager.rs new file mode 100644 index 0000000000000..dfb511f08849a --- /dev/null +++ b/src/meta/src/manager/sink_coordination/manager.rs @@ -0,0 +1,796 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::pin::pin; + +use futures::future::{select, BoxFuture, Either}; +use futures::stream::FuturesUnordered; +use futures::{FutureExt, Stream, StreamExt, TryStreamExt}; +use risingwave_common::buffer::Bitmap; +use risingwave_common::util::pending_on_none; +use risingwave_connector::sink::catalog::SinkId; +use risingwave_connector::sink::SinkParam; +use risingwave_pb::connector_service::coordinate_request::Msg; +use risingwave_pb::connector_service::{coordinate_request, CoordinateRequest, CoordinateResponse}; +use risingwave_rpc_client::ConnectorClient; +use tokio::sync::mpsc; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::sync::oneshot::{channel, Receiver, Sender}; +use tokio::task::{JoinError, JoinHandle}; +use tokio_stream::wrappers::ReceiverStream; +use tonic::Status; +use tracing::{debug, error, info, warn}; + +use crate::manager::sink_coordination::coordinator_worker::CoordinatorWorker; +use crate::manager::sink_coordination::{NewSinkWriterRequest, SinkWriterRequestStream}; + +macro_rules! send_with_err_check { + ($tx:expr, $msg:expr) => { + if $tx.send($msg).is_err() { + error!("unable to send msg"); + } + }; +} + +macro_rules! send_await_with_err_check { + ($tx:expr, $msg:expr) => { + if $tx.send($msg).await.is_err() { + error!("unable to send msg"); + } + }; +} + +const BOUNDED_CHANNEL_SIZE: usize = 16; + +enum ManagerRequest { + NewSinkWriter(NewSinkWriterRequest), + StopCoordinator { + finish_notifier: Sender<()>, + /// sink id to stop. When `None`, stop all sink coordinator + sink_id: Option, + }, +} + +#[derive(Clone)] +pub struct SinkCoordinatorManager { + request_tx: mpsc::Sender, +} + +impl SinkCoordinatorManager { + pub(crate) fn start_worker( + connector_client: Option, + ) -> (Self, (JoinHandle<()>, Sender<()>)) { + Self::start_worker_with_spawn_worker( + connector_client, + |writer_request, manager_request_stream, connector_client| { + tokio::spawn(CoordinatorWorker::run( + writer_request, + manager_request_stream, + connector_client, + )) + }, + ) + } + + fn start_worker_with_spawn_worker( + connector_client: Option, + spawn_coordinator_worker: impl SpawnCoordinatorFn, + ) -> (Self, (JoinHandle<()>, Sender<()>)) { + let (request_tx, request_rx) = mpsc::channel(BOUNDED_CHANNEL_SIZE); + let (shutdown_tx, shutdown_rx) = channel(); + let worker = ManagerWorker::new(request_rx, shutdown_rx, connector_client); + let join_handle = tokio::spawn(worker.execute(spawn_coordinator_worker)); + ( + SinkCoordinatorManager { request_tx }, + (join_handle, shutdown_tx), + ) + } + + pub(crate) async fn handle_new_request( + &self, + mut request_stream: SinkWriterRequestStream, + ) -> Result>, Status> { + let (param, vnode_bitmap) = match request_stream.try_next().await? { + Some(CoordinateRequest { + msg: + Some(Msg::StartRequest(coordinate_request::StartCoordinationRequest { + param: Some(param), + vnode_bitmap: Some(vnode_bitmap), + })), + }) => (SinkParam::from_proto(param), Bitmap::from(&vnode_bitmap)), + msg => { + return Err(Status::invalid_argument(format!( + "expected CoordinateRequest::StartRequest in the first request, get {:?}", + msg + ))); + } + }; + let (response_tx, response_rx) = mpsc::channel(BOUNDED_CHANNEL_SIZE); + self.request_tx + .send(ManagerRequest::NewSinkWriter(NewSinkWriterRequest { + request_stream, + response_tx, + param, + vnode_bitmap, + })) + .await + .map_err(|_| { + Status::unavailable( + "unable to send to sink manager worker. The worker may have stopped", + ) + })?; + + Ok(ReceiverStream::new(response_rx)) + } + + async fn stop_coordinator(&self, sink_id: Option) { + let (tx, rx) = channel(); + send_await_with_err_check!( + self.request_tx, + ManagerRequest::StopCoordinator { + finish_notifier: tx, + sink_id, + } + ); + if rx.await.is_err() { + error!("fail to wait for resetting sink manager worker"); + } + info!("successfully stop coordinator: {:?}", sink_id); + } + + pub(crate) async fn reset(&self) { + self.stop_coordinator(None).await; + } + + pub(crate) async fn stop_sink_coordinator(&self, sink_id: SinkId) { + self.stop_coordinator(Some(sink_id)).await; + } +} + +struct CoordinatorWorkerHandle { + /// Sender to coordinator worker. Drop the sender as a stop signal + request_sender: Option>, + /// Notify when the coordinator worker stops + finish_notifiers: Vec>, +} + +struct ManagerWorker { + connector_client: Option, + request_rx: mpsc::Receiver, + // Make it option so that it can be polled with &mut SinkManagerWorker + shutdown_rx: Option>, + + running_coordinator_worker_join_handles: + FuturesUnordered)>>, + running_coordinator_worker: HashMap, +} + +enum ManagerEvent { + NewRequest(ManagerRequest), + CoordinatorWorkerFinished { + sink_id: SinkId, + join_result: Result<(), JoinError>, + }, +} + +trait SpawnCoordinatorFn = FnMut( + NewSinkWriterRequest, + UnboundedReceiver, + Option, + ) -> JoinHandle<()> + + Send + + 'static; + +impl ManagerWorker { + fn new( + request_rx: mpsc::Receiver, + shutdown_rx: Receiver<()>, + connector_client: Option, + ) -> Self { + ManagerWorker { + request_rx, + shutdown_rx: Some(shutdown_rx), + running_coordinator_worker_join_handles: Default::default(), + running_coordinator_worker: Default::default(), + connector_client, + } + } + + async fn execute(mut self, mut spawn_coordinator_worker: impl SpawnCoordinatorFn) { + while let Some(event) = self.next_event().await { + match event { + ManagerEvent::NewRequest(request) => match request { + ManagerRequest::NewSinkWriter(request) => { + self.handle_new_sink_writer(request, &mut spawn_coordinator_worker) + } + ManagerRequest::StopCoordinator { + finish_notifier, + sink_id, + } => { + if let Some(sink_id) = sink_id { + if let Some(worker_handle) = + self.running_coordinator_worker.get_mut(&sink_id) + { + if let Some(sender) = worker_handle.request_sender.take() { + // drop the sender as a signal to notify the coordinator worker + // to stop + drop(sender); + } + worker_handle.finish_notifiers.push(finish_notifier); + } else { + debug!( + "sink coordinator of {} is not running. Notify finish directly", + sink_id.sink_id + ); + send_with_err_check!(finish_notifier, ()); + } + } else { + self.clean_up().await; + send_with_err_check!(finish_notifier, ()); + } + } + }, + ManagerEvent::CoordinatorWorkerFinished { + sink_id, + join_result, + } => self.handle_coordinator_finished(sink_id, join_result), + } + } + self.clean_up().await; + info!("sink manager worker exited"); + } + + async fn next_event(&mut self) -> Option { + let shutdown_rx = self.shutdown_rx.take().expect("should not be empty"); + match select( + select( + pin!(self.request_rx.recv()), + pin!(pending_on_none( + self.running_coordinator_worker_join_handles.next() + )), + ), + shutdown_rx, + ) + .await + { + Either::Left((either, shutdown_rx)) => { + self.shutdown_rx = Some(shutdown_rx); + match either { + Either::Left((Some(request), _)) => Some(ManagerEvent::NewRequest(request)), + Either::Left((None, _)) => None, + Either::Right(((sink_id, join_result), _)) => { + Some(ManagerEvent::CoordinatorWorkerFinished { + sink_id, + join_result, + }) + } + } + } + Either::Right(_) => None, + } + } + + async fn clean_up(&mut self) { + info!("sink manager worker start cleaning up"); + for worker_handle in self.running_coordinator_worker.values_mut() { + if let Some(sender) = worker_handle.request_sender.take() { + // drop the sender to notify the coordinator worker to stop + drop(sender); + } + } + while let Some((sink_id, join_result)) = + self.running_coordinator_worker_join_handles.next().await + { + self.handle_coordinator_finished(sink_id, join_result); + } + info!("sink manager worker finished cleaning up"); + } + + fn handle_coordinator_finished(&mut self, sink_id: SinkId, join_result: Result<(), JoinError>) { + let worker_handle = self + .running_coordinator_worker + .remove(&sink_id) + .expect("finished coordinator should have an associated worker handle"); + for finish_notifier in worker_handle.finish_notifiers { + send_with_err_check!(finish_notifier, ()); + } + match join_result { + Ok(()) => { + info!( + "sink coordinator of {} has gracefully finished", + sink_id.sink_id + ); + } + Err(err) => { + error!( + "sink coordinator of {} finished with error {:?}", + sink_id.sink_id, err + ); + } + } + } + + fn handle_new_sink_writer( + &mut self, + request: NewSinkWriterRequest, + spawn_coordinator_worker: &mut impl SpawnCoordinatorFn, + ) { + let param = &request.param; + let sink_id = param.sink_id; + + // Launch the coordinator worker task if it is the first + match self.running_coordinator_worker.entry(param.sink_id) { + Entry::Occupied(mut entry) => { + if let Some(sender) = entry.get_mut().request_sender.as_mut() { + send_with_err_check!(sender, request); + } else { + warn!( + "handle a new request while the sink coordinator is being stopped: {:?}", + param + ); + drop(request.response_tx); + } + } + Entry::Vacant(entry) => { + let (request_tx, request_rx) = unbounded_channel(); + let connector_client = self.connector_client.clone(); + let join_handle = spawn_coordinator_worker(request, request_rx, connector_client); + self.running_coordinator_worker_join_handles.push( + join_handle + .map(move |join_result| (sink_id, join_result)) + .boxed(), + ); + entry.insert(CoordinatorWorkerHandle { + request_sender: Some(request_tx), + finish_notifiers: Vec::new(), + }); + } + }; + } +} + +#[cfg(test)] +mod tests { + use std::future::{poll_fn, Future}; + use std::pin::pin; + use std::task::Poll; + + use anyhow::anyhow; + use async_trait::async_trait; + use futures::future::join; + use futures::{FutureExt, StreamExt}; + use itertools::Itertools; + use rand::seq::SliceRandom; + use risingwave_common::buffer::{Bitmap, BitmapBuilder}; + use risingwave_common::hash::VirtualNode; + use risingwave_connector::sink::catalog::{SinkId, SinkType}; + use risingwave_connector::sink::{SinkCommitCoordinator, SinkError, SinkParam}; + use risingwave_pb::connector_service::sink_metadata::{Metadata, SerializedMetadata}; + use risingwave_pb::connector_service::SinkMetadata; + use risingwave_rpc_client::CoordinatorStreamHandle; + + use crate::manager::sink_coordination::coordinator_worker::CoordinatorWorker; + use crate::manager::sink_coordination::{NewSinkWriterRequest, SinkCoordinatorManager}; + + struct MockCoordinator, &mut C) -> Result<(), SinkError>> { + context: C, + f: F, + } + + impl, &mut C) -> Result<(), SinkError>> MockCoordinator { + fn new(context: C, f: F) -> Self { + MockCoordinator { context, f } + } + } + + #[async_trait] + impl, &mut C) -> Result<(), SinkError> + Send> + SinkCommitCoordinator for MockCoordinator + { + async fn init(&mut self) -> risingwave_connector::sink::Result<()> { + Ok(()) + } + + async fn commit( + &mut self, + epoch: u64, + metadata: Vec, + ) -> risingwave_connector::sink::Result<()> { + (self.f)(epoch, metadata, &mut self.context) + } + } + + #[tokio::test] + async fn test_basic() { + let sink_id = SinkId::from(1); + let param = SinkParam { + sink_id, + properties: Default::default(), + columns: vec![], + pk_indices: vec![], + sink_type: SinkType::AppendOnly, + }; + + let epoch1 = 233; + let epoch2 = 234; + + let mut all_vnode = (0..VirtualNode::COUNT).collect_vec(); + all_vnode.shuffle(&mut rand::thread_rng()); + let (first, second) = all_vnode.split_at(VirtualNode::COUNT / 2); + let build_bitmap = |indexes: &[usize]| { + let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + for i in indexes.iter() { + builder.set(*i, true); + } + builder.finish() + }; + let vnode1 = build_bitmap(first); + let vnode2 = build_bitmap(second); + + let metadata = [ + [vec![1u8, 2u8], vec![3u8, 4u8]], + [vec![5u8, 6u8], vec![7u8, 8u8]], + ]; + + let (manager, (_join_handle, _stop_tx)) = + SinkCoordinatorManager::start_worker_with_spawn_worker(None, { + let param = param.clone(); + let metadata = metadata.clone(); + move |first_request: NewSinkWriterRequest, new_writer_rx, _| { + let param = param.clone(); + let metadata = metadata.clone(); + tokio::spawn(async move { + // validate the start request + assert_eq!(first_request.param, param); + CoordinatorWorker::execute_coordinator( + first_request, + new_writer_rx, + MockCoordinator::new(0, |epoch, metadata_list, count: &mut usize| { + *count += 1; + let mut metadata_list = metadata_list + .into_iter() + .map(|metadata| match metadata { + SinkMetadata { + metadata: + Some(Metadata::Serialized(SerializedMetadata { + metadata, + })), + } => metadata, + _ => unreachable!(), + }) + .collect_vec(); + metadata_list.sort(); + match *count { + 1 => { + assert_eq!(epoch, epoch1); + assert_eq!(2, metadata_list.len()); + assert_eq!(metadata[0][0], metadata_list[0]); + assert_eq!(metadata[0][1], metadata_list[1]); + } + 2 => { + assert_eq!(epoch, epoch2); + assert_eq!(2, metadata_list.len()); + assert_eq!(metadata[1][0], metadata_list[0]); + assert_eq!(metadata[1][1], metadata_list[1]); + } + _ => unreachable!(), + } + Ok(()) + }), + ) + .await; + }) + } + }); + + let build_client = |vnode| async { + CoordinatorStreamHandle::new_with_init_stream( + param.to_proto(), + vnode, + |stream_req| async { + Ok(tonic::Response::new( + manager + .handle_new_request(stream_req.into_inner().map(Ok).boxed()) + .await + .unwrap() + .boxed(), + )) + }, + ) + .await + .unwrap() + }; + + let mut build_client_future1 = pin!(build_client(vnode1)); + assert!( + poll_fn(|cx| Poll::Ready(build_client_future1.as_mut().poll(cx))) + .await + .is_pending() + ); + let (mut client1, mut client2) = + join(build_client_future1, pin!(build_client(vnode2))).await; + + { + // commit epoch1 + let mut commit_future = pin!(client2 + .commit( + epoch1, + SinkMetadata { + metadata: Some(Metadata::Serialized(SerializedMetadata { + metadata: metadata[0][1].clone(), + })), + }, + ) + .map(|result| result.unwrap())); + assert!(poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx))) + .await + .is_pending()); + join( + commit_future, + client1 + .commit( + epoch1, + SinkMetadata { + metadata: Some(Metadata::Serialized(SerializedMetadata { + metadata: metadata[0][0].clone(), + })), + }, + ) + .map(|result| result.unwrap()), + ) + .await; + } + + // commit epoch2 + let mut commit_future = pin!(client1 + .commit( + epoch2, + SinkMetadata { + metadata: Some(Metadata::Serialized(SerializedMetadata { + metadata: metadata[1][0].clone(), + })), + }, + ) + .map(|result| result.unwrap())); + assert!(poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx))) + .await + .is_pending()); + join( + commit_future, + client2 + .commit( + epoch2, + SinkMetadata { + metadata: Some(Metadata::Serialized(SerializedMetadata { + metadata: metadata[1][1].clone(), + })), + }, + ) + .map(|result| result.unwrap()), + ) + .await; + } + + #[tokio::test] + async fn test_drop_sink_while_init() { + let sink_id = SinkId::from(1); + let param = SinkParam { + sink_id, + properties: Default::default(), + columns: vec![], + pk_indices: vec![], + sink_type: SinkType::AppendOnly, + }; + + let (manager, (_join_handle, _stop_tx)) = SinkCoordinatorManager::start_worker(None); + + let mut build_client_future1 = pin!(CoordinatorStreamHandle::new_with_init_stream( + param.to_proto(), + Bitmap::zeros(VirtualNode::COUNT), + |stream_req| async { + Ok(tonic::Response::new( + manager + .handle_new_request(stream_req.into_inner().map(Ok).boxed()) + .await + .unwrap() + .boxed(), + )) + }, + )); + assert!( + poll_fn(|cx| Poll::Ready(build_client_future1.as_mut().poll(cx))) + .await + .is_pending() + ); + manager.stop_sink_coordinator(sink_id).await; + + assert!(build_client_future1.await.is_err()); + } + + #[tokio::test] + async fn test_partial_commit() { + let sink_id = SinkId::from(1); + let param = SinkParam { + sink_id, + properties: Default::default(), + columns: vec![], + pk_indices: vec![], + sink_type: SinkType::AppendOnly, + }; + + let epoch = 233; + + let mut all_vnode = (0..VirtualNode::COUNT).collect_vec(); + all_vnode.shuffle(&mut rand::thread_rng()); + let (first, second) = all_vnode.split_at(VirtualNode::COUNT / 2); + let build_bitmap = |indexes: &[usize]| { + let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + for i in indexes.iter() { + builder.set(*i, true); + } + builder.finish() + }; + let vnode1 = build_bitmap(first); + let vnode2 = build_bitmap(second); + + let (manager, (_join_handle, _stop_tx)) = + SinkCoordinatorManager::start_worker_with_spawn_worker(None, { + let param = param.clone(); + move |first_request: NewSinkWriterRequest, new_writer_rx, _| { + let param = param.clone(); + tokio::spawn(async move { + // validate the start request + assert_eq!(first_request.param, param); + CoordinatorWorker::execute_coordinator( + first_request, + new_writer_rx, + MockCoordinator::new((), |_, _, _| unreachable!()), + ) + .await; + }) + } + }); + + let build_client = |vnode| async { + CoordinatorStreamHandle::new_with_init_stream( + param.to_proto(), + vnode, + |stream_req| async { + Ok(tonic::Response::new( + manager + .handle_new_request(stream_req.into_inner().map(Ok).boxed()) + .await + .unwrap() + .boxed(), + )) + }, + ) + .await + .unwrap() + }; + + let (mut client1, client2) = join(build_client(vnode1), build_client(vnode2)).await; + + // commit epoch + let mut commit_future = pin!(client1.commit( + epoch, + SinkMetadata { + metadata: Some(Metadata::Serialized(SerializedMetadata { + metadata: vec![], + })), + }, + )); + assert!(poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx))) + .await + .is_pending()); + drop(client2); + assert!(commit_future.await.is_err()); + } + + #[tokio::test] + async fn test_fail_commit() { + let sink_id = SinkId::from(1); + let param = SinkParam { + sink_id, + properties: Default::default(), + columns: vec![], + pk_indices: vec![], + sink_type: SinkType::AppendOnly, + }; + + let epoch = 233; + + let mut all_vnode = (0..VirtualNode::COUNT).collect_vec(); + all_vnode.shuffle(&mut rand::thread_rng()); + let (first, second) = all_vnode.split_at(VirtualNode::COUNT / 2); + let build_bitmap = |indexes: &[usize]| { + let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + for i in indexes.iter() { + builder.set(*i, true); + } + builder.finish() + }; + let vnode1 = build_bitmap(first); + let vnode2 = build_bitmap(second); + + let (manager, (_join_handle, _stop_tx)) = + SinkCoordinatorManager::start_worker_with_spawn_worker(None, { + let param = param.clone(); + move |first_request: NewSinkWriterRequest, new_writer_rx, _| { + let param = param.clone(); + tokio::spawn(async move { + // validate the start request + assert_eq!(first_request.param, param); + CoordinatorWorker::execute_coordinator( + first_request, + new_writer_rx, + MockCoordinator::new((), |_, _, _| { + Err(SinkError::Coordinator(anyhow!("failed to commit"))) + }), + ) + .await; + }) + } + }); + + let build_client = |vnode| async { + CoordinatorStreamHandle::new_with_init_stream( + param.to_proto(), + vnode, + |stream_req| async { + Ok(tonic::Response::new( + manager + .handle_new_request(stream_req.into_inner().map(Ok).boxed()) + .await + .unwrap() + .boxed(), + )) + }, + ) + .await + .unwrap() + }; + + let (mut client1, mut client2) = join(build_client(vnode1), build_client(vnode2)).await; + + // commit epoch + let mut commit_future = pin!(client1.commit( + epoch, + SinkMetadata { + metadata: Some(Metadata::Serialized(SerializedMetadata { + metadata: vec![], + })), + }, + )); + assert!(poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx))) + .await + .is_pending()); + let (result1, result2) = join( + commit_future, + client2.commit( + epoch, + SinkMetadata { + metadata: Some(Metadata::Serialized(SerializedMetadata { + metadata: vec![], + })), + }, + ), + ) + .await; + assert!(result1.is_err()); + assert!(result2.is_err()); + } +} diff --git a/src/meta/src/manager/sink_coordination/mod.rs b/src/meta/src/manager/sink_coordination/mod.rs new file mode 100644 index 0000000000000..30786c8721e97 --- /dev/null +++ b/src/meta/src/manager/sink_coordination/mod.rs @@ -0,0 +1,34 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod coordinator_worker; +mod manager; + +use futures::stream::BoxStream; +pub(crate) use manager::SinkCoordinatorManager; +use risingwave_common::buffer::Bitmap; +use risingwave_connector::sink::SinkParam; +use risingwave_pb::connector_service::{CoordinateRequest, CoordinateResponse}; +use tokio::sync::mpsc::Sender; +use tonic::Status; + +pub(crate) type SinkWriterRequestStream = BoxStream<'static, Result>; +pub(crate) type SinkCoordinatorResponseSender = Sender>; + +pub(crate) struct NewSinkWriterRequest { + pub(crate) request_stream: SinkWriterRequestStream, + pub(crate) response_tx: SinkCoordinatorResponseSender, + pub(crate) param: SinkParam, + pub(crate) vnode_bitmap: Bitmap, +} diff --git a/src/meta/src/rpc/server.rs b/src/meta/src/rpc/server.rs index 3f5c1f6a420c9..dd7d8740a6547 100644 --- a/src/meta/src/rpc/server.rs +++ b/src/meta/src/rpc/server.rs @@ -30,6 +30,7 @@ use risingwave_common_service::metrics_manager::MetricsManager; use risingwave_common_service::tracing::TracingExtractLayer; use risingwave_pb::backup_service::backup_service_server::BackupServiceServer; use risingwave_pb::cloud_service::cloud_service_server::CloudServiceServer; +use risingwave_pb::connector_service::sink_coordination_service_server::SinkCoordinationServiceServer; use risingwave_pb::ddl_service::ddl_service_server::DdlServiceServer; use risingwave_pb::health::health_server::HealthServer; use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerServiceServer; @@ -59,6 +60,7 @@ use super::DdlServiceImpl; use crate::backup_restore::BackupManager; use crate::barrier::{BarrierScheduler, GlobalBarrierManager}; use crate::hummock::HummockManager; +use crate::manager::sink_coordination::SinkCoordinatorManager; use crate::manager::{ CatalogManager, ClusterManager, FragmentManager, IdleManager, MetaOpts, MetaSrvEnv, SystemParamsManager, @@ -72,6 +74,7 @@ use crate::rpc::service::cluster_service::ClusterServiceImpl; use crate::rpc::service::heartbeat_service::HeartbeatServiceImpl; use crate::rpc::service::hummock_service::HummockServiceImpl; use crate::rpc::service::meta_member_service::MetaMemberServiceImpl; +use crate::rpc::service::sink_coordination_service::SinkCoordinationServiceImpl; use crate::rpc::service::stream_service::StreamServiceImpl; use crate::rpc::service::system_params_service::SystemParamsServiceImpl; use crate::rpc::service::telemetry_service::TelemetryInfoServiceImpl; @@ -438,6 +441,10 @@ pub async fn start_service_as_election_leader( .unwrap(), ); + let (sink_manager, shutdown_handle) = + SinkCoordinatorManager::start_worker(env.connector_client()); + let mut sub_tasks = vec![shutdown_handle]; + let barrier_manager = Arc::new(GlobalBarrierManager::new( scheduled_barriers, env.clone(), @@ -446,6 +453,7 @@ pub async fn start_service_as_election_leader( fragment_manager.clone(), hummock_manager.clone(), source_manager.clone(), + sink_manager.clone(), meta_metrics.clone(), )); @@ -512,6 +520,7 @@ pub async fn start_service_as_election_leader( cluster_manager.clone(), fragment_manager.clone(), barrier_manager.clone(), + sink_manager.clone(), ); let user_srv = UserServiceImpl::::new(env.clone(), catalog_manager.clone()); @@ -533,6 +542,7 @@ pub async fn start_service_as_election_leader( catalog_manager.clone(), fragment_manager.clone(), ); + let sink_coordination_srv = SinkCoordinationServiceImpl::new(sink_manager); let hummock_srv = HummockServiceImpl::new( hummock_manager.clone(), vacuum_manager.clone(), @@ -563,12 +573,12 @@ pub async fn start_service_as_election_leader( } // sub_tasks executed concurrently. Can be shutdown via shutdown_all - let mut sub_tasks = hummock::start_hummock_workers( + sub_tasks.extend(hummock::start_hummock_workers( hummock_manager.clone(), vacuum_manager, // compaction_scheduler, &env.opts, - ); + )); sub_tasks.push( start_worker_info_monitor( cluster_manager.clone(), @@ -703,6 +713,7 @@ pub async fn start_service_as_election_leader( .add_service(TelemetryInfoServiceServer::new(telemetry_srv)) .add_service(ServingServiceServer::new(serving_srv)) .add_service(CloudServiceServer::new(cloud_srv)) + .add_service(SinkCoordinationServiceServer::new(sink_coordination_srv)) .serve_with_shutdown(address_info.listen_addr, async move { tokio::select! { res = svc_shutdown_rx.changed() => { diff --git a/src/meta/src/rpc/service/ddl_service.rs b/src/meta/src/rpc/service/ddl_service.rs index 1a316edc29196..059eab76c5dcf 100644 --- a/src/meta/src/rpc/service/ddl_service.rs +++ b/src/meta/src/rpc/service/ddl_service.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use anyhow::anyhow; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::stream_graph_visitor::visit_fragment; +use risingwave_connector::sink::catalog::SinkId; use risingwave_pb::catalog::connection::private_link_service::{ PbPrivateLinkProvider, PrivateLinkProvider, }; @@ -32,6 +33,7 @@ use risingwave_pb::stream_plan::stream_node::NodeBody; use tonic::{Request, Response, Status}; use crate::barrier::BarrierManagerRef; +use crate::manager::sink_coordination::SinkCoordinatorManager; use crate::manager::{ CatalogManagerRef, ClusterManagerRef, ConnectionId, FragmentManagerRef, IdCategory, IdCategoryType, MetaSrvEnv, StreamingJob, @@ -47,6 +49,7 @@ pub struct DdlServiceImpl { env: MetaSrvEnv, catalog_manager: CatalogManagerRef, + sink_manager: SinkCoordinatorManager, ddl_controller: DdlController, aws_client: Arc>, } @@ -65,6 +68,7 @@ where cluster_manager: ClusterManagerRef, fragment_manager: FragmentManagerRef, barrier_manager: BarrierManagerRef, + sink_manager: SinkCoordinatorManager, ) -> Self { let aws_cli_ref = Arc::new(aws_client); let ddl_controller = DdlController::new( @@ -82,6 +86,7 @@ where catalog_manager, ddl_controller, aws_client: aws_cli_ref, + sink_manager, } } } @@ -254,6 +259,10 @@ where )) .await?; + self.sink_manager + .stop_sink_coordinator(SinkId::from(sink_id)) + .await; + Ok(Response::new(DropSinkResponse { status: None, version, diff --git a/src/meta/src/rpc/service/mod.rs b/src/meta/src/rpc/service/mod.rs index 2fdf4daa066b9..4484a8ca68a88 100644 --- a/src/meta/src/rpc/service/mod.rs +++ b/src/meta/src/rpc/service/mod.rs @@ -23,6 +23,7 @@ pub mod meta_member_service; pub mod notification_service; pub mod scale_service; pub mod serving_service; +pub mod sink_coordination_service; pub mod stream_service; pub mod system_params_service; pub mod telemetry_service; diff --git a/src/meta/src/rpc/service/sink_coordination_service.rs b/src/meta/src/rpc/service/sink_coordination_service.rs new file mode 100644 index 0000000000000..f7d56af9c063f --- /dev/null +++ b/src/meta/src/rpc/service/sink_coordination_service.rs @@ -0,0 +1,46 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use futures::{Stream, StreamExt}; +use risingwave_pb::connector_service::sink_coordination_service_server::SinkCoordinationService; +use risingwave_pb::connector_service::{CoordinateRequest, CoordinateResponse}; +use tonic::{Request, Response, Status, Streaming}; + +use crate::manager::sink_coordination::SinkCoordinatorManager; + +#[derive(Clone)] +pub(crate) struct SinkCoordinationServiceImpl { + sink_manager: SinkCoordinatorManager, +} + +impl SinkCoordinationServiceImpl { + pub(crate) fn new(sink_manager: SinkCoordinatorManager) -> Self { + Self { sink_manager } + } +} + +#[async_trait::async_trait] +impl SinkCoordinationService for SinkCoordinationServiceImpl { + type CoordinateStream = impl Stream>; + + async fn coordinate( + &self, + request: Request>, + ) -> Result, Status> { + let stream = request.into_inner(); + Ok(Response::new( + self.sink_manager.handle_new_request(stream.boxed()).await?, + )) + } +} diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index c82da3643c6f0..a32810bec3513 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -578,6 +578,7 @@ mod tests { use super::*; use crate::barrier::GlobalBarrierManager; use crate::hummock::{CompactorManager, HummockManager}; + use crate::manager::sink_coordination::SinkCoordinatorManager; use crate::manager::{ CatalogManager, CatalogManagerRef, ClusterManager, FragmentManager, MetaSrvEnv, RelationIdEnum, StreamingClusterInfo, @@ -778,6 +779,8 @@ mod tests { .await?, ); + let (sink_manager, _) = SinkCoordinatorManager::start_worker(None); + let barrier_manager = Arc::new(GlobalBarrierManager::new( scheduled_barriers, env.clone(), @@ -786,6 +789,7 @@ mod tests { fragment_manager.clone(), hummock_manager.clone(), source_manager.clone(), + sink_manager, meta_metrics.clone(), )); diff --git a/src/rpc_client/src/lib.rs b/src/rpc_client/src/lib.rs index a3473479e252f..cf695862f7fc0 100644 --- a/src/rpc_client/src/lib.rs +++ b/src/rpc_client/src/lib.rs @@ -39,7 +39,7 @@ use anyhow::anyhow; use async_trait::async_trait; use futures::future::try_join_all; use futures::stream::BoxStream; -use futures::StreamExt; +use futures::{Stream, StreamExt}; #[cfg(not(madsim))] use moka::future::Cache; use rand::prelude::SliceRandom; @@ -50,16 +50,16 @@ use tokio::sync::mpsc::{channel, Sender}; #[cfg(madsim)] use tokio::sync::Mutex; use tokio_stream::wrappers::ReceiverStream; -use tonic::{Request, Response, Status, Streaming}; +use tonic::{Request, Response, Status}; pub mod error; use error::{Result, RpcError}; +mod compactor_client; mod compute_client; mod connector_client; mod hummock_meta_client; mod meta_client; -// mod sink_client; -mod compactor_client; +mod sink_coordinate_client; mod stream_client; mod tracing; @@ -67,7 +67,8 @@ pub use compactor_client::CompactorClient; pub use compute_client::{ComputeClient, ComputeClientPool, ComputeClientPoolRef}; pub use connector_client::{ConnectorClient, SinkCoordinatorStreamHandle, SinkWriterStreamHandle}; pub use hummock_meta_client::{CompactionEventItem, HummockMetaClient}; -pub use meta_client::MetaClient; +pub use meta_client::{MetaClient, SinkCoordinationRpcClient}; +pub use sink_coordinate_client::CoordinatorStreamHandle; pub use stream_client::{StreamClient, StreamClientPool, StreamClientPoolRef}; #[async_trait] @@ -207,7 +208,7 @@ impl Debug for BidiStreamHandle { } impl BidiStreamHandle { - pub fn new( + pub fn for_test( request_sender: Sender, response_stream: BoxStream<'static, std::result::Result>, ) -> Self { @@ -219,7 +220,8 @@ impl BidiStreamHandle { pub async fn initialize< F: FnOnce(Request>) -> Fut, - Fut: Future>, Status>> + Send, + St: Stream> + Send + Unpin + 'static, + Fut: Future, Status>> + Send, >( first_request: REQ, init_stream_fn: F, @@ -246,7 +248,10 @@ impl BidiStreamHandle { )))??; Ok(( - Self::new(request_sender, response_stream.boxed()), + Self { + request_sender, + response_stream: response_stream.boxed(), + }, first_response, )) } diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index d67e5144f0979..77b74d0685690 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -45,6 +45,7 @@ use risingwave_pb::catalog::{ use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient; use risingwave_pb::cloud_service::*; use risingwave_pb::common::{HostAddress, WorkerNode, WorkerType}; +use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient; use risingwave_pb::ddl_service::alter_relation_name_request::Relation; use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient; use risingwave_pb::ddl_service::drop_table_request::SourceId; @@ -70,7 +71,7 @@ use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClie use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient; use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient; use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability; -use risingwave_pb::meta::{PbReschedule, *}; +use risingwave_pb::meta::*; use risingwave_pb::stream_plan::StreamFragmentGraph; use risingwave_pb::user::update_user_request::UpdateField; use risingwave_pb::user::user_service_client::UserServiceClient; @@ -1052,6 +1053,10 @@ impl MetaClient { let resp = self.inner.rw_cloud_validate_source(req).await?; Ok(resp) } + + pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClient { + self.inner.core.read().await.sink_coordinate_client.clone() + } } #[async_trait] @@ -1213,6 +1218,8 @@ impl TelemetryInfoFetcher for MetaClient { } } +pub type SinkCoordinationRpcClient = SinkCoordinationServiceClient; + #[derive(Debug, Clone)] struct GrpcMetaClientCore { cluster_client: ClusterServiceClient, @@ -1229,6 +1236,7 @@ struct GrpcMetaClientCore { system_params_client: SystemParamsServiceClient, serving_client: ServingServiceClient, cloud_client: CloudServiceClient, + sink_coordinate_client: SinkCoordinationRpcClient, } impl GrpcMetaClientCore { @@ -1248,7 +1256,8 @@ impl GrpcMetaClientCore { let telemetry_client = TelemetryInfoServiceClient::new(channel.clone()); let system_params_client = SystemParamsServiceClient::new(channel.clone()); let serving_client = ServingServiceClient::new(channel.clone()); - let cloud_client = CloudServiceClient::new(channel); + let cloud_client = CloudServiceClient::new(channel.clone()); + let sink_coordinate_client = SinkCoordinationServiceClient::new(channel); GrpcMetaClientCore { cluster_client, @@ -1265,6 +1274,7 @@ impl GrpcMetaClientCore { system_params_client, serving_client, cloud_client, + sink_coordinate_client, } } } diff --git a/src/rpc_client/src/sink_coordinate_client.rs b/src/rpc_client/src/sink_coordinate_client.rs new file mode 100644 index 0000000000000..2afa878e2cd34 --- /dev/null +++ b/src/rpc_client/src/sink_coordinate_client.rs @@ -0,0 +1,91 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::future::Future; + +use anyhow::anyhow; +use futures::Stream; +use risingwave_common::buffer::Bitmap; +use risingwave_pb::connector_service::coordinate_request::{ + CommitRequest, StartCoordinationRequest, +}; +use risingwave_pb::connector_service::{ + coordinate_request, coordinate_response, CoordinateRequest, CoordinateResponse, PbSinkParam, + SinkMetadata, +}; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{Request, Response, Status}; + +use crate::{BidiStreamHandle, SinkCoordinationRpcClient}; + +pub type CoordinatorStreamHandle = BidiStreamHandle; + +impl CoordinatorStreamHandle { + pub async fn new( + mut client: SinkCoordinationRpcClient, + param: PbSinkParam, + vnode_bitmap: Bitmap, + ) -> anyhow::Result { + Self::new_with_init_stream(param, vnode_bitmap, |req_stream| async move { + client.coordinate(req_stream).await + }) + .await + } + + pub async fn new_with_init_stream( + param: PbSinkParam, + vnode_bitmap: Bitmap, + init_stream: F, + ) -> anyhow::Result + where + F: FnOnce(Request>) -> Fut, + St: Stream> + Send + Unpin + 'static, + Fut: Future, Status>> + Send, + { + let (stream_handle, first_response) = BidiStreamHandle::initialize( + CoordinateRequest { + msg: Some(coordinate_request::Msg::StartRequest( + StartCoordinationRequest { + vnode_bitmap: Some(vnode_bitmap.to_protobuf()), + param: Some(param), + }, + )), + }, + init_stream, + ) + .await?; + match first_response { + CoordinateResponse { + msg: Some(coordinate_response::Msg::StartResponse(_)), + } => Ok(stream_handle), + msg => Err(anyhow!("should get start response but get {:?}", msg)), + } + } + + pub async fn commit(&mut self, epoch: u64, metadata: SinkMetadata) -> anyhow::Result<()> { + self.send_request(CoordinateRequest { + msg: Some(coordinate_request::Msg::CommitRequest(CommitRequest { + epoch, + metadata: Some(metadata), + })), + }) + .await?; + match self.next_response().await? { + CoordinateResponse { + msg: Some(coordinate_response::Msg::CommitResponse(_)), + } => Ok(()), + msg => Err(anyhow!("should get commit response but get {:?}", msg)), + } + } +} diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 8a1853f698bd0..19d7e2cb819ee 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -388,11 +388,7 @@ mod test { let sink_executor = SinkExecutor::new( Box::new(mock), Arc::new(StreamingMetrics::unused()), - SinkWriterParam { - connector_params: Default::default(), - executor_id: 0, - vnode_bitmap: None, - }, + SinkWriterParam::default(), columns.clone(), properties, pk.clone(), @@ -478,11 +474,7 @@ mod test { let sink_executor = SinkExecutor::new( Box::new(mock), Arc::new(StreamingMetrics::unused()), - SinkWriterParam { - connector_params: Default::default(), - executor_id: 0, - vnode_bitmap: None, - }, + SinkWriterParam::default(), columns, properties, pk.clone(), diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index 6a91aa36b688a..d3f133822769f 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -68,6 +68,7 @@ impl ExecutorBuilder for SinkExecutorBuilder { connector_params: params.env.connector_params(), executor_id: params.executor_id, vnode_bitmap: params.vnode_bitmap, + meta_client: params.env.meta_client(), }, columns, properties, @@ -97,6 +98,7 @@ impl ExecutorBuilder for SinkExecutorBuilder { connector_params: params.env.connector_params(), executor_id: params.executor_id, vnode_bitmap: params.vnode_bitmap, + meta_client: params.env.meta_client(), }, columns, properties, diff --git a/src/stream/src/task/env.rs b/src/stream/src/task/env.rs index cd9b066163aca..75b074e477f37 100644 --- a/src/stream/src/task/env.rs +++ b/src/stream/src/task/env.rs @@ -22,6 +22,7 @@ use risingwave_connector::source::monitor::SourceMetrics; use risingwave_connector::ConnectorParams; #[cfg(test)] use risingwave_pb::connector_service::SinkPayloadFormat; +use risingwave_rpc_client::MetaClient; use risingwave_source::dml_manager::DmlManagerRef; use risingwave_storage::StateStoreImpl; @@ -57,6 +58,9 @@ pub struct StreamEnvironment { /// Total memory usage in stream. total_mem_val: Arc>, + + /// Meta client. Use `None` for test only + meta_client: Option, } impl StreamEnvironment { @@ -70,6 +74,7 @@ impl StreamEnvironment { dml_manager: DmlManagerRef, system_params_manager: LocalSystemParamsManagerRef, source_metrics: Arc, + meta_client: MetaClient, ) -> Self { StreamEnvironment { server_addr, @@ -81,6 +86,7 @@ impl StreamEnvironment { system_params_manager, source_metrics, total_mem_val: Arc::new(TrAdder::new()), + meta_client: Some(meta_client), } } @@ -102,6 +108,7 @@ impl StreamEnvironment { system_params_manager: Arc::new(LocalSystemParamsManager::for_test()), source_metrics: Arc::new(SourceMetrics::default()), total_mem_val: Arc::new(TrAdder::new()), + meta_client: None, } } @@ -140,4 +147,8 @@ impl StreamEnvironment { pub fn total_mem_usage(&self) -> Arc> { self.total_mem_val.clone() } + + pub fn meta_client(&self) -> Option { + self.meta_client.clone() + } }