diff --git a/Cargo.lock b/Cargo.lock index 82309c8406572..19716c39bc67c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7309,7 +7309,6 @@ dependencies = [ "regex", "reqwest", "risingwave_common", - "risingwave_connector_common", "risingwave_jni_core", "risingwave_pb", "risingwave_rpc_client", @@ -7336,89 +7335,6 @@ dependencies = [ "workspace-hack", ] -[[package]] -name = "risingwave_connector_common" -version = "1.3.0-alpha" -dependencies = [ - "anyhow", - "apache-avro 0.15.0 (git+https://github.com/risingwavelabs/avro?branch=idx0dev/resolved_schema)", - "arrow-array", - "arrow-schema", - "async-nats", - "async-trait", - "auto_enums", - "auto_impl", - "aws-config", - "aws-credential-types", - "aws-sdk-kinesis", - "aws-sdk-s3", - "aws-smithy-http", - "aws-types", - "base64 0.21.4", - "byteorder", - "bytes", - "chrono", - "clickhouse", - "csv", - "duration-str", - "easy-ext", - "enum-as-inner", - "futures", - "futures-async-stream", - "glob", - "google-cloud-pubsub", - "http", - "hyper", - "hyper-tls", - "icelake", - "indexmap 1.9.3", - "itertools 0.11.0", - "jni", - "jsonschema-transpiler", - "madsim-rdkafka", - "madsim-tokio", - "maplit", - "moka", - "mysql_async", - "mysql_common", - "nexmark", - "num-bigint", - "parking_lot 0.12.1", - "paste", - "prometheus", - "prost 0.12.1", - "prost-build 0.12.1", - "prost-reflect", - "prost-types 0.12.1", - "protobuf-native", - "pulsar", - "redis", - "regex", - "reqwest", - "risingwave_common", - "risingwave_pb", - "risingwave_rpc_client", - "rust_decimal", - "serde", - "serde_derive", - "serde_json", - "serde_with 3.4.0", - "simd-json", - "strum 0.25.0", - "strum_macros 0.25.2", - "tempfile", - "thiserror", - "time", - "tokio-retry", - "tokio-stream", - "tokio-util", - "tonic 0.9.2", - "tracing", - "tracing-futures", - "url", - "urlencoding", -] - [[package]] name = "risingwave_ctl" version = "1.3.0-alpha" @@ -8082,9 +7998,11 @@ dependencies = [ "protobuf-native", "pulsar", "rand", + "redis", + "regex", "reqwest", "risingwave_common", - "risingwave_connector_common", + "risingwave_connector", "risingwave_jni_core", "risingwave_pb", "risingwave_rpc_client", diff --git a/Cargo.toml b/Cargo.toml index 8682690394bb7..7963f4561a7bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,6 @@ members = [ "src/common/heap_profiling", "src/compute", "src/connector", - "src/connector/connector_common", "src/connector/sink_impl", "src/ctl", "src/expr/core", @@ -137,7 +136,6 @@ risingwave_compactor = { path = "./src/storage/compactor" } risingwave_compute = { path = "./src/compute" } risingwave_ctl = { path = "./src/ctl" } risingwave_connector = { path = "./src/connector" } -risingwave_connector_common = { path = "./src/connector/connector_common" } risingwave_expr = { path = "./src/expr/core" } risingwave_expr_impl = { path = "./src/expr/impl" } risingwave_frontend = { path = "./src/frontend" } diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 7f117837988f5..4886b1b52fcc5 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -100,7 +100,6 @@ redis = { version = "0.23.3", features = ["aio","tokio-comp","async-std-comp"] } regex = "1.4" reqwest = { version = "0.11", features = ["json"] } risingwave_common = { workspace = true } -risingwave_connector_common = { workspace = true } risingwave_jni_core = { workspace = true } risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } diff --git a/src/connector/build.rs b/src/connector/build.rs index 9d4d448753bdf..3ace772d46039 100644 --- a/src/connector/build.rs +++ b/src/connector/build.rs @@ -23,7 +23,7 @@ fn main() { .map(|f| format!("{}/{}.proto", proto_dir, f)) .collect(); prost_build::Config::new() - .out_dir("./connector_common/src/parser/protobuf") + .out_dir("./src/parser/protobuf") .compile_protos(&protos, &Vec::::new()) .unwrap(); } diff --git a/src/connector/connector_common/Cargo.toml b/src/connector/connector_common/Cargo.toml deleted file mode 100644 index 93bbdc3da953b..0000000000000 --- a/src/connector/connector_common/Cargo.toml +++ /dev/null @@ -1,221 +0,0 @@ -[package] -name = "risingwave_connector_common" -version = { workspace = true } -edition = { workspace = true } -homepage = { workspace = true } -keywords = { workspace = true } -license = { workspace = true } -repository = { workspace = true } - -[package.metadata.cargo-machete] -ignored = ["workspace-hack"] - -[package.metadata.cargo-udeps.ignore] -normal = ["workspace-hack"] - -[dependencies] -anyhow = "1" -apache-avro = { git = "https://github.com/risingwavelabs/avro", branch = "idx0dev/resolved_schema", features = [ - "snappy", - "zstandard", - "bzip", - "xz", -] } -arrow-array = { workspace = true } -arrow-schema = { workspace = true } -async-nats = "0.32" -async-trait = "0.1" -auto_enums = { version = "0.8", features = ["futures03"] } -auto_impl = "1" -aws-config = { workspace = true } -aws-credential-types = { workspace = true } -aws-sdk-kinesis = { workspace = true } -aws-sdk-s3 = { workspace = true } -aws-smithy-http = { workspace = true } -aws-types = { workspace = true } -base64 = "0.21" -byteorder = "1" -bytes = { version = "1", features = ["serde"] } -chrono = { version = "0.4", default-features = false, features = [ - "clock", - "std", -] } -clickhouse = { git = "https://github.com/risingwavelabs/clickhouse.rs", rev = "622501c1c98c80baaf578c716d6903dde947804e", features = [ - "time", -] } -csv = "1.3" -duration-str = "0.7.0" -easy-ext = "1" -enum-as-inner = "0.6" -futures = { version = "0.3", default-features = false, features = ["alloc"] } -futures-async-stream = { workspace = true } -glob = "0.3" -google-cloud-pubsub = "0.20" -http = "0.2" -hyper = { version = "0.14", features = [ - "client", - "tcp", - "http1", - "http2", - "stream", -] } -hyper-tls = "0.5" -icelake = { workspace = true } -indexmap = { version = "1.9.3", features = ["serde"] } -itertools = "0.11" -jni = { version = "0.21.1", features = ["invocation"] } -jst = { package = 'jsonschema-transpiler', git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" } -maplit = "1.0.2" -moka = { version = "0.12", features = ["future"] } -mysql_async = { version = "0.32", default-features = false, features = [ - "default", -] } -mysql_common = { version = "0.30", default-features = false, features = [ - "chrono", -] } -nexmark = { version = "0.2", features = ["serde"] } -num-bigint = "0.4" -parking_lot = "0.12" -paste = "1" -prometheus = { version = "0.13", features = ["process"] } -prost = { version = "0.12", features = ["no-recursion-limit"] } -prost-reflect = "0.12" -prost-types = "0.12" -protobuf-native = "0.2.1" -pulsar = { version = "6.0", default-features = false, features = [ - "tokio-runtime", - "telemetry", - "auth-oauth2", -] } -rdkafka = { workspace = true, features = [ - "cmake-build", - # "ssl", - # FIXME: temporary workaround before we find an ideal solution. - # See why it's needed and why it's not ideal in https://github.com/risingwavelabs/risingwave/issues/9852 - "ssl-vendored", - "gssapi", - "zstd", -] } -redis = { version = "0.23.3", features = ["aio","tokio-comp","async-std-comp"] } -regex = "1.4" -reqwest = { version = "0.11", features = ["json"] } -risingwave_common = { workspace = true } -risingwave_pb = { workspace = true } -risingwave_rpc_client = { workspace = true } -rust_decimal = "1" -serde = { version = "1", features = ["derive", "rc"] } -serde_derive = "1" -serde_json = "1" -serde_with = { version = "3", features = ["json"] } -simd-json = "0.12.0" -strum = "0.25" -strum_macros = "0.25" -tempfile = "3" -thiserror = "1" -time = "0.3.28" -tokio = { version = "0.2", package = "madsim-tokio", features = [ - "rt", - "rt-multi-thread", - "sync", - "macros", - "time", - "signal", - "fs", -] } -tokio-retry = "0.3" -tokio-stream = "0.1" -tokio-util = { version = "0.7", features = ["codec", "io"] } -tonic_0_9 = { package = "tonic", version = "0.9" } -tracing = "0.1" -tracing-futures = { version = "0.2", features = ["futures-03"] } -url = "2" -urlencoding = "2" -#anyhow = "1" -#apache-avro = { git = "https://github.com/risingwavelabs/avro", branch = "idx0dev/resolved_schema", features = [ -# "snappy", -# "zstandard", -# "bzip", -# "xz", -#] } -#async-nats = "0.32" -#async-trait = "0.1" -#aws-config = { workspace = true } -#aws-credential-types = { workspace = true } -#aws-sdk-kinesis = { workspace = true } -#aws-sdk-s3 = { workspace = true } -#aws-smithy-http = { workspace = true } -#aws-types = { workspace = true } -#base64 = "0.21" -#byteorder = "1" -#bytes = { version = "1", features = ["serde"] } -#chrono = { version = "0.4", default-features = false, features = [ -# "clock", -# "std", -#] } -#clickhouse = { git = "https://github.com/risingwavelabs/clickhouse.rs", rev = "622501c1c98c80baaf578c716d6903dde947804e", features = [ -# "time", -#] } -#csv = "1.3" -#duration-str = "0.7.0" -#easy-ext = "1" -#futures = { version = "0.3", default-features = false, features = ["alloc"] } -#hyper = { version = "0.14", features = [ -# "client", -# "tcp", -# "http1", -# "http2", -# "stream", -#] } -#hyper-tls = "0.5" -#icelake = { workspace = true } -#indexmap = { version = "1.9.3", features = ["serde"] } -#itertools = "0.11" -#prometheus = { version = "0.13", features = ["process"] } -#prost = { version = "0.12", features = ["no-recursion-limit"] } -#prost-reflect = "0.12" -#prost-types = "0.12" -#pulsar = { version = "6.0", default-features = false, features = [ -# "tokio-runtime", -# "telemetry", -# "auth-oauth2", -#] } -#rdkafka = { workspace = true, features = [ -# "cmake-build", -# # "ssl", -# # FIXME: temporary workaround before we find an ideal solution. -# # See why it's needed and why it's not ideal in https://github.com/risingwavelabs/risingwave/issues/9852 -# "ssl-vendored", -# "gssapi", -# "zstd", -#] } -#redis = { version = "0.23.3", features = ["aio","tokio-comp","async-std-comp"] } -#reqwest = { version = "0.11", features = ["json"] } -#risingwave_common = { workspace = true } -#risingwave_pb = { workspace = true } -#risingwave_rpc_client = { workspace = true } -#rust_decimal = "1" -#serde = { version = "1", features = ["derive", "rc"] } -#serde_derive = "1" -#serde_json = "1" -#serde_with = { version = "3", features = ["json"] } -#simd-json = "0.12.0" -#tempfile = "3" -#thiserror = "1" -#time = "0.3.28" -#tokio = { version = "0.2", package = "madsim-tokio", features = [ -# "rt", -# "rt-multi-thread", -# "sync", -# "macros", -# "time", -# "signal", -# "fs", -#] } -#tracing = "0.1" -#url = "2" - -[build-dependencies] -prost-build = "0.12" - -[lints] -workspace = true diff --git a/src/connector/connector_common/src/kafka/mod.rs b/src/connector/connector_common/src/kafka/mod.rs deleted file mode 100644 index 571bdfda01fde..0000000000000 --- a/src/connector/connector_common/src/kafka/mod.rs +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright 2023 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -pub mod private_link; -pub mod stats; - -pub const KAFKA_PROPS_BROKER_KEY: &str = "properties.bootstrap.server"; -pub const KAFKA_PROPS_BROKER_KEY_ALIAS: &str = "kafka.brokers"; -pub const PRIVATELINK_CONNECTION: &str = "privatelink"; -pub const KAFKA_ISOLATION_LEVEL: &str = "read_committed"; diff --git a/src/connector/connector_common/src/lib.rs b/src/connector/connector_common/src/lib.rs deleted file mode 100644 index 62a3c3368e9da..0000000000000 --- a/src/connector/connector_common/src/lib.rs +++ /dev/null @@ -1,99 +0,0 @@ -// Copyright 2023 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#![feature(return_position_impl_trait_in_trait)] -#![feature(async_fn_in_trait)] -#![feature(lint_reasons)] -#![feature(let_chains)] -#![feature(result_option_inspect)] -#![feature(associated_type_defaults)] -#![feature(iter_from_generator)] -#![feature(generators)] -#![feature(iterator_try_collect)] -#![feature(box_into_inner)] -#![feature(type_alias_impl_trait)] -#![feature(if_let_guard)] - -pub mod aws_auth; -pub mod aws_utils; -pub mod common; -pub mod kafka; -pub mod parser; -pub mod schema; -pub mod sink; - -use std::time::Duration; - -use duration_str::parse_std; -use risingwave_pb::connector_service::SinkPayloadFormat; -use risingwave_rpc_client::ConnectorClient; -use serde::de; - -#[derive(Clone, Debug, Default)] -pub struct ConnectorParams { - pub connector_client: Option, - pub sink_payload_format: SinkPayloadFormat, -} - -impl ConnectorParams { - pub fn new( - connector_client: Option, - sink_payload_format: SinkPayloadFormat, - ) -> Self { - Self { - connector_client, - sink_payload_format, - } - } -} - -pub fn deserialize_u32_from_string<'de, D>(deserializer: D) -> Result -where - D: de::Deserializer<'de>, -{ - let s: String = de::Deserialize::deserialize(deserializer)?; - s.parse().map_err(|_| { - de::Error::invalid_value( - de::Unexpected::Str(&s), - &"integer greater than or equal to 0", - ) - }) -} - -pub fn deserialize_bool_from_string<'de, D>(deserializer: D) -> Result -where - D: de::Deserializer<'de>, -{ - let s: String = de::Deserialize::deserialize(deserializer)?; - let s = s.to_ascii_lowercase(); - match s.as_str() { - "true" => Ok(true), - "false" => Ok(false), - _ => Err(de::Error::invalid_value( - de::Unexpected::Str(&s), - &"true or false", - )), - } -} - -pub fn deserialize_duration_from_string<'de, D>(deserializer: D) -> Result -where - D: de::Deserializer<'de>, -{ - let s: String = de::Deserialize::deserialize(deserializer)?; - parse_std(&s).map_err(|_| de::Error::invalid_value( - de::Unexpected::Str(&s), - &"The String value unit support for one of:[“y”,“mon”,“w”,“d”,“h”,“m”,“s”, “ms”, “µs”, “ns”]", - )) -} diff --git a/src/connector/sink_impl/Cargo.toml b/src/connector/sink_impl/Cargo.toml index e801302ffe6a5..2b4869a1cda3d 100644 --- a/src/connector/sink_impl/Cargo.toml +++ b/src/connector/sink_impl/Cargo.toml @@ -99,9 +99,11 @@ rdkafka = { workspace = true, features = [ "gssapi", "zstd", ] } +redis = { version = "0.23.3", features = ["aio", "tokio-comp", "async-std-comp"] } +regex = "1.4" reqwest = { version = "0.11", features = ["json"] } risingwave_common = { workspace = true } -risingwave_connector_common = { workspace = true } +risingwave_connector = { workspace = true } risingwave_jni_core = { workspace = true } risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } diff --git a/src/connector/sink_impl/src/lib.rs b/src/connector/sink_impl/src/lib.rs index 5a5547f10bbe6..117831678601a 100644 --- a/src/connector/sink_impl/src/lib.rs +++ b/src/connector/sink_impl/src/lib.rs @@ -38,13 +38,12 @@ pub mod sink; use futures::future::BoxFuture; use futures::FutureExt; -use risingwave_connector_common::sink::boxed::BoxCoordinator; -use risingwave_connector_common::sink::catalog::SinkCatalog; -use risingwave_connector_common::sink::{Sink, SinkError, SinkParam}; -pub(crate) use risingwave_connector_common::*; +pub(crate) use risingwave_connector::*; use risingwave_pb::catalog::PbSink; -use crate::sink::build_sink; +use crate::sink::boxed::BoxCoordinator; +use crate::sink::catalog::SinkCatalog; +use crate::sink::{build_sink, Sink, SinkError, SinkParam}; #[export_name = "__exported_validate_sink"] pub fn validate_sink( diff --git a/src/connector/sink_impl/src/sink/doris_connector.rs b/src/connector/sink_impl/src/sink/doris_connector.rs index 61c96d302c772..116cd91d86542 100644 --- a/src/connector/sink_impl/src/sink/doris_connector.rs +++ b/src/connector/sink_impl/src/sink/doris_connector.rs @@ -356,33 +356,6 @@ impl DorisGet { Ok(schema) } } - -#[derive(Deserialize, Serialize, Debug, Clone)] -pub struct DorisCommon { - #[serde(rename = "doris.url")] - pub url: String, - #[serde(rename = "doris.user")] - pub user: String, - #[serde(rename = "doris.password")] - pub password: String, - #[serde(rename = "doris.database")] - pub database: String, - #[serde(rename = "doris.table")] - pub table: String, -} - -impl DorisCommon { - pub(crate) fn build_get_client(&self) -> DorisGet { - DorisGet::new( - self.url.clone(), - self.table.clone(), - self.database.clone(), - self.user.clone(), - self.password.clone(), - ) - } -} - #[derive(Debug, Serialize, Deserialize)] pub struct DorisSchema { status: i32, diff --git a/src/connector/sink_impl/src/sink/iceberg.rs b/src/connector/sink_impl/src/sink/iceberg.rs index 324a6f92868eb..1031c5181d81e 100644 --- a/src/connector/sink_impl/src/sink/iceberg.rs +++ b/src/connector/sink_impl/src/sink/iceberg.rs @@ -911,7 +911,8 @@ fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> R #[cfg(test)] mod test { use risingwave_common::catalog::Field; - use risingwave_common::types::DataType; + + use crate::source::DataType; #[test] fn test_compatible_arrow_schema() { diff --git a/src/connector/sink_impl/src/sink/kafka.rs b/src/connector/sink_impl/src/sink/kafka.rs index b9cfdea2e81ce..6307d80d0f3db 100644 --- a/src/connector/sink_impl/src/sink/kafka.rs +++ b/src/connector/sink_impl/src/sink/kafka.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use std::fmt::Debug; use std::pin::pin; +use std::sync::Arc; use std::time::Duration; use anyhow::anyhow; @@ -22,13 +23,11 @@ use futures::future::{select, Either}; use futures::{Future, FutureExt, TryFuture}; use rdkafka::error::KafkaError; use rdkafka::message::ToBytes; -use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord, Producer}; +use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord}; use rdkafka::types::RDKafkaErrorCode; use rdkafka::ClientConfig; use risingwave_common::catalog::Schema; use risingwave_common::util::drop_either_future; -use risingwave_connector_common::common::KAFKA_CONNECTOR_NAME; -use risingwave_connector_common::kafka::private_link::PrivateLinkProducerContext; use serde_derive::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; use strum_macros::{Display, EnumString}; @@ -36,6 +35,7 @@ use strum_macros::{Display, EnumString}; use super::catalog::{SinkFormat, SinkFormatDesc}; use super::{Sink, SinkError, SinkParam}; use crate::common::KafkaCommon; +pub use crate::common::KAFKA_CONNECTOR_NAME as KAFKA_SINK; use crate::sink::catalog::desc::SinkDesc; use crate::sink::formatter::SinkFormatterImpl; use crate::sink::log_store::{ @@ -43,12 +43,12 @@ use crate::sink::log_store::{ }; use crate::sink::writer::FormattedSink; use crate::sink::{DummySinkCommitCoordinator, LogSinker, Result, SinkWriterParam}; +use crate::source::kafka::{KafkaProperties, KafkaSplitEnumerator, PrivateLinkProducerContext}; +use crate::source::{SourceEnumeratorContext, SplitEnumerator}; use crate::{ deserialize_duration_from_string, deserialize_u32_from_string, dispatch_sink_formatter_impl, }; -pub const KAFKA_SINK: &str = "kafka"; - const fn _default_max_retries() -> u32 { 3 } @@ -244,33 +244,26 @@ impl KafkaConfig { Ok(config) } - fn set_client(&self, c: &mut rdkafka::ClientConfig) { + pub(crate) fn set_client(&self, c: &mut rdkafka::ClientConfig) { self.common.set_client(c); self.rdkafka_properties.set_client(c); tracing::info!("kafka client starts with: {:?}", c); } +} - async fn build_producer(&self) -> Result> { - let mut c = ClientConfig::new(); - - // KafkaConfig configuration - self.common.set_security_properties(&mut c); - self.set_client(&mut c); - - // ClientConfig configuration - c.set("bootstrap.servers", &self.common.brokers); - - // Create the producer context, will be used to create the producer - let producer_ctx = PrivateLinkProducerContext::new( - self.common.broker_rewrite_map.clone(), - // fixme: enable kafka native metrics for sink - None, - None, - )?; - - // Generate the producer - Ok(c.create_with_context(producer_ctx).await?) +impl From for KafkaProperties { + fn from(val: KafkaConfig) -> Self { + KafkaProperties { + bytes_per_second: None, + max_num_messages: None, + scan_startup_mode: None, + time_offset: None, + consumer_group: None, + upsert: None, + common: val.common, + rdkafka_properties: Default::default(), + } } } @@ -307,7 +300,7 @@ impl Sink for KafkaSink { type Coordinator = DummySinkCommitCoordinator; type LogSinker = KafkaLogSinker; - const SINK_NAME: &'static str = KAFKA_CONNECTOR_NAME; + const SINK_NAME: &'static str = KAFKA_SINK; fn default_sink_decouple(desc: &SinkDesc) -> bool { desc.sink_type.is_append_only() @@ -346,15 +339,12 @@ impl Sink for KafkaSink { // Try Kafka connection. // There is no such interface for kafka producer to validate a connection // use enumerator to validate broker reachability and existence of topic - if self - .config - .build_producer() - .await? - .client() - .fetch_metadata(None, self.config.common.sync_call_timeout) - .await - .is_err() - { + let check = KafkaSplitEnumerator::new( + KafkaProperties::from(self.config.clone()), + Arc::new(SourceEnumeratorContext::default()), + ) + .await?; + if !check.check_reachability().await { return Err(SinkError::Config(anyhow!( "cannot connect to kafka broker ({})", self.config.common.brokers @@ -390,7 +380,27 @@ pub struct KafkaLogSinker { impl KafkaLogSinker { async fn new(config: KafkaConfig, formatter: SinkFormatterImpl) -> Result { - let inner: FutureProducer = config.build_producer().await?; + let inner: FutureProducer = { + let mut c = ClientConfig::new(); + + // KafkaConfig configuration + config.common.set_security_properties(&mut c); + config.set_client(&mut c); + + // ClientConfig configuration + c.set("bootstrap.servers", &config.common.brokers); + + // Create the producer context, will be used to create the producer + let producer_ctx = PrivateLinkProducerContext::new( + config.common.broker_rewrite_map.clone(), + // fixme: enable kafka native metrics for sink + None, + None, + )?; + + // Generate the producer + c.create_with_context(producer_ctx).await? + }; let max_delivery_buffer_size = (config .rdkafka_properties diff --git a/src/connector/sink_impl/src/sink/kinesis.rs b/src/connector/sink_impl/src/sink/kinesis.rs index 492367f93586e..90231c8eb2968 100644 --- a/src/connector/sink_impl/src/sink/kinesis.rs +++ b/src/connector/sink_impl/src/sink/kinesis.rs @@ -21,7 +21,6 @@ use aws_sdk_kinesis::primitives::Blob; use aws_sdk_kinesis::Client as KinesisClient; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; -use risingwave_connector_common::common::KINESIS_CONNECTOR_NAME; use serde_derive::Deserialize; use serde_with::serde_as; use tokio_retry::strategy::{jitter, ExponentialBackoff}; @@ -29,7 +28,7 @@ use tokio_retry::Retry; use super::catalog::SinkFormatDesc; use super::SinkParam; -use crate::common::KinesisCommon; +use crate::common::{KinesisCommon, KINESIS_CONNECTOR_NAME}; use crate::dispatch_sink_formatter_impl; use crate::sink::formatter::SinkFormatterImpl; use crate::sink::writer::{FormattedSink, LogSinkerOf, SinkWriter, SinkWriterExt}; diff --git a/src/connector/sink_impl/src/sink/mod.rs b/src/connector/sink_impl/src/sink/mod.rs index 1e41b569c948e..544f654061362 100644 --- a/src/connector/sink_impl/src/sink/mod.rs +++ b/src/connector/sink_impl/src/sink/mod.rs @@ -27,7 +27,8 @@ pub mod remote; pub mod test_sink; use anyhow::anyhow; -pub use risingwave_connector_common::sink::*; +pub use risingwave_connector::sink::Sink; +pub(crate) use risingwave_connector::sink::*; pub use tracing; use crate::sink::writer::SinkWriter; diff --git a/src/connector/sink_impl/src/sink/pulsar.rs b/src/connector/sink_impl/src/sink/pulsar.rs index 37d316a954843..859f76cc720d1 100644 --- a/src/connector/sink_impl/src/sink/pulsar.rs +++ b/src/connector/sink_impl/src/sink/pulsar.rs @@ -24,13 +24,12 @@ use pulsar::producer::{Message, SendFuture}; use pulsar::{Producer, ProducerOptions, Pulsar, TokioExecutor}; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; -use risingwave_connector_common::common::PULSAR_CONNECTOR_NAME; use serde::Deserialize; use serde_with::{serde_as, DisplayFromStr}; use super::catalog::{SinkFormat, SinkFormatDesc}; use super::{Sink, SinkError, SinkParam, SinkWriter, SinkWriterParam}; -use crate::common::PulsarCommon; +use crate::common::{PulsarCommon, PULSAR_CONNECTOR_NAME}; use crate::sink::formatter::SinkFormatterImpl; use crate::sink::writer::{FormattedSink, LogSinkerOf, SinkWriterExt}; use crate::sink::{DummySinkCommitCoordinator, Result}; diff --git a/src/connector/connector_common/src/aws_auth.rs b/src/connector/src/aws_auth.rs similarity index 100% rename from src/connector/connector_common/src/aws_auth.rs rename to src/connector/src/aws_auth.rs diff --git a/src/connector/connector_common/src/aws_utils.rs b/src/connector/src/aws_utils.rs similarity index 100% rename from src/connector/connector_common/src/aws_utils.rs rename to src/connector/src/aws_utils.rs diff --git a/src/connector/connector_common/src/common.rs b/src/connector/src/common.rs similarity index 99% rename from src/connector/connector_common/src/common.rs rename to src/connector/src/common.rs index 943db01921827..9f4a05f46572e 100644 --- a/src/connector/connector_common/src/common.rs +++ b/src/connector/src/common.rs @@ -37,6 +37,7 @@ use crate::aws_auth::AwsAuthProps; use crate::aws_utils::load_file_descriptor_from_s3; use crate::deserialize_duration_from_string; use crate::sink::SinkError; +use crate::source::nats::source::NatsOffset; // The file describes the common abstractions for each connector and can be used in both source and // sink. @@ -415,15 +416,6 @@ pub struct UpsertMessage<'a> { pub record: Cow<'a, [u8]>, } -#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Hash)] -pub enum NatsOffset { - Earliest, - Latest, - SequenceNumber(String), - Timestamp(i128), - None, -} - #[serde_as] #[derive(Deserialize, Serialize, Debug, Clone)] pub struct NatsCommon { diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index 69c46e60c87d6..ea0acada0d5d0 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -33,11 +33,81 @@ #![feature(if_let_guard)] #![feature(iterator_try_collect)] -pub use risingwave_connector_common::*; +use std::time::Duration; +use duration_str::parse_std; +use risingwave_pb::connector_service::SinkPayloadFormat; +use risingwave_rpc_client::ConnectorClient; +use serde::de; + +pub mod aws_auth; +pub mod aws_utils; pub mod error; mod macros; +pub mod parser; +pub mod schema; +pub mod sink; pub mod source; +pub mod common; + pub use paste::paste; + +#[derive(Clone, Debug, Default)] +pub struct ConnectorParams { + pub connector_client: Option, + pub sink_payload_format: SinkPayloadFormat, +} + +impl ConnectorParams { + pub fn new( + connector_client: Option, + sink_payload_format: SinkPayloadFormat, + ) -> Self { + Self { + connector_client, + sink_payload_format, + } + } +} + +pub fn deserialize_u32_from_string<'de, D>(deserializer: D) -> Result +where + D: de::Deserializer<'de>, +{ + let s: String = de::Deserialize::deserialize(deserializer)?; + s.parse().map_err(|_| { + de::Error::invalid_value( + de::Unexpected::Str(&s), + &"integer greater than or equal to 0", + ) + }) +} + +pub fn deserialize_bool_from_string<'de, D>(deserializer: D) -> Result +where + D: de::Deserializer<'de>, +{ + let s: String = de::Deserialize::deserialize(deserializer)?; + let s = s.to_ascii_lowercase(); + match s.as_str() { + "true" => Ok(true), + "false" => Ok(false), + _ => Err(de::Error::invalid_value( + de::Unexpected::Str(&s), + &"true or false", + )), + } +} + +pub fn deserialize_duration_from_string<'de, D>(deserializer: D) -> Result +where + D: de::Deserializer<'de>, +{ + let s: String = de::Deserialize::deserialize(deserializer)?; + parse_std(&s).map_err(|_| de::Error::invalid_value( + de::Unexpected::Str(&s), + &"The String value unit support for one of:[“y”,“mon”,“w”,“d”,“h”,“m”,“s”, “ms”, “µs”, “ns”]", + )) +} diff --git a/src/connector/connector_common/src/parser/avro/mod.rs b/src/connector/src/parser/avro/mod.rs similarity index 100% rename from src/connector/connector_common/src/parser/avro/mod.rs rename to src/connector/src/parser/avro/mod.rs diff --git a/src/connector/connector_common/src/parser/avro/parser.rs b/src/connector/src/parser/avro/parser.rs similarity index 100% rename from src/connector/connector_common/src/parser/avro/parser.rs rename to src/connector/src/parser/avro/parser.rs diff --git a/src/connector/connector_common/src/parser/avro/schema_resolver.rs b/src/connector/src/parser/avro/schema_resolver.rs similarity index 100% rename from src/connector/connector_common/src/parser/avro/schema_resolver.rs rename to src/connector/src/parser/avro/schema_resolver.rs diff --git a/src/connector/connector_common/src/parser/avro/util.rs b/src/connector/src/parser/avro/util.rs similarity index 100% rename from src/connector/connector_common/src/parser/avro/util.rs rename to src/connector/src/parser/avro/util.rs diff --git a/src/connector/connector_common/src/parser/bytes_parser.rs b/src/connector/src/parser/bytes_parser.rs similarity index 100% rename from src/connector/connector_common/src/parser/bytes_parser.rs rename to src/connector/src/parser/bytes_parser.rs diff --git a/src/connector/connector_common/src/parser/canal/mod.rs b/src/connector/src/parser/canal/mod.rs similarity index 100% rename from src/connector/connector_common/src/parser/canal/mod.rs rename to src/connector/src/parser/canal/mod.rs diff --git a/src/connector/connector_common/src/parser/canal/operators.rs b/src/connector/src/parser/canal/operators.rs similarity index 100% rename from src/connector/connector_common/src/parser/canal/operators.rs rename to src/connector/src/parser/canal/operators.rs diff --git a/src/connector/connector_common/src/parser/canal/simd_json_parser.rs b/src/connector/src/parser/canal/simd_json_parser.rs similarity index 100% rename from src/connector/connector_common/src/parser/canal/simd_json_parser.rs rename to src/connector/src/parser/canal/simd_json_parser.rs diff --git a/src/connector/connector_common/src/parser/common.rs b/src/connector/src/parser/common.rs similarity index 100% rename from src/connector/connector_common/src/parser/common.rs rename to src/connector/src/parser/common.rs diff --git a/src/connector/connector_common/src/parser/csv_parser.rs b/src/connector/src/parser/csv_parser.rs similarity index 100% rename from src/connector/connector_common/src/parser/csv_parser.rs rename to src/connector/src/parser/csv_parser.rs diff --git a/src/connector/connector_common/src/parser/debezium/avro_parser.rs b/src/connector/src/parser/debezium/avro_parser.rs similarity index 100% rename from src/connector/connector_common/src/parser/debezium/avro_parser.rs rename to src/connector/src/parser/debezium/avro_parser.rs diff --git a/src/connector/connector_common/src/parser/debezium/debezium_parser.rs b/src/connector/src/parser/debezium/debezium_parser.rs similarity index 100% rename from src/connector/connector_common/src/parser/debezium/debezium_parser.rs rename to src/connector/src/parser/debezium/debezium_parser.rs diff --git a/src/connector/connector_common/src/parser/debezium/mod.rs b/src/connector/src/parser/debezium/mod.rs similarity index 100% rename from src/connector/connector_common/src/parser/debezium/mod.rs rename to src/connector/src/parser/debezium/mod.rs diff --git a/src/connector/connector_common/src/parser/debezium/mongo_json_parser.rs b/src/connector/src/parser/debezium/mongo_json_parser.rs similarity index 100% rename from src/connector/connector_common/src/parser/debezium/mongo_json_parser.rs rename to src/connector/src/parser/debezium/mongo_json_parser.rs diff --git a/src/connector/connector_common/src/parser/debezium/operators.rs b/src/connector/src/parser/debezium/operators.rs similarity index 100% rename from src/connector/connector_common/src/parser/debezium/operators.rs rename to src/connector/src/parser/debezium/operators.rs diff --git a/src/connector/connector_common/src/parser/debezium/simd_json_parser.rs b/src/connector/src/parser/debezium/simd_json_parser.rs similarity index 100% rename from src/connector/connector_common/src/parser/debezium/simd_json_parser.rs rename to src/connector/src/parser/debezium/simd_json_parser.rs diff --git a/src/connector/connector_common/src/parser/json_parser.rs b/src/connector/src/parser/json_parser.rs similarity index 100% rename from src/connector/connector_common/src/parser/json_parser.rs rename to src/connector/src/parser/json_parser.rs diff --git a/src/connector/connector_common/src/parser/maxwell/maxwell_parser.rs b/src/connector/src/parser/maxwell/maxwell_parser.rs similarity index 100% rename from src/connector/connector_common/src/parser/maxwell/maxwell_parser.rs rename to src/connector/src/parser/maxwell/maxwell_parser.rs diff --git a/src/connector/connector_common/src/parser/maxwell/mod.rs b/src/connector/src/parser/maxwell/mod.rs similarity index 100% rename from src/connector/connector_common/src/parser/maxwell/mod.rs rename to src/connector/src/parser/maxwell/mod.rs diff --git a/src/connector/connector_common/src/parser/maxwell/operators.rs b/src/connector/src/parser/maxwell/operators.rs similarity index 100% rename from src/connector/connector_common/src/parser/maxwell/operators.rs rename to src/connector/src/parser/maxwell/operators.rs diff --git a/src/connector/connector_common/src/parser/maxwell/simd_json_parser.rs b/src/connector/src/parser/maxwell/simd_json_parser.rs similarity index 100% rename from src/connector/connector_common/src/parser/maxwell/simd_json_parser.rs rename to src/connector/src/parser/maxwell/simd_json_parser.rs diff --git a/src/connector/connector_common/src/parser/mod.rs b/src/connector/src/parser/mod.rs similarity index 100% rename from src/connector/connector_common/src/parser/mod.rs rename to src/connector/src/parser/mod.rs diff --git a/src/connector/connector_common/src/parser/mysql.rs b/src/connector/src/parser/mysql.rs similarity index 100% rename from src/connector/connector_common/src/parser/mysql.rs rename to src/connector/src/parser/mysql.rs diff --git a/src/connector/connector_common/src/parser/plain_parser.rs b/src/connector/src/parser/plain_parser.rs similarity index 100% rename from src/connector/connector_common/src/parser/plain_parser.rs rename to src/connector/src/parser/plain_parser.rs diff --git a/src/connector/connector_common/src/parser/protobuf/.gitignore b/src/connector/src/parser/protobuf/.gitignore similarity index 100% rename from src/connector/connector_common/src/parser/protobuf/.gitignore rename to src/connector/src/parser/protobuf/.gitignore diff --git a/src/connector/connector_common/src/parser/protobuf/mod.rs b/src/connector/src/parser/protobuf/mod.rs similarity index 100% rename from src/connector/connector_common/src/parser/protobuf/mod.rs rename to src/connector/src/parser/protobuf/mod.rs diff --git a/src/connector/connector_common/src/parser/protobuf/parser.rs b/src/connector/src/parser/protobuf/parser.rs similarity index 100% rename from src/connector/connector_common/src/parser/protobuf/parser.rs rename to src/connector/src/parser/protobuf/parser.rs diff --git a/src/connector/connector_common/src/parser/protobuf/schema_resolver.rs b/src/connector/src/parser/protobuf/schema_resolver.rs similarity index 100% rename from src/connector/connector_common/src/parser/protobuf/schema_resolver.rs rename to src/connector/src/parser/protobuf/schema_resolver.rs diff --git a/src/connector/connector_common/src/parser/unified/avro.rs b/src/connector/src/parser/unified/avro.rs similarity index 100% rename from src/connector/connector_common/src/parser/unified/avro.rs rename to src/connector/src/parser/unified/avro.rs diff --git a/src/connector/connector_common/src/parser/unified/bytes.rs b/src/connector/src/parser/unified/bytes.rs similarity index 100% rename from src/connector/connector_common/src/parser/unified/bytes.rs rename to src/connector/src/parser/unified/bytes.rs diff --git a/src/connector/connector_common/src/parser/unified/debezium.rs b/src/connector/src/parser/unified/debezium.rs similarity index 100% rename from src/connector/connector_common/src/parser/unified/debezium.rs rename to src/connector/src/parser/unified/debezium.rs diff --git a/src/connector/connector_common/src/parser/unified/json.rs b/src/connector/src/parser/unified/json.rs similarity index 100% rename from src/connector/connector_common/src/parser/unified/json.rs rename to src/connector/src/parser/unified/json.rs diff --git a/src/connector/connector_common/src/parser/unified/maxwell.rs b/src/connector/src/parser/unified/maxwell.rs similarity index 100% rename from src/connector/connector_common/src/parser/unified/maxwell.rs rename to src/connector/src/parser/unified/maxwell.rs diff --git a/src/connector/connector_common/src/parser/unified/mod.rs b/src/connector/src/parser/unified/mod.rs similarity index 100% rename from src/connector/connector_common/src/parser/unified/mod.rs rename to src/connector/src/parser/unified/mod.rs diff --git a/src/connector/connector_common/src/parser/unified/protobuf.rs b/src/connector/src/parser/unified/protobuf.rs similarity index 100% rename from src/connector/connector_common/src/parser/unified/protobuf.rs rename to src/connector/src/parser/unified/protobuf.rs diff --git a/src/connector/connector_common/src/parser/unified/upsert.rs b/src/connector/src/parser/unified/upsert.rs similarity index 100% rename from src/connector/connector_common/src/parser/unified/upsert.rs rename to src/connector/src/parser/unified/upsert.rs diff --git a/src/connector/connector_common/src/parser/unified/util.rs b/src/connector/src/parser/unified/util.rs similarity index 100% rename from src/connector/connector_common/src/parser/unified/util.rs rename to src/connector/src/parser/unified/util.rs diff --git a/src/connector/connector_common/src/parser/upsert_parser.rs b/src/connector/src/parser/upsert_parser.rs similarity index 100% rename from src/connector/connector_common/src/parser/upsert_parser.rs rename to src/connector/src/parser/upsert_parser.rs diff --git a/src/connector/connector_common/src/parser/util.rs b/src/connector/src/parser/util.rs similarity index 100% rename from src/connector/connector_common/src/parser/util.rs rename to src/connector/src/parser/util.rs diff --git a/src/connector/connector_common/src/schema/mod.rs b/src/connector/src/schema/mod.rs similarity index 100% rename from src/connector/connector_common/src/schema/mod.rs rename to src/connector/src/schema/mod.rs diff --git a/src/connector/connector_common/src/schema/protobuf.rs b/src/connector/src/schema/protobuf.rs similarity index 100% rename from src/connector/connector_common/src/schema/protobuf.rs rename to src/connector/src/schema/protobuf.rs diff --git a/src/connector/connector_common/src/schema/schema_registry/client.rs b/src/connector/src/schema/schema_registry/client.rs similarity index 100% rename from src/connector/connector_common/src/schema/schema_registry/client.rs rename to src/connector/src/schema/schema_registry/client.rs diff --git a/src/connector/connector_common/src/schema/schema_registry/mod.rs b/src/connector/src/schema/schema_registry/mod.rs similarity index 100% rename from src/connector/connector_common/src/schema/schema_registry/mod.rs rename to src/connector/src/schema/schema_registry/mod.rs diff --git a/src/connector/connector_common/src/schema/schema_registry/util.rs b/src/connector/src/schema/schema_registry/util.rs similarity index 100% rename from src/connector/connector_common/src/schema/schema_registry/util.rs rename to src/connector/src/schema/schema_registry/util.rs diff --git a/src/connector/connector_common/src/sink/boxed.rs b/src/connector/src/sink/boxed.rs similarity index 100% rename from src/connector/connector_common/src/sink/boxed.rs rename to src/connector/src/sink/boxed.rs diff --git a/src/connector/connector_common/src/sink/catalog/desc.rs b/src/connector/src/sink/catalog/desc.rs similarity index 100% rename from src/connector/connector_common/src/sink/catalog/desc.rs rename to src/connector/src/sink/catalog/desc.rs diff --git a/src/connector/connector_common/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs similarity index 98% rename from src/connector/connector_common/src/sink/catalog/mod.rs rename to src/connector/src/sink/catalog/mod.rs index 640af7179422d..9d688e7ba65cd 100644 --- a/src/connector/connector_common/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -26,11 +26,9 @@ use risingwave_common::util::sort_util::ColumnOrder; use risingwave_pb::catalog::{PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus}; use super::{ - CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, + SinkError, CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; -use crate::common::*; -use crate::sink::SinkError; #[derive(Clone, Copy, Debug, Default, Hash, PartialOrd, PartialEq, Eq)] pub struct SinkId { @@ -138,6 +136,8 @@ pub enum SinkEncode { impl SinkFormatDesc { pub fn from_legacy_type(connector: &str, r#type: &str) -> Result, SinkError> { + use crate::common::{KAFKA_CONNECTOR_NAME, KINESIS_CONNECTOR_NAME, PULSAR_CONNECTOR_NAME}; + let format = match r#type { SINK_TYPE_APPEND_ONLY => SinkFormat::AppendOnly, SINK_TYPE_UPSERT => SinkFormat::Upsert, diff --git a/src/connector/connector_common/src/sink/encoder/avro.rs b/src/connector/src/sink/encoder/avro.rs similarity index 100% rename from src/connector/connector_common/src/sink/encoder/avro.rs rename to src/connector/src/sink/encoder/avro.rs diff --git a/src/connector/connector_common/src/sink/encoder/json.rs b/src/connector/src/sink/encoder/json.rs similarity index 100% rename from src/connector/connector_common/src/sink/encoder/json.rs rename to src/connector/src/sink/encoder/json.rs diff --git a/src/connector/connector_common/src/sink/encoder/mod.rs b/src/connector/src/sink/encoder/mod.rs similarity index 100% rename from src/connector/connector_common/src/sink/encoder/mod.rs rename to src/connector/src/sink/encoder/mod.rs diff --git a/src/connector/connector_common/src/sink/encoder/proto.rs b/src/connector/src/sink/encoder/proto.rs similarity index 99% rename from src/connector/connector_common/src/sink/encoder/proto.rs rename to src/connector/src/sink/encoder/proto.rs index 02c84d546436f..489023f80e145 100644 --- a/src/connector/connector_common/src/sink/encoder/proto.rs +++ b/src/connector/src/sink/encoder/proto.rs @@ -201,7 +201,6 @@ fn encode_fields<'a>( // Full name of Well-Known Types const WKT_TIMESTAMP: &str = "google.protobuf.Timestamp"; -#[allow(dead_code)] const WKT_BOOL_VALUE: &str = "google.protobuf.BoolValue"; /// Handles both `validate` (without actual data) and `encode`. diff --git a/src/connector/connector_common/src/sink/encoder/template.rs b/src/connector/src/sink/encoder/template.rs similarity index 100% rename from src/connector/connector_common/src/sink/encoder/template.rs rename to src/connector/src/sink/encoder/template.rs diff --git a/src/connector/connector_common/src/sink/formatter/append_only.rs b/src/connector/src/sink/formatter/append_only.rs similarity index 100% rename from src/connector/connector_common/src/sink/formatter/append_only.rs rename to src/connector/src/sink/formatter/append_only.rs diff --git a/src/connector/connector_common/src/sink/formatter/debezium_json.rs b/src/connector/src/sink/formatter/debezium_json.rs similarity index 100% rename from src/connector/connector_common/src/sink/formatter/debezium_json.rs rename to src/connector/src/sink/formatter/debezium_json.rs diff --git a/src/connector/connector_common/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs similarity index 100% rename from src/connector/connector_common/src/sink/formatter/mod.rs rename to src/connector/src/sink/formatter/mod.rs diff --git a/src/connector/connector_common/src/sink/formatter/upsert.rs b/src/connector/src/sink/formatter/upsert.rs similarity index 100% rename from src/connector/connector_common/src/sink/formatter/upsert.rs rename to src/connector/src/sink/formatter/upsert.rs diff --git a/src/connector/connector_common/src/sink/log_store.rs b/src/connector/src/sink/log_store.rs similarity index 100% rename from src/connector/connector_common/src/sink/log_store.rs rename to src/connector/src/sink/log_store.rs diff --git a/src/connector/connector_common/src/sink/mod.rs b/src/connector/src/sink/mod.rs similarity index 93% rename from src/connector/connector_common/src/sink/mod.rs rename to src/connector/src/sink/mod.rs index 8be52a5b6c6a1..d6b52cd9cf3f2 100644 --- a/src/connector/connector_common/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -26,6 +26,7 @@ use std::future::Future; use anyhow::anyhow; use async_trait::async_trait; use clickhouse::error::Error as ClickHouseError; +use futures::future::BoxFuture; use redis::RedisError; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{ColumnDesc, Field, Schema}; @@ -33,12 +34,13 @@ use risingwave_common::error::{anyhow_error, ErrorCode, RwError}; use risingwave_common::metrics::{ LabelGuardedHistogram, LabelGuardedIntCounter, LabelGuardedIntGauge, }; -use risingwave_pb::catalog::PbSinkType; +use risingwave_pb::catalog::{PbSink, PbSinkType}; use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema}; use risingwave_rpc_client::error::RpcError; use risingwave_rpc_client::MetaClient; use thiserror::Error; +use crate::sink::boxed::BoxCoordinator; use crate::sink::catalog::desc::SinkDesc; use crate::sink::catalog::{SinkCatalog, SinkFormatDesc, SinkId, SinkType}; use crate::sink::log_store::LogReader; @@ -52,6 +54,24 @@ pub const SINK_TYPE_DEBEZIUM: &str = "debezium"; pub const SINK_TYPE_UPSERT: &str = "upsert"; pub const SINK_USER_FORCE_APPEND_ONLY_OPTION: &str = "force_append_only"; +extern "Rust" { + fn __exported_build_box_coordinator( + param: SinkParam, + ) -> BoxFuture<'static, std::result::Result>; + + fn __exported_validate_sink( + prost_sink_catalog: &PbSink, + ) -> BoxFuture<'_, std::result::Result<(), SinkError>>; +} + +pub async fn build_box_coordinator(param: SinkParam) -> Result { + unsafe { __exported_build_box_coordinator(param).await } +} + +pub async fn validate_sink(prost_sink_catalog: &PbSink) -> std::result::Result<(), SinkError> { + unsafe { __exported_validate_sink(prost_sink_catalog).await } +} + #[derive(Debug, Clone, PartialEq, Eq)] pub struct SinkParam { pub sink_id: SinkId, diff --git a/src/connector/connector_common/src/sink/utils.rs b/src/connector/src/sink/utils.rs similarity index 100% rename from src/connector/connector_common/src/sink/utils.rs rename to src/connector/src/sink/utils.rs diff --git a/src/connector/connector_common/src/sink/writer.rs b/src/connector/src/sink/writer.rs similarity index 100% rename from src/connector/connector_common/src/sink/writer.rs rename to src/connector/src/sink/writer.rs diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index 06577aa4e3433..2c0d03306366b 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -342,6 +342,13 @@ impl KafkaSplitEnumerator { .set(offset); } + pub async fn check_reachability(&self) -> bool { + self.client + .fetch_metadata(None, self.sync_call_timeout) + .await + .is_ok() + } + async fn fetch_topic_partition(&self) -> anyhow::Result> { // for now, we only support one topic let metadata = self diff --git a/src/connector/src/source/kafka/mod.rs b/src/connector/src/source/kafka/mod.rs index ca9be5876ba31..b661cc84f8de4 100644 --- a/src/connector/src/source/kafka/mod.rs +++ b/src/connector/src/source/kafka/mod.rs @@ -16,19 +16,25 @@ use serde::Deserialize; use serde_with::{serde_as, DisplayFromStr}; pub mod enumerator; +pub mod private_link; pub mod source; pub mod split; +pub mod stats; pub use enumerator::*; pub use private_link::*; -use risingwave_connector_common::common::KAFKA_CONNECTOR_NAME; -pub use risingwave_connector_common::kafka::*; pub use source::*; pub use split::*; -use crate::common::KafkaCommon; +pub use crate::common::KAFKA_CONNECTOR_NAME as KAFKA_CONNECTOR; +use crate::common::{KafkaCommon, KAFKA_CONNECTOR_NAME}; use crate::source::SourceProperties; +pub const KAFKA_PROPS_BROKER_KEY: &str = "properties.bootstrap.server"; +pub const KAFKA_PROPS_BROKER_KEY_ALIAS: &str = "kafka.brokers"; +pub const PRIVATELINK_CONNECTION: &str = "privatelink"; +pub const KAFKA_ISOLATION_LEVEL: &str = "read_committed"; + /// Properties for the rdkafka library. Leave a field as `None` to use the default value. /// These properties are not intended to be exposed to users in the majority of cases. /// diff --git a/src/connector/connector_common/src/kafka/private_link.rs b/src/connector/src/source/kafka/private_link.rs similarity index 94% rename from src/connector/connector_common/src/kafka/private_link.rs rename to src/connector/src/source/kafka/private_link.rs index 34f7773fcbc0c..573e14c3e073f 100644 --- a/src/connector/connector_common/src/kafka/private_link.rs +++ b/src/connector/src/source/kafka/private_link.rs @@ -27,8 +27,9 @@ use risingwave_common::util::iter_util::ZipEqFast; use risingwave_pb::catalog::connection::PrivateLinkService; use crate::common::{AwsPrivateLinkItem, BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY}; -use crate::kafka::stats::RdKafkaStats; -use crate::kafka::{KAFKA_PROPS_BROKER_KEY, KAFKA_PROPS_BROKER_KEY_ALIAS}; +use crate::source::kafka::stats::RdKafkaStats; +use crate::source::kafka::{KAFKA_PROPS_BROKER_KEY, KAFKA_PROPS_BROKER_KEY_ALIAS}; +use crate::source::KAFKA_CONNECTOR; pub const PRIVATELINK_ENDPOINT_KEY: &str = "privatelink.endpoint"; pub const CONNECTION_NAME_KEY: &str = "connection.name"; @@ -49,7 +50,7 @@ impl std::fmt::Display for PrivateLinkContextRole { } struct BrokerAddrRewriter { - _role: PrivateLinkContextRole, + role: PrivateLinkContextRole, rewrite_map: BTreeMap, } @@ -87,10 +88,7 @@ impl BrokerAddrRewriter { .collect() }); let rewrite_map = rewrite_map?; - Ok(Self { - _role: role, - rewrite_map, - }) + Ok(Self { role, rewrite_map }) } } @@ -198,6 +196,16 @@ fn get_property_required( .ok_or_else(|| anyhow!("Required property \"{property}\" is not provided")) } +#[inline(always)] +fn is_kafka_connector(with_properties: &BTreeMap) -> bool { + const UPSTREAM_SOURCE_KEY: &str = "connector"; + with_properties + .get(UPSTREAM_SOURCE_KEY) + .unwrap_or(&"".to_string()) + .to_lowercase() + .eq_ignore_ascii_case(KAFKA_CONNECTOR) +} + pub fn insert_privatelink_broker_rewrite_map( properties: &mut BTreeMap, svc: Option<&PrivateLinkService>, diff --git a/src/connector/connector_common/src/kafka/stats.rs b/src/connector/src/source/kafka/stats.rs similarity index 100% rename from src/connector/connector_common/src/kafka/stats.rs rename to src/connector/src/source/kafka/stats.rs diff --git a/src/connector/src/source/kinesis/mod.rs b/src/connector/src/source/kinesis/mod.rs index 259b5f40b3808..4b3596ec772ea 100644 --- a/src/connector/src/source/kinesis/mod.rs +++ b/src/connector/src/source/kinesis/mod.rs @@ -16,10 +16,10 @@ pub mod enumerator; pub mod source; pub mod split; -use risingwave_connector_common::common::KINESIS_CONNECTOR_NAME; use serde::Deserialize; use crate::common::KinesisCommon; +pub use crate::common::KINESIS_CONNECTOR_NAME as KINESIS_CONNECTOR; use crate::source::kinesis::enumerator::client::KinesisSplitEnumerator; use crate::source::kinesis::source::reader::KinesisSplitReader; use crate::source::kinesis::split::KinesisSplit; @@ -43,5 +43,5 @@ impl SourceProperties for KinesisProperties { type SplitEnumerator = KinesisSplitEnumerator; type SplitReader = KinesisSplitReader; - const SOURCE_NAME: &'static str = KINESIS_CONNECTOR_NAME; + const SOURCE_NAME: &'static str = KINESIS_CONNECTOR; } diff --git a/src/connector/src/source/mod.rs b/src/connector/src/source/mod.rs index aa052ad7124ac..869b7089ac271 100644 --- a/src/connector/src/source/mod.rs +++ b/src/connector/src/source/mod.rs @@ -27,6 +27,8 @@ pub mod pulsar; pub use base::*; pub(crate) use common::*; pub use google_pubsub::GOOGLE_PUBSUB_CONNECTOR; +pub use kafka::KAFKA_CONNECTOR; +pub use kinesis::KINESIS_CONNECTOR; pub use nats::NATS_CONNECTOR; mod common; pub mod external; @@ -39,3 +41,4 @@ pub use mock_external_table::MockExternalTableReader; pub use crate::source::filesystem::{S3_CONNECTOR, S3_V2_CONNECTOR}; pub use crate::source::nexmark::NEXMARK_CONNECTOR; +pub use crate::source::pulsar::PULSAR_CONNECTOR; diff --git a/src/connector/src/source/nats/enumerator/mod.rs b/src/connector/src/source/nats/enumerator/mod.rs index 0ede740c181f8..e987a45188114 100644 --- a/src/connector/src/source/nats/enumerator/mod.rs +++ b/src/connector/src/source/nats/enumerator/mod.rs @@ -16,9 +16,8 @@ use std::sync::Arc; use anyhow; use async_trait::async_trait; -use risingwave_connector_common::common::NatsOffset; -use super::source::NatsSplit; +use super::source::{NatsOffset, NatsSplit}; use super::NatsProperties; use crate::source::{SourceEnumeratorContextRef, SplitEnumerator, SplitId}; diff --git a/src/connector/src/source/nats/source/reader.rs b/src/connector/src/source/nats/source/reader.rs index c1fb2c5e1dddd..6e22748bcf468 100644 --- a/src/connector/src/source/nats/source/reader.rs +++ b/src/connector/src/source/nats/source/reader.rs @@ -17,12 +17,11 @@ use async_nats::jetstream::consumer; use async_trait::async_trait; use futures::StreamExt; use futures_async_stream::try_stream; -use risingwave_connector_common::common::NatsOffset; use super::message::NatsMessage; +use super::{NatsOffset, NatsSplit}; use crate::parser::ParserConfig; use crate::source::common::{into_chunk_stream, CommonSplitReader}; -use crate::source::nats::source::NatsSplit; use crate::source::nats::NatsProperties; use crate::source::{ BoxSourceWithStateStream, Column, SourceContextRef, SourceMessage, SplitId, SplitReader, diff --git a/src/connector/src/source/nats/split.rs b/src/connector/src/source/nats/split.rs index 3c0838118a02e..d9b3e11b98f87 100644 --- a/src/connector/src/source/nats/split.rs +++ b/src/connector/src/source/nats/split.rs @@ -14,11 +14,19 @@ use anyhow::{anyhow, Ok}; use risingwave_common::types::JsonbVal; -use risingwave_connector_common::common::NatsOffset; use serde::{Deserialize, Serialize}; use crate::source::{SplitId, SplitMetaData}; +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Hash)] +pub enum NatsOffset { + Earliest, + Latest, + SequenceNumber(String), + Timestamp(i128), + None, +} + /// The states of a NATS split, which will be persisted to checkpoint. #[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)] pub struct NatsSplit { diff --git a/src/connector/src/source/pulsar/mod.rs b/src/connector/src/source/pulsar/mod.rs index f9f19960ad9d5..c5a5e00522c33 100644 --- a/src/connector/src/source/pulsar/mod.rs +++ b/src/connector/src/source/pulsar/mod.rs @@ -18,11 +18,11 @@ pub mod split; pub mod topic; pub use enumerator::*; -use risingwave_connector_common::common::PULSAR_CONNECTOR_NAME; use serde::Deserialize; pub use split::*; use crate::common::PulsarCommon; +pub use crate::common::PULSAR_CONNECTOR_NAME as PULSAR_CONNECTOR; use crate::source::pulsar::source::reader::PulsarSplitReader; use crate::source::SourceProperties; @@ -31,7 +31,7 @@ impl SourceProperties for PulsarProperties { type SplitEnumerator = PulsarSplitEnumerator; type SplitReader = PulsarSplitReader; - const SOURCE_NAME: &'static str = PULSAR_CONNECTOR_NAME; + const SOURCE_NAME: &'static str = PULSAR_CONNECTOR; } #[derive(Clone, Debug, Deserialize)] diff --git a/src/frontend/src/catalog/connection_catalog.rs b/src/frontend/src/catalog/connection_catalog.rs index 065a63c194cd7..7913d04379cd5 100644 --- a/src/frontend/src/catalog/connection_catalog.rs +++ b/src/frontend/src/catalog/connection_catalog.rs @@ -17,8 +17,8 @@ use std::sync::Arc; use anyhow::anyhow; use risingwave_common::error::{Result, RwError}; -use risingwave_connector::common::KAFKA_CONNECTOR_NAME; use risingwave_connector::source::kafka::private_link::insert_privatelink_broker_rewrite_map; +use risingwave_connector::source::KAFKA_CONNECTOR; use risingwave_pb::catalog::connection::private_link_service::PrivateLinkProvider; use risingwave_pb::catalog::connection::Info; use risingwave_pb::catalog::{connection, PbConnection}; @@ -72,7 +72,7 @@ fn is_kafka_connector(with_properties: &BTreeMap) -> bool { .get(UPSTREAM_SOURCE_KEY) .unwrap_or(&"".to_string()) .to_lowercase() - .eq_ignore_ascii_case(KAFKA_CONNECTOR_NAME) + .eq_ignore_ascii_case(KAFKA_CONNECTOR) } pub(crate) fn resolve_private_link_connection( diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index e3894b7c0e9a5..66494be928d42 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -31,7 +31,7 @@ use risingwave_common::error::{ErrorCode, Result as RwResult}; use risingwave_common::row::Row as _; use risingwave_common::types::{DataType, ScalarRefImpl, Timestamptz}; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_connector::common::KAFKA_CONNECTOR_NAME; +use risingwave_connector::source::KAFKA_CONNECTOR; use risingwave_sqlparser::ast::display_comma_separated; use crate::catalog::IndexCatalog; @@ -250,7 +250,7 @@ pub fn is_kafka_connector(with_properties: &HashMap) -> bool { return false; }; - connector == KAFKA_CONNECTOR_NAME + connector == KAFKA_CONNECTOR } #[inline(always)] diff --git a/src/frontend/src/utils/with_options.rs b/src/frontend/src/utils/with_options.rs index 4d494dde495a7..4b0a70ef856dc 100644 --- a/src/frontend/src/utils/with_options.rs +++ b/src/frontend/src/utils/with_options.rs @@ -18,10 +18,10 @@ use std::num::NonZeroU32; use itertools::Itertools; use risingwave_common::error::{ErrorCode, Result as RwResult, RwError}; -use risingwave_connector::common::KAFKA_CONNECTOR_NAME; use risingwave_connector::source::kafka::{ insert_privatelink_broker_rewrite_map, PRIVATELINK_ENDPOINT_KEY, }; +use risingwave_connector::source::KAFKA_CONNECTOR; use risingwave_sqlparser::ast::{ CompatibleSourceSchema, CreateConnectionStatement, CreateSinkStatement, CreateSourceStatement, SqlOption, Statement, Value, @@ -126,7 +126,7 @@ fn is_kafka_connector(with_options: &WithOptions) -> bool { else { return false; }; - connector == KAFKA_CONNECTOR_NAME + connector == KAFKA_CONNECTOR } pub(crate) fn resolve_privatelink_in_with_option( diff --git a/src/meta/src/manager/sink_coordination/coordinator_worker.rs b/src/meta/src/manager/sink_coordination/coordinator_worker.rs index 65bea8bdc390a..a7e6f8d1bc7ef 100644 --- a/src/meta/src/manager/sink_coordination/coordinator_worker.rs +++ b/src/meta/src/manager/sink_coordination/coordinator_worker.rs @@ -16,13 +16,12 @@ use std::collections::HashSet; use std::pin::pin; use anyhow::anyhow; -use futures::future::{select, BoxFuture, Either}; +use futures::future::{select, Either}; use futures::stream::FuturesUnordered; use futures::{StreamExt, TryStreamExt}; use risingwave_common::buffer::Bitmap; use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; -use risingwave_connector::sink::boxed::BoxCoordinator; -use risingwave_connector::sink::{SinkCommitCoordinator, SinkError, SinkParam}; +use risingwave_connector::sink::{build_box_coordinator, SinkCommitCoordinator, SinkParam}; use risingwave_pb::connector_service::coordinate_request::CommitRequest; use risingwave_pb::connector_service::coordinate_response::{ CommitResponse, StartCoordinationResponse, @@ -53,19 +52,12 @@ pub struct CoordinatorWorker { request_rx: UnboundedReceiver, } -extern "Rust" { - fn __exported_build_box_coordinator( - param: SinkParam, - ) -> BoxFuture<'static, std::result::Result>; -} - impl CoordinatorWorker { pub async fn run( first_writer_request: NewSinkWriterRequest, request_rx: UnboundedReceiver, ) { - let coordinator_result = - unsafe { __exported_build_box_coordinator(first_writer_request.param.clone()).await }; + let coordinator_result = build_box_coordinator(first_writer_request.param.clone()).await; let coordinator = match coordinator_result { Ok(coordinator) => coordinator, Err(e) => { diff --git a/src/meta/src/stream/sink.rs b/src/meta/src/stream/sink.rs index 272e95f9f973e..33ef66c115041 100644 --- a/src/meta/src/stream/sink.rs +++ b/src/meta/src/stream/sink.rs @@ -12,16 +12,4 @@ // See the License for the specific language governing permissions and // limitations under the License. -use futures::future::BoxFuture; -use risingwave_connector::sink::SinkError; -use risingwave_pb::catalog::PbSink; - -extern "Rust" { - fn __exported_validate_sink( - prost_sink_catalog: &PbSink, - ) -> BoxFuture<'_, std::result::Result<(), SinkError>>; -} - -pub async fn validate_sink(prost_sink_catalog: &PbSink) -> std::result::Result<(), SinkError> { - unsafe { __exported_validate_sink(prost_sink_catalog).await } -} +pub use risingwave_connector::sink::validate_sink;