diff --git a/src/connector/sink_impl/Cargo.toml b/src/connector/sink_impl/Cargo.toml new file mode 100644 index 0000000000000..e801302ffe6a5 --- /dev/null +++ b/src/connector/sink_impl/Cargo.toml @@ -0,0 +1,151 @@ +[package] +name = "risingwave_sink_impl" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +keywords = { workspace = true } +license = { workspace = true } +repository = { workspace = true } + +[package.metadata.cargo-machete] +ignored = ["workspace-hack"] + +[package.metadata.cargo-udeps.ignore] +normal = ["workspace-hack"] + +[dependencies] +anyhow = "1" +apache-avro = { git = "https://github.com/risingwavelabs/avro", branch = "idx0dev/resolved_schema", features = [ + "snappy", + "zstandard", + "bzip", + "xz", +] } +arrow-array = { workspace = true } +arrow-schema = { workspace = true } +async-nats = "0.32" +async-trait = "0.1" +auto_enums = { version = "0.8", features = ["futures03"] } +auto_impl = "1" +aws-config = { workspace = true } +aws-credential-types = { workspace = true } +aws-sdk-kinesis = { workspace = true } +aws-sdk-s3 = { workspace = true } +aws-smithy-http = { workspace = true } +aws-types = { workspace = true } +base64 = "0.21" +bincode = "1" +byteorder = "1" +bytes = { version = "1", features = ["serde"] } +chrono = { version = "0.4", default-features = false, features = [ + "clock", + "std", +] } +clickhouse = { git = "https://github.com/risingwavelabs/clickhouse.rs", rev = "622501c1c98c80baaf578c716d6903dde947804e", features = [ + "time", +] } +csv = "1.3" +duration-str = "0.7.0" +easy-ext = "1" +enum-as-inner = "0.6" +futures = { version = "0.3", default-features = false, features = ["alloc"] } +futures-async-stream = { workspace = true } +glob = "0.3" +google-cloud-pubsub = "0.20" +http = "0.2" +hyper = { version = "0.14", features = [ + "client", + "tcp", + "http1", + "http2", + "stream", +] } +hyper-tls = "0.5" +icelake = { workspace = true } +indexmap = { version = "1.9.3", features = ["serde"] } +itertools = "0.11" +jni = { version = "0.21.1", features = ["invocation"] } +jsonschema-transpiler = { git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" } +maplit = "1.0.2" +moka = { version = "0.12", features = ["future"] } +mysql_async = { version = "0.32", default-features = false, features = [ + "default", +] } +mysql_common = { version = "0.30", default-features = false, features = [ + "chrono", +] } +nexmark = { version = "0.2", features = ["serde"] } +nkeys = "0.3.2" +num-bigint = "0.4" +opendal = "0.39" +parking_lot = "0.12" +paste = "1" +prometheus = { version = "0.13", features = ["process"] } +prost = { version = "0.12", features = ["no-recursion-limit"] } +prost-reflect = "0.12" +prost-types = "0.12" +protobuf-native = "0.2.1" +pulsar = { version = "6.0", default-features = false, features = [ + "tokio-runtime", + "telemetry", + "auth-oauth2", +] } +rdkafka = { workspace = true, features = [ + "cmake-build", + # "ssl", + # FIXME: temporary workaround before we find an ideal solution. + # See why it's needed and why it's not ideal in https://github.com/risingwavelabs/risingwave/issues/9852 + "ssl-vendored", + "gssapi", + "zstd", +] } +reqwest = { version = "0.11", features = ["json"] } +risingwave_common = { workspace = true } +risingwave_connector_common = { workspace = true } +risingwave_jni_core = { workspace = true } +risingwave_pb = { workspace = true } +risingwave_rpc_client = { workspace = true } +rust_decimal = "1" +serde = { version = "1", features = ["derive", "rc"] } +serde_derive = "1" +serde_json = "1" +serde_with = { version = "3", features = ["json"] } +simd-json = "0.12.0" +strum = "0.25" +strum_macros = "0.25" +tempfile = "3" +thiserror = "1" +time = "0.3.28" +tokio = { version = "0.2", package = "madsim-tokio", features = [ + "rt", + "rt-multi-thread", + "sync", + "macros", + "time", + "signal", + "fs", +] } +tokio-retry = "0.3" +tokio-stream = "0.1" +tokio-util = { version = "0.7", features = ["codec", "io"] } +tonic = { workspace = true } +tonic_0_9 = { package = "tonic", version = "0.9" } +tracing = "0.1" +tracing-futures = { version = "0.2", features = ["futures-03"] } +url = "2" +urlencoding = "2" + +[target.'cfg(not(madsim))'.dependencies] +workspace-hack = { path = "../../workspace-hack" } + +[dev-dependencies] +criterion = { workspace = true, features = ["async_tokio", "async"] } +rand = "0.8" +tempfile = "3" +tracing-test = "0.2" + +[build-dependencies] +prost-build = "0.12" + +[lints] +workspace = true diff --git a/src/connector/sink_impl/src/lib.rs b/src/connector/sink_impl/src/lib.rs new file mode 100644 index 0000000000000..2b9807fc36b4f --- /dev/null +++ b/src/connector/sink_impl/src/lib.rs @@ -0,0 +1,62 @@ +#![expect(dead_code)] +#![allow(clippy::derive_partial_eq_without_eq)] +#![feature(generators)] +#![feature(proc_macro_hygiene)] +#![feature(stmt_expr_attributes)] +#![feature(box_patterns)] +#![feature(trait_alias)] +#![feature(binary_heap_drain_sorted)] +#![feature(lint_reasons)] +#![feature(lazy_cell)] +#![feature(result_option_inspect)] +#![feature(let_chains)] +#![feature(box_into_inner)] +#![feature(type_alias_impl_trait)] +#![feature(return_position_impl_trait_in_trait)] +#![feature(async_fn_in_trait)] +#![feature(associated_type_defaults)] +#![feature(impl_trait_in_assoc_type)] +#![feature(iter_from_generator)] +#![feature(if_let_guard)] +#![feature(iterator_try_collect)] + +pub mod sink; + +use futures::future::BoxFuture; +use futures::FutureExt; +use risingwave_connector_common::sink::boxed::BoxCoordinator; +use risingwave_connector_common::sink::catalog::SinkCatalog; +use risingwave_connector_common::sink::{Sink, SinkError, SinkParam}; +pub(crate) use risingwave_connector_common::*; +use risingwave_pb::catalog::PbSink; + +use crate::sink::build_sink; + +#[export_name = "__exported_validate_sink"] +pub fn validate_sink( + prost_sink_catalog: &PbSink, +) -> BoxFuture<'_, std::result::Result<(), SinkError>> { + async move { + let sink_catalog = SinkCatalog::from(prost_sink_catalog); + let param = SinkParam::from(sink_catalog); + + let sink = build_sink(param)?; + + dispatch_sink!(sink, sink, Ok(sink.validate().await?)) + } + .boxed() +} + +#[export_name = "__exported_build_box_coordinator"] +pub fn build_box_coordinator( + param: SinkParam, +) -> BoxFuture<'static, std::result::Result> { + async move { + let sink = build_sink(param)?; + dispatch_sink!(sink, sink, { + let coordinator = sink.new_coordinator().await?; + Ok(Box::new(coordinator) as BoxCoordinator) + }) + } + .boxed() +}