Skip to content

Commit

Permalink
fix(sink): ClickHouse DateTime64 should be timestamptz (#13672)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu authored Nov 28, 2023
1 parent 6072963 commit 4e28163
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 13 deletions.
4 changes: 2 additions & 2 deletions integration_tests/clickhouse-sink/create_source.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ CREATE TABLE user_behaviors (
user_id INT,
target_id VARCHAR,
target_type VARCHAR,
event_timestamp TIMESTAMP,
event_timestamp TIMESTAMPTZ,
behavior_type VARCHAR,
parent_target_type VARCHAR,
parent_target_id VARCHAR,
Expand All @@ -15,4 +15,4 @@ CREATE TABLE user_behaviors (
fields.user_name.kind = 'random',
fields.user_name.length = '10',
datagen.rows.per.second = '50'
) FORMAT PLAIN ENCODE JSON;
) FORMAT PLAIN ENCODE JSON;
35 changes: 24 additions & 11 deletions src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,12 +277,12 @@ impl ClickHouseSink {
risingwave_common::types::DataType::Time => Err(SinkError::ClickHouse(
"clickhouse can not support Time".to_string(),
)),
risingwave_common::types::DataType::Timestamp => {
risingwave_common::types::DataType::Timestamp => Err(SinkError::ClickHouse(
"clickhouse does not have a type corresponding to naive timestamp".to_string(),
)),
risingwave_common::types::DataType::Timestamptz => {
Ok(ck_column.r#type.contains("DateTime64"))
}
risingwave_common::types::DataType::Timestamptz => Err(SinkError::ClickHouse(
"clickhouse can not support Timestamptz".to_string(),
)),
risingwave_common::types::DataType::Interval => Err(SinkError::ClickHouse(
"clickhouse can not support Interval".to_string(),
)),
Expand Down Expand Up @@ -422,6 +422,7 @@ impl ClickHouseSinkWriter {
/// `column_correct_vec`
fn build_column_correct_vec(ck_column: &SystemColumn) -> Result<ClickHouseSchemaFeature> {
let can_null = ck_column.r#type.contains("Nullable");
// `DateTime64` without precision is already displayed as `DateTime(3)` in `system.columns`.
let accuracy_time = if ck_column.r#type.contains("DateTime64(") {
ck_column
.r#type
Expand All @@ -431,6 +432,9 @@ impl ClickHouseSinkWriter {
.split(')')
.next()
.ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))?
.split(',')
.next()
.ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))?
.parse::<u8>()
.map_err(|e| SinkError::ClickHouse(format!("clickhouse sink error {}", e)))?
} else {
Expand Down Expand Up @@ -696,16 +700,25 @@ impl ClickHouseFieldWithNull {
"clickhouse can not support Time".to_string(),
))
}
ScalarRefImpl::Timestamp(v) => {
let time = v.get_timestamp_nanos()
/ 10_i32.pow((9 - clickhouse_schema_feature.accuracy_time).into()) as i64;
ClickHouseField::Int64(time)
}
ScalarRefImpl::Timestamptz(_) => {
ScalarRefImpl::Timestamp(_) => {
return Err(SinkError::ClickHouse(
"clickhouse can not support Timestamptz".to_string(),
"clickhouse does not have a type corresponding to naive timestamp".to_string(),
))
}
ScalarRefImpl::Timestamptz(v) => {
let micros = v.timestamp_micros();
let ticks = match clickhouse_schema_feature.accuracy_time <= 6 {
true => {
micros / 10_i64.pow((6 - clickhouse_schema_feature.accuracy_time).into())
}
false => micros
.checked_mul(
10_i64.pow((clickhouse_schema_feature.accuracy_time - 6).into()),
)
.ok_or_else(|| SinkError::ClickHouse("DateTime64 overflow".to_string()))?,
};
ClickHouseField::Int64(ticks)
}
ScalarRefImpl::Jsonb(_) => {
return Err(SinkError::ClickHouse(
"clickhouse rust interface can not support Json".to_string(),
Expand Down

0 comments on commit 4e28163

Please sign in to comment.