From 6b0e60b30af48364536ed7a2f33666c7a9cf91e3 Mon Sep 17 00:00:00 2001 From: William Wen Date: Sun, 29 Oct 2023 19:16:15 +0800 Subject: [PATCH 01/14] refactor(sink): reimplement remote sink without writer --- src/connector/src/sink/remote.rs | 436 +++++++++++++++---------------- 1 file changed, 210 insertions(+), 226 deletions(-) diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 3c52cb720dbd4..e46f93da80e24 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -17,10 +17,12 @@ use std::fmt::Formatter; use std::future::Future; use std::marker::PhantomData; use std::ops::Deref; +use std::pin::pin; use std::time::Instant; use anyhow::anyhow; use async_trait::async_trait; +use futures::future::select; use futures::stream::Peekable; use futures::{StreamExt, TryFutureExt, TryStreamExt}; use itertools::Itertools; @@ -34,9 +36,8 @@ use risingwave_jni_core::jvm_runtime::JVM; use risingwave_pb::connector_service::sink_coordinator_stream_request::{ CommitMetadata, StartCoordinator, }; -use risingwave_pb::connector_service::sink_writer_stream_request::write_batch::json_payload::RowOp; use risingwave_pb::connector_service::sink_writer_stream_request::write_batch::{ - JsonPayload, Payload, StreamChunkPayload, + Payload, StreamChunkPayload, }; use risingwave_pb::connector_service::sink_writer_stream_request::{ Barrier, BeginEpoch, Request as SinkRequest, StartSink, WriteBatch, @@ -44,17 +45,15 @@ use risingwave_pb::connector_service::sink_writer_stream_request::{ use risingwave_pb::connector_service::sink_writer_stream_response::CommitResponse; use risingwave_pb::connector_service::{ sink_coordinator_stream_request, sink_coordinator_stream_response, sink_writer_stream_response, - SinkCoordinatorStreamRequest, SinkCoordinatorStreamResponse, SinkMetadata, SinkPayloadFormat, + PbSinkPayloadFormat, SinkCoordinatorStreamRequest, SinkCoordinatorStreamResponse, SinkMetadata, SinkWriterStreamRequest, SinkWriterStreamResponse, ValidateSinkRequest, ValidateSinkResponse, }; use tokio::sync::mpsc; -use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::sync::mpsc::{unbounded_channel, Receiver, Sender}; use tokio_stream::wrappers::ReceiverStream; use tracing::warn; -use super::encoder::{JsonEncoder, RowEncoder}; use crate::sink::coordinate::CoordinatedSinkWriter; -use crate::sink::encoder::TimestampHandlingMode; use crate::sink::log_store::{LogReader, LogStoreReadItem, TruncateOffset}; use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt}; use crate::sink::{ @@ -109,12 +108,12 @@ impl TryFrom for RemoteSink { impl Sink for RemoteSink { type Coordinator = DummySinkCommitCoordinator; - type LogSinker = RemoteLogSinker; + type LogSinker = RemoteLogSinker; const SINK_NAME: &'static str = R::SINK_NAME; async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { - RemoteLogSinker::new(self.param.clone(), writer_param).await + RemoteLogSinker::new::(self.param.clone(), writer_param).await } async fn validate(&self) -> Result<()> { @@ -194,22 +193,26 @@ impl Sink for RemoteSink { } } -pub struct RemoteLogSinker { - writer: RemoteSinkWriter, +pub struct RemoteLogSinker { + request_tx: SinkWriterStreamJniSender, + response_rx: SinkWriterStreamJniReceiver, sink_metrics: SinkMetrics, } -impl RemoteLogSinker { - async fn new(sink_param: SinkParam, writer_param: SinkWriterParam) -> Result { - let writer = RemoteSinkWriter::new( - sink_param, - writer_param.connector_params, - writer_param.sink_metrics.clone(), - ) - .await?; +impl RemoteLogSinker { + async fn new( + sink_param: SinkParam, + writer_param: SinkWriterParam, + ) -> Result { + let SinkWriterStreamJniHandle { + request_tx, + response_rx, + } = SinkWriterStreamJniHandle::new::(&sink_param).await?; + let sink_metrics = writer_param.sink_metrics; Ok(RemoteLogSinker { - writer, + request_tx, + response_rx, sink_metrics, }) } @@ -228,103 +231,155 @@ async fn await_future_with_monitor_receiver_err> } #[async_trait] -impl LogSinker for RemoteLogSinker { +impl LogSinker for RemoteLogSinker { async fn consume_log_and_sink(self, mut log_reader: impl LogReader) -> Result<()> { - // Note: this is a total copy of the implementation of LogSinkerOf, - // except that we monitor the future of `log_reader.next_item` with await_future_with_monitor_receiver_err - // to monitor the error in the response stream. - - let mut sink_writer = self.writer; + let mut request_tx = self.request_tx; + let mut response_err_stream_rx = self.response_rx; let sink_metrics = self.sink_metrics; - #[derive(Debug)] - enum LogConsumerState { - /// Mark that the log consumer is not initialized yet - Uninitialized, - /// Mark that a new epoch has begun. - EpochBegun { curr_epoch: u64 }, + let (response_tx, mut response_rx) = unbounded_channel(); - /// Mark that the consumer has just received a barrier - BarrierReceived { prev_epoch: u64 }, - } + let poll_response_stream = pin!(async move { + let result = response_err_stream_rx.response_stream.try_next().await; + match result { + Ok(Some(response)) => response_tx.send(response).map_err(|err| { + SinkError::Remote(anyhow!("unable to send response: {:?}", err.0)) + }), + Ok(None) => Err(SinkError::Remote(anyhow!("end of response stream"))), + Err(e) => Err(SinkError::Remote(e)), + } + }); - let mut state = LogConsumerState::Uninitialized; + let poll_consume_log_and_sink = pin!(async move { + #[derive(Debug)] + enum LogConsumerState { + /// Mark that the log consumer is not initialized yet + Uninitialized, - log_reader.init().await?; + /// Mark that a new epoch has begun. + EpochBegun { curr_epoch: u64, next_batch_id: u64 }, - loop { - let (epoch, item): (u64, LogStoreReadItem) = await_future_with_monitor_receiver_err( - &mut sink_writer.stream_handle.response_rx, - log_reader.next_item().map_err(SinkError::Internal), - ) - .await?; - if let LogStoreReadItem::UpdateVnodeBitmap(_) = &item { - match &state { - LogConsumerState::BarrierReceived { .. } => {} - _ => unreachable!( - "update vnode bitmap can be accepted only right after \ - barrier, but current state is {:?}", - state - ), - } + /// Mark that the consumer has just received a barrier + BarrierReceived { prev_epoch: u64 }, } - // begin_epoch when not previously began - state = match state { - LogConsumerState::Uninitialized => { - sink_writer.begin_epoch(epoch).await?; - LogConsumerState::EpochBegun { curr_epoch: epoch } - } - LogConsumerState::EpochBegun { curr_epoch } => { - assert!( - epoch >= curr_epoch, - "new epoch {} should not be below the current epoch {}", - epoch, - curr_epoch - ); - LogConsumerState::EpochBegun { curr_epoch: epoch } - } - LogConsumerState::BarrierReceived { prev_epoch } => { - assert!( - epoch > prev_epoch, - "new epoch {} should be greater than prev epoch {}", - epoch, - prev_epoch - ); - sink_writer.begin_epoch(epoch).await?; - LogConsumerState::EpochBegun { curr_epoch: epoch } - } - }; - match item { - LogStoreReadItem::StreamChunk { chunk, .. } => { - if let Err(e) = sink_writer.write_batch(chunk).await { - sink_writer.abort().await?; - return Err(e); + + let mut state = LogConsumerState::Uninitialized; + + log_reader.init().await?; + + loop { + let (epoch, item): (u64, LogStoreReadItem) = + log_reader.next_item().map_err(SinkError::Internal).await?; + + if let LogStoreReadItem::UpdateVnodeBitmap(_) = &item { + match &state { + LogConsumerState::BarrierReceived { .. } => {} + _ => unreachable!( + "update vnode bitmap can be accepted only right after \ + barrier, but current state is {:?}", + state + ), } } - LogStoreReadItem::Barrier { is_checkpoint } => { - let prev_epoch = match state { - LogConsumerState::EpochBegun { curr_epoch } => curr_epoch, - _ => unreachable!("epoch must have begun before handling barrier"), - }; - if is_checkpoint { - let start_time = Instant::now(); - sink_writer.barrier(true).await?; + // begin_epoch when not previously began + state = match state { + LogConsumerState::Uninitialized => { + request_tx.start_epoch(epoch).await?; + LogConsumerState::EpochBegun { + curr_epoch: epoch, + next_batch_id: 0, + } + } + LogConsumerState::EpochBegun { + curr_epoch, + next_batch_id, + } => { + assert!( + epoch >= curr_epoch, + "new epoch {} should not be below the current epoch {}", + epoch, + curr_epoch + ); + LogConsumerState::EpochBegun { + curr_epoch: epoch, + next_batch_id, + } + } + LogConsumerState::BarrierReceived { prev_epoch } => { + assert!( + epoch > prev_epoch, + "new epoch {} should be greater than prev epoch {}", + epoch, + prev_epoch + ); + request_tx.start_epoch(epoch).await?; + LogConsumerState::EpochBegun { + curr_epoch: epoch, + next_batch_id: 0, + } + } + }; + match item { + LogStoreReadItem::StreamChunk { chunk, .. } => { + let cardinality = chunk.cardinality(); sink_metrics - .sink_commit_duration_metrics - .observe(start_time.elapsed().as_millis() as f64); - log_reader - .truncate(TruncateOffset::Barrier { epoch }) - .await?; - } else { - sink_writer.barrier(false).await?; + .connector_sink_rows_received + .inc_by(cardinality as _); + let (epoch, next_batch_id) = match &mut state { + LogConsumerState::EpochBegun { + curr_epoch, + next_batch_id, + } => (*curr_epoch, next_batch_id), + _ => unreachable!("epoch must have begun before handling stream chunk"), + }; + + let batch_id = *next_batch_id; + *next_batch_id += 1; + + let payload = build_chunk_payload(chunk); + request_tx.write_batch(epoch, batch_id, payload).await?; } - state = LogConsumerState::BarrierReceived { prev_epoch } - } - LogStoreReadItem::UpdateVnodeBitmap(vnode_bitmap) => { - sink_writer.update_vnode_bitmap(vnode_bitmap).await?; + LogStoreReadItem::Barrier { is_checkpoint } => { + let prev_epoch = match state { + LogConsumerState::EpochBegun { curr_epoch, .. } => curr_epoch, + _ => unreachable!("epoch must have begun before handling barrier"), + }; + if is_checkpoint { + let start_time = Instant::now(); + request_tx.barrier(epoch, true).await?; + match response_rx.recv().await.ok_or_else(|| { + SinkError::Remote(anyhow!("end of response stream")) + })? { + SinkWriterStreamResponse { + response: Some(sink_writer_stream_response::Response::Commit(_)), + } => {} + response => { + return Err(SinkError::Remote(anyhow!( + "expected commit response, but get {:?}", + response + ))); + } + }; + sink_metrics + .sink_commit_duration_metrics + .observe(start_time.elapsed().as_millis() as f64); + log_reader + .truncate(TruncateOffset::Barrier { epoch }) + .await?; + } else { + request_tx.barrier(epoch, false).await?; + } + state = LogConsumerState::BarrierReceived { prev_epoch } + } + LogStoreReadItem::UpdateVnodeBitmap(_) => {} } } - } + }); + + select(poll_response_stream, poll_consume_log_and_sink) + .await + .factor_first() + .0 } } @@ -340,8 +395,8 @@ impl TryFrom for CoordinatedRemoteSink { } impl Sink for CoordinatedRemoteSink { - type Coordinator = RemoteCoordinator; - type LogSinker = LogSinkerOf>>; + type Coordinator = RemoteCoordinator; + type LogSinker = LogSinkerOf>; const SINK_NAME: &'static str = R::SINK_NAME; @@ -362,7 +417,7 @@ impl Sink for CoordinatedRemoteSink { "sink needs coordination should not have singleton input" )) })?, - CoordinatedRemoteSinkWriter::new( + CoordinatedRemoteSinkWriter::new::( self.0.param.clone(), writer_param.connector_params, writer_param.sink_metrics.clone(), @@ -374,7 +429,7 @@ impl Sink for CoordinatedRemoteSink { } async fn new_coordinator(&self) -> Result { - RemoteCoordinator::new(self.0.param.clone()).await + RemoteCoordinator::new::(self.0.param.clone()).await } } @@ -527,26 +582,8 @@ impl SinkWriterStreamJniHandle { } } -pub type RemoteSinkWriter = RemoteSinkWriterInner<(), R>; -pub type CoordinatedRemoteSinkWriter = RemoteSinkWriterInner, R>; - -pub struct RemoteSinkWriterInner { - properties: HashMap, - epoch: Option, - batch_id: u64, - payload_format: SinkPayloadFormat, - stream_handle: SinkWriterStreamJniHandle, - json_encoder: JsonEncoder, - sink_metrics: SinkMetrics, - _phantom: PhantomData<(SM, R)>, -} - -impl RemoteSinkWriterInner { - pub async fn new( - param: SinkParam, - connector_params: ConnectorParams, - sink_metrics: SinkMetrics, - ) -> Result { +impl SinkWriterStreamJniHandle { + async fn new(param: &SinkParam) -> Result { let (request_tx, request_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE); let (response_tx, response_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE); @@ -581,7 +618,7 @@ impl RemoteSinkWriterInner { let sink_writer_stream_request = SinkWriterStreamRequest { request: Some(SinkRequest::Start(StartSink { sink_param: Some(param.to_proto()), - format: connector_params.sink_payload_format as i32, + format: PbSinkPayloadFormat::StreamChunk as i32, })), }; @@ -618,20 +655,38 @@ impl RemoteSinkWriterInner { ¶m.properties ); - let schema = param.schema(); - - let stream_handle = SinkWriterStreamJniHandle { + Ok(SinkWriterStreamJniHandle { request_tx: SinkWriterStreamJniSender { request_tx }, response_rx: SinkWriterStreamJniReceiver { response_stream }, - }; + }) + } +} + +pub type RemoteSinkWriter = RemoteSinkWriterInner<()>; +pub type CoordinatedRemoteSinkWriter = RemoteSinkWriterInner>; + +pub struct RemoteSinkWriterInner { + properties: HashMap, + epoch: Option, + batch_id: u64, + stream_handle: SinkWriterStreamJniHandle, + sink_metrics: SinkMetrics, + _phantom: PhantomData, +} + +impl RemoteSinkWriterInner { + pub async fn new( + param: SinkParam, + _connector_params: ConnectorParams, + sink_metrics: SinkMetrics, + ) -> Result { + let stream_handle = SinkWriterStreamJniHandle::new::(¶m).await?; Ok(Self { properties: param.properties, epoch: None, batch_id: 0, stream_handle, - payload_format: connector_params.sink_payload_format, - json_encoder: JsonEncoder::new(schema, None, TimestampHandlingMode::String), sink_metrics, _phantom: PhantomData, }) @@ -641,25 +696,9 @@ impl RemoteSinkWriterInner { fn for_test( response_receiver: Receiver>, request_sender: Sender, - ) -> RemoteSinkWriter { - use risingwave_common::catalog::{Field, Schema}; + ) -> RemoteSinkWriter { let properties = HashMap::from([("output.path".to_string(), "/tmp/rw".to_string())]); - let schema = Schema::new(vec![ - Field { - data_type: DataType::Int32, - name: "id".into(), - sub_fields: vec![], - type_name: "".into(), - }, - Field { - data_type: DataType::Varchar, - name: "name".into(), - sub_fields: vec![], - type_name: "".into(), - }, - ]); - let stream_handle = SinkWriterStreamJniHandle { request_tx: SinkWriterStreamJniSender { request_tx: request_sender, @@ -673,9 +712,7 @@ impl RemoteSinkWriterInner { properties, epoch: None, batch_id: 0, - json_encoder: JsonEncoder::new(schema, None, TimestampHandlingMode::String), stream_handle, - payload_format: SinkPayloadFormat::Json, sink_metrics: SinkMetrics::for_test(), _phantom: PhantomData, } @@ -688,7 +725,7 @@ trait HandleBarrierResponse { fn non_checkpoint_return_value() -> Self::SinkMetadata; } -impl HandleBarrierResponse for RemoteSinkWriter { +impl HandleBarrierResponse for RemoteSinkWriter { type SinkMetadata = (); fn handle_commit_response(rsp: CommitResponse) -> Result { @@ -701,7 +738,7 @@ impl HandleBarrierResponse for RemoteSinkWriter { fn non_checkpoint_return_value() -> Self::SinkMetadata {} } -impl HandleBarrierResponse for CoordinatedRemoteSinkWriter { +impl HandleBarrierResponse for CoordinatedRemoteSinkWriter { type SinkMetadata = Option; fn handle_commit_response(rsp: CommitResponse) -> Result { @@ -719,8 +756,14 @@ impl HandleBarrierResponse for CoordinatedRemoteSinkWriter Payload { + let prost_stream_chunk = chunk.to_protobuf(); + let binary_data = Message::encode_to_vec(&prost_stream_chunk); + Payload::StreamChunkPayload(StreamChunkPayload { binary_data }) +} + #[async_trait] -impl SinkWriter for RemoteSinkWriterInner +impl SinkWriter for RemoteSinkWriterInner where Self: HandleBarrierResponse, { @@ -731,30 +774,8 @@ where self.sink_metrics .connector_sink_rows_received .inc_by(cardinality as _); - let payload = match self.payload_format { - SinkPayloadFormat::Json => { - let mut row_ops = Vec::with_capacity(cardinality); - for (op, row_ref) in chunk.rows() { - let map = self.json_encoder.encode(row_ref)?; - let row_op = RowOp { - op_type: op.to_protobuf() as i32, - line: serde_json::to_string(&map) - .map_err(|e| SinkError::Remote(anyhow_error!("{:?}", e)))?, - }; - - row_ops.push(row_op); - } - Payload::JsonPayload(JsonPayload { row_ops }) - } - SinkPayloadFormat::StreamChunk => { - let prost_stream_chunk = chunk.to_protobuf(); - let binary_data = Message::encode_to_vec(&prost_stream_chunk); - Payload::StreamChunkPayload(StreamChunkPayload { binary_data }) - } - SinkPayloadFormat::FormatUnspecified => { - unreachable!("should specify sink payload format") - } - }; + + let payload = build_chunk_payload(chunk); let epoch = self.epoch.ok_or_else(|| { SinkError::Remote(anyhow_error!( @@ -794,13 +815,12 @@ where } } -pub struct RemoteCoordinator { +pub struct RemoteCoordinator { stream_handle: SinkCoordinatorStreamJniHandle, - _phantom: PhantomData, } -impl RemoteCoordinator { - pub async fn new(param: SinkParam) -> Result { +impl RemoteCoordinator { + pub async fn new(param: SinkParam) -> Result { let (request_tx, request_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE); let (response_tx, response_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE); @@ -877,15 +897,12 @@ impl RemoteCoordinator { ¶m.properties ); - Ok(RemoteCoordinator { - stream_handle, - _phantom: PhantomData, - }) + Ok(RemoteCoordinator { stream_handle }) } } #[async_trait] -impl SinkCommitCoordinator for RemoteCoordinator { +impl SinkCommitCoordinator for RemoteCoordinator { async fn init(&mut self) -> Result<()> { Ok(()) } @@ -901,27 +918,20 @@ mod test { use risingwave_common::array::StreamChunk; use risingwave_common::test_prelude::StreamChunkTestExt; - use risingwave_pb::connector_service::sink_writer_stream_request::write_batch::Payload; use risingwave_pb::connector_service::sink_writer_stream_request::{Barrier, Request}; use risingwave_pb::connector_service::sink_writer_stream_response::{CommitResponse, Response}; use risingwave_pb::connector_service::{SinkWriterStreamRequest, SinkWriterStreamResponse}; - use risingwave_pb::data; use tokio::sync::mpsc; - use crate::sink::remote::{RemoteSinkTrait, RemoteSinkWriter}; + use crate::sink::remote::{build_chunk_payload, RemoteSinkWriter}; use crate::sink::SinkWriter; - struct TestRemote; - impl RemoteSinkTrait for TestRemote { - const SINK_NAME: &'static str = "test-remote"; - } - #[tokio::test] async fn test_epoch_check() { let (request_sender, mut request_recv) = mpsc::channel(16); let (_, resp_recv) = mpsc::channel(16); - let mut sink = >::for_test(resp_recv, request_sender); + let mut sink = RemoteSinkWriter::for_test(resp_recv, request_sender); let chunk = StreamChunk::from_pretty( " i T + 1 Ripper @@ -958,7 +968,7 @@ mod test { async fn test_remote_sink() { let (request_sender, mut request_receiver) = mpsc::channel(16); let (response_sender, response_receiver) = mpsc::channel(16); - let mut sink = >::for_test(response_receiver, request_sender); + let mut sink = RemoteSinkWriter::for_test(response_receiver, request_sender); let chunk_a = StreamChunk::from_pretty( " i T @@ -993,20 +1003,7 @@ mod test { }) => { assert_eq!(write.epoch, 2022); assert_eq!(write.batch_id, 0); - match write.payload.unwrap() { - Payload::JsonPayload(json) => { - let row_0 = json.row_ops.get(0).unwrap(); - assert_eq!(row_0.line, "{\"id\":1,\"name\":\"Alice\"}"); - assert_eq!(row_0.op_type, data::Op::Insert as i32); - let row_1 = json.row_ops.get(1).unwrap(); - assert_eq!(row_1.line, "{\"id\":2,\"name\":\"Bob\"}"); - assert_eq!(row_1.op_type, data::Op::Insert as i32); - let row_2 = json.row_ops.get(2).unwrap(); - assert_eq!(row_2.line, "{\"id\":3,\"name\":\"Clare\"}"); - assert_eq!(row_2.op_type, data::Op::Insert as i32); - } - _ => unreachable!("should be json payload"), - } + assert_eq!(write.payload.unwrap(), build_chunk_payload(chunk_a)) } _ => panic!("test failed: failed to construct write request"), } @@ -1049,20 +1046,7 @@ mod test { }) => { assert_eq!(write.epoch, 2023); assert_eq!(write.batch_id, 1); - match write.payload.unwrap() { - Payload::JsonPayload(json) => { - let row_0 = json.row_ops.get(0).unwrap(); - assert_eq!(row_0.line, "{\"id\":4,\"name\":\"David\"}"); - assert_eq!(row_0.op_type, data::Op::Insert as i32); - let row_1 = json.row_ops.get(1).unwrap(); - assert_eq!(row_1.line, "{\"id\":5,\"name\":\"Eve\"}"); - assert_eq!(row_1.op_type, data::Op::Insert as i32); - let row_2 = json.row_ops.get(2).unwrap(); - assert_eq!(row_2.line, "{\"id\":6,\"name\":\"Frank\"}"); - assert_eq!(row_2.op_type, data::Op::Insert as i32); - } - _ => unreachable!("should be json payload"), - } + assert_eq!(write.payload.unwrap(), build_chunk_payload(chunk_b)); } _ => panic!("test failed: failed to construct write request"), } From b2e62a4c4873817d8220e691d879e3c609bde896 Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 30 Oct 2023 19:16:25 +0800 Subject: [PATCH 02/14] keep polling response stream --- src/connector/src/sink/remote.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index e46f93da80e24..10024e850d851 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -240,13 +240,17 @@ impl LogSinker for RemoteLogSinker { let (response_tx, mut response_rx) = unbounded_channel(); let poll_response_stream = pin!(async move { - let result = response_err_stream_rx.response_stream.try_next().await; - match result { - Ok(Some(response)) => response_tx.send(response).map_err(|err| { - SinkError::Remote(anyhow!("unable to send response: {:?}", err.0)) - }), - Ok(None) => Err(SinkError::Remote(anyhow!("end of response stream"))), - Err(e) => Err(SinkError::Remote(e)), + loop { + let result = response_err_stream_rx.response_stream.try_next().await; + match result { + Ok(Some(response)) => { + response_tx.send(response).map_err(|err| { + SinkError::Remote(anyhow!("unable to send response: {:?}", err.0)) + })?; + } + Ok(None) => return Err(SinkError::Remote(anyhow!("end of response stream"))), + Err(e) => return Err(SinkError::Remote(e)), + } } }); From b74e04b27520bad46f1538dc4fad64d6210d15f9 Mon Sep 17 00:00:00 2001 From: William Wen Date: Tue, 31 Oct 2023 14:41:54 +0800 Subject: [PATCH 03/14] use offset as the consuming state and reset batch id to null in java --- .../connector/SinkWriterStreamObserver.java | 5 +- src/connector/src/sink/log_store.rs | 12 +++ src/connector/src/sink/remote.rs | 93 +++---------------- 3 files changed, 29 insertions(+), 81 deletions(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java index 1323133519165..5d5bf482384a3 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java @@ -156,15 +156,14 @@ public void onNext(ConnectorServiceProto.SinkWriterStreamRequest sinkTask) { boolean isCheckpoint = sinkTask.getBarrier().getIsCheckpoint(); Optional metadata = sink.barrier(isCheckpoint); currentEpoch = sinkTask.getBarrier().getEpoch(); + currentBatchId = null; LOG.debug("Epoch {} barrier {}", currentEpoch, isCheckpoint); if (isCheckpoint) { ConnectorServiceProto.SinkWriterStreamResponse.CommitResponse.Builder builder = ConnectorServiceProto.SinkWriterStreamResponse.CommitResponse .newBuilder() .setEpoch(currentEpoch); - if (metadata.isPresent()) { - builder.setMetadata(metadata.get()); - } + metadata.ifPresent(builder::setMetadata); responseObserver.onNext( ConnectorServiceProto.SinkWriterStreamResponse.newBuilder() .setCommit(builder) diff --git a/src/connector/src/sink/log_store.rs b/src/connector/src/sink/log_store.rs index f7d99141139f5..c3356fce23d65 100644 --- a/src/connector/src/sink/log_store.rs +++ b/src/connector/src/sink/log_store.rs @@ -62,6 +62,18 @@ impl TruncateOffset { } } + pub fn check_next_offset(&self, next_offset: TruncateOffset) -> anyhow::Result<()> { + if *self >= next_offset { + Err(anyhow!( + "next offset {:?} should be later than current offset {:?}", + next_offset, + self + )) + } else { + Ok(()) + } + } + pub fn check_next_item_epoch(&self, epoch: u64) -> LogStoreResult<()> { match self { TruncateOffset::Chunk { diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 10024e850d851..e6d03f9ee9c47 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -255,19 +255,7 @@ impl LogSinker for RemoteLogSinker { }); let poll_consume_log_and_sink = pin!(async move { - #[derive(Debug)] - enum LogConsumerState { - /// Mark that the log consumer is not initialized yet - Uninitialized, - - /// Mark that a new epoch has begun. - EpochBegun { curr_epoch: u64, next_batch_id: u64 }, - - /// Mark that the consumer has just received a barrier - BarrierReceived { prev_epoch: u64 }, - } - - let mut state = LogConsumerState::Uninitialized; + let mut prev_offset: Option = None; log_reader.init().await?; @@ -275,79 +263,28 @@ impl LogSinker for RemoteLogSinker { let (epoch, item): (u64, LogStoreReadItem) = log_reader.next_item().map_err(SinkError::Internal).await?; - if let LogStoreReadItem::UpdateVnodeBitmap(_) = &item { - match &state { - LogConsumerState::BarrierReceived { .. } => {} - _ => unreachable!( - "update vnode bitmap can be accepted only right after \ - barrier, but current state is {:?}", - state - ), - } - } - // begin_epoch when not previously began - state = match state { - LogConsumerState::Uninitialized => { - request_tx.start_epoch(epoch).await?; - LogConsumerState::EpochBegun { - curr_epoch: epoch, - next_batch_id: 0, - } - } - LogConsumerState::EpochBegun { - curr_epoch, - next_batch_id, - } => { - assert!( - epoch >= curr_epoch, - "new epoch {} should not be below the current epoch {}", - epoch, - curr_epoch - ); - LogConsumerState::EpochBegun { - curr_epoch: epoch, - next_batch_id, - } - } - LogConsumerState::BarrierReceived { prev_epoch } => { - assert!( - epoch > prev_epoch, - "new epoch {} should be greater than prev epoch {}", - epoch, - prev_epoch - ); - request_tx.start_epoch(epoch).await?; - LogConsumerState::EpochBegun { - curr_epoch: epoch, - next_batch_id: 0, - } - } - }; match item { - LogStoreReadItem::StreamChunk { chunk, .. } => { + LogStoreReadItem::StreamChunk { chunk, chunk_id } => { + let offset = TruncateOffset::Chunk { epoch, chunk_id }; + if let Some(prev_offset) = &prev_offset { + prev_offset.check_next_offset(offset)?; + } let cardinality = chunk.cardinality(); sink_metrics .connector_sink_rows_received .inc_by(cardinality as _); - let (epoch, next_batch_id) = match &mut state { - LogConsumerState::EpochBegun { - curr_epoch, - next_batch_id, - } => (*curr_epoch, next_batch_id), - _ => unreachable!("epoch must have begun before handling stream chunk"), - }; - - let batch_id = *next_batch_id; - *next_batch_id += 1; let payload = build_chunk_payload(chunk); - request_tx.write_batch(epoch, batch_id, payload).await?; + request_tx + .write_batch(epoch, chunk_id as u64, payload) + .await?; + prev_offset = Some(offset); } LogStoreReadItem::Barrier { is_checkpoint } => { - let prev_epoch = match state { - LogConsumerState::EpochBegun { curr_epoch, .. } => curr_epoch, - _ => unreachable!("epoch must have begun before handling barrier"), - }; + let offset = TruncateOffset::Barrier { epoch }; + if let Some(prev_offset) = &prev_offset { + prev_offset.check_next_offset(offset)?; + } if is_checkpoint { let start_time = Instant::now(); request_tx.barrier(epoch, true).await?; @@ -373,7 +310,7 @@ impl LogSinker for RemoteLogSinker { } else { request_tx.barrier(epoch, false).await?; } - state = LogConsumerState::BarrierReceived { prev_epoch } + prev_offset = Some(offset); } LogStoreReadItem::UpdateVnodeBitmap(_) => {} } From 4535b9b3c7d3563812cd1ee6b9f602c92e8d2b95 Mon Sep 17 00:00:00 2001 From: William Wen Date: Tue, 31 Oct 2023 17:32:03 +0800 Subject: [PATCH 04/14] start epoch --- src/connector/src/sink/remote.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index e6d03f9ee9c47..43d6cbd45960d 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -268,6 +268,8 @@ impl LogSinker for RemoteLogSinker { let offset = TruncateOffset::Chunk { epoch, chunk_id }; if let Some(prev_offset) = &prev_offset { prev_offset.check_next_offset(offset)?; + } else { + request_tx.start_epoch(epoch).await?; } let cardinality = chunk.cardinality(); sink_metrics @@ -284,6 +286,9 @@ impl LogSinker for RemoteLogSinker { let offset = TruncateOffset::Barrier { epoch }; if let Some(prev_offset) = &prev_offset { prev_offset.check_next_offset(offset)?; + } else { + // TODO: this start epoch is actually unnecessary + request_tx.start_epoch(epoch).await?; } if is_checkpoint { let start_time = Instant::now(); From 37615402a4f5a45ff3ec455fb737fe234740430e Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 1 Nov 2023 00:54:16 +0800 Subject: [PATCH 05/14] remove RemoteSinkWriter --- src/connector/src/sink/remote.rs | 202 +++++++++++++------------------ 1 file changed, 85 insertions(+), 117 deletions(-) diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 3ba258ae98635..bfeb08cc19946 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -41,7 +41,6 @@ use risingwave_pb::connector_service::sink_writer_stream_request::write_batch::{ use risingwave_pb::connector_service::sink_writer_stream_request::{ Request as SinkRequest, StartSink, }; -use risingwave_pb::connector_service::sink_writer_stream_response::CommitResponse; use risingwave_pb::connector_service::{ sink_coordinator_stream_request, sink_coordinator_stream_response, sink_writer_stream_response, SinkCoordinatorStreamRequest, SinkCoordinatorStreamResponse, SinkMetadata, SinkPayloadFormat, @@ -55,7 +54,6 @@ use risingwave_rpc_client::{ use tokio::sync::mpsc; use tokio::sync::mpsc::{unbounded_channel, Receiver, Sender}; use tokio_stream::wrappers::ReceiverStream; -use tracing::warn; use crate::sink::coordinate::CoordinatedSinkWriter; use crate::sink::log_store::{LogReader, LogStoreReadItem, TruncateOffset}; @@ -121,9 +119,15 @@ impl Sink for RemoteSink { } async fn validate(&self) -> Result<()> { - // FIXME: support struct and array in stream sink - self.param.columns.iter().map(|col| { - if matches!( + validate_remote_sink(&self.param).await + } +} + +#[expect(clippy::unused_async)] +async fn validate_remote_sink(param: &SinkParam) -> Result<()> { + // FIXME: support struct and array in stream sink + param.columns.iter().map(|col| { + if matches!( col.data_type, DataType::Int16 | DataType::Int32 @@ -142,58 +146,58 @@ impl Sink for RemoteSink { | DataType::Bytea | DataType::List(_) ) { - Ok(()) - } else { - Err(SinkError::Remote(anyhow_error!( + Ok(()) + } else { + Err(SinkError::Remote(anyhow_error!( "remote sink supports Int16, Int32, Int64, Float32, Float64, Boolean, Decimal, Time, Date, Interval, Jsonb, Timestamp, Timestamptz, List, Bytea and Varchar, got {:?}: {:?}", col.name, col.data_type, ))) - } - }).try_collect()?; - - let mut env = JVM - .get_or_init()? - .attach_current_thread() - .map_err(|err| SinkError::Internal(err.into()))?; - let validate_sink_request = ValidateSinkRequest { - sink_param: Some(self.param.to_proto()), - }; - let validate_sink_request_bytes = env - .byte_array_from_slice(&Message::encode_to_vec(&validate_sink_request)) - .map_err(|err| SinkError::Internal(err.into()))?; - - let response = env - .call_static_method( - "com/risingwave/connector/JniSinkValidationHandler", - "validate", - "([B)[B", - &[JValue::Object(&validate_sink_request_bytes)], - ) - .map_err(|err| SinkError::Internal(err.into()))?; + } + }).try_collect()?; - let validate_sink_response_bytes = match response { - JValueOwned::Object(o) => unsafe { JByteArray::from_raw(o.into_raw()) }, - _ => unreachable!(), - }; + // TODO: use spawn_blocking to avoid blocking the tokio worker thread + let mut env = JVM + .get_or_init()? + .attach_current_thread() + .map_err(|err| SinkError::Internal(err.into()))?; + let validate_sink_request = ValidateSinkRequest { + sink_param: Some(param.to_proto()), + }; + let validate_sink_request_bytes = env + .byte_array_from_slice(&Message::encode_to_vec(&validate_sink_request)) + .map_err(|err| SinkError::Internal(err.into()))?; - let validate_sink_response: ValidateSinkResponse = Message::decode( - risingwave_jni_core::to_guarded_slice(&validate_sink_response_bytes, &mut env) - .map_err(|err| SinkError::Internal(err.into()))? - .deref(), + let response = env + .call_static_method( + "com/risingwave/connector/JniSinkValidationHandler", + "validate", + "([B)[B", + &[JValue::Object(&validate_sink_request_bytes)], ) .map_err(|err| SinkError::Internal(err.into()))?; - validate_sink_response.error.map_or_else( - || Ok(()), // If there is no error message, return Ok here. - |err| { - Err(SinkError::Remote(anyhow!(format!( - "sink cannot pass validation: {}", - err.error_message - )))) - }, - ) - } + let validate_sink_response_bytes = match response { + JValueOwned::Object(o) => unsafe { JByteArray::from_raw(o.into_raw()) }, + _ => unreachable!(), + }; + + let validate_sink_response: ValidateSinkResponse = Message::decode( + risingwave_jni_core::to_guarded_slice(&validate_sink_response_bytes, &mut env) + .map_err(|err| SinkError::Internal(err.into()))? + .deref(), + ) + .map_err(|err| SinkError::Internal(err.into()))?; + + validate_sink_response.error.map_or_else( + || Ok(()), // If there is no error message, return Ok here. + |err| { + Err(SinkError::Remote(anyhow!(format!( + "sink cannot pass validation: {}", + err.error_message + )))) + }, + ) } pub struct RemoteLogSinker { @@ -332,13 +336,19 @@ impl LogSinker for RemoteLogSinker { } #[derive(Debug)] -pub struct CoordinatedRemoteSink(pub RemoteSink); +pub struct CoordinatedRemoteSink { + param: SinkParam, + _phantom: PhantomData, +} impl TryFrom for CoordinatedRemoteSink { type Error = SinkError; fn try_from(param: SinkParam) -> std::result::Result { - RemoteSink::try_from(param).map(Self) + Ok(Self { + param, + _phantom: PhantomData, + }) } } @@ -349,7 +359,7 @@ impl Sink for CoordinatedRemoteSink { const SINK_NAME: &'static str = R::SINK_NAME; async fn validate(&self) -> Result<()> { - self.0.validate().await + validate_remote_sink(&self.param).await } async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { @@ -359,14 +369,14 @@ impl Sink for CoordinatedRemoteSink { .expect("should have meta client") .sink_coordinate_client() .await, - self.0.param.clone(), + self.param.clone(), writer_param.vnode_bitmap.ok_or_else(|| { SinkError::Remote(anyhow_error!( "sink needs coordination should not have singleton input" )) })?, CoordinatedRemoteSinkWriter::new( - self.0.param.clone(), + self.param.clone(), writer_param.connector_params, writer_param.sink_metrics.clone(), ) @@ -377,23 +387,19 @@ impl Sink for CoordinatedRemoteSink { } async fn new_coordinator(&self) -> Result { - RemoteCoordinator::new::(self.0.param.clone()).await + RemoteCoordinator::new::(self.param.clone()).await } } -pub type RemoteSinkWriter = RemoteSinkWriterInner<()>; -pub type CoordinatedRemoteSinkWriter = RemoteSinkWriterInner>; - -pub struct RemoteSinkWriterInner { +pub struct CoordinatedRemoteSinkWriter { properties: HashMap, epoch: Option, batch_id: u64, stream_handle: SinkWriterStreamHandle, sink_metrics: SinkMetrics, - _phantom: PhantomData, } -impl RemoteSinkWriterInner { +impl CoordinatedRemoteSinkWriter { pub async fn new( param: SinkParam, connector_params: ConnectorParams, @@ -409,14 +415,13 @@ impl RemoteSinkWriterInner { batch_id: 0, stream_handle, sink_metrics, - _phantom: PhantomData, }) } fn for_test( response_receiver: Receiver>, request_sender: Sender, - ) -> RemoteSinkWriter { + ) -> CoordinatedRemoteSinkWriter { let properties = HashMap::from([("output.path".to_string(), "/tmp/rw".to_string())]); let stream_handle = SinkWriterStreamHandle::for_test( @@ -426,51 +431,13 @@ impl RemoteSinkWriterInner { .boxed(), ); - RemoteSinkWriter { + CoordinatedRemoteSinkWriter { properties, epoch: None, batch_id: 0, stream_handle, sink_metrics: SinkMetrics::for_test(), - _phantom: PhantomData, - } - } -} - -trait HandleBarrierResponse { - type SinkMetadata: Send; - fn handle_commit_response(rsp: CommitResponse) -> Result; - fn non_checkpoint_return_value() -> Self::SinkMetadata; -} - -impl HandleBarrierResponse for RemoteSinkWriter { - type SinkMetadata = (); - - fn handle_commit_response(rsp: CommitResponse) -> Result { - if rsp.metadata.is_some() { - warn!("get metadata in commit response for non-coordinated remote sink writer"); } - Ok(()) - } - - fn non_checkpoint_return_value() -> Self::SinkMetadata {} -} - -impl HandleBarrierResponse for CoordinatedRemoteSinkWriter { - type SinkMetadata = Option; - - fn handle_commit_response(rsp: CommitResponse) -> Result { - rsp.metadata - .ok_or_else(|| { - SinkError::Remote(anyhow_error!( - "get none metadata in commit response for coordinated sink writer" - )) - }) - .map(Some) - } - - fn non_checkpoint_return_value() -> Self::SinkMetadata { - None } } @@ -481,11 +448,8 @@ fn build_chunk_payload(chunk: StreamChunk) -> Payload { } #[async_trait] -impl SinkWriter for RemoteSinkWriterInner -where - Self: HandleBarrierResponse, -{ - type CommitMetadata = SM; +impl SinkWriter for CoordinatedRemoteSinkWriter { + type CommitMetadata = Option; async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { let cardinality = chunk.cardinality(); @@ -514,7 +478,7 @@ where Ok(()) } - async fn barrier(&mut self, is_checkpoint: bool) -> Result { + async fn barrier(&mut self, is_checkpoint: bool) -> Result> { let epoch = self.epoch.ok_or_else(|| { SinkError::Remote(anyhow_error!( "epoch has not been initialize, call `begin_epoch`" @@ -523,12 +487,16 @@ where if is_checkpoint { // TODO: add metrics to measure commit time let rsp = self.stream_handle.commit(epoch).await?; - Ok(::handle_commit_response( - rsp, - )?) + rsp.metadata + .ok_or_else(|| { + SinkError::Remote(anyhow_error!( + "get none metadata in commit response for coordinated sink writer" + )) + }) + .map(Some) } else { self.stream_handle.barrier(epoch).await?; - Ok(::non_checkpoint_return_value()) + Ok(None) } } } @@ -608,7 +576,7 @@ impl EmbeddedConnectorClient { } } - pub async fn start_sink_coordinator_stream( + async fn start_sink_coordinator_stream( &self, param: SinkParam, ) -> Result { @@ -696,7 +664,7 @@ mod test { use risingwave_pb::connector_service::{SinkWriterStreamRequest, SinkWriterStreamResponse}; use tokio::sync::mpsc; - use crate::sink::remote::{build_chunk_payload, RemoteSinkWriter}; + use crate::sink::remote::{build_chunk_payload, CoordinatedRemoteSinkWriter}; use crate::sink::SinkWriter; #[tokio::test] @@ -704,7 +672,7 @@ mod test { let (request_sender, mut request_recv) = mpsc::channel(16); let (_, resp_recv) = mpsc::channel(16); - let mut sink = RemoteSinkWriter::for_test(resp_recv, request_sender); + let mut sink = CoordinatedRemoteSinkWriter::for_test(resp_recv, request_sender); let chunk = StreamChunk::from_pretty( " i T + 1 Ripper @@ -741,7 +709,7 @@ mod test { async fn test_remote_sink() { let (request_sender, mut request_receiver) = mpsc::channel(16); let (response_sender, response_receiver) = mpsc::channel(16); - let mut sink = RemoteSinkWriter::for_test(response_receiver, request_sender); + let mut sink = CoordinatedRemoteSinkWriter::for_test(response_receiver, request_sender); let chunk_a = StreamChunk::from_pretty( " i T @@ -791,12 +759,12 @@ mod test { })) .await .expect("test failed: failed to sync epoch"); - sink.barrier(true).await.unwrap(); + sink.barrier(false).await.unwrap(); let commit_request = request_receiver.recv().await.unwrap(); match commit_request.request { Some(Request::Barrier(Barrier { epoch, - is_checkpoint: true, + is_checkpoint: false, })) => { assert_eq!(epoch, 2022); } From 5381129c57188d2ff11d2a49cfd7343509b0ccb5 Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 1 Nov 2023 10:48:06 +0800 Subject: [PATCH 06/14] feat(sink): support jdbc sink async truncate --- .../connector/api/sink/SinkWriter.java | 2 +- .../connector/api/sink/SinkWriterV1.java | 3 +- .../connector/SinkWriterStreamObserver.java | 22 +- .../AppendOnlyIcebergSinkWriter.java | 3 +- .../connector/UpsertIcebergSinkWriter.java | 3 +- proto/connector_service.proto | 6 + src/connector/src/sink/iceberg.rs | 1 + src/connector/src/sink/remote.rs | 233 +++++++++++++----- 8 files changed, 203 insertions(+), 70 deletions(-) diff --git a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/SinkWriter.java b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/SinkWriter.java index df7c20b30474f..0192dbfe28b2b 100644 --- a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/SinkWriter.java +++ b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/SinkWriter.java @@ -23,7 +23,7 @@ public interface SinkWriter { void beginEpoch(long epoch); - void write(Iterator rows); + boolean write(Iterator rows); Optional barrier(boolean isCheckpoint); diff --git a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/SinkWriterV1.java b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/SinkWriterV1.java index dae28c9ab13f2..a605c4e063be1 100644 --- a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/SinkWriterV1.java +++ b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/SinkWriterV1.java @@ -43,11 +43,12 @@ public SinkWriterV1 getInner() { public void beginEpoch(long epoch) {} @Override - public void write(Iterator rows) { + public boolean write(Iterator rows) { if (!hasBegun) { hasBegun = true; } this.inner.write(rows); + return false; } @Override diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java index 5d5bf482384a3..1b43791f41133 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java @@ -125,13 +125,29 @@ public void onNext(ConnectorServiceProto.SinkWriterStreamRequest sinkTask) { .asRuntimeException(); } + boolean batchWritten; + try (CloseableIterator rowIter = deserializer.deserialize(batch)) { - sink.write( - new MonitoredRowIterator( - rowIter, connectorName, String.valueOf(sinkId))); + batchWritten = + sink.write( + new MonitoredRowIterator( + rowIter, connectorName, String.valueOf(sinkId))); } currentBatchId = batch.getBatchId(); + + if (batchWritten) { + responseObserver.onNext( + ConnectorServiceProto.SinkWriterStreamResponse.newBuilder() + .setBatch( + ConnectorServiceProto.SinkWriterStreamResponse + .BatchWrittenResponse.newBuilder() + .setEpoch(currentEpoch) + .setBatchId(currentBatchId) + .build()) + .build()); + } + LOG.debug("Batch {} written to epoch {}", currentBatchId, batch.getEpoch()); } else if (sinkTask.hasBarrier()) { if (!isInitialized()) { diff --git a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/AppendOnlyIcebergSinkWriter.java b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/AppendOnlyIcebergSinkWriter.java index 6b60eedd23d37..f2e70cc9cc5c1 100644 --- a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/AppendOnlyIcebergSinkWriter.java +++ b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/AppendOnlyIcebergSinkWriter.java @@ -53,7 +53,7 @@ public AppendOnlyIcebergSinkWriter( } @Override - public void write(Iterator rows) { + public boolean write(Iterator rows) { while (rows.hasNext()) { SinkRow row = rows.next(); switch (row.getOp()) { @@ -108,6 +108,7 @@ public void write(Iterator rows) { .asRuntimeException(); } } + return false; } @Override diff --git a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/UpsertIcebergSinkWriter.java b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/UpsertIcebergSinkWriter.java index e1d649f028bf8..0ffb8ae36b820 100644 --- a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/UpsertIcebergSinkWriter.java +++ b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/UpsertIcebergSinkWriter.java @@ -140,7 +140,7 @@ private List> getKeyFromRow(SinkRow row) { } @Override - public void write(Iterator rows) { + public boolean write(Iterator rows) { while (rows.hasNext()) { SinkRow row = rows.next(); if (row.size() != tableSchema.getColumnNames().length) { @@ -190,6 +190,7 @@ public void write(Iterator rows) { .asRuntimeException(); } } + return false; } @Override diff --git a/proto/connector_service.proto b/proto/connector_service.proto index e750c8ce96e9d..ac19e14502c53 100644 --- a/proto/connector_service.proto +++ b/proto/connector_service.proto @@ -88,9 +88,15 @@ message SinkWriterStreamResponse { SinkMetadata metadata = 2; } + message BatchWrittenResponse { + uint64 epoch = 1; + uint64 batch_id = 2; + } + oneof response { StartResponse start = 1; CommitResponse commit = 2; + BatchWrittenResponse batch = 3; } } diff --git a/src/connector/src/sink/iceberg.rs b/src/connector/src/sink/iceberg.rs index 1031c5181d81e..cde0b4b508236 100644 --- a/src/connector/src/sink/iceberg.rs +++ b/src/connector/src/sink/iceberg.rs @@ -55,6 +55,7 @@ pub const REMOTE_ICEBERG_SINK: &str = "iceberg_java"; pub struct RemoteIceberg; impl RemoteSinkTrait for RemoteIceberg { + const IS_ASYNC_TRUNCATE: bool = false; const SINK_NAME: &'static str = REMOTE_ICEBERG_SINK; } diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index bfeb08cc19946..f2cb3402677cd 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::any::Any; -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::future::Future; use std::marker::PhantomData; use std::ops::Deref; @@ -23,7 +23,7 @@ use std::time::Instant; use anyhow::anyhow; use async_trait::async_trait; use futures::future::select; -use futures::{StreamExt, TryFutureExt, TryStreamExt}; +use futures::{StreamExt, TryStreamExt}; use itertools::Itertools; use jni::objects::{JByteArray, JValue, JValueOwned}; use jni::JavaVM; @@ -31,7 +31,7 @@ use prost::Message; use risingwave_common::array::StreamChunk; use risingwave_common::error::anyhow_error; use risingwave_common::types::DataType; -use risingwave_common::util::await_future_with_monitor_error_stream; +use risingwave_common::util::{await_future_with_monitor_error_stream, drop_either_future}; use risingwave_jni_core::jvm_runtime::JVM; use risingwave_jni_core::{gen_class_name, gen_jni_sig, JniReceiverType, JniSenderType}; use risingwave_pb::connector_service::sink_coordinator_stream_request::StartCoordinator; @@ -56,7 +56,7 @@ use tokio::sync::mpsc::{unbounded_channel, Receiver, Sender}; use tokio_stream::wrappers::ReceiverStream; use crate::sink::coordinate::CoordinatedSinkWriter; -use crate::sink::log_store::{LogReader, LogStoreReadItem, TruncateOffset}; +use crate::sink::log_store::{ChunkId, LogReader, LogStoreReadItem, TruncateOffset}; use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt}; use crate::sink::{ DummySinkCommitCoordinator, LogSinker, Result, Sink, SinkCommitCoordinator, SinkError, @@ -67,18 +67,19 @@ use crate::ConnectorParams; macro_rules! def_remote_sink { () => { def_remote_sink! { - { ElasticSearch, ElasticSearchSink, "elasticsearch" }, - { Cassandra, CassandraSink, "cassandra" }, - { Jdbc, JdbcSink, "jdbc" }, - { DeltaLake, DeltaLakeSink, "deltalake" } + { ElasticSearch, ElasticSearchSink, "elasticsearch", false }, + { Cassandra, CassandraSink, "cassandra", false }, + { Jdbc, JdbcSink, "jdbc", true }, + { DeltaLake, DeltaLakeSink, "deltalake", false } } }; - ($({ $variant_name:ident, $sink_type_name:ident, $sink_name:expr }),*) => { + ($({ $variant_name:ident, $sink_type_name:ident, $sink_name:expr, $is_async_truncate:expr }),*) => { $( #[derive(Debug)] pub struct $variant_name; impl RemoteSinkTrait for $variant_name { const SINK_NAME: &'static str = $sink_name; + const IS_ASYNC_TRUNCATE: bool = $is_async_truncate; } pub type $sink_type_name = RemoteSink<$variant_name>; )* @@ -89,6 +90,7 @@ def_remote_sink!(); pub trait RemoteSinkTrait: Send + Sync + 'static { const SINK_NAME: &'static str; + const IS_ASYNC_TRUNCATE: bool; } #[derive(Debug)] @@ -115,7 +117,7 @@ impl Sink for RemoteSink { const SINK_NAME: &'static str = R::SINK_NAME; async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { - RemoteLogSinker::new(self.param.clone(), writer_param).await + RemoteLogSinker::new(self.param.clone(), writer_param, R::IS_ASYNC_TRUNCATE).await } async fn validate(&self) -> Result<()> { @@ -204,10 +206,15 @@ pub struct RemoteLogSinker { request_sender: BidiStreamSender, response_stream: BidiStreamReceiver, sink_metrics: SinkMetrics, + is_async_truncate: bool, } impl RemoteLogSinker { - async fn new(sink_param: SinkParam, writer_param: SinkWriterParam) -> Result { + async fn new( + sink_param: SinkParam, + writer_param: SinkWriterParam, + is_async_truncate: bool, + ) -> Result { let SinkWriterStreamHandle { request_sender, response_stream, @@ -220,6 +227,7 @@ impl RemoteLogSinker { request_sender, response_stream, sink_metrics, + is_async_truncate, }) } } @@ -241,6 +249,7 @@ impl LogSinker for RemoteLogSinker { async fn consume_log_and_sink(self, mut log_reader: impl LogReader) -> Result<()> { let mut request_tx = self.request_sender; let mut response_err_stream_rx = self.response_stream; + let is_async_truncate = self.is_async_truncate; let sink_metrics = self.sink_metrics; let (response_tx, mut response_rx) = unbounded_channel(); @@ -262,68 +271,166 @@ impl LogSinker for RemoteLogSinker { let poll_consume_log_and_sink = pin!(async move { let mut prev_offset: Option = None; + let mut sent_chunk_id_queue: VecDeque<(u64, ChunkId)> = VecDeque::new(); log_reader.init().await?; + async fn truncate_matched_chunk_id( + queue: &mut VecDeque<(u64, ChunkId)>, + (epoch, batch_id): (u64, u64), + log_reader: &mut impl LogReader, + ) -> Result<()> { + let (prev_epoch, prev_batch_id) = queue.pop_front().ok_or_else(|| { + anyhow!("batch {} {} is not buffered for response", epoch, batch_id) + })?; + if prev_epoch != epoch || prev_batch_id != batch_id as ChunkId { + return Err(SinkError::Remote(anyhow!( + "epoch and batch_id not match the first buffered one" + ))); + } + log_reader + .truncate(TruncateOffset::Chunk { + epoch, + chunk_id: prev_batch_id, + }) + .await?; + Ok(()) + } + loop { - let (epoch, item): (u64, LogStoreReadItem) = - log_reader.next_item().map_err(SinkError::Internal).await?; - - match item { - LogStoreReadItem::StreamChunk { chunk, chunk_id } => { - let offset = TruncateOffset::Chunk { epoch, chunk_id }; - if let Some(prev_offset) = &prev_offset { - prev_offset.check_next_offset(offset)?; - } else { - request_tx.start_epoch(epoch).await?; - } - let cardinality = chunk.cardinality(); - sink_metrics - .connector_sink_rows_received - .inc_by(cardinality as _); - - let payload = build_chunk_payload(chunk); - request_tx - .write_batch(epoch, chunk_id as u64, payload) - .await?; - prev_offset = Some(offset); - } - LogStoreReadItem::Barrier { is_checkpoint } => { - let offset = TruncateOffset::Barrier { epoch }; - if let Some(prev_offset) = &prev_offset { - prev_offset.check_next_offset(offset)?; - } else { - // TODO: this start epoch is actually unnecessary - request_tx.start_epoch(epoch).await?; - } - if is_checkpoint { - let start_time = Instant::now(); - request_tx.barrier(epoch, true).await?; - match response_rx.recv().await.ok_or_else(|| { - SinkError::Remote(anyhow!("end of response stream")) - })? { - SinkWriterStreamResponse { - response: Some(sink_writer_stream_response::Response::Commit(_)), - } => {} - response => { + let either_result: futures::future::Either< + Option, + anyhow::Result<(u64, LogStoreReadItem)>, + > = drop_either_future( + select(pin!(response_rx.recv()), pin!(log_reader.next_item())).await, + ); + match either_result { + futures::future::Either::Left(opt) => { + let response = opt.ok_or_else(|| anyhow!("end of response stream"))?; + match response { + SinkWriterStreamResponse { + response: + Some(sink_writer_stream_response::Response::Batch( + sink_writer_stream_response::BatchWrittenResponse { + epoch, + batch_id, + }, + )), + } => { + if !is_async_truncate { return Err(SinkError::Remote(anyhow!( - "expected commit response, but get {:?}", - response + "batch written response only for async truncate" ))); } - }; - sink_metrics - .sink_commit_duration_metrics - .observe(start_time.elapsed().as_millis() as f64); - log_reader - .truncate(TruncateOffset::Barrier { epoch }) + truncate_matched_chunk_id( + &mut sent_chunk_id_queue, + (epoch, batch_id), + &mut log_reader, + ) .await?; - } else { - request_tx.barrier(epoch, false).await?; + } + response => { + return Err(SinkError::Remote(anyhow!( + "expected batch written response, but get {:?}", + response + ))); + } + } + } + futures::future::Either::Right(result) => { + let (epoch, item): (u64, LogStoreReadItem) = result?; + match item { + LogStoreReadItem::StreamChunk { chunk, chunk_id } => { + let offset = TruncateOffset::Chunk { epoch, chunk_id }; + if let Some(prev_offset) = &prev_offset { + prev_offset.check_next_offset(offset)?; + } else { + request_tx.start_epoch(epoch).await?; + } + let cardinality = chunk.cardinality(); + sink_metrics + .connector_sink_rows_received + .inc_by(cardinality as _); + + let payload = build_chunk_payload(chunk); + request_tx + .write_batch(epoch, chunk_id as u64, payload) + .await?; + prev_offset = Some(offset); + if is_async_truncate { + sent_chunk_id_queue.push_back((epoch, chunk_id)); + } + } + LogStoreReadItem::Barrier { is_checkpoint } => { + let offset = TruncateOffset::Barrier { epoch }; + if let Some(prev_offset) = &prev_offset { + prev_offset.check_next_offset(offset)?; + } else { + // TODO: this start epoch is actually unnecessary + request_tx.start_epoch(epoch).await?; + } + if is_checkpoint { + let start_time = Instant::now(); + request_tx.barrier(epoch, true).await?; + // waiting for previous response + loop { + match response_rx.recv().await.ok_or_else(|| { + SinkError::Remote(anyhow!("end of response stream")) + })? { + SinkWriterStreamResponse { + response: + Some(sink_writer_stream_response::Response::Commit( + _, + )), + } => { + if is_async_truncate && !sent_chunk_id_queue.is_empty() { + return Err(SinkError::Remote(anyhow!("get commit response with chunk not acked: {:?}", sent_chunk_id_queue))); + } + break; + } + SinkWriterStreamResponse { + response: + Some(sink_writer_stream_response::Response::Batch( + sink_writer_stream_response::BatchWrittenResponse { + epoch, + batch_id, + }, + )), + } => { + if !is_async_truncate { + return Err(SinkError::Remote(anyhow!( + "batch written response only for async truncate" + ))); + } + truncate_matched_chunk_id( + &mut sent_chunk_id_queue, + (epoch, batch_id), + &mut log_reader, + ) + .await?; + } + response => { + return Err(SinkError::Remote(anyhow!( + "expected commit or batch written response, but get {:?}", + response + ))); + } + }; + } + sink_metrics + .sink_commit_duration_metrics + .observe(start_time.elapsed().as_millis() as f64); + log_reader + .truncate(TruncateOffset::Barrier { epoch }) + .await?; + } else { + request_tx.barrier(epoch, false).await?; + } + prev_offset = Some(offset); + } + LogStoreReadItem::UpdateVnodeBitmap(_) => {} } - prev_offset = Some(offset); } - LogStoreReadItem::UpdateVnodeBitmap(_) => {} } } }); From c03963e3c68c156d94e34426211b363a657b305e Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 2 Nov 2023 17:24:34 +0800 Subject: [PATCH 07/14] start epoch also for state just receive barrier --- .../JniSinkWriterResponseObserver.java | 9 ++++-- .../connector/SinkWriterStreamObserver.java | 2 +- src/connector/src/sink/remote.rs | 28 ++++++------------- 3 files changed, 15 insertions(+), 24 deletions(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JniSinkWriterResponseObserver.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JniSinkWriterResponseObserver.java index 735d85a06c1fc..0bb3d686cf82f 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JniSinkWriterResponseObserver.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JniSinkWriterResponseObserver.java @@ -16,6 +16,7 @@ import com.risingwave.java.binding.Binding; import com.risingwave.proto.ConnectorServiceProto; +import io.grpc.Status; import io.grpc.stub.StreamObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,7 +26,7 @@ public class JniSinkWriterResponseObserver private static final Logger LOG = LoggerFactory.getLogger(JniSinkWriterResponseObserver.class); private long responseTxPtr; - private boolean success; + private boolean success = true; public JniSinkWriterResponseObserver(long responseTxPtr) { this.responseTxPtr = responseTxPtr; @@ -33,12 +34,14 @@ public JniSinkWriterResponseObserver(long responseTxPtr) { @Override public void onNext(ConnectorServiceProto.SinkWriterStreamResponse response) { - this.success = - Binding.sendSinkWriterResponseToChannel(this.responseTxPtr, response.toByteArray()); + if (Binding.sendSinkWriterResponseToChannel(this.responseTxPtr, response.toByteArray())) { + throw Status.INTERNAL.withDescription("unable to send response").asRuntimeException(); + } } @Override public void onError(Throwable throwable) { + this.success = false; LOG.error("JniSinkWriterHandler onError: ", throwable); } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java index 5d5bf482384a3..c0f7ea73ea82c 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java @@ -172,7 +172,7 @@ public void onNext(ConnectorServiceProto.SinkWriterStreamRequest sinkTask) { } else { throw INVALID_ARGUMENT.withDescription("invalid sink task").asRuntimeException(); } - } catch (Exception e) { + } catch (Throwable e) { LOG.error("sink writer error: ", e); cleanup(); responseObserver.onError(e); diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index bfeb08cc19946..258bce34655d5 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -12,9 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::any::Any; use std::collections::HashMap; -use std::future::Future; use std::marker::PhantomData; use std::ops::Deref; use std::pin::pin; @@ -31,7 +29,6 @@ use prost::Message; use risingwave_common::array::StreamChunk; use risingwave_common::error::anyhow_error; use risingwave_common::types::DataType; -use risingwave_common::util::await_future_with_monitor_error_stream; use risingwave_jni_core::jvm_runtime::JVM; use risingwave_jni_core::{gen_class_name, gen_jni_sig, JniReceiverType, JniSenderType}; use risingwave_pb::connector_service::sink_coordinator_stream_request::StartCoordinator; @@ -224,18 +221,6 @@ impl RemoteLogSinker { } } -/// Await the given future while monitoring on error of the receiver stream. -async fn await_future_with_monitor_receiver_err>>( - receiver: &mut BidiStreamReceiver, - future: F, -) -> Result { - match await_future_with_monitor_error_stream(&mut receiver.stream, future).await { - Ok(result) => Ok(result?), - Err(None) => Err(SinkError::Remote(anyhow!("end of remote receiver stream"))), - Err(Some(err)) => Err(SinkError::Remote(err.into())), - } -} - #[async_trait] impl LogSinker for RemoteLogSinker { async fn consume_log_and_sink(self, mut log_reader: impl LogReader) -> Result<()> { @@ -269,13 +254,19 @@ impl LogSinker for RemoteLogSinker { let (epoch, item): (u64, LogStoreReadItem) = log_reader.next_item().map_err(SinkError::Internal).await?; + match &prev_offset { + Some(TruncateOffset::Barrier { .. }) | None => { + // TODO: this start epoch is actually unnecessary + request_tx.start_epoch(epoch).await?; + } + _ => {} + } + match item { LogStoreReadItem::StreamChunk { chunk, chunk_id } => { let offset = TruncateOffset::Chunk { epoch, chunk_id }; if let Some(prev_offset) = &prev_offset { prev_offset.check_next_offset(offset)?; - } else { - request_tx.start_epoch(epoch).await?; } let cardinality = chunk.cardinality(); sink_metrics @@ -292,9 +283,6 @@ impl LogSinker for RemoteLogSinker { let offset = TruncateOffset::Barrier { epoch }; if let Some(prev_offset) = &prev_offset { prev_offset.check_next_offset(offset)?; - } else { - // TODO: this start epoch is actually unnecessary - request_tx.start_epoch(epoch).await?; } if is_checkpoint { let start_time = Instant::now(); From c2b3ba0863a8f719ac7e50059f985a252dd541f4 Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 2 Nov 2023 18:21:35 +0800 Subject: [PATCH 08/14] fix --- .../com/risingwave/connector/JniSinkWriterResponseObserver.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JniSinkWriterResponseObserver.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JniSinkWriterResponseObserver.java index 8cb046b60b5ea..2f1ed7b496a35 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JniSinkWriterResponseObserver.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JniSinkWriterResponseObserver.java @@ -34,7 +34,7 @@ public JniSinkWriterResponseObserver(long responseTxPtr) { @Override public void onNext(ConnectorServiceProto.SinkWriterStreamResponse response) { - if (Binding.sendSinkWriterResponseToChannel(this.responseTxPtr, response.toByteArray())) { + if (!Binding.sendSinkWriterResponseToChannel(this.responseTxPtr, response.toByteArray())) { throw Status.INTERNAL.withDescription("unable to send response").asRuntimeException(); } } From 1449692b4489f9e1be0e7a6681e1aab21c7281d0 Mon Sep 17 00:00:00 2001 From: William Wen Date: Fri, 3 Nov 2023 14:02:07 +0800 Subject: [PATCH 09/14] fix comment --- .../connector/JniSinkCoordinatorResponseObserver.java | 11 +++++++---- .../connector/JniSinkWriterResponseObserver.java | 5 +++-- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JniSinkCoordinatorResponseObserver.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JniSinkCoordinatorResponseObserver.java index 2a04e23f1a0b2..790953a793521 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JniSinkCoordinatorResponseObserver.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JniSinkCoordinatorResponseObserver.java @@ -16,6 +16,7 @@ import com.risingwave.java.binding.Binding; import com.risingwave.proto.ConnectorServiceProto; +import io.grpc.Status; import io.grpc.stub.StreamObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,7 +27,7 @@ public class JniSinkCoordinatorResponseObserver LoggerFactory.getLogger(JniSinkCoordinatorResponseObserver.class); private long responseTxPtr; - private boolean success; + private boolean success = true; public JniSinkCoordinatorResponseObserver(long responseTxPtr) { this.responseTxPtr = responseTxPtr; @@ -34,13 +35,15 @@ public JniSinkCoordinatorResponseObserver(long responseTxPtr) { @Override public void onNext(ConnectorServiceProto.SinkCoordinatorStreamResponse response) { - this.success = - Binding.sendSinkCoordinatorResponseToChannel( - this.responseTxPtr, response.toByteArray()); + if (!Binding.sendSinkCoordinatorResponseToChannel( + this.responseTxPtr, response.toByteArray())) { + throw Status.INTERNAL.withDescription("unable to send response").asRuntimeException(); + } } @Override public void onError(Throwable throwable) { + this.success = false; LOG.error("JniSinkCoordinatorHandler onError: ", throwable); } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JniSinkWriterResponseObserver.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JniSinkWriterResponseObserver.java index 2f1ed7b496a35..895f995b144b2 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JniSinkWriterResponseObserver.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JniSinkWriterResponseObserver.java @@ -41,8 +41,9 @@ public void onNext(ConnectorServiceProto.SinkWriterStreamResponse response) { @Override public void onError(Throwable throwable) { - this.success = - Binding.sendSinkWriterErrorToChannel(this.responseTxPtr, throwable.getMessage()); + if (!Binding.sendSinkWriterErrorToChannel(this.responseTxPtr, throwable.getMessage())) { + LOG.warn("unable to send error: {}", throwable.getMessage()); + } this.success = false; LOG.error("JniSinkWriterHandler onError: ", throwable); } From 7d64ae57d3d5a9d503607a0bfcd9405753765845 Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 6 Nov 2023 15:00:17 +0800 Subject: [PATCH 10/14] remove is_async_truncate flag --- src/connector/src/sink/iceberg.rs | 1 - src/connector/src/sink/remote.rs | 190 +++++++++++++----------------- 2 files changed, 80 insertions(+), 111 deletions(-) diff --git a/src/connector/src/sink/iceberg.rs b/src/connector/src/sink/iceberg.rs index add3ac4cf200d..7fdbcc26e2693 100644 --- a/src/connector/src/sink/iceberg.rs +++ b/src/connector/src/sink/iceberg.rs @@ -55,7 +55,6 @@ pub const REMOTE_ICEBERG_SINK: &str = "iceberg_java"; pub struct RemoteIceberg; impl RemoteSinkTrait for RemoteIceberg { - const IS_ASYNC_TRUNCATE: bool = false; const SINK_NAME: &'static str = REMOTE_ICEBERG_SINK; } diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index e981c7ff26749..f53eff8831e47 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -53,9 +53,10 @@ use tokio::sync::mpsc; use tokio::sync::mpsc::{unbounded_channel, Receiver, Sender}; use tokio::task::spawn_blocking; use tokio_stream::wrappers::ReceiverStream; +use tracing::warn; use crate::sink::coordinate::CoordinatedSinkWriter; -use crate::sink::log_store::{ChunkId, LogReader, LogStoreReadItem, TruncateOffset}; +use crate::sink::log_store::{LogReader, LogStoreReadItem, TruncateOffset}; use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt}; use crate::sink::{ DummySinkCommitCoordinator, LogSinker, Result, Sink, SinkCommitCoordinator, SinkError, @@ -66,19 +67,18 @@ use crate::ConnectorParams; macro_rules! def_remote_sink { () => { def_remote_sink! { - { ElasticSearch, ElasticSearchSink, "elasticsearch", false }, - { Cassandra, CassandraSink, "cassandra", false }, - { Jdbc, JdbcSink, "jdbc", true }, - { DeltaLake, DeltaLakeSink, "deltalake", false } + { ElasticSearch, ElasticSearchSink, "elasticsearch" }, + { Cassandra, CassandraSink, "cassandra" }, + { Jdbc, JdbcSink, "jdbc" }, + { DeltaLake, DeltaLakeSink, "deltalake" } } }; - ($({ $variant_name:ident, $sink_type_name:ident, $sink_name:expr, $is_async_truncate:expr }),*) => { + ($({ $variant_name:ident, $sink_type_name:ident, $sink_name:expr }),*) => { $( #[derive(Debug)] pub struct $variant_name; impl RemoteSinkTrait for $variant_name { const SINK_NAME: &'static str = $sink_name; - const IS_ASYNC_TRUNCATE: bool = $is_async_truncate; } pub type $sink_type_name = RemoteSink<$variant_name>; )* @@ -89,7 +89,6 @@ def_remote_sink!(); pub trait RemoteSinkTrait: Send + Sync + 'static { const SINK_NAME: &'static str; - const IS_ASYNC_TRUNCATE: bool; } #[derive(Debug)] @@ -116,7 +115,7 @@ impl Sink for RemoteSink { const SINK_NAME: &'static str = R::SINK_NAME; async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { - RemoteLogSinker::new(self.param.clone(), writer_param, R::IS_ASYNC_TRUNCATE).await + RemoteLogSinker::new(self.param.clone(), writer_param).await } async fn validate(&self) -> Result<()> { @@ -209,15 +208,10 @@ pub struct RemoteLogSinker { request_sender: BidiStreamSender, response_stream: BidiStreamReceiver, sink_metrics: SinkMetrics, - is_async_truncate: bool, } impl RemoteLogSinker { - async fn new( - sink_param: SinkParam, - writer_param: SinkWriterParam, - is_async_truncate: bool, - ) -> Result { + async fn new(sink_param: SinkParam, writer_param: SinkWriterParam) -> Result { let SinkWriterStreamHandle { request_sender, response_stream, @@ -230,7 +224,6 @@ impl RemoteLogSinker { request_sender, response_stream, sink_metrics, - is_async_truncate, }) } } @@ -240,12 +233,11 @@ impl LogSinker for RemoteLogSinker { async fn consume_log_and_sink(self, mut log_reader: impl LogReader) -> Result<()> { let mut request_tx = self.request_sender; let mut response_err_stream_rx = self.response_stream; - let is_async_truncate = self.is_async_truncate; let sink_metrics = self.sink_metrics; let (response_tx, mut response_rx) = unbounded_channel(); - let poll_response_stream = pin!(async move { + let poll_response_stream = async move { loop { let result = response_err_stream_rx.stream.try_next().await; match result { @@ -258,36 +250,48 @@ impl LogSinker for RemoteLogSinker { Err(e) => return Err(SinkError::Remote(anyhow!(e))), } } - }); - - let poll_consume_log_and_sink = pin!(async move { - let mut prev_offset: Option = None; - let mut sent_chunk_id_queue: VecDeque<(u64, ChunkId)> = VecDeque::new(); + }; + let poll_consume_log_and_sink = async move { log_reader.init().await?; - async fn truncate_matched_chunk_id( - queue: &mut VecDeque<(u64, ChunkId)>, - (epoch, batch_id): (u64, u64), + async fn truncate_matched_offset( + queue: &mut VecDeque<(TruncateOffset, Option)>, + offset: TruncateOffset, log_reader: &mut impl LogReader, + metrics: &SinkMetrics, ) -> Result<()> { - let (prev_epoch, prev_batch_id) = queue.pop_front().ok_or_else(|| { - anyhow!("batch {} {} is not buffered for response", epoch, batch_id) - })?; - if prev_epoch != epoch || prev_batch_id != batch_id as ChunkId { - return Err(SinkError::Remote(anyhow!( - "epoch and batch_id not match the first buffered one" - ))); + while let Some((sent_offset, _)) = queue.front() && sent_offset < &offset { + queue.pop_front(); + } + + let (sent_offset, start_time) = queue + .pop_front() + .ok_or_else(|| anyhow!("get unsent offset {:?} in response", offset))?; + if sent_offset != offset { + return Err(anyhow!( + "new response offset {:?} not match the buffer offset {:?}", + offset, + sent_offset + ) + .into()); } - log_reader - .truncate(TruncateOffset::Chunk { - epoch, - chunk_id: prev_batch_id, - }) - .await?; + + if let (TruncateOffset::Barrier { .. }, Some(start_time)) = (offset, start_time) { + metrics + .sink_commit_duration_metrics + .observe(start_time.elapsed().as_millis() as f64); + } + + log_reader.truncate(offset).await?; Ok(()) } + let mut prev_offset: Option = None; + // Push from back and pop from front + let mut sent_offset_queue: VecDeque<(TruncateOffset, Option)> = + VecDeque::new(); + loop { let either_result: futures::future::Either< Option, @@ -308,21 +312,40 @@ impl LogSinker for RemoteLogSinker { }, )), } => { - if !is_async_truncate { - return Err(SinkError::Remote(anyhow!( - "batch written response only for async truncate" - ))); + truncate_matched_offset( + &mut sent_offset_queue, + TruncateOffset::Chunk { + epoch, + chunk_id: batch_id as _, + }, + &mut log_reader, + &sink_metrics, + ) + .await?; + } + SinkWriterStreamResponse { + response: + Some(sink_writer_stream_response::Response::Commit( + sink_writer_stream_response::CommitResponse { + epoch, + metadata, + }, + )), + } => { + if let Some(metadata) = metadata { + warn!("get unexpected non-empty metadata: {:?}", metadata); } - truncate_matched_chunk_id( - &mut sent_chunk_id_queue, - (epoch, batch_id), + truncate_matched_offset( + &mut sent_offset_queue, + TruncateOffset::Barrier { epoch }, &mut log_reader, + &sink_metrics, ) .await?; } response => { return Err(SinkError::Remote(anyhow!( - "expected batch written response, but get {:?}", + "get unexpected response: {:?}", response ))); } @@ -344,8 +367,6 @@ impl LogSinker for RemoteLogSinker { let offset = TruncateOffset::Chunk { epoch, chunk_id }; if let Some(prev_offset) = &prev_offset { prev_offset.check_next_offset(offset)?; - } else { - request_tx.start_epoch(epoch).await?; } let cardinality = chunk.cardinality(); sink_metrics @@ -357,85 +378,34 @@ impl LogSinker for RemoteLogSinker { .write_batch(epoch, chunk_id as u64, payload) .await?; prev_offset = Some(offset); - if is_async_truncate { - sent_chunk_id_queue.push_back((epoch, chunk_id)); - } + sent_offset_queue + .push_back((TruncateOffset::Chunk { epoch, chunk_id }, None)); } LogStoreReadItem::Barrier { is_checkpoint } => { let offset = TruncateOffset::Barrier { epoch }; if let Some(prev_offset) = &prev_offset { prev_offset.check_next_offset(offset)?; - } else { - // TODO: this start epoch is actually unnecessary - request_tx.start_epoch(epoch).await?; } - if is_checkpoint { + let start_time = if is_checkpoint { let start_time = Instant::now(); request_tx.barrier(epoch, true).await?; - // waiting for previous response - loop { - match response_rx.recv().await.ok_or_else(|| { - SinkError::Remote(anyhow!("end of response stream")) - })? { - SinkWriterStreamResponse { - response: - Some(sink_writer_stream_response::Response::Commit( - _, - )), - } => { - if is_async_truncate && !sent_chunk_id_queue.is_empty() { - return Err(SinkError::Remote(anyhow!("get commit response with chunk not acked: {:?}", sent_chunk_id_queue))); - } - break; - } - SinkWriterStreamResponse { - response: - Some(sink_writer_stream_response::Response::Batch( - sink_writer_stream_response::BatchWrittenResponse { - epoch, - batch_id, - }, - )), - } => { - if !is_async_truncate { - return Err(SinkError::Remote(anyhow!( - "batch written response only for async truncate" - ))); - } - truncate_matched_chunk_id( - &mut sent_chunk_id_queue, - (epoch, batch_id), - &mut log_reader, - ) - .await?; - } - response => { - return Err(SinkError::Remote(anyhow!( - "expected commit or batch written response, but get {:?}", - response - ))); - } - }; - } - sink_metrics - .sink_commit_duration_metrics - .observe(start_time.elapsed().as_millis() as f64); - log_reader - .truncate(TruncateOffset::Barrier { epoch }) - .await?; + Some(start_time) } else { request_tx.barrier(epoch, false).await?; - } + None + }; prev_offset = Some(offset); + sent_offset_queue + .push_back((TruncateOffset::Barrier { epoch }, start_time)); } LogStoreReadItem::UpdateVnodeBitmap(_) => {} } } } } - }); + }; - select(poll_response_stream, poll_consume_log_and_sink) + select(pin!(poll_response_stream), pin!(poll_consume_log_and_sink)) .await .factor_first() .0 From e41af89a50034fedb13b87842fe3a3ce23e2bb81 Mon Sep 17 00:00:00 2001 From: William Wen Date: Tue, 7 Nov 2023 14:17:13 +0800 Subject: [PATCH 11/14] enable kv log store for append only jdbc sink by default --- src/connector/src/sink/remote.rs | 53 +++++++++++++++++++++++++------- 1 file changed, 42 insertions(+), 11 deletions(-) diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index f53eff8831e47..bcf7bac7b82fc 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -55,6 +55,7 @@ use tokio::task::spawn_blocking; use tokio_stream::wrappers::ReceiverStream; use tracing::warn; +use crate::sink::catalog::desc::SinkDesc; use crate::sink::coordinate::CoordinatedSinkWriter; use crate::sink::log_store::{LogReader, LogStoreReadItem, TruncateOffset}; use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt}; @@ -67,28 +68,54 @@ use crate::ConnectorParams; macro_rules! def_remote_sink { () => { def_remote_sink! { - { ElasticSearch, ElasticSearchSink, "elasticsearch" }, - { Cassandra, CassandraSink, "cassandra" }, - { Jdbc, JdbcSink, "jdbc" }, + { ElasticSearch, ElasticSearchSink, "elasticsearch" } + { Cassandra, CassandraSink, "cassandra" } + { Jdbc, JdbcSink, "jdbc", |desc| { + desc.sink_type.is_append_only() + } } { DeltaLake, DeltaLakeSink, "deltalake" } } }; - ($({ $variant_name:ident, $sink_type_name:ident, $sink_name:expr }),*) => { - $( - #[derive(Debug)] - pub struct $variant_name; - impl RemoteSinkTrait for $variant_name { - const SINK_NAME: &'static str = $sink_name; + () => {}; + ({ $variant_name:ident, $sink_type_name:ident, $sink_name:expr }) => { + #[derive(Debug)] + pub struct $variant_name; + impl RemoteSinkTrait for $variant_name { + const SINK_NAME: &'static str = $sink_name; + } + pub type $sink_type_name = RemoteSink<$variant_name>; + }; + ({ $variant_name:ident, $sink_type_name:ident, $sink_name:expr, |$desc:ident| $body:expr }) => { + #[derive(Debug)] + pub struct $variant_name; + impl RemoteSinkTrait for $variant_name { + const SINK_NAME: &'static str = $sink_name; + fn default_sink_decouple($desc: &SinkDesc) -> bool { + $body } - pub type $sink_type_name = RemoteSink<$variant_name>; - )* + } + pub type $sink_type_name = RemoteSink<$variant_name>; + }; + ({ $($first:tt)+ } $({$($rest:tt)+})*) => { + def_remote_sink! { + {$($first)+} + } + def_remote_sink! { + $({$($rest)+})* + } }; + ($($invalid:tt)*) => { + compile_error! {concat! {"invalid `", stringify!{$($invalid)*}, "`"}} + } } def_remote_sink!(); pub trait RemoteSinkTrait: Send + Sync + 'static { const SINK_NAME: &'static str; + fn default_sink_decouple(_desc: &SinkDesc) -> bool { + false + } } #[derive(Debug)] @@ -114,6 +141,10 @@ impl Sink for RemoteSink { const SINK_NAME: &'static str = R::SINK_NAME; + fn default_sink_decouple(desc: &SinkDesc) -> bool { + R::default_sink_decouple(desc) + } + async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { RemoteLogSinker::new(self.param.clone(), writer_param).await } From 85340c8593b8e344749cc78652daf2d0b8d977ff Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 8 Nov 2023 12:25:48 +0800 Subject: [PATCH 12/14] port the async truncate logic to jdbc sink --- .../connector/sink/jdbc/JDBCSinkTest.java | 18 ++++++------ .../com/risingwave/connector/JDBCSink.java | 29 ++++++++++++------- .../risingwave/connector/JDBCSinkFactory.java | 3 +- 3 files changed, 28 insertions(+), 22 deletions(-) diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/jdbc/JDBCSinkTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/jdbc/JDBCSinkTest.java index da9b9d866583b..918ea46f9e49f 100644 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/jdbc/JDBCSinkTest.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/jdbc/JDBCSinkTest.java @@ -16,7 +16,6 @@ import static org.junit.Assert.*; -import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.risingwave.connector.JDBCSink; import com.risingwave.connector.JDBCSinkConfig; @@ -26,6 +25,7 @@ import com.risingwave.proto.Data.DataType.TypeName; import com.risingwave.proto.Data.Op; import java.sql.*; +import java.util.Arrays; import org.junit.Test; import org.testcontainers.containers.JdbcDatabaseContainer; import org.testcontainers.containers.MySQLContainer; @@ -84,7 +84,7 @@ static void testJDBCSync(JdbcDatabaseContainer container, TestType testType) Connection conn = sink.getConn(); sink.write( - Iterators.forArray( + Arrays.asList( new ArraySinkRow( Op.INSERT, 1, @@ -94,7 +94,7 @@ static void testJDBCSync(JdbcDatabaseContainer container, TestType testType) new Timestamp(1000000000), "{\"key\": \"password\", \"value\": \"Singularity123\"}", "I want to sleep".getBytes()))); - sink.sync(); + sink.barrier(true); Statement stmt = conn.createStatement(); try (var rs = stmt.executeQuery(String.format("SELECT * FROM %s", tableName))) { @@ -106,7 +106,7 @@ static void testJDBCSync(JdbcDatabaseContainer container, TestType testType) } sink.write( - Iterators.forArray( + Arrays.asList( new ArraySinkRow( Op.INSERT, 2, @@ -116,7 +116,7 @@ static void testJDBCSync(JdbcDatabaseContainer container, TestType testType) new Timestamp(1000000000), "{\"key\": \"password\", \"value\": \"Singularity123\"}", "I want to sleep".getBytes()))); - sink.sync(); + sink.barrier(true); try (var rs = stmt.executeQuery(String.format("SELECT * FROM %s", tableName))) { int count; for (count = 0; rs.next(); ) { @@ -126,7 +126,7 @@ static void testJDBCSync(JdbcDatabaseContainer container, TestType testType) } stmt.close(); - sink.sync(); + sink.barrier(true); sink.drop(); } @@ -144,7 +144,7 @@ static void testJDBCWrite(JdbcDatabaseContainer container, TestType testType) Statement stmt = conn.createStatement(); sink.write( - Iterators.forArray( + Arrays.asList( new ArraySinkRow( Op.INSERT, 1, @@ -171,7 +171,7 @@ static void testJDBCWrite(JdbcDatabaseContainer container, TestType testType) } sink.write( - Iterators.forArray( + Arrays.asList( new ArraySinkRow( Op.UPDATE_DELETE, 1, @@ -216,7 +216,7 @@ static void testJDBCWrite(JdbcDatabaseContainer container, TestType testType) assertFalse(rs.next()); } - sink.sync(); + sink.barrier(true); stmt.close(); } diff --git a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java index ea8429536c03c..220caca4d13fe 100644 --- a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java +++ b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java @@ -16,8 +16,9 @@ import com.risingwave.connector.api.TableSchema; import com.risingwave.connector.api.sink.SinkRow; -import com.risingwave.connector.api.sink.SinkWriterBase; +import com.risingwave.connector.api.sink.SinkWriter; import com.risingwave.connector.jdbc.JdbcDialect; +import com.risingwave.proto.ConnectorServiceProto; import com.risingwave.proto.Data; import io.grpc.Status; import java.sql.*; @@ -25,7 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class JDBCSink extends SinkWriterBase { +public class JDBCSink implements SinkWriter { private static final String ERROR_REPORT_TEMPLATE = "Error when exec %s, message %s"; private final JdbcDialect jdbcDialect; @@ -33,6 +34,8 @@ public class JDBCSink extends SinkWriterBase { private final Connection conn; private final List pkColumnNames; + private final TableSchema tableSchema; + public static final String JDBC_COLUMN_NAME_KEY = "COLUMN_NAME"; public static final String JDBC_DATA_TYPE_KEY = "DATA_TYPE"; @@ -45,7 +48,7 @@ public class JDBCSink extends SinkWriterBase { private static final Logger LOG = LoggerFactory.getLogger(JDBCSink.class); public JDBCSink(JDBCSinkConfig config, TableSchema tableSchema) { - super(tableSchema); + this.tableSchema = tableSchema; var jdbcUrl = config.getJdbcUrl().toLowerCase(); var factory = JdbcUtils.getDialectFactory(jdbcUrl); @@ -175,7 +178,7 @@ private PreparedStatement prepareInsertStatement(SinkRow row) { } try { var preparedStmt = insertPreparedStmt; - jdbcDialect.bindInsertIntoStatement(preparedStmt, conn, getTableSchema(), row); + jdbcDialect.bindInsertIntoStatement(preparedStmt, conn, tableSchema, row); preparedStmt.addBatch(); return preparedStmt; } catch (SQLException e) { @@ -192,7 +195,7 @@ private PreparedStatement prepareUpsertStatement(SinkRow row) { var preparedStmt = upsertPreparedStmt; switch (row.getOp()) { case INSERT: - jdbcDialect.bindUpsertStatement(preparedStmt, conn, getTableSchema(), row); + jdbcDialect.bindUpsertStatement(preparedStmt, conn, tableSchema, row); break; case UPDATE_INSERT: if (!updateFlag) { @@ -200,7 +203,7 @@ private PreparedStatement prepareUpsertStatement(SinkRow row) { .withDescription("an UPDATE_DELETE should precede an UPDATE_INSERT") .asRuntimeException(); } - jdbcDialect.bindUpsertStatement(preparedStmt, conn, getTableSchema(), row); + jdbcDialect.bindUpsertStatement(preparedStmt, conn, tableSchema, row); updateFlag = false; break; default: @@ -235,7 +238,7 @@ private PreparedStatement prepareDeleteStatement(SinkRow row) { try { int placeholderIdx = 1; for (String primaryKey : pkColumnNames) { - Object fromRow = getTableSchema().getFromRow(primaryKey, row); + Object fromRow = tableSchema.getFromRow(primaryKey, row); deletePreparedStmt.setObject(placeholderIdx++, fromRow); } deletePreparedStmt.addBatch(); @@ -249,13 +252,15 @@ private PreparedStatement prepareDeleteStatement(SinkRow row) { } @Override - public void write(Iterator rows) { + public void beginEpoch(long epoch) {} + + @Override + public boolean write(Iterable rows) { PreparedStatement deleteStatement = null; PreparedStatement upsertStatement = null; PreparedStatement insertStatement = null; - while (rows.hasNext()) { - SinkRow row = rows.next(); + for (SinkRow row : rows) { if (row.getOp() == Data.Op.UPDATE_DELETE) { updateFlag = true; continue; @@ -285,6 +290,7 @@ public void write(Iterator rows) { String.format(ERROR_REPORT_TEMPLATE, e.getSQLState(), e.getMessage())) .asRuntimeException(); } + return true; } private void executeStatement(PreparedStatement stmt) throws SQLException { @@ -297,13 +303,14 @@ private void executeStatement(PreparedStatement stmt) throws SQLException { } @Override - public void sync() { + public Optional barrier(boolean isCheckpoint) { if (updateFlag) { throw Status.FAILED_PRECONDITION .withDescription( "expected UPDATE_INSERT to complete an UPDATE operation, got `sync`") .asRuntimeException(); } + return Optional.empty(); } @Override diff --git a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkFactory.java b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkFactory.java index 39a86772d19e5..e4d009d32b1a0 100644 --- a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkFactory.java +++ b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkFactory.java @@ -19,7 +19,6 @@ import com.risingwave.connector.api.TableSchema; import com.risingwave.connector.api.sink.SinkFactory; import com.risingwave.connector.api.sink.SinkWriter; -import com.risingwave.connector.api.sink.SinkWriterV1; import com.risingwave.proto.Catalog.SinkType; import io.grpc.Status; import java.sql.*; @@ -40,7 +39,7 @@ public class JDBCSinkFactory implements SinkFactory { public SinkWriter createWriter(TableSchema tableSchema, Map tableProperties) { ObjectMapper mapper = new ObjectMapper(); JDBCSinkConfig config = mapper.convertValue(tableProperties, JDBCSinkConfig.class); - return new SinkWriterV1.Adapter(new JDBCSink(config, tableSchema)); + return new JDBCSink(config, tableSchema); } @Override From f038997f4d3f23f5fe7688e4d09a4619bce67437 Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 8 Nov 2023 14:36:52 +0800 Subject: [PATCH 13/14] add java doc for SinkWriter interface --- .../connector/api/sink/SinkWriter.java | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/SinkWriter.java b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/SinkWriter.java index 9ec92001846a3..a211d8f9750f0 100644 --- a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/SinkWriter.java +++ b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/SinkWriter.java @@ -20,11 +20,31 @@ import java.util.Optional; public interface SinkWriter { + /** + * Begin writing an epoch. + * + * @param epoch + */ void beginEpoch(long epoch); + /** + * Write a series of rows to the external sink. + * + * @return Flag to indicate whether the rows are written and persisting in the external sink. + * `true` means persisted. + */ boolean write(Iterable rows); + /** + * Mark the end of the previous begun epoch. + * + * @param isCheckpoint `isCheckpoint` = `true` means that the RW kernel will do a checkpoint for + * data before this barrier. External sink should have its data persisted before it returns. + * @return Optionally return the metadata of this checkpoint. Only return some metadata for + * coordinated remote sink when `isCheckpoint` == `true`. + */ Optional barrier(boolean isCheckpoint); + /** Clean up */ void drop(); } From 095d91f182ed83ebdba17a5098d376dc7354535a Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 8 Nov 2023 14:40:10 +0800 Subject: [PATCH 14/14] minor rename --- src/connector/src/sink/remote.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 770ffe00f97ed..1c827309f30bb 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -281,33 +281,35 @@ impl LogSinker for RemoteLogSinker { async fn truncate_matched_offset( queue: &mut VecDeque<(TruncateOffset, Option)>, - offset: TruncateOffset, + persisted_offset: TruncateOffset, log_reader: &mut impl LogReader, metrics: &SinkMetrics, ) -> Result<()> { - while let Some((sent_offset, _)) = queue.front() && sent_offset < &offset { + while let Some((sent_offset, _)) = queue.front() && sent_offset < &persisted_offset { queue.pop_front(); } - let (sent_offset, start_time) = queue - .pop_front() - .ok_or_else(|| anyhow!("get unsent offset {:?} in response", offset))?; - if sent_offset != offset { + let (sent_offset, start_time) = queue.pop_front().ok_or_else(|| { + anyhow!("get unsent offset {:?} in response", persisted_offset) + })?; + if sent_offset != persisted_offset { return Err(anyhow!( "new response offset {:?} not match the buffer offset {:?}", - offset, + persisted_offset, sent_offset ) .into()); } - if let (TruncateOffset::Barrier { .. }, Some(start_time)) = (offset, start_time) { + if let (TruncateOffset::Barrier { .. }, Some(start_time)) = + (persisted_offset, start_time) + { metrics .sink_commit_duration_metrics .observe(start_time.elapsed().as_millis() as f64); } - log_reader.truncate(offset).await?; + log_reader.truncate(persisted_offset).await?; Ok(()) }