Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/clickhouse sink issue with collapsing merge tree #2477

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions dozer-ingestion/aerospike/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,11 @@ pub(crate) fn map_value_to_field(
AerospikeConnectorError::ParsingIntFailed
})?))
}
FieldType::Int8 => {
check_type("int8")?;
let string = value.as_str().ok_or_else(unsupported_type)?;
Ok(Field::Int8(string.parse()?))
}
FieldType::U128 => {
check_type("str")?;
let string = value.as_str().ok_or_else(unsupported_type)?;
Expand Down
1 change: 1 addition & 0 deletions dozer-ingestion/mysql/src/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ impl<'a> IntoField<'a> for Value {
FieldType::UInt => Field::UInt(from_value_opt::<u64>(value)?),
FieldType::U128 => Field::U128(from_value_opt::<u128>(value)?),
FieldType::Int => Field::Int(from_value_opt::<i64>(value)?),
FieldType::Int8 => Field::Int8(from_value_opt::<i8>(value)?),
FieldType::I128 => Field::I128(from_value_opt::<i128>(value)?),
FieldType::Float => Field::Float(from_value_opt::<f64>(value)?.into()),
FieldType::Boolean => Field::Boolean(from_value_opt::<bool>(value)?),
Expand Down
3 changes: 3 additions & 0 deletions dozer-ingestion/tests/test_suite/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,9 @@ fn assert_record_matches_schema(record: &Record, schema: &Schema, only_match_pk:
FieldType::Int => {
assert!(value.as_int().is_some())
}
FieldType::Int8 => {
assert!(value.as_int().is_some())
}
FieldType::I128 => {
assert!(value.as_i128().is_some())
}
Expand Down
13 changes: 13 additions & 0 deletions dozer-ingestion/tests/test_suite/connectors/object_store/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ fn field_type_to_arrow(field_type: FieldType) -> Option<arrow::datatypes::DataTy
FieldType::UInt => Some(arrow::datatypes::DataType::UInt64),
FieldType::U128 => None,
FieldType::Int => Some(arrow::datatypes::DataType::Int64),
FieldType::Int8 => Some(arrow::datatypes::DataType::Int64),
FieldType::I128 => None,
FieldType::Float => Some(arrow::datatypes::DataType::Float64),
FieldType::Boolean => Some(arrow::datatypes::DataType::Boolean),
Expand Down Expand Up @@ -349,6 +350,18 @@ fn fields_to_arrow<'a, F: IntoIterator<Item = &'a Field>>(
}
Arc::new(builder.finish())
}
FieldType::Int8 => {
let mut builder = arrow::array::Int64Array::builder(count);
for field in fields {
match field {
Field::Int(value) => builder.append_value(*value),
Field::Int8(value) => builder.append_value(*value as i64),
Field::Null => builder.append_null(),
_ => panic!("Unexpected field type"),
}
}
Arc::new(builder.finish())
}
FieldType::I128 => panic!("Unexpected field type"),
FieldType::Float => {
let mut builder = arrow::array::Float64Array::builder(count);
Expand Down
2 changes: 2 additions & 0 deletions dozer-ingestion/tests/test_suite/connectors/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ fn field_type_to_sql(field_type: FieldType) -> Option<String> {
FieldType::UInt => None,
FieldType::U128 => None,
FieldType::Int => Some("INT8".to_string()),
FieldType::Int8 => Some("INT8".to_string()),
FieldType::I128 => None,
FieldType::Float => Some("FLOAT8".to_string()),
FieldType::Boolean => Some("BOOLEAN".to_string()),
Expand Down Expand Up @@ -229,6 +230,7 @@ fn field_to_sql(field: &Field) -> String {
Field::UInt(i) => i.to_string(),
Field::U128(i) => i.to_string(),
Field::Int(i) => i.to_string(),
Field::Int8(i) => i.to_string(),
Field::I128(i) => i.to_string(),
Field::Float(f) => f.to_string(),
Field::Boolean(b) => b.to_string(),
Expand Down
5 changes: 5 additions & 0 deletions dozer-ingestion/webhook/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ pub fn map_record(
let field = Field::Int(i64_value);
values.push(field);
}
FieldType::Int8 => {
let i8_value: i8 = serde_json::from_value(value.clone())?;
let field = Field::Int8(i8_value);
values.push(field);
}
FieldType::Float => {
let float_value: f64 = serde_json::from_value(value.clone())?;
let field = Field::Float(OrderedFloat(float_value));
Expand Down
18 changes: 18 additions & 0 deletions dozer-sink-aerospike/src/aerospike.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,9 @@ unsafe fn init_key_single(
Field::Int(v) => {
as_key_init_int64(key, namespace.as_ptr(), set.as_ptr(), *v);
}
Field::Int8(v) => {
as_key_init_int64(key, namespace.as_ptr(), set.as_ptr(), (*v).into());
}
Field::U128(v) => set_str_key(key, namespace, set, v.to_string(), allocated_strings),
Field::I128(v) => set_str_key(key, namespace, set, v.to_string(), allocated_strings),
Field::Decimal(v) => set_str_key(key, namespace, set, v.to_string(), allocated_strings),
Expand Down Expand Up @@ -554,6 +557,13 @@ pub(crate) unsafe fn new_record_map(
Field::Int(v) => {
as_orderedmap_set(map, key, check_alloc(as_integer_new(*v)) as *const as_val);
}
Field::Int8(v) => {
as_orderedmap_set(
map,
key,
check_alloc(as_integer_new((*v).into())) as *const as_val,
);
}
Field::I128(v) => {
map_set_str(map, key, v, allocated_strings);
}
Expand Down Expand Up @@ -662,6 +672,9 @@ pub(crate) unsafe fn init_batch_write_operations(
Field::Int(v) => {
as_operations_add_write_int64(ops, name, *v);
}
Field::Int8(v) => {
as_operations_add_write_int64(ops, name, (*v).into());
}
Field::I128(v) => {
set_operation_str(ops, name, v.to_string(), allocated_strings);
}
Expand Down Expand Up @@ -806,6 +819,11 @@ fn parse_val(
Some(Field::Int(v.value))
})
}
dozer_types::types::FieldType::Int8 => {
map(val, as_val_type_e_AS_INTEGER, |v: &as_integer| {
Some(Field::Int8(v.value as i8))
})
}
dozer_types::types::FieldType::I128 => {
map(val, as_val_type_e_AS_STRING, |v: &as_string| {
Some(Field::I128(unsafe {
Expand Down
1 change: 1 addition & 0 deletions dozer-sink-aerospike/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ impl SinkFactory for AerospikeSinkFactory {
dozer_types::types::FieldType::UInt
| dozer_types::types::FieldType::U128
| dozer_types::types::FieldType::Int
| dozer_types::types::FieldType::Int8
| dozer_types::types::FieldType::I128
| dozer_types::types::FieldType::String
| dozer_types::types::FieldType::Text
Expand Down
19 changes: 13 additions & 6 deletions dozer-sink-clickhouse/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,25 @@ pub fn get_create_table_query(
fields: &[FieldDefinition],
table_options: Option<ClickhouseTableOptions>,
) -> String {
let engine = table_options
.as_ref()
.and_then(|c| c.engine.clone())
.unwrap_or_else(|| DEFAULT_TABLE_ENGINE.to_string());
let engine_name = if engine == "CollapsingMergeTree" {
"CollapsingMergeTree(sign)".to_string()
} else {
engine.to_owned()
};
let mut parts = fields
.iter()
.map(|field| {
let typ = map_field_to_type(field);
format!("{} {}", field.name, typ)
})
.collect::<Vec<_>>();

let engine = table_options
.as_ref()
.and_then(|c| c.engine.clone())
.unwrap_or_else(|| DEFAULT_TABLE_ENGINE.to_string());
if engine == "CollapsingMergeTree" {
parts.push("sign Int8".to_string());
}

parts.push(
table_options
Expand Down Expand Up @@ -63,7 +70,7 @@ pub fn get_create_table_query(
"CREATE TABLE IF NOT EXISTS {table_name} {cluster} (
{query}
)
ENGINE = {engine}
ENGINE = {engine_name}
{order_by}
{partition_by}
{sample_by}
Expand Down
1 change: 1 addition & 0 deletions dozer-sink-clickhouse/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ pub fn map_field_to_type(field: &FieldDefinition) -> String {
FieldType::UInt => "UInt64",
FieldType::U128 => "UInt128",
FieldType::Int => "Int64",
FieldType::Int8 => "Int8",
FieldType::I128 => "Int128",
FieldType::Float => "Float64",
FieldType::Boolean => "Boolean",
Expand Down
27 changes: 20 additions & 7 deletions dozer-sink-clickhouse/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::metadata::{
};
use crate::schema::{ClickhouseSchema, ClickhouseTable};
use dozer_types::tonic::async_trait;
use dozer_types::types::{Field, Operation, Schema, TableOperation};
use dozer_types::types::{Field, FieldDefinition, Operation, Schema, TableOperation};
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
Expand Down Expand Up @@ -113,7 +113,6 @@ impl SinkFactory for ClickhouseSinkFactory {
let table = ClickhouseSchema::get_clickhouse_table(client.clone(), &self.config).await?;

ClickhouseSchema::compare_with_dozer_schema(client.clone(), &schema, &table).await?;

let sink = ClickhouseSink::new(
client,
self.config.clone(),
Expand Down Expand Up @@ -155,6 +154,19 @@ impl ClickhouseSink {
runtime: Arc<Runtime>,
table: ClickhouseTable,
) -> Self {
let mut schema = schema.clone();

if table.engine == "CollapsingMergeTree" && !schema.fields.is_empty() {
// get source from any field in schema
let source = schema.fields[0].source.clone();
schema.fields.push(FieldDefinition {
name: "sign".to_string(),
typ: dozer_types::types::FieldType::Int8,
nullable: false,
description: None,
source,
});
}
Self {
client,
runtime,
Expand Down Expand Up @@ -249,8 +261,7 @@ impl Sink for ClickhouseSink {
Operation::Insert { new } => {
if self.table.engine == "CollapsingMergeTree" {
let mut values = new.values;
values.push(Field::Int(1));

values.push(Field::Int8(1));
self.insert_values(&values)?;
} else {
self.insert_values(&new.values)?;
Expand All @@ -273,17 +284,19 @@ impl Sink for ClickhouseSink {
return Err(BoxedError::from(ClickhouseSinkError::UnsupportedOperation));
}
let mut values = old.values;
values.push(Field::Int(-1));
values.push(Field::Int8(-1));
self.insert_values(&values)?;

let mut values = new.values;
values.push(Field::Int(1));
values.push(Field::Int8(1));
self.insert_values(&values)?;
}
Operation::BatchInsert { new } => {
for record in new {
let mut values = record.values;
values.push(Field::Int(1));
if self.table.engine == "CollapsingMergeTree" {
values.push(Field::Int8(1));
}
self.insert_values(&values)?;
}
self.commit_batch()?;
Expand Down
2 changes: 1 addition & 1 deletion dozer-sink-clickhouse/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ fn add_last_column_to_block(
FieldType::UInt => add_last_column.call(trivial_mapper!(Field::UInt)),
FieldType::U128 => add_last_column.call(trivial_mapper!(Field::U128)),
FieldType::Int => add_last_column.call(trivial_mapper!(Field::Int)),
FieldType::Int8 => add_last_column.call(trivial_mapper!(Field::Int8)),
FieldType::I128 => add_last_column.call(trivial_mapper!(Field::I128)),
FieldType::Boolean => add_last_column.call(trivial_mapper!(Field::Boolean)),
FieldType::Float => add_last_column.call(|field| match field {
Expand Down Expand Up @@ -276,7 +277,6 @@ pub async fn insert_multi(
query_id: Option<String>,
) -> Result<(), QueryError> {
let mut block = Block::<clickhouse_rs::Simple>::new();

for field in fields.iter().rev() {
block = add_last_column_to_block(block, &field.name, &mut rows, field.typ, field.nullable)?;
}
Expand Down
1 change: 1 addition & 0 deletions dozer-sink-oracle/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ impl OracleSinkFactory {
FieldType::UInt => "NUMBER(20)",
FieldType::U128 => unimplemented!(),
FieldType::Int => "NUMBER(20)",
FieldType::Int8 => unimplemented!(),
FieldType::I128 => unimplemented!(),
// Should this be BINARY_DOUBLE?
FieldType::Float => "NUMBER",
Expand Down
24 changes: 24 additions & 0 deletions dozer-sql/expression/src/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ impl Display for CastOperatorType {
FieldType::UInt => f.write_str("CAST AS UINT"),
FieldType::U128 => f.write_str("CAST AS U128"),
FieldType::Int => f.write_str("CAST AS INT"),
FieldType::Int8 => f.write_str("CAST AS INT8"),
FieldType::I128 => f.write_str("CAST AS I128"),
FieldType::Float => f.write_str("CAST AS FLOAT"),
FieldType::Boolean => f.write_str("CAST AS BOOLEAN"),
Expand Down Expand Up @@ -79,6 +80,19 @@ impl CastOperatorType {
FieldType::Int => (
vec![
FieldType::Int,
FieldType::Int8,
FieldType::String,
FieldType::UInt,
FieldType::I128,
FieldType::U128,
FieldType::Json,
],
FieldType::Int,
),
FieldType::Int8 => (
vec![
FieldType::Int,
FieldType::Int8,
FieldType::String,
FieldType::UInt,
FieldType::I128,
Expand Down Expand Up @@ -252,6 +266,16 @@ pub fn cast_field(input: &Field, output_type: FieldType) -> Result<Field, Error>
})
}
}
FieldType::Int8 => {
if let Some(value) = input.to_int8() {
Ok(Field::Int8(value))
} else {
Err(Error::InvalidCast {
from: input.clone(),
to: FieldType::Int,
})
}
}
FieldType::I128 => {
if let Some(value) = input.to_i128() {
Ok(Field::I128(value))
Expand Down
Loading
Loading