From 3474214f9d604aa908a8e8c6454ca3aec27dedde Mon Sep 17 00:00:00 2001 From: Kiv Chen <34561254+KivenChen@users.noreply.github.com> Date: Mon, 28 Nov 2022 15:12:15 +0800 Subject: [PATCH] feat(connector): add connector remote sink (#6493) * add remote sink * pass connector_sink_endpoint thru CN opts instead * refactor remote sink * refactor connector params * fix source ci * Optionize source endpoint * fix proto lint * fix naming style linting * fix naming style linting * fix sinkerror parsing Co-authored-by: William Wen <44139337+wenym1@users.noreply.github.com> Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- ci/scripts/e2e-source-test.sh | 3 + ci/scripts/misc-check.sh | 1 - proto/connector_service.proto | 88 +++++ src/common/src/config.rs | 13 + src/compute/src/lib.rs | 8 +- src/compute/src/server.rs | 6 +- src/connector/src/lib.rs | 18 + src/connector/src/sink/mod.rs | 33 +- src/connector/src/sink/remote.rs | 535 ++++++++++++++++++++++++++++ src/prost/build.rs | 1 + src/prost/src/lib.rs | 3 + src/rpc_client/src/lib.rs | 1 + src/source/src/connector_source.rs | 19 +- src/source/src/manager.rs | 21 +- src/stream/src/executor/sink.rs | 28 +- src/stream/src/from_proto/sink.rs | 1 + src/stream/src/from_proto/source.rs | 2 +- src/stream/src/task/env.rs | 15 +- 18 files changed, 759 insertions(+), 37 deletions(-) create mode 100644 proto/connector_service.proto create mode 100644 src/connector/src/sink/remote.rs diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 976a3c1e138b5..e083f54e1b203 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -5,6 +5,9 @@ set -euo pipefail source ci/scripts/common.env.sh +# prepare environment +export CONNECTOR_SOURCE_ENDPOINT="localhost:60061" + while getopts 'p:' opt; do case ${opt} in p ) diff --git a/ci/scripts/misc-check.sh b/ci/scripts/misc-check.sh index ae84ac626119e..2c53d7414c538 100755 --- a/ci/scripts/misc-check.sh +++ b/ci/scripts/misc-check.sh @@ -9,4 +9,3 @@ echo "--- Check protobuf code format && Lint protobuf" cd proto buf format -d --exit-code buf lint - diff --git a/proto/connector_service.proto b/proto/connector_service.proto new file mode 100644 index 0000000000000..d65d03c219f27 --- /dev/null +++ b/proto/connector_service.proto @@ -0,0 +1,88 @@ +syntax = "proto3"; + +package connector_service; + +import "data.proto"; + +option java_outer_classname = "ConnectorServiceProto"; +option java_package = "com.risingwave.proto"; + +message SinkConfig { + message TableSchema { + message Column { + string name = 1; + data.DataType.TypeName data_type = 2; + } + repeated Column columns = 1; + repeated uint32 pk_indices = 2; + } + string sink_type = 1; + map properties = 2; + TableSchema table_schema = 3; +} + +message SinkStreamRequest { + message StartSink { + SinkConfig sink_config = 1; + } + + message WriteBatch { + message JsonPayload { + message RowOp { + data.Op op_type = 1; + string line = 2; + } + repeated RowOp row_ops = 1; + } + + oneof payload { + JsonPayload json_payload = 1; + } + + uint64 batch_id = 3; + uint64 epoch = 4; + } + + message StartEpoch { + uint64 epoch = 1; + } + + message SyncBatch { + uint64 epoch = 1; + } + + oneof request { + StartSink start = 1; + StartEpoch start_epoch = 2; + WriteBatch write = 3; + SyncBatch sync = 4; + } +} + +message SinkResponse { + message SyncResponse { + uint64 epoch = 1; + } + + message StartEpochResponse { + uint64 epoch = 1; + } + + message WriteResponse { + uint64 epoch = 1; + uint64 batch_id = 2; + } + + message StartResponse {} + + oneof response { + SyncResponse sync = 2; + StartEpochResponse start_epoch = 3; + WriteResponse write = 4; + StartResponse start = 5; + } +} + +service ConnectorService { + rpc SinkStream(stream SinkStreamRequest) returns (stream SinkResponse); +} diff --git a/src/common/src/config.rs b/src/common/src/config.rs index b1a5e195b26d7..18684fbd0e0fc 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -223,6 +223,19 @@ impl Default for StorageConfig { } } +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct ConnectorConfig { + #[serde(default)] + pub connector_addr: Option, +} + +impl Default for ConnectorConfig { + fn default() -> Self { + toml::from_str("").unwrap() + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub struct FileCacheConfig { diff --git a/src/compute/src/lib.rs b/src/compute/src/lib.rs index 1d16133a57052..79ef0236a9c4d 100644 --- a/src/compute/src/lib.rs +++ b/src/compute/src/lib.rs @@ -80,8 +80,12 @@ pub struct ComputeNodeOpts { pub file_cache_dir: String, /// Endpoint of the connector node - #[clap(long, default_value = "127.0.0.1:60061")] - pub connector_source_endpoint: String, + #[clap(long, env = "CONNECTOR_SOURCE_ENDPOINT")] + pub connector_source_endpoint: Option, + + /// Endpoint of connector sink node + #[clap(long, env = "CONNECTOR_SINK_ENDPOINT")] + pub connector_sink_endpoint: Option, } use std::future::Future; diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 541c187e38ed6..ede5fc285e1a6 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -224,11 +224,15 @@ pub async fn compute_node_serve( dml_mgr.clone(), ); + let connector_params = risingwave_connector::ConnectorParams { + connector_source_endpoint: opts.connector_source_endpoint, + connector_sink_endpoint: opts.connector_sink_endpoint, + }; // Initialize the streaming environment. let stream_env = StreamEnvironment::new( source_mgr, client_addr.clone(), - opts.connector_source_endpoint, + connector_params, stream_config, worker_id, state_store, diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index 66bd82c285358..5c44a08f01f15 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -30,3 +30,21 @@ pub mod error; mod macros; pub mod sink; pub mod source; + +#[derive(Clone, Debug, Default)] +pub struct ConnectorParams { + pub connector_source_endpoint: Option, + pub connector_sink_endpoint: Option, +} + +impl ConnectorParams { + pub fn new( + connector_source_endpoint: Option, + connector_sink_endpoint: Option, + ) -> Self { + Self { + connector_source_endpoint, + connector_sink_endpoint, + } + } +} diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index f504291cf02c7..ef5e574e70943 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -16,6 +16,7 @@ pub mod console; pub mod kafka; pub mod mysql; pub mod redis; +pub mod remote; use std::collections::HashMap; @@ -24,6 +25,7 @@ use enum_as_inner::EnumAsInner; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; use risingwave_common::error::{ErrorCode, RwError}; +use risingwave_rpc_client::error::RpcError; use serde::{Deserialize, Serialize}; use thiserror::Error; pub use tracing; @@ -32,6 +34,8 @@ use crate::sink::console::{ConsoleConfig, ConsoleSink, CONSOLE_SINK}; use crate::sink::kafka::{KafkaConfig, KafkaSink, KAFKA_SINK}; pub use crate::sink::mysql::{MySqlConfig, MySqlSink, MYSQL_SINK}; use crate::sink::redis::{RedisConfig, RedisSink}; +use crate::sink::remote::{RemoteConfig, RemoteSink}; +use crate::ConnectorParams; #[async_trait] pub trait Sink { @@ -54,6 +58,7 @@ pub enum SinkConfig { Mysql(MySqlConfig), Redis(RedisConfig), Kafka(KafkaConfig), + Remote(RemoteConfig), Console(ConsoleConfig), } @@ -63,6 +68,7 @@ pub enum SinkState { Mysql, Redis, Console, + Remote, } impl SinkConfig { @@ -77,7 +83,7 @@ impl SinkConfig { CONSOLE_SINK => Ok(SinkConfig::Console(ConsoleConfig::from_hashmap( properties, )?)), - _ => unimplemented!(), + _ => Ok(SinkConfig::Remote(RemoteConfig::from_hashmap(properties)?)), } } @@ -86,6 +92,7 @@ impl SinkConfig { SinkConfig::Mysql(_) => "mysql", SinkConfig::Kafka(_) => "kafka", SinkConfig::Redis(_) => "redis", + SinkConfig::Remote(_) => "remote", SinkConfig::Console(_) => "console", } } @@ -96,16 +103,25 @@ pub enum SinkImpl { MySql(Box), Redis(Box), Kafka(Box), + Remote(Box), Console(Box), } impl SinkImpl { - pub async fn new(cfg: SinkConfig, schema: Schema) -> Result { + pub async fn new( + cfg: SinkConfig, + schema: Schema, + pk_indices: Vec, + connector_params: ConnectorParams, + ) -> Result { Ok(match cfg { SinkConfig::Mysql(cfg) => SinkImpl::MySql(Box::new(MySqlSink::new(cfg, schema).await?)), SinkConfig::Redis(cfg) => SinkImpl::Redis(Box::new(RedisSink::new(cfg, schema)?)), SinkConfig::Kafka(cfg) => SinkImpl::Kafka(Box::new(KafkaSink::new(cfg, schema).await?)), SinkConfig::Console(cfg) => SinkImpl::Console(Box::new(ConsoleSink::new(cfg, schema)?)), + SinkConfig::Remote(cfg) => SinkImpl::Remote(Box::new( + RemoteSink::new(cfg, schema, pk_indices, connector_params).await?, + )), }) } @@ -114,6 +130,7 @@ impl SinkImpl { SinkImpl::MySql(_) => true, SinkImpl::Redis(_) => false, SinkImpl::Kafka(_) => false, + SinkImpl::Remote(_) => false, SinkImpl::Console(_) => false, } } @@ -133,6 +150,7 @@ impl Sink for SinkImpl { SinkImpl::MySql(sink) => sink.write_batch(chunk).await, SinkImpl::Redis(sink) => sink.write_batch(chunk).await, SinkImpl::Kafka(sink) => sink.write_batch(chunk).await, + SinkImpl::Remote(sink) => sink.write_batch(chunk).await, SinkImpl::Console(sink) => sink.write_batch(chunk).await, } } @@ -142,6 +160,7 @@ impl Sink for SinkImpl { SinkImpl::MySql(sink) => sink.begin_epoch(epoch).await, SinkImpl::Redis(sink) => sink.begin_epoch(epoch).await, SinkImpl::Kafka(sink) => sink.begin_epoch(epoch).await, + SinkImpl::Remote(sink) => sink.begin_epoch(epoch).await, SinkImpl::Console(sink) => sink.begin_epoch(epoch).await, } } @@ -151,6 +170,7 @@ impl Sink for SinkImpl { SinkImpl::MySql(sink) => sink.commit().await, SinkImpl::Redis(sink) => sink.commit().await, SinkImpl::Kafka(sink) => sink.commit().await, + SinkImpl::Remote(sink) => sink.commit().await, SinkImpl::Console(sink) => sink.commit().await, } } @@ -160,6 +180,7 @@ impl Sink for SinkImpl { SinkImpl::MySql(sink) => sink.abort().await, SinkImpl::Redis(sink) => sink.abort().await, SinkImpl::Kafka(sink) => sink.abort().await, + SinkImpl::Remote(sink) => sink.abort().await, SinkImpl::Console(sink) => sink.abort().await, } } @@ -175,12 +196,20 @@ pub enum SinkError { MySqlInner(#[from] mysql_async::Error), #[error("Kafka error: {0}")] Kafka(#[from] rdkafka::error::KafkaError), + #[error("Remote sink error: {0}")] + Remote(String), #[error("Json parse error: {0}")] JsonParse(String), #[error("config error: {0}")] Config(String), } +impl From for SinkError { + fn from(value: RpcError) -> Self { + SinkError::Remote(format!("{:?}", value)) + } +} + impl From for RwError { fn from(e: SinkError) -> Self { ErrorCode::SinkError(Box::new(e)).into() diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs new file mode 100644 index 0000000000000..9112eb02be511 --- /dev/null +++ b/src/connector/src/sink/remote.rs @@ -0,0 +1,535 @@ +// Copyright 2022 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::time::Duration; + +use async_trait::async_trait; +use itertools::Itertools; +use risingwave_common::array::StreamChunk; +#[cfg(test)] +use risingwave_common::catalog::Field; +use risingwave_common::catalog::Schema; +use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, STREAM_WINDOW_SIZE}; +#[cfg(test)] +use risingwave_common::types::DataType; +use risingwave_common::types::{DatumRef, ScalarRefImpl}; +use risingwave_pb::connector_service::connector_service_client::ConnectorServiceClient; +use risingwave_pb::connector_service::sink_config::table_schema::Column; +use risingwave_pb::connector_service::sink_config::TableSchema; +use risingwave_pb::connector_service::sink_stream_request::write_batch::json_payload::RowOp; +use risingwave_pb::connector_service::sink_stream_request::write_batch::{JsonPayload, Payload}; +use risingwave_pb::connector_service::sink_stream_request::{ + Request as SinkRequest, StartEpoch, StartSink, SyncBatch, WriteBatch, +}; +use risingwave_pb::connector_service::{SinkConfig, SinkResponse, SinkStreamRequest}; +use serde_json::Value; +use serde_json::Value::Number; +use tokio::sync::mpsc; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use tokio_stream::wrappers::UnboundedReceiverStream; +use tokio_stream::StreamExt; +use tonic::transport::{Channel, Endpoint}; +use tonic::{Request, Status, Streaming}; + +use crate::sink::{Result, Sink, SinkError}; +use crate::ConnectorParams; + +pub const VALID_REMOTE_SINKS: [&str; 2] = ["jdbc", "file"]; + +pub fn is_valid_remote_sink(sink_type: String) -> bool { + return VALID_REMOTE_SINKS.contains(&sink_type.as_str()); +} + +#[derive(Clone, Debug)] +pub struct RemoteConfig { + pub sink_type: String, + pub properties: HashMap, +} + +impl RemoteConfig { + pub fn from_hashmap(values: HashMap) -> Result { + let sink_type = values + .get("connector") + .expect("sink type must be specified") + .to_string(); + + if !is_valid_remote_sink(sink_type.clone()) { + return Err(SinkError::Config(format!("invalid sink type: {sink_type}"))); + } + + Ok(RemoteConfig { + sink_type, + properties: values, + }) + } +} + +#[derive(Debug)] +enum ResponseStreamImpl { + Grpc(Streaming), + Receiver(UnboundedReceiver), +} + +impl ResponseStreamImpl { + pub async fn next(&mut self) -> Result { + return match self { + ResponseStreamImpl::Grpc(ref mut response) => response + .next() + .await + .unwrap_or_else(|| Err(Status::cancelled("response stream closed unexpectedly"))) + .map_err(|e| SinkError::Remote(e.message().to_string())), + ResponseStreamImpl::Receiver(ref mut receiver) => { + receiver.recv().await.ok_or_else(|| { + SinkError::Remote("response stream closed unexpectedly".to_string()) + }) + } + }; + } +} + +#[derive(Debug)] +pub struct RemoteSink { + pub sink_type: String, + properties: HashMap, + epoch: Option, + batch_id: u64, + schema: Schema, + _client: Option>, + request_sender: Option>, + response_stream: ResponseStreamImpl, +} + +impl RemoteSink { + pub async fn new( + config: RemoteConfig, + schema: Schema, + pk_indices: Vec, + connector_params: ConnectorParams, + ) -> Result { + let channel = Endpoint::from_shared(format!( + "http://{}", + connector_params + .connector_sink_endpoint + .ok_or_else(|| SinkError::Remote( + "connector sink endpoint not specified".parse().unwrap() + ))? + )) + .map_err(|e| SinkError::Remote(format!("failed to connect channel: {:?}", e)))? + .initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE) + .initial_stream_window_size(STREAM_WINDOW_SIZE) + .tcp_nodelay(true) + .connect_timeout(Duration::from_secs(5)) + .connect() + .await + .map_err(|e| SinkError::Remote(format!("failed to connect channel: {:?}", e)))?; // create client and start sink + let mut client = ConnectorServiceClient::new(channel); + + let (request_sender, request_receiver) = mpsc::unbounded_channel::(); + + // send initial request in case of the blocking receive call from creating streaming request + request_sender + .send(SinkStreamRequest { + request: Some(SinkRequest::Start(StartSink { + sink_config: Some(SinkConfig { + sink_type: config.sink_type.clone(), + properties: config.properties.clone(), + table_schema: Some(TableSchema { + columns: schema + .fields() + .iter() + .map(|c| Column { + name: c.name.clone(), + data_type: c.data_type().to_protobuf().type_name, + }) + .collect(), + pk_indices: pk_indices.iter().map(|i| *i as u32).collect(), + }), + }), + })), + }) + .map_err(|e| SinkError::Remote(e.to_string()))?; + + let mut response = tokio::time::timeout( + Duration::from_secs(3), + client.sink_stream(Request::new(UnboundedReceiverStream::new(request_receiver))), + ) + .await + .map_err(|e| SinkError::Remote(format!("failed to start sink: {:?}", e)))? + .map_err(|e| SinkError::Remote(format!("{:?}", e)))? + .into_inner(); + let _ = response.next().await.unwrap(); + + Ok(RemoteSink { + sink_type: config.sink_type, + properties: config.properties, + epoch: None, + batch_id: 0, + schema, + _client: Some(client), + request_sender: Some(request_sender), + response_stream: ResponseStreamImpl::Grpc(response), + }) + } + + fn on_sender_alive(&mut self) -> Result<&UnboundedSender> { + self.request_sender + .as_ref() + .ok_or_else(|| SinkError::Remote("sink has been dropped".to_string())) + } + + #[cfg(test)] + fn for_test( + response_receiver: UnboundedReceiver, + request_sender: UnboundedSender, + ) -> Self { + let properties = HashMap::from([("output_path".to_string(), "/tmp/rw".to_string())]); + + let schema = Schema::new(vec![ + Field { + data_type: DataType::Int32, + name: "id".into(), + sub_fields: vec![], + type_name: "".into(), + }, + Field { + data_type: DataType::Varchar, + name: "name".into(), + sub_fields: vec![], + type_name: "".into(), + }, + ]); + + Self { + sink_type: "file".to_string(), + properties, + epoch: None, + batch_id: 0, + schema, + _client: None, + request_sender: Some(request_sender), + response_stream: ResponseStreamImpl::Receiver(response_receiver), + } + } +} + +#[async_trait] +impl Sink for RemoteSink { + async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { + let mut row_ops = vec![]; + for (op, row_ref) in chunk.rows() { + let mut map = serde_json::Map::new(); + row_ref + .values() + .zip_eq(self.schema.fields.iter()) + .for_each(|(v, f)| { + map.insert(f.name.clone(), parse_datum(v)); + }); + let row_op = RowOp { + op_type: op.to_protobuf() as i32, + line: serde_json::to_string(&map) + .map_err(|e| SinkError::Remote(format!("{:?}", e)))?, + }; + + row_ops.push(row_op); + } + + let epoch = self.epoch.ok_or_else(|| { + SinkError::Remote("epoch has not been initialize, call `begin_epoch`".to_string()) + })?; + let batch_id = self.batch_id; + self.on_sender_alive()? + .send(SinkStreamRequest { + request: Some(SinkRequest::Write(WriteBatch { + epoch, + batch_id, + payload: Some(Payload::JsonPayload(JsonPayload { row_ops })), + })), + }) + .map_err(|e| SinkError::Remote(e.to_string()))?; + self.response_stream + .next() + .await + .map(|_| self.batch_id += 1) + } + + async fn begin_epoch(&mut self, epoch: u64) -> Result<()> { + self.on_sender_alive()? + .send(SinkStreamRequest { + request: Some(SinkRequest::StartEpoch(StartEpoch { epoch })), + }) + .map_err(|e| SinkError::Remote(e.to_string()))?; + self.response_stream + .next() + .await + .map(|_| self.epoch = Some(epoch)) + } + + async fn commit(&mut self) -> Result<()> { + let epoch = self.epoch.ok_or_else(|| { + SinkError::Remote("epoch has not been initialize, call `begin_epoch`".to_string()) + })?; + self.on_sender_alive()? + .send(SinkStreamRequest { + request: Some(SinkRequest::Sync(SyncBatch { epoch })), + }) + .map_err(|e| SinkError::Remote(e.to_string()))?; + self.response_stream.next().await.map(|_| ()) + } + + async fn abort(&mut self) -> Result<()> { + self.request_sender = None; + Ok(()) + } +} + +fn parse_datum(datum: DatumRef<'_>) -> Value { + match datum { + None => Value::Null, + Some(ScalarRefImpl::Int32(v)) => Value::from(v), + Some(ScalarRefImpl::Int64(v)) => Value::from(v), + Some(ScalarRefImpl::Float32(v)) => Value::from(v.into_inner()), + Some(ScalarRefImpl::Float64(v)) => Value::from(v.into_inner()), + Some(ScalarRefImpl::Decimal(v)) => Number(v.to_string().parse().unwrap()), + Some(ScalarRefImpl::Utf8(v)) => Value::from(v), + Some(ScalarRefImpl::Bool(v)) => Value::from(v), + Some(ScalarRefImpl::NaiveDate(v)) => Value::from(v.to_string()), + Some(ScalarRefImpl::NaiveTime(v)) => Value::from(v.to_string()), + Some(ScalarRefImpl::Interval(v)) => Value::from(v.to_string()), + Some(ScalarRefImpl::Struct(v)) => Value::from( + v.fields_ref() + .iter() + .map(|v| parse_datum(*v)) + .collect::>(), + ), + Some(ScalarRefImpl::List(v)) => Value::from( + v.values_ref() + .iter() + .map(|v| parse_datum(*v)) + .collect::>(), + ), + _ => unimplemented!(), + } +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + use std::time::Duration; + + use risingwave_common::array; + use risingwave_common::array::column::Column; + use risingwave_common::array::{ArrayImpl, I32Array, Op, StreamChunk, Utf8Array}; + use risingwave_pb::connector_service::sink_response::{ + Response, StartEpochResponse, SyncResponse, WriteResponse, + }; + use risingwave_pb::connector_service::sink_stream_request::write_batch::Payload; + use risingwave_pb::connector_service::sink_stream_request::Request; + use risingwave_pb::connector_service::{SinkResponse, SinkStreamRequest}; + use risingwave_pb::data; + use tokio::sync::mpsc; + + use crate::sink::remote::RemoteSink; + use crate::sink::Sink; + + #[tokio::test] + async fn test_epoch_check() { + let (request_sender, mut request_recv) = mpsc::unbounded_channel(); + let (_, resp_recv) = mpsc::unbounded_channel(); + + let mut sink = RemoteSink::for_test(resp_recv, request_sender); + let chunk = StreamChunk::new( + vec![Op::Insert], + vec![ + Column::new(Arc::new(ArrayImpl::from(array!(I32Array, [Some(1)])))), + Column::new(Arc::new(ArrayImpl::from(array!( + Utf8Array, + [Some("Ripper")] + )))), + ], + None, + ); + + // test epoch check + assert!( + tokio::time::timeout(Duration::from_secs(10), sink.commit()) + .await + .expect("test failed: should not commit without epoch") + .is_err(), + "test failed: no epoch check for commit()" + ); + assert!( + request_recv.try_recv().is_err(), + "test failed: unchecked epoch before request" + ); + + assert!( + tokio::time::timeout(Duration::from_secs(1), sink.write_batch(chunk)) + .await + .expect("test failed: should not write without epoch") + .is_err(), + "test failed: no epoch check for write_batch()" + ); + assert!( + request_recv.try_recv().is_err(), + "test failed: unchecked epoch before request" + ); + } + + #[tokio::test] + async fn test_remote_sink() { + let (request_sender, mut request_receiver) = mpsc::unbounded_channel(); + let (response_sender, response_receiver) = mpsc::unbounded_channel(); + let mut sink = RemoteSink::for_test(response_receiver, request_sender); + + let chunk_a = StreamChunk::new( + vec![Op::Insert, Op::Insert, Op::Insert], + vec![ + Column::new(Arc::new(ArrayImpl::from(array!( + I32Array, + [Some(1), Some(2), Some(3)] + )))), + Column::new(Arc::new(ArrayImpl::from(array!( + Utf8Array, + [Some("Alice"), Some("Bob"), Some("Clare")] + )))), + ], + None, + ); + + let chunk_b = StreamChunk::new( + vec![Op::Insert, Op::Insert, Op::Insert], + vec![ + Column::new(Arc::new(ArrayImpl::from(array!( + I32Array, + [Some(4), Some(5), Some(6)] + )))), + Column::new(Arc::new(ArrayImpl::from(array!( + Utf8Array, + [Some("David"), Some("Eve"), Some("Frank")] + )))), + ], + None, + ); + + // test write batch + response_sender + .send(SinkResponse { + response: Some(Response::StartEpoch(StartEpochResponse { epoch: 2022 })), + }) + .expect("test failed: failed to start epoch"); + sink.begin_epoch(2022).await.unwrap(); + assert_eq!(sink.epoch, Some(2022)); + + request_receiver + .recv() + .await + .expect("test failed: failed to construct start_epoch request"); + + response_sender + .send(SinkResponse { + response: Some(Response::Write(WriteResponse { + epoch: 2022, + batch_id: 0, + })), + }) + .expect("test failed: failed to start epoch"); + sink.write_batch(chunk_a.clone()).await.unwrap(); + assert_eq!(sink.epoch, Some(2022)); + assert_eq!(sink.batch_id, 1); + match request_receiver.recv().await { + Some(SinkStreamRequest { + request: Some(Request::Write(write)), + }) => { + assert_eq!(write.epoch, 2022); + assert_eq!(write.batch_id, 0); + match write.payload.unwrap() { + Payload::JsonPayload(json) => { + let row_0 = json.row_ops.get(0).unwrap(); + assert_eq!(row_0.line, "{\"id\":1,\"name\":\"Alice\"}"); + assert_eq!(row_0.op_type, data::Op::Insert as i32); + let row_1 = json.row_ops.get(1).unwrap(); + assert_eq!(row_1.line, "{\"id\":2,\"name\":\"Bob\"}"); + assert_eq!(row_1.op_type, data::Op::Insert as i32); + let row_2 = json.row_ops.get(2).unwrap(); + assert_eq!(row_2.line, "{\"id\":3,\"name\":\"Clare\"}"); + assert_eq!(row_2.op_type, data::Op::Insert as i32); + } + } + } + _ => panic!("test failed: failed to construct write request"), + } + + // test commit + response_sender + .send(SinkResponse { + response: Some(Response::Sync(SyncResponse { epoch: 2022 })), + }) + .expect("test failed: failed to sync epoch"); + sink.commit().await.unwrap(); + let commit_request = request_receiver.recv().await.unwrap(); + match commit_request.request { + Some(Request::Sync(sync_batch)) => { + assert_eq!(sync_batch.epoch, 2022); + } + _ => panic!("test failed: failed to construct sync request "), + } + + // begin another epoch + response_sender + .send(SinkResponse { + response: Some(Response::StartEpoch(StartEpochResponse { epoch: 2023 })), + }) + .expect("test failed: failed to start epoch"); + sink.begin_epoch(2023).await.unwrap(); + // simply keep the channel empty since we've tested begin_epoch + let _ = request_receiver.recv().await.unwrap(); + assert_eq!(sink.epoch, Some(2023)); + + // test another write + response_sender + .send(SinkResponse { + response: Some(Response::Write(WriteResponse { + epoch: 2022, + batch_id: 1, + })), + }) + .expect("test failed: failed to start epoch"); + sink.write_batch(chunk_b.clone()).await.unwrap(); + assert_eq!(sink.epoch, Some(2023)); + assert_eq!(sink.batch_id, 2); + match request_receiver.recv().await { + Some(SinkStreamRequest { + request: Some(Request::Write(write)), + }) => { + assert_eq!(write.epoch, 2023); + assert_eq!(write.batch_id, 1); + match write.payload.unwrap() { + Payload::JsonPayload(json) => { + let row_0 = json.row_ops.get(0).unwrap(); + assert_eq!(row_0.line, "{\"id\":4,\"name\":\"David\"}"); + assert_eq!(row_0.op_type, data::Op::Insert as i32); + let row_1 = json.row_ops.get(1).unwrap(); + assert_eq!(row_1.line, "{\"id\":5,\"name\":\"Eve\"}"); + assert_eq!(row_1.op_type, data::Op::Insert as i32); + let row_2 = json.row_ops.get(2).unwrap(); + assert_eq!(row_2.line, "{\"id\":6,\"name\":\"Frank\"}"); + assert_eq!(row_2.op_type, data::Op::Insert as i32); + } + } + } + _ => panic!("test failed: failed to construct write request"), + } + } +} diff --git a/src/prost/build.rs b/src/prost/build.rs index 83be6521ebf62..3745e66276be3 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -30,6 +30,7 @@ fn main() -> Result<(), Box> { "meta", "batch_plan", "task_service", + "connector_service", "stream_plan", "stream_service", "compactor", diff --git a/src/prost/src/lib.rs b/src/prost/src/lib.rs index ce0d0d9ae70fb..091b571103cd4 100644 --- a/src/prost/src/lib.rs +++ b/src/prost/src/lib.rs @@ -48,6 +48,9 @@ pub mod batch_plan; #[cfg_attr(madsim, path = "sim/task_service.rs")] pub mod task_service; #[rustfmt::skip] +#[cfg_attr(madsim, path="sim/connector_service.rs")] +pub mod connector_service; +#[rustfmt::skip] #[cfg_attr(madsim, path = "sim/stream_plan.rs")] pub mod stream_plan; #[rustfmt::skip] diff --git a/src/rpc_client/src/lib.rs b/src/rpc_client/src/lib.rs index e622c67b362f0..a66ad9fb2c8d7 100644 --- a/src/rpc_client/src/lib.rs +++ b/src/rpc_client/src/lib.rs @@ -46,6 +46,7 @@ mod cdc_client; mod compute_client; mod hummock_meta_client; mod meta_client; +// mod sink_client; mod stream_client; pub use cdc_client::CdcClient; diff --git a/src/source/src/connector_source.rs b/src/source/src/connector_source.rs index 067d49e31bc22..fc873456b9a87 100644 --- a/src/source/src/connector_source.rs +++ b/src/source/src/connector_source.rs @@ -28,6 +28,7 @@ use risingwave_connector::source::{ Column, ConnectorProperties, ConnectorState, SourceMessage, SplitId, SplitMetaData, SplitReaderImpl, }; +use risingwave_connector::ConnectorParams; use risingwave_pb::catalog::{ ColumnIndex as ProstColumnIndex, StreamSourceInfo as ProstStreamSourceInfo, }; @@ -193,16 +194,14 @@ impl ConnectorSource { proto_message_name: String, properties: HashMap, columns: Vec, - connector_node_addr: String, + connector_node_addr: Option, connector_message_buffer_size: usize, ) -> Result { // Store the connector node address to properties for later use. let mut source_props: HashMap = HashMap::from_iter(properties.clone().into_iter()); - source_props.insert( - "connector_node_addr".to_string(), - connector_node_addr.clone(), - ); + connector_node_addr + .map(|addr| source_props.insert("connector_node_addr".to_string(), addr)); let config = ConnectorProperties::extract(source_props).map_err(|e| ConnectorError(e.into()))?; let parser = SourceParserImpl::create( @@ -298,7 +297,7 @@ pub struct SourceDescBuilderV2 { pk_column_ids: Vec, properties: HashMap, source_info: ProstStreamSourceInfo, - connector_node_addr: String, + connector_params: ConnectorParams, connector_message_buffer_size: usize, } @@ -311,7 +310,7 @@ impl SourceDescBuilderV2 { pk_column_ids: Vec, properties: HashMap, source_info: ProstStreamSourceInfo, - connector_node_addr: String, + connector_params: ConnectorParams, connector_message_buffer_size: usize, ) -> Self { Self { @@ -321,7 +320,7 @@ impl SourceDescBuilderV2 { pk_column_ids, properties, source_info, - connector_node_addr, + connector_params, connector_message_buffer_size, } } @@ -361,7 +360,7 @@ impl SourceDescBuilderV2 { self.source_info.proto_message_name, self.properties, columns.clone(), - self.connector_node_addr, + self.connector_params.connector_source_endpoint, self.connector_message_buffer_size, ) .await?; @@ -418,7 +417,7 @@ pub mod test_utils { pk_column_ids, properties, source_info, - connector_node_addr: "127.0.0.1:60061".to_string(), + connector_params: Default::default(), connector_message_buffer_size: DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE, } } diff --git a/src/source/src/manager.rs b/src/source/src/manager.rs index ace9b0af5efe1..f875d73700d5c 100644 --- a/src/source/src/manager.rs +++ b/src/source/src/manager.rs @@ -24,6 +24,7 @@ use risingwave_common::error::{Result, RwError}; use risingwave_common::try_match_expand; use risingwave_common::types::DataType; use risingwave_connector::source::ConnectorProperties; +use risingwave_connector::ConnectorParams; use risingwave_pb::catalog::source_info::SourceInfo as ProstSourceInfo; use risingwave_pb::catalog::ColumnIndex as ProstColumnIndex; use risingwave_pb::plan_common::{ColumnCatalog as ProstColumnCatalog, RowFormatType}; @@ -204,7 +205,7 @@ pub struct SourceDescBuilder { properties: HashMap, info: ProstSourceInfo, source_manager: TableSourceManagerRef, - connector_node_addr: String, + connector_params: ConnectorParams, } impl SourceDescBuilder { @@ -217,7 +218,7 @@ impl SourceDescBuilder { properties: HashMap, info: ProstSourceInfo, source_manager: TableSourceManagerRef, - connector_node_addr: String, + connector_params: ConnectorParams, ) -> Self { Self { source_id, @@ -227,7 +228,7 @@ impl SourceDescBuilder { properties, info, source_manager, - connector_node_addr, + connector_params, } } @@ -295,10 +296,10 @@ impl SourceDescBuilder { // store the connector node address to properties for later use let mut source_props: HashMap = HashMap::from_iter(self.properties.clone().into_iter()); - source_props.insert( - "connector_node_addr".to_string(), - self.connector_node_addr.clone(), - ); + self.connector_params + .connector_source_endpoint + .as_ref() + .map(|addr| source_props.insert("connector_node_addr".to_string(), addr.clone())); let config = ConnectorProperties::extract(source_props) .map_err(|e| RwError::from(ConnectorError(e.into())))?; @@ -322,6 +323,7 @@ impl SourceDescBuilder { pub mod test_utils { use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId}; + use risingwave_connector::ConnectorParams; use risingwave_pb::catalog::source_info::SourceInfo as ProstSourceInfo; use risingwave_pb::catalog::{ColumnIndex, TableSourceInfo}; use risingwave_pb::plan_common::ColumnCatalog; @@ -363,7 +365,10 @@ pub mod test_utils { properties: Default::default(), info, source_manager, - connector_node_addr: "127.0.0.1:60061".to_string(), + connector_params: ConnectorParams { + connector_source_endpoint: None, + connector_sink_endpoint: None, + }, } } } diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index ae6124e904289..8560dcd31eb08 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use core::default::Default; use std::collections::HashMap; use std::sync::Arc; use std::time::Instant; @@ -21,6 +20,7 @@ use futures::StreamExt; use futures_async_stream::try_stream; use risingwave_common::catalog::Schema; use risingwave_connector::sink::{Sink, SinkConfig, SinkImpl}; +use risingwave_connector::ConnectorParams; use risingwave_storage::StateStore; use super::error::{StreamExecutorError, StreamExecutorResult}; @@ -34,11 +34,19 @@ pub struct SinkExecutor { metrics: Arc, properties: HashMap, identity: String, + connector_params: ConnectorParams, pk_indices: PkIndices, } -async fn build_sink(config: SinkConfig, schema: Schema) -> StreamExecutorResult> { - Ok(Box::new(SinkImpl::new(config, schema).await?)) +async fn build_sink( + config: SinkConfig, + schema: Schema, + pk_indices: PkIndices, + connector_params: ConnectorParams, +) -> StreamExecutorResult> { + Ok(Box::new( + SinkImpl::new(config, schema, pk_indices, connector_params).await?, + )) } impl SinkExecutor { @@ -48,17 +56,20 @@ impl SinkExecutor { metrics: Arc, mut properties: HashMap, executor_id: u64, + connector_params: ConnectorParams, ) -> Self { // This field can be used to distinguish a specific actor in parallelism to prevent // transaction execution errors properties.insert("identifier".to_string(), format!("sink-{:?}", executor_id)); + let pk_indices = materialize_executor.pk_indices().to_vec(); Self { input: materialize_executor, _store, metrics, properties, identity: format!("SinkExecutor_{:?}", executor_id), - pk_indices: Default::default(), // todo + pk_indices, + connector_params, } } @@ -72,7 +83,13 @@ impl SinkExecutor { let schema = self.schema().clone(); let sink_config = SinkConfig::from_hashmap(self.properties.clone())?; - let mut sink = build_sink(sink_config.clone(), schema).await?; + let mut sink = build_sink( + sink_config.clone(), + schema, + self.pk_indices, + self.connector_params, + ) + .await?; // prepare the external sink before writing if needed. if sink.needs_preparation() { @@ -201,6 +218,7 @@ mod test { Arc::new(StreamingMetrics::unused()), properties, 0, + Default::default(), ); let mut executor = SinkExecutor::execute(Box::new(sink_executor)); diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index 37abbe443db30..c7fc7b6744db6 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -45,6 +45,7 @@ impl ExecutorBuilder for SinkExecutorBuilder { stream.streaming_metrics.clone(), node.properties.clone(), params.executor_id, + params.env.connector_params(), ))) } } diff --git a/src/stream/src/from_proto/source.rs b/src/stream/src/from_proto/source.rs index a472f6935068a..e3170262483a6 100644 --- a/src/stream/src/from_proto/source.rs +++ b/src/stream/src/from_proto/source.rs @@ -51,7 +51,7 @@ impl ExecutorBuilder for SourceExecutorBuilder { node.properties.clone(), node.get_info()?.get_source_info()?.clone(), params.env.source_manager_ref(), - params.env.connector_source_endpoint(), + params.env.connector_params(), ); let columns = node.columns.clone(); diff --git a/src/stream/src/task/env.rs b/src/stream/src/task/env.rs index 5b105e7684924..cfe62014a4b54 100644 --- a/src/stream/src/task/env.rs +++ b/src/stream/src/task/env.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use risingwave_common::config::StreamingConfig; use risingwave_common::util::addr::HostAddr; +use risingwave_connector::ConnectorParams; use risingwave_source::dml_manager::DmlManagerRef; use risingwave_source::{TableSourceManager, TableSourceManagerRef}; use risingwave_storage::StateStoreImpl; @@ -29,8 +30,8 @@ pub struct StreamEnvironment { /// Endpoint the stream manager listens on. server_addr: HostAddr, - /// Endpoint of the source connector node - connector_source_endpoint: String, + /// Parameters used by connector nodes. + connector_params: ConnectorParams, /// Reference to the source manager. source_manager: TableSourceManagerRef, @@ -52,7 +53,7 @@ impl StreamEnvironment { pub fn new( source_manager: TableSourceManagerRef, server_addr: HostAddr, - connector_source_endpoint: String, + connector_params: ConnectorParams, config: Arc, worker_id: WorkerNodeId, state_store: StateStoreImpl, @@ -60,7 +61,7 @@ impl StreamEnvironment { ) -> Self { StreamEnvironment { server_addr, - connector_source_endpoint, + connector_params, source_manager, config, worker_id, @@ -76,7 +77,7 @@ impl StreamEnvironment { use risingwave_storage::monitor::StateStoreMetrics; StreamEnvironment { server_addr: "127.0.0.1:5688".parse().unwrap(), - connector_source_endpoint: "127.0.0.1:60061".parse().unwrap(), + connector_params: ConnectorParams::new(None, None), source_manager: Arc::new(TableSourceManager::default()), config: Arc::new(StreamingConfig::default()), worker_id: WorkerNodeId::default(), @@ -111,8 +112,8 @@ impl StreamEnvironment { self.state_store.clone() } - pub fn connector_source_endpoint(&self) -> String { - self.connector_source_endpoint.clone() + pub fn connector_params(&self) -> ConnectorParams { + self.connector_params.clone() } pub fn dml_manager_ref(&self) -> DmlManagerRef {