diff --git a/dozer-sink-clickhouse/src/schema.rs b/dozer-sink-clickhouse/src/schema.rs index f3c617512c..0d99871e17 100644 --- a/dozer-sink-clickhouse/src/schema.rs +++ b/dozer-sink-clickhouse/src/schema.rs @@ -1,5 +1,6 @@ use crate::client::ClickhouseClient; use crate::errors::ClickhouseSinkError::{self, SinkTableDoesNotExist}; +use crate::types::DECIMAL_SCALE; use clickhouse_rs::types::Complex; use clickhouse_rs::{Block, ClientHandle}; use dozer_types::errors::internal::BoxedError; @@ -175,6 +176,7 @@ impl ClickhouseSchema { } pub fn map_field_to_type(field: &FieldDefinition) -> String { + let decimal = format!("Decimal(10, {})", DECIMAL_SCALE); let typ: &str = match field.typ { FieldType::UInt => "UInt64", FieldType::U128 => "UInt128", @@ -185,7 +187,7 @@ pub fn map_field_to_type(field: &FieldDefinition) -> String { FieldType::String => "String", FieldType::Text => "String", FieldType::Binary => "Array(UInt8)", - FieldType::Decimal => "Decimal(10, 0)", + FieldType::Decimal => &decimal, FieldType::Timestamp => "DateTime64(3)", FieldType::Date => "Date", FieldType::Json => "JSON", diff --git a/dozer-sink-clickhouse/src/types.rs b/dozer-sink-clickhouse/src/types.rs index 75c10df85e..c2cf3b831b 100644 --- a/dozer-sink-clickhouse/src/types.rs +++ b/dozer-sink-clickhouse/src/types.rs @@ -13,6 +13,7 @@ use either::Either; use clickhouse_rs::types::{FromSql, Value, ValueRef}; +pub const DECIMAL_SCALE: u8 = 4; pub struct ValueWrapper(pub Value); impl<'a> FromSql<'a> for ValueWrapper { @@ -263,10 +264,13 @@ pub async fn insert_multi( nullable, b, Decimal, - f64, + clickhouse_rs::types::Decimal, column_values, n, - |f: &rust_decimal::Decimal| -> Option { f.to_f64() } + |f: &rust_decimal::Decimal| -> Option { + f.to_f64() + .map(|f| clickhouse_rs::types::Decimal::of(f, DECIMAL_SCALE)) + } ) } FieldType::Timestamp => {