Skip to content

Commit

Permalink
feat: clickhouse sink (#2353)
Browse files Browse the repository at this point in the history
* feat: clickhouse sink

* Use dozer_types::serde

* Simplify code

* Handle null value for binary
  • Loading branch information
karolisg authored Jan 30, 2024
1 parent c6850f8 commit 99281ed
Show file tree
Hide file tree
Showing 22 changed files with 1,365 additions and 228 deletions.
536 changes: 328 additions & 208 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ members = [
"dozer-lambda",
"dozer-sinks",
"dozer-sink-aerospike",
"dozer-sink-clickhouse",
]
resolver = "2"

Expand Down
1 change: 1 addition & 0 deletions dozer-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dozer-recordstore = { path = "../dozer-recordstore" }
dozer-lambda = { path = "../dozer-lambda" }
dozer-sinks = { path = "../dozer-sinks" }
dozer-sink-aerospike = { path = "../dozer-sink-aerospike" }
dozer-sink-clickhouse = { path = "../dozer-sink-clickhouse" }

uuid = { version = "1.6.1", features = ["v4", "serde"] }
tokio = { version = "1", features = ["full"] }
Expand Down
7 changes: 6 additions & 1 deletion dozer-cli/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use dozer_sql::builder::{OutputNodeInfo, QueryContext};
use dozer_tracing::LabelsAndProgress;
use dozer_types::log::debug;
use dozer_types::models::connection::Connection;
use dozer_types::models::endpoint::AerospikeSinkConfig;
use dozer_types::models::endpoint::{AerospikeSinkConfig, ClickhouseSinkConfig};
use dozer_types::models::flags::Flags;
use dozer_types::models::source::Source;
use dozer_types::models::udf_config::UdfConfig;
Expand All @@ -26,6 +26,7 @@ use tokio::sync::Mutex;
use crate::pipeline::dummy_sink::DummySinkFactory;
use crate::pipeline::LogSinkFactory;
use dozer_sink_aerospike::AerospikeSinkFactory;
use dozer_sink_clickhouse::ClickhouseSinkFactory;

use super::connector_source::ConnectorSourceFactoryError;
use super::source_builder::SourceBuilder;
Expand Down Expand Up @@ -60,6 +61,7 @@ pub enum EndpointLogKind {
Api { log: Arc<Mutex<Log>> },
Dummy,
Aerospike { config: AerospikeSinkConfig },
Clickhouse { config: ClickhouseSinkConfig },
}

pub struct PipelineBuilder<'a> {
Expand Down Expand Up @@ -290,6 +292,9 @@ impl<'a> PipelineBuilder<'a> {
EndpointLogKind::Aerospike { config } => {
Box::new(AerospikeSinkFactory::new(config))
}
EndpointLogKind::Clickhouse { config } => {
Box::new(ClickhouseSinkFactory::new(config.clone(), runtime.clone()))
}
};

match table_info {
Expand Down
4 changes: 3 additions & 1 deletion dozer-cli/src/pipeline/dummy_sink.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use dozer_api::async_trait::async_trait;
use std::{collections::HashMap, time::Instant};

use dozer_cache::dozer_log::storage::Queue;
Expand All @@ -17,6 +18,7 @@ use dozer_types::{
#[derive(Debug)]
pub struct DummySinkFactory;

#[async_trait]
impl SinkFactory for DummySinkFactory {
fn get_input_ports(&self) -> Vec<PortHandle> {
vec![DEFAULT_PORT_HANDLE]
Expand All @@ -26,7 +28,7 @@ impl SinkFactory for DummySinkFactory {
Ok(())
}

fn build(
async fn build(
&self,
input_schemas: HashMap<PortHandle, Schema>,
) -> Result<Box<dyn Sink>, BoxedError> {
Expand Down
4 changes: 3 additions & 1 deletion dozer-cli/src/pipeline/log_sink.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{collections::HashMap, fmt::Debug, ops::Deref, sync::Arc};

use dozer_api::async_trait::async_trait;
use dozer_cache::dozer_log::{
replication::{Log, LogOperation},
storage::Queue,
Expand Down Expand Up @@ -40,6 +41,7 @@ impl LogSinkFactory {
}
}

#[async_trait]
impl SinkFactory for LogSinkFactory {
fn get_input_ports(&self) -> Vec<PortHandle> {
vec![DEFAULT_PORT_HANDLE]
Expand All @@ -50,7 +52,7 @@ impl SinkFactory for LogSinkFactory {
Ok(())
}

fn build(
async fn build(
&self,
_input_schemas: HashMap<PortHandle, Schema>,
) -> Result<Box<dyn Sink>, BoxedError> {
Expand Down
11 changes: 10 additions & 1 deletion dozer-cli/src/simple/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use dozer_cache::dozer_log::home_dir::{BuildPath, HomeDir};
use dozer_cache::dozer_log::replication::Log;
use dozer_core::checkpoint::{CheckpointOptions, OptionCheckpoint};
use dozer_tracing::LabelsAndProgress;
use dozer_types::models::endpoint::{AerospikeSinkConfig, Endpoint, EndpointKind};
use dozer_types::models::endpoint::{
AerospikeSinkConfig, ClickhouseSinkConfig, Endpoint, EndpointKind,
};
use dozer_types::models::flags::Flags;
use tokio::runtime::Runtime;
use tokio::sync::Mutex;
Expand Down Expand Up @@ -45,6 +47,7 @@ enum ExecutorEndpointKind {
Api { log_endpoint: LogEndpoint },
Dummy,
Aerospike { config: AerospikeSinkConfig },
Clickhouse { config: ClickhouseSinkConfig },
}

impl<'a> Executor<'a> {
Expand Down Expand Up @@ -89,6 +92,9 @@ impl<'a> Executor<'a> {
EndpointKind::Aerospike(config) => ExecutorEndpointKind::Aerospike {
config: config.clone(),
},
EndpointKind::Clickhouse(config) => ExecutorEndpointKind::Clickhouse {
config: config.clone(),
},
};

executor_endpoints.push(ExecutorEndpoint {
Expand Down Expand Up @@ -147,6 +153,9 @@ impl<'a> Executor<'a> {
ExecutorEndpointKind::Aerospike { config } => {
EndpointLogKind::Aerospike { config }
}
ExecutorEndpointKind::Clickhouse { config } => {
EndpointLogKind::Clickhouse { config }
}
};
EndpointLog {
table_name: endpoint.table_name,
Expand Down
1 change: 1 addition & 0 deletions dozer-core/src/builder_dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ impl BuilderDag {
.remove(&node_index)
.expect("we collected all input schemas"),
)
.await
.map_err(ExecutionError::Factory)?;
NodeType {
handle: node.handle,
Expand Down
3 changes: 2 additions & 1 deletion dozer-core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,11 @@ pub trait Processor: Send + Sync + Debug {
) -> Result<(), BoxedError>;
}

#[async_trait]
pub trait SinkFactory: Send + Sync + Debug {
fn get_input_ports(&self) -> Vec<PortHandle>;
fn prepare(&self, input_schemas: HashMap<PortHandle, Schema>) -> Result<(), BoxedError>;
fn build(
async fn build(
&self,
input_schemas: HashMap<PortHandle, Schema>,
) -> Result<Box<dyn Sink>, BoxedError>;
Expand Down
3 changes: 2 additions & 1 deletion dozer-core/src/tests/dag_base_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ impl ErrSinkFactory {
}
}

#[async_trait]
impl SinkFactory for ErrSinkFactory {
fn get_input_ports(&self) -> Vec<PortHandle> {
vec![COUNTING_SINK_INPUT_PORT]
Expand All @@ -428,7 +429,7 @@ impl SinkFactory for ErrSinkFactory {
Ok(())
}

fn build(
async fn build(
&self,
_input_schemas: HashMap<PortHandle, Schema>,
) -> Result<Box<dyn Sink>, BoxedError> {
Expand Down
3 changes: 2 additions & 1 deletion dozer-core/src/tests/dag_schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ impl ProcessorFactory for TestJoinProcessorFactory {
#[derive(Debug)]
struct TestSinkFactory {}

#[async_trait]
impl SinkFactory for TestSinkFactory {
fn get_input_ports(&self) -> Vec<PortHandle> {
vec![DEFAULT_PORT_HANDLE]
Expand All @@ -174,7 +175,7 @@ impl SinkFactory for TestSinkFactory {
Ok(())
}

fn build(
async fn build(
&self,
_input_schemas: HashMap<PortHandle, Schema>,
) -> Result<Box<dyn crate::node::Sink>, BoxedError> {
Expand Down
10 changes: 7 additions & 3 deletions dozer-core/src/tests/sinks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use dozer_types::types::{Operation, Schema};
use dozer_types::log::debug;
use std::collections::HashMap;

use dozer_types::tonic::async_trait;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

Expand All @@ -29,6 +30,7 @@ impl CountingSinkFactory {
}
}

#[async_trait]
impl SinkFactory for CountingSinkFactory {
fn get_input_ports(&self) -> Vec<PortHandle> {
vec![COUNTING_SINK_INPUT_PORT]
Expand All @@ -38,7 +40,7 @@ impl SinkFactory for CountingSinkFactory {
Ok(())
}

fn build(
async fn build(
&self,
_input_schemas: HashMap<PortHandle, Schema>,
) -> Result<Box<dyn Sink>, BoxedError> {
Expand Down Expand Up @@ -104,6 +106,7 @@ impl Sink for CountingSink {
#[derive(Debug)]
pub struct ConnectivityTestSinkFactory;

#[async_trait]
impl SinkFactory for ConnectivityTestSinkFactory {
fn get_input_ports(&self) -> Vec<PortHandle> {
vec![DEFAULT_PORT_HANDLE]
Expand All @@ -113,7 +116,7 @@ impl SinkFactory for ConnectivityTestSinkFactory {
unimplemented!("This struct is for connectivity test, only input ports are defined")
}

fn build(
async fn build(
&self,
_input_schemas: HashMap<PortHandle, Schema>,
) -> Result<Box<dyn Sink>, BoxedError> {
Expand All @@ -124,6 +127,7 @@ impl SinkFactory for ConnectivityTestSinkFactory {
#[derive(Debug)]
pub struct NoInputPortSinkFactory;

#[async_trait]
impl SinkFactory for NoInputPortSinkFactory {
fn get_input_ports(&self) -> Vec<PortHandle> {
vec![]
Expand All @@ -133,7 +137,7 @@ impl SinkFactory for NoInputPortSinkFactory {
unimplemented!("This struct is for connectivity test, only input ports are defined")
}

fn build(
async fn build(
&self,
_input_schemas: HashMap<PortHandle, Schema>,
) -> Result<Box<dyn Sink>, BoxedError> {
Expand Down
20 changes: 12 additions & 8 deletions dozer-sink-aerospike/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use dozer_types::errors::internal::BoxedError;
use dozer_types::geo::{Coord, Point};
use dozer_types::log::{error, info};
use dozer_types::ordered_float::OrderedFloat;
use dozer_types::tonic::async_trait;
use dozer_types::types::DozerPoint;
use dozer_types::{
errors::types::TypeError,
Expand Down Expand Up @@ -217,6 +218,7 @@ impl AerospikeSinkFactory {
}
}

#[async_trait]
impl SinkFactory for AerospikeSinkFactory {
fn get_input_ports(&self) -> Vec<PortHandle> {
vec![DEFAULT_PORT_HANDLE]
Expand All @@ -227,7 +229,7 @@ impl SinkFactory for AerospikeSinkFactory {
Ok(())
}

fn build(
async fn build(
&self,
mut input_schemas: HashMap<PortHandle, Schema>,
) -> Result<Box<dyn dozer_core::node::Sink>, BoxedError> {
Expand Down Expand Up @@ -920,6 +922,7 @@ impl Sink for AerospikeSink {
#[cfg(test)]
mod tests {

use dozer_log::tokio;
use std::time::Duration;

use dozer_recordstore::ProcessorRecordStore;
Expand All @@ -944,12 +947,12 @@ mod tests {
const N_RECORDS: usize = 1000;
const BATCH_SIZE: usize = 100;

#[test]
#[tokio::test]
#[ignore]
fn test_inserts() {
async fn test_inserts() {
let rs = ProcessorRecordStore::new(dozer_types::models::app_config::RecordStore::InMemory)
.unwrap();
let mut sink = sink("inserts");
let mut sink = sink("inserts").await;
for i in 0..N_RECORDS {
sink.process(
DEFAULT_PORT_HANDLE,
Expand All @@ -962,9 +965,9 @@ mod tests {
}
}

#[test]
#[tokio::test]
#[ignore]
fn test_inserts_batch() {
async fn test_inserts_batch() {
let mut batches = Vec::with_capacity(N_RECORDS / BATCH_SIZE);
for i in 0..N_RECORDS / BATCH_SIZE {
let mut batch = Vec::with_capacity(BATCH_SIZE);
Expand All @@ -973,7 +976,7 @@ mod tests {
}
batches.push(batch);
}
let mut sink = sink("inserts_batch");
let mut sink = sink("inserts_batch").await;
let rs = ProcessorRecordStore::new(dozer_types::models::app_config::RecordStore::InMemory)
.unwrap();
for batch in batches {
Expand All @@ -986,7 +989,7 @@ mod tests {
}
}

fn sink(set: &str) -> Box<dyn Sink> {
async fn sink(set: &str) -> Box<dyn Sink> {
let mut schema = Schema::new();
schema
.field(f("uint", FieldType::UInt), true)
Expand Down Expand Up @@ -1020,6 +1023,7 @@ mod tests {
});
factory
.build([(DEFAULT_PORT_HANDLE, schema)].into())
.await
.unwrap()
}

Expand Down
13 changes: 13 additions & 0 deletions dozer-sink-clickhouse/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "dozer-sink-clickhouse"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dozer-core = { path = "../dozer-core" }
dozer-types = { path = "../dozer-types" }
dozer-log = { path = "../dozer-log" }
dozer-recordstore = { path = "../dozer-recordstore" }
clickhouse = { git = "https://github.com/getdozer/clickhouse.rs.git" }
Loading

0 comments on commit 99281ed

Please sign in to comment.