Skip to content

Commit

Permalink
feat: Refactor clickhouse sink to work with pulse (#2448)
Browse files Browse the repository at this point in the history
* feat: refactor type mapping

* chore: use sink config

* chore: fix clickhouse sink

* feat: implement batching

* chore: fix decimal

* chore: use a common format

* chore: fix table creation

* chore: fix decimal

* chore: fix clippy

* chore: update client

* chore: update pub crate

* remove println

Signed-off-by: VG <[email protected]>

* feat: implement clikchouse sink checkpointing

---------

Signed-off-by: VG <[email protected]>
  • Loading branch information
v3g42 authored Mar 8, 2024
1 parent 37a9c0d commit b2c3d0e
Show file tree
Hide file tree
Showing 17 changed files with 1,225 additions and 930 deletions.
254 changes: 99 additions & 155 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ resolver = "2"
[workspace.dependencies]
bincode = { version = "2.0.0-rc.3", features = ["derive"] }
datafusion = { version = "33.0.0" }
datafusion-expr = { version = "33.0.0" }

[patch.crates-io]
postgres = { git = "https://github.com/getdozer/rust-postgres" }
Expand Down
17 changes: 5 additions & 12 deletions dozer-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use dozer_cli::{set_ctrl_handler, set_panic_hook};
use dozer_core::shutdown;
use dozer_tracing::LabelsAndProgress;
use dozer_types::models::config::Config;
use dozer_types::models::telemetry::{TelemetryConfig, TelemetryMetricsConfig};
use dozer_types::models::telemetry::TelemetryConfig;
use dozer_types::tracing::{error, error_span, info};
use futures::TryFutureExt;
use std::process;
Expand Down Expand Up @@ -45,17 +45,10 @@ fn run() -> Result<(), OrchestrationError> {
.map(|(c, _)| c.cloud.app_id.as_deref().unwrap_or(&c.app_name))
.ok();

let telemetry_config = if matches!(cli.cmd, Commands::Run) {
TelemetryConfig {
trace: None,
metrics: Some(TelemetryMetricsConfig::Prometheus),
}
} else {
config_res
.as_ref()
.map(|(c, _)| c.telemetry.clone())
.unwrap_or_default()
};
let telemetry_config = config_res
.as_ref()
.map(|(c, _)| c.telemetry.clone())
.unwrap_or_default();

let _telemetry = runtime.block_on(async { Telemetry::new(app_id, &telemetry_config) });

Expand Down
5 changes: 4 additions & 1 deletion dozer-sink-clickhouse/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,7 @@ edition = "2021"
[dependencies]
dozer-core = { path = "../dozer-core" }
dozer-types = { path = "../dozer-types" }
clickhouse = { git = "https://github.com/getdozer/clickhouse.rs.git" }
clickhouse-rs = { git = "https://github.com/suharev7/clickhouse-rs" }
either = "1.10.0"
chrono-tz = "0.8.6"
serde = "1.0.197"
124 changes: 124 additions & 0 deletions dozer-sink-clickhouse/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
#![allow(dead_code)]
use super::ddl::get_create_table_query;
use super::types::ValueWrapper;
use crate::errors::QueryError;
use crate::types::{insert_multi, map_value_wrapper_to_field};
use clickhouse_rs::{ClientHandle, Pool};
use dozer_types::log::{debug, info};
use dozer_types::models::sink::{ClickhouseSinkConfig, ClickhouseTableOptions};
use dozer_types::types::{Field, FieldDefinition};
pub struct SqlResult {
pub rows: Vec<Vec<Field>>,
}

#[derive(Clone)]
pub struct ClickhouseClient {
pool: Pool,
}

impl ClickhouseClient {
pub fn new(config: ClickhouseSinkConfig) -> Self {
let url = Self::construct_url(&config);
let pool = Pool::new(url);
Self { pool }
}

pub fn construct_url(config: &ClickhouseSinkConfig) -> String {
let user_password = match &config.password {
Some(password) => format!("{}:{}", config.user, password),
None => config.user.to_string(),
};

let url = format!(
"{}://{}@{}:{}/{}",
config.scheme, user_password, config.host, config.port, config.database
);
debug!("{url}");
url
}

pub async fn get_client_handle(&self) -> Result<ClientHandle, QueryError> {
let client = self.pool.get_handle().await?;
Ok(client)
}

pub async fn drop_table(&self, datasource_name: &str) -> Result<(), QueryError> {
let mut client = self.pool.get_handle().await?;
let ddl = format!("DROP TABLE IF EXISTS {}", datasource_name);
info!("#{ddl}");
client.execute(ddl).await?;
Ok(())
}

pub async fn create_table(
&self,
datasource_name: &str,
fields: &[FieldDefinition],
table_options: Option<ClickhouseTableOptions>,
) -> Result<(), QueryError> {
let mut client = self.pool.get_handle().await?;
let ddl = get_create_table_query(datasource_name, fields, table_options);
info!("Creating Clickhouse Table");
info!("{ddl}");
client.execute(ddl).await?;
Ok(())
}

pub async fn fetch_all(
&self,
query: &str,
schema: Vec<FieldDefinition>,
query_id: Option<String>,
) -> Result<SqlResult, QueryError> {
let mut client = self.pool.get_handle().await?;
// TODO: query_id doesnt work
// https://github.com/suharev7/clickhouse-rs/issues/176
// let query = Query::new(sql).id(query_id.to_string())
let query = query_id.map_or(query.to_string(), |id| {
format!("{0} settings log_comment = '{1}'", query, id)
});

let block = client.query(&query).fetch_all().await?;

let mut rows: Vec<Vec<Field>> = vec![];
for row in block.rows() {
let mut row_data = vec![];
for (idx, field) in schema.clone().into_iter().enumerate() {
let v: ValueWrapper = row.get(idx)?;
row_data.push(map_value_wrapper_to_field(v, field)?);
}
rows.push(row_data);
}

Ok(SqlResult { rows })
}

pub async fn check_table(&self, table_name: &str) -> Result<bool, QueryError> {
let mut client = self.pool.get_handle().await?;
let query = format!("CHECK TABLE {}", table_name);
client.query(query).fetch_all().await?;

// if error not found, table exists
Ok(true)
}

pub async fn insert(
&self,
table_name: &str,
fields: &[FieldDefinition],
values: &[Field],
) -> Result<(), QueryError> {
let client = self.pool.get_handle().await?;
insert_multi(client, table_name, fields, &[values.to_vec()]).await
}

pub async fn insert_multi(
&self,
table_name: &str,
fields: &[FieldDefinition],
values: &[Vec<Field>],
) -> Result<(), QueryError> {
let client = self.pool.get_handle().await?;
insert_multi(client, table_name, fields, values).await
}
}
146 changes: 55 additions & 91 deletions dozer-sink-clickhouse/src/ddl.rs
Original file line number Diff line number Diff line change
@@ -1,108 +1,72 @@
use dozer_types::log::warn;
use dozer_types::models::sink::ClickhouseSinkTableOptions;
use dozer_types::types::{FieldDefinition, FieldType, Schema};
use dozer_types::models::sink::ClickhouseTableOptions;
use dozer_types::types::FieldDefinition;

pub struct ClickhouseDDL {}
use crate::schema::map_field_to_type;

const DEFAULT_TABLE_ENGINE: &str = "MergeTree()";

impl ClickhouseDDL {
pub fn get_create_table_query(
table_name: String,
schema: Schema,
sink_options: Option<ClickhouseSinkTableOptions>,
primary_keys: Option<Vec<String>>,
) -> String {
let mut parts = schema
.fields
.iter()
.map(|field| {
let typ = Self::map_field_to_type(field);
format!("{} {}", field.name, typ)
})
.collect::<Vec<_>>();
pub fn get_create_table_query(
table_name: &str,
fields: &[FieldDefinition],
table_options: Option<ClickhouseTableOptions>,
) -> String {
let mut parts = fields
.iter()
.map(|field| {
let typ = map_field_to_type(field);
format!("{} {}", field.name, typ)
})
.collect::<Vec<_>>();

let engine = sink_options
.as_ref()
.map(|options| {
options
.engine
.clone()
.unwrap_or_else(|| DEFAULT_TABLE_ENGINE.to_string())
})
.unwrap_or_else(|| DEFAULT_TABLE_ENGINE.to_string());
let engine = table_options
.as_ref()
.and_then(|c| c.engine.clone())
.unwrap_or_else(|| DEFAULT_TABLE_ENGINE.to_string());

if let Some(pk) = primary_keys {
parts.push(format!("PRIMARY KEY ({})", pk.join(", ")));
}
parts.push(
table_options
.as_ref()
.and_then(|options| options.primary_keys.clone())
.map_or("".to_string(), |pk| {
format!("PRIMARY KEY ({})", pk.join(", "))
}),
);

let query = parts.join(",\n");
let query = parts.join(",\n");

let partition_by = sink_options
.as_ref()
.and_then(|options| options.partition_by.clone())
.map_or("".to_string(), |partition_by| {
format!("PARTITION BY {}\n", partition_by)
});
let sample_by = sink_options
.as_ref()
.and_then(|options| options.sample_by.clone())
.map_or("".to_string(), |partition_by| {
format!("SAMPLE BY {}\n", partition_by)
});
let order_by = sink_options
.as_ref()
.and_then(|options| options.order_by.clone())
.map_or("".to_string(), |order_by| {
format!("ORDER BY ({})\n", order_by.join(", "))
});
let cluster = sink_options
.as_ref()
.and_then(|options| options.cluster.clone())
.map_or("".to_string(), |cluster| {
format!("ON CLUSTER {}\n", cluster)
});
let partition_by = table_options
.as_ref()
.and_then(|options| options.partition_by.clone())
.map_or("".to_string(), |partition_by| {
format!("PARTITION BY {}\n", partition_by)
});
let sample_by = table_options
.as_ref()
.and_then(|options| options.sample_by.clone())
.map_or("".to_string(), |partition_by| {
format!("SAMPLE BY {}\n", partition_by)
});
let order_by = table_options
.as_ref()
.and_then(|options| options.order_by.clone())
.map_or("".to_string(), |order_by| {
format!("ORDER BY ({})\n", order_by.join(", "))
});
let cluster = table_options
.as_ref()
.and_then(|options| options.cluster.clone())
.map_or("".to_string(), |cluster| {
format!("ON CLUSTER {}\n", cluster)
});

format!(
"CREATE TABLE IF NOT EXISTS {table_name} {cluster} (
format!(
"CREATE TABLE IF NOT EXISTS {table_name} {cluster} (
{query}
)
ENGINE = {engine}
{order_by}
{partition_by}
{sample_by}
",
)
}

pub fn map_field_to_type(field: &FieldDefinition) -> String {
let typ = match field.typ {
FieldType::UInt => "UInt64",
FieldType::U128 => "UInt128",
FieldType::Int => "Int64",
FieldType::I128 => "Int128",
FieldType::Float => "Float64",
FieldType::Boolean => "Boolean",
FieldType::String => "String",
FieldType::Text => "String",
FieldType::Binary => "Array(UInt8)",
FieldType::Decimal => "Decimal",
FieldType::Timestamp => "DateTime64(3)",
FieldType::Date => "Date",
FieldType::Json => "JSON",
FieldType::Point => "Point",
FieldType::Duration => unimplemented!(),
};

if field.nullable {
if field.typ != FieldType::Binary {
format!("Nullable({})", typ)
} else {
warn!("Binary field cannot be nullable, ignoring nullable flag");
typ.to_string()
}
} else {
typ.to_string()
}
}
)
}
40 changes: 40 additions & 0 deletions dozer-sink-clickhouse/src/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use dozer_types::thiserror::{self, Error};

#[derive(Error, Debug)]
pub enum ClickhouseSinkError {
#[error("Only MergeTree engine is supported for delete operation")]
UnsupportedOperation,

#[error("Column {0} not found in sink table")]
ColumnNotFound(String),

#[error("Column {0} has type {1} in dozer schema but type {2} in sink table")]
ColumnTypeMismatch(String, String, String),

#[error("Clickhouse error: {0:?}")]
ClickhouseError(#[from] clickhouse_rs::errors::Error),

#[error("Primary key not found")]
PrimaryKeyNotFound,

#[error("Sink table does not exist and create_table_options is not set")]
SinkTableDoesNotExist,

#[error("Expected primary key {0:?} but got {1:?}")]
PrimaryKeyMismatch(Vec<String>, Vec<String>),

#[error("QueryError: {0:?}")]
QueryError(#[from] QueryError),
}

#[derive(Error, Debug)]
pub enum QueryError {
#[error("Clickhouse error: {0:?}")]
DataFetchError(#[from] clickhouse_rs::errors::Error),

#[error("Unexpected field type for {0:?}, expected {0}")]
TypeMismatch(String, String),

#[error("{0:?}")]
CustomError(String),
}
Loading

0 comments on commit b2c3d0e

Please sign in to comment.