Skip to content

Commit

Permalink
feat(connector): add connector remote sink (risingwavelabs#6493)
Browse files Browse the repository at this point in the history
* add remote sink

* pass connector_sink_endpoint thru CN opts instead

* refactor remote sink

* refactor connector params

* fix source ci

* Optionize source endpoint

* fix proto lint

* fix naming style linting

* fix naming style linting

* fix sinkerror parsing

Co-authored-by: William Wen <[email protected]>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Nov 28, 2022
1 parent e6c9116 commit 3474214
Show file tree
Hide file tree
Showing 18 changed files with 759 additions and 37 deletions.
3 changes: 3 additions & 0 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ set -euo pipefail

source ci/scripts/common.env.sh

# prepare environment
export CONNECTOR_SOURCE_ENDPOINT="localhost:60061"

while getopts 'p:' opt; do
case ${opt} in
p )
Expand Down
1 change: 0 additions & 1 deletion ci/scripts/misc-check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,3 @@ echo "--- Check protobuf code format && Lint protobuf"
cd proto
buf format -d --exit-code
buf lint

88 changes: 88 additions & 0 deletions proto/connector_service.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
syntax = "proto3";

package connector_service;

import "data.proto";

option java_outer_classname = "ConnectorServiceProto";
option java_package = "com.risingwave.proto";

message SinkConfig {
message TableSchema {
message Column {
string name = 1;
data.DataType.TypeName data_type = 2;
}
repeated Column columns = 1;
repeated uint32 pk_indices = 2;
}
string sink_type = 1;
map<string, string> properties = 2;
TableSchema table_schema = 3;
}

message SinkStreamRequest {
message StartSink {
SinkConfig sink_config = 1;
}

message WriteBatch {
message JsonPayload {
message RowOp {
data.Op op_type = 1;
string line = 2;
}
repeated RowOp row_ops = 1;
}

oneof payload {
JsonPayload json_payload = 1;
}

uint64 batch_id = 3;
uint64 epoch = 4;
}

message StartEpoch {
uint64 epoch = 1;
}

message SyncBatch {
uint64 epoch = 1;
}

oneof request {
StartSink start = 1;
StartEpoch start_epoch = 2;
WriteBatch write = 3;
SyncBatch sync = 4;
}
}

message SinkResponse {
message SyncResponse {
uint64 epoch = 1;
}

message StartEpochResponse {
uint64 epoch = 1;
}

message WriteResponse {
uint64 epoch = 1;
uint64 batch_id = 2;
}

message StartResponse {}

oneof response {
SyncResponse sync = 2;
StartEpochResponse start_epoch = 3;
WriteResponse write = 4;
StartResponse start = 5;
}
}

service ConnectorService {
rpc SinkStream(stream SinkStreamRequest) returns (stream SinkResponse);
}
13 changes: 13 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,19 @@ impl Default for StorageConfig {
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ConnectorConfig {
#[serde(default)]
pub connector_addr: Option<String>,
}

impl Default for ConnectorConfig {
fn default() -> Self {
toml::from_str("").unwrap()
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct FileCacheConfig {
Expand Down
8 changes: 6 additions & 2 deletions src/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,12 @@ pub struct ComputeNodeOpts {
pub file_cache_dir: String,

/// Endpoint of the connector node
#[clap(long, default_value = "127.0.0.1:60061")]
pub connector_source_endpoint: String,
#[clap(long, env = "CONNECTOR_SOURCE_ENDPOINT")]
pub connector_source_endpoint: Option<String>,

/// Endpoint of connector sink node
#[clap(long, env = "CONNECTOR_SINK_ENDPOINT")]
pub connector_sink_endpoint: Option<String>,
}

use std::future::Future;
Expand Down
6 changes: 5 additions & 1 deletion src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,15 @@ pub async fn compute_node_serve(
dml_mgr.clone(),
);

let connector_params = risingwave_connector::ConnectorParams {
connector_source_endpoint: opts.connector_source_endpoint,
connector_sink_endpoint: opts.connector_sink_endpoint,
};
// Initialize the streaming environment.
let stream_env = StreamEnvironment::new(
source_mgr,
client_addr.clone(),
opts.connector_source_endpoint,
connector_params,
stream_config,
worker_id,
state_store,
Expand Down
18 changes: 18 additions & 0 deletions src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,21 @@ pub mod error;
mod macros;
pub mod sink;
pub mod source;

#[derive(Clone, Debug, Default)]
pub struct ConnectorParams {
pub connector_source_endpoint: Option<String>,
pub connector_sink_endpoint: Option<String>,
}

impl ConnectorParams {
pub fn new(
connector_source_endpoint: Option<String>,
connector_sink_endpoint: Option<String>,
) -> Self {
Self {
connector_source_endpoint,
connector_sink_endpoint,
}
}
}
33 changes: 31 additions & 2 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod console;
pub mod kafka;
pub mod mysql;
pub mod redis;
pub mod remote;

use std::collections::HashMap;

Expand All @@ -24,6 +25,7 @@ use enum_as_inner::EnumAsInner;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::error::{ErrorCode, RwError};
use risingwave_rpc_client::error::RpcError;
use serde::{Deserialize, Serialize};
use thiserror::Error;
pub use tracing;
Expand All @@ -32,6 +34,8 @@ use crate::sink::console::{ConsoleConfig, ConsoleSink, CONSOLE_SINK};
use crate::sink::kafka::{KafkaConfig, KafkaSink, KAFKA_SINK};
pub use crate::sink::mysql::{MySqlConfig, MySqlSink, MYSQL_SINK};
use crate::sink::redis::{RedisConfig, RedisSink};
use crate::sink::remote::{RemoteConfig, RemoteSink};
use crate::ConnectorParams;

#[async_trait]
pub trait Sink {
Expand All @@ -54,6 +58,7 @@ pub enum SinkConfig {
Mysql(MySqlConfig),
Redis(RedisConfig),
Kafka(KafkaConfig),
Remote(RemoteConfig),
Console(ConsoleConfig),
}

Expand All @@ -63,6 +68,7 @@ pub enum SinkState {
Mysql,
Redis,
Console,
Remote,
}

impl SinkConfig {
Expand All @@ -77,7 +83,7 @@ impl SinkConfig {
CONSOLE_SINK => Ok(SinkConfig::Console(ConsoleConfig::from_hashmap(
properties,
)?)),
_ => unimplemented!(),
_ => Ok(SinkConfig::Remote(RemoteConfig::from_hashmap(properties)?)),
}
}

Expand All @@ -86,6 +92,7 @@ impl SinkConfig {
SinkConfig::Mysql(_) => "mysql",
SinkConfig::Kafka(_) => "kafka",
SinkConfig::Redis(_) => "redis",
SinkConfig::Remote(_) => "remote",
SinkConfig::Console(_) => "console",
}
}
Expand All @@ -96,16 +103,25 @@ pub enum SinkImpl {
MySql(Box<MySqlSink>),
Redis(Box<RedisSink>),
Kafka(Box<KafkaSink>),
Remote(Box<RemoteSink>),
Console(Box<ConsoleSink>),
}

impl SinkImpl {
pub async fn new(cfg: SinkConfig, schema: Schema) -> Result<Self> {
pub async fn new(
cfg: SinkConfig,
schema: Schema,
pk_indices: Vec<usize>,
connector_params: ConnectorParams,
) -> Result<Self> {
Ok(match cfg {
SinkConfig::Mysql(cfg) => SinkImpl::MySql(Box::new(MySqlSink::new(cfg, schema).await?)),
SinkConfig::Redis(cfg) => SinkImpl::Redis(Box::new(RedisSink::new(cfg, schema)?)),
SinkConfig::Kafka(cfg) => SinkImpl::Kafka(Box::new(KafkaSink::new(cfg, schema).await?)),
SinkConfig::Console(cfg) => SinkImpl::Console(Box::new(ConsoleSink::new(cfg, schema)?)),
SinkConfig::Remote(cfg) => SinkImpl::Remote(Box::new(
RemoteSink::new(cfg, schema, pk_indices, connector_params).await?,
)),
})
}

Expand All @@ -114,6 +130,7 @@ impl SinkImpl {
SinkImpl::MySql(_) => true,
SinkImpl::Redis(_) => false,
SinkImpl::Kafka(_) => false,
SinkImpl::Remote(_) => false,
SinkImpl::Console(_) => false,
}
}
Expand All @@ -133,6 +150,7 @@ impl Sink for SinkImpl {
SinkImpl::MySql(sink) => sink.write_batch(chunk).await,
SinkImpl::Redis(sink) => sink.write_batch(chunk).await,
SinkImpl::Kafka(sink) => sink.write_batch(chunk).await,
SinkImpl::Remote(sink) => sink.write_batch(chunk).await,
SinkImpl::Console(sink) => sink.write_batch(chunk).await,
}
}
Expand All @@ -142,6 +160,7 @@ impl Sink for SinkImpl {
SinkImpl::MySql(sink) => sink.begin_epoch(epoch).await,
SinkImpl::Redis(sink) => sink.begin_epoch(epoch).await,
SinkImpl::Kafka(sink) => sink.begin_epoch(epoch).await,
SinkImpl::Remote(sink) => sink.begin_epoch(epoch).await,
SinkImpl::Console(sink) => sink.begin_epoch(epoch).await,
}
}
Expand All @@ -151,6 +170,7 @@ impl Sink for SinkImpl {
SinkImpl::MySql(sink) => sink.commit().await,
SinkImpl::Redis(sink) => sink.commit().await,
SinkImpl::Kafka(sink) => sink.commit().await,
SinkImpl::Remote(sink) => sink.commit().await,
SinkImpl::Console(sink) => sink.commit().await,
}
}
Expand All @@ -160,6 +180,7 @@ impl Sink for SinkImpl {
SinkImpl::MySql(sink) => sink.abort().await,
SinkImpl::Redis(sink) => sink.abort().await,
SinkImpl::Kafka(sink) => sink.abort().await,
SinkImpl::Remote(sink) => sink.abort().await,
SinkImpl::Console(sink) => sink.abort().await,
}
}
Expand All @@ -175,12 +196,20 @@ pub enum SinkError {
MySqlInner(#[from] mysql_async::Error),
#[error("Kafka error: {0}")]
Kafka(#[from] rdkafka::error::KafkaError),
#[error("Remote sink error: {0}")]
Remote(String),
#[error("Json parse error: {0}")]
JsonParse(String),
#[error("config error: {0}")]
Config(String),
}

impl From<RpcError> for SinkError {
fn from(value: RpcError) -> Self {
SinkError::Remote(format!("{:?}", value))
}
}

impl From<SinkError> for RwError {
fn from(e: SinkError) -> Self {
ErrorCode::SinkError(Box::new(e)).into()
Expand Down
Loading

0 comments on commit 3474214

Please sign in to comment.