Skip to content

Commit

Permalink
add files
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Oct 20, 2023
1 parent 5dc5f1f commit 62f463b
Show file tree
Hide file tree
Showing 2 changed files with 213 additions and 0 deletions.
151 changes: 151 additions & 0 deletions src/connector/sink_impl/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
[package]
name = "risingwave_sink_impl"
version = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
keywords = { workspace = true }
license = { workspace = true }
repository = { workspace = true }

[package.metadata.cargo-machete]
ignored = ["workspace-hack"]

[package.metadata.cargo-udeps.ignore]
normal = ["workspace-hack"]

[dependencies]
anyhow = "1"
apache-avro = { git = "https://github.com/risingwavelabs/avro", branch = "idx0dev/resolved_schema", features = [
"snappy",
"zstandard",
"bzip",
"xz",
] }
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
async-nats = "0.32"
async-trait = "0.1"
auto_enums = { version = "0.8", features = ["futures03"] }
auto_impl = "1"
aws-config = { workspace = true }
aws-credential-types = { workspace = true }
aws-sdk-kinesis = { workspace = true }
aws-sdk-s3 = { workspace = true }
aws-smithy-http = { workspace = true }
aws-types = { workspace = true }
base64 = "0.21"
bincode = "1"
byteorder = "1"
bytes = { version = "1", features = ["serde"] }
chrono = { version = "0.4", default-features = false, features = [
"clock",
"std",
] }
clickhouse = { git = "https://github.com/risingwavelabs/clickhouse.rs", rev = "622501c1c98c80baaf578c716d6903dde947804e", features = [
"time",
] }
csv = "1.3"
duration-str = "0.7.0"
easy-ext = "1"
enum-as-inner = "0.6"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
futures-async-stream = { workspace = true }
glob = "0.3"
google-cloud-pubsub = "0.20"
http = "0.2"
hyper = { version = "0.14", features = [
"client",
"tcp",
"http1",
"http2",
"stream",
] }
hyper-tls = "0.5"
icelake = { workspace = true }
indexmap = { version = "1.9.3", features = ["serde"] }
itertools = "0.11"
jni = { version = "0.21.1", features = ["invocation"] }
jsonschema-transpiler = { git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" }
maplit = "1.0.2"
moka = { version = "0.12", features = ["future"] }
mysql_async = { version = "0.32", default-features = false, features = [
"default",
] }
mysql_common = { version = "0.30", default-features = false, features = [
"chrono",
] }
nexmark = { version = "0.2", features = ["serde"] }
nkeys = "0.3.2"
num-bigint = "0.4"
opendal = "0.39"
parking_lot = "0.12"
paste = "1"
prometheus = { version = "0.13", features = ["process"] }
prost = { version = "0.12", features = ["no-recursion-limit"] }
prost-reflect = "0.12"
prost-types = "0.12"
protobuf-native = "0.2.1"
pulsar = { version = "6.0", default-features = false, features = [
"tokio-runtime",
"telemetry",
"auth-oauth2",
] }
rdkafka = { workspace = true, features = [
"cmake-build",
# "ssl",
# FIXME: temporary workaround before we find an ideal solution.
# See why it's needed and why it's not ideal in https://github.com/risingwavelabs/risingwave/issues/9852
"ssl-vendored",
"gssapi",
"zstd",
] }
reqwest = { version = "0.11", features = ["json"] }
risingwave_common = { workspace = true }
risingwave_connector_common = { workspace = true }
risingwave_jni_core = { workspace = true }
risingwave_pb = { workspace = true }
risingwave_rpc_client = { workspace = true }
rust_decimal = "1"
serde = { version = "1", features = ["derive", "rc"] }
serde_derive = "1"
serde_json = "1"
serde_with = { version = "3", features = ["json"] }
simd-json = "0.12.0"
strum = "0.25"
strum_macros = "0.25"
tempfile = "3"
thiserror = "1"
time = "0.3.28"
tokio = { version = "0.2", package = "madsim-tokio", features = [
"rt",
"rt-multi-thread",
"sync",
"macros",
"time",
"signal",
"fs",
] }
tokio-retry = "0.3"
tokio-stream = "0.1"
tokio-util = { version = "0.7", features = ["codec", "io"] }
tonic = { workspace = true }
tonic_0_9 = { package = "tonic", version = "0.9" }
tracing = "0.1"
tracing-futures = { version = "0.2", features = ["futures-03"] }
url = "2"
urlencoding = "2"

[target.'cfg(not(madsim))'.dependencies]
workspace-hack = { path = "../../workspace-hack" }

[dev-dependencies]
criterion = { workspace = true, features = ["async_tokio", "async"] }
rand = "0.8"
tempfile = "3"
tracing-test = "0.2"

[build-dependencies]
prost-build = "0.12"

[lints]
workspace = true
62 changes: 62 additions & 0 deletions src/connector/sink_impl/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#![expect(dead_code)]
#![allow(clippy::derive_partial_eq_without_eq)]
#![feature(generators)]
#![feature(proc_macro_hygiene)]
#![feature(stmt_expr_attributes)]
#![feature(box_patterns)]
#![feature(trait_alias)]
#![feature(binary_heap_drain_sorted)]
#![feature(lint_reasons)]
#![feature(lazy_cell)]
#![feature(result_option_inspect)]
#![feature(let_chains)]
#![feature(box_into_inner)]
#![feature(type_alias_impl_trait)]
#![feature(return_position_impl_trait_in_trait)]
#![feature(async_fn_in_trait)]
#![feature(associated_type_defaults)]
#![feature(impl_trait_in_assoc_type)]
#![feature(iter_from_generator)]
#![feature(if_let_guard)]
#![feature(iterator_try_collect)]

pub mod sink;

use futures::future::BoxFuture;
use futures::FutureExt;
use risingwave_connector_common::sink::boxed::BoxCoordinator;
use risingwave_connector_common::sink::catalog::SinkCatalog;
use risingwave_connector_common::sink::{Sink, SinkError, SinkParam};
pub(crate) use risingwave_connector_common::*;
use risingwave_pb::catalog::PbSink;

use crate::sink::build_sink;

#[export_name = "__exported_validate_sink"]
pub fn validate_sink(
prost_sink_catalog: &PbSink,
) -> BoxFuture<'_, std::result::Result<(), SinkError>> {
async move {
let sink_catalog = SinkCatalog::from(prost_sink_catalog);
let param = SinkParam::from(sink_catalog);

let sink = build_sink(param)?;

dispatch_sink!(sink, sink, Ok(sink.validate().await?))
}
.boxed()
}

#[export_name = "__exported_build_box_coordinator"]
pub fn build_box_coordinator(
param: SinkParam,
) -> BoxFuture<'static, std::result::Result<BoxCoordinator, SinkError>> {
async move {
let sink = build_sink(param)?;
dispatch_sink!(sink, sink, {
let coordinator = sink.new_coordinator().await?;
Ok(Box::new(coordinator) as BoxCoordinator)
})
}
.boxed()
}

0 comments on commit 62f463b

Please sign in to comment.