Skip to content

Commit

Permalink
timestamptz.handling.mode
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu committed Oct 31, 2023
1 parent 94e6ab2 commit a8a0022
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 24 deletions.
57 changes: 49 additions & 8 deletions src/connector/src/sink/encoder/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ use serde_json::{json, Map, Value};

use super::{
CustomJsonType, KafkaConnectParams, KafkaConnectParamsRef, Result, RowEncoder, SerTo,
TimestampHandlingMode,
TimestampHandlingMode, TimestamptzHandlingMode,
};
use crate::sink::SinkError;

pub struct JsonEncoder {
schema: Schema,
col_indices: Option<Vec<usize>>,
timestamp_handling_mode: TimestampHandlingMode,
timestamptz_handling_mode: TimestamptzHandlingMode,
custom_json_type: CustomJsonType,
kafka_connect: Option<KafkaConnectParamsRef>,
}
Expand All @@ -46,11 +47,13 @@ impl JsonEncoder {
schema: Schema,
col_indices: Option<Vec<usize>>,
timestamp_handling_mode: TimestampHandlingMode,
timestamptz_handling_mode: TimestamptzHandlingMode,
) -> Self {
Self {
schema,
col_indices,
timestamp_handling_mode,
timestamptz_handling_mode,
custom_json_type: CustomJsonType::None,
kafka_connect: None,
}
Expand All @@ -66,6 +69,7 @@ impl JsonEncoder {
schema,
col_indices,
timestamp_handling_mode,
timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
custom_json_type: CustomJsonType::Doris(map),
kafka_connect: None,
}
Expand Down Expand Up @@ -104,6 +108,7 @@ impl RowEncoder for JsonEncoder {
field,
row.datum_at(*idx),
self.timestamp_handling_mode,
self.timestamptz_handling_mode,
&self.custom_json_type,
)
.map_err(|e| SinkError::Encode(e.to_string()))?;
Expand Down Expand Up @@ -138,6 +143,7 @@ fn datum_to_json_object(
field: &Field,
datum: DatumRef<'_>,
timestamp_handling_mode: TimestampHandlingMode,
timestamptz_handling_mode: TimestamptzHandlingMode,
custom_json_type: &CustomJsonType,
) -> ArrayResult<Value> {
let scalar_ref = match datum {
Expand Down Expand Up @@ -191,13 +197,20 @@ fn datum_to_json_object(
json!(v.to_text())
}
},
(DataType::Timestamptz, ScalarRefImpl::Timestamptz(v)) => {
// risingwave's timestamp with timezone is stored in UTC and does not maintain the
// timezone info and the time is in microsecond.
let parsed = v.to_datetime_utc();
let v = parsed.to_rfc3339_opts(chrono::SecondsFormat::Micros, true);
json!(v)
}
(DataType::Timestamptz, ScalarRefImpl::Timestamptz(v)) => match timestamptz_handling_mode {
TimestamptzHandlingMode::UtcString => {
let parsed = v.to_datetime_utc();
let v = parsed.to_rfc3339_opts(chrono::SecondsFormat::Micros, true);
json!(v)
}
TimestamptzHandlingMode::UtcWithoutSuffix => {
let parsed = v.to_datetime_utc().naive_utc();
let v = parsed.format("%Y-%m-%d %H:%M:%S%.6f").to_string();
json!(v)
}
TimestamptzHandlingMode::Micro => json!(v.timestamp_micros()),
TimestamptzHandlingMode::Milli => json!(v.timestamp_millis()),
},
(DataType::Time, ScalarRefImpl::Time(v)) => {
// todo: just ignore the nanos part to avoid leap second complex
json!(v.0.num_seconds_from_midnight() as i64 * 1000)
Expand Down Expand Up @@ -232,6 +245,7 @@ fn datum_to_json_object(
&inner_field,
sub_datum_ref,
timestamp_handling_mode,
timestamptz_handling_mode,
custom_json_type,
)?;
vec.push(value);
Expand All @@ -251,6 +265,7 @@ fn datum_to_json_object(
&sub_field,
sub_datum_ref,
timestamp_handling_mode,
timestamptz_handling_mode,
custom_json_type,
)?;
map.insert(sub_field.name.clone(), value);
Expand All @@ -269,6 +284,7 @@ fn datum_to_json_object(
&sub_field,
sub_datum_ref,
timestamp_handling_mode,
timestamptz_handling_mode,
custom_json_type,
)?;
map.insert(sub_field.name.clone(), value);
Expand Down Expand Up @@ -385,6 +401,7 @@ mod tests {
},
Some(ScalarImpl::Bool(false).as_scalar_ref_impl()),
TimestampHandlingMode::String,
TimestamptzHandlingMode::UtcString,
&CustomJsonType::None,
)
.unwrap();
Expand All @@ -397,6 +414,7 @@ mod tests {
},
Some(ScalarImpl::Int16(16).as_scalar_ref_impl()),
TimestampHandlingMode::String,
TimestamptzHandlingMode::UtcString,
&CustomJsonType::None,
)
.unwrap();
Expand All @@ -409,6 +427,7 @@ mod tests {
},
Some(ScalarImpl::Int64(std::i64::MAX).as_scalar_ref_impl()),
TimestampHandlingMode::String,
TimestamptzHandlingMode::UtcString,
&CustomJsonType::None,
)
.unwrap();
Expand All @@ -426,11 +445,26 @@ mod tests {
},
Some(ScalarImpl::Timestamptz(tstz_inner).as_scalar_ref_impl()),
TimestampHandlingMode::String,
TimestamptzHandlingMode::UtcString,
&CustomJsonType::None,
)
.unwrap();
assert_eq!(tstz_value, "2018-01-26T18:30:09.453000Z");

let tstz_inner = "2018-01-26T18:30:09.453Z".parse().unwrap();
let tstz_value = datum_to_json_object(
&Field {
data_type: DataType::Timestamptz,
..mock_field.clone()
},
Some(ScalarImpl::Timestamptz(tstz_inner).as_scalar_ref_impl()),
TimestampHandlingMode::String,
TimestamptzHandlingMode::UtcWithoutSuffix,
&CustomJsonType::None,
)
.unwrap();
assert_eq!(tstz_value, "2018-01-26 18:30:09.453000");

let ts_value = datum_to_json_object(
&Field {
data_type: DataType::Timestamp,
Expand All @@ -441,6 +475,7 @@ mod tests {
.as_scalar_ref_impl(),
),
TimestampHandlingMode::Milli,
TimestamptzHandlingMode::UtcString,
&CustomJsonType::None,
)
.unwrap();
Expand All @@ -456,6 +491,7 @@ mod tests {
.as_scalar_ref_impl(),
),
TimestampHandlingMode::String,
TimestamptzHandlingMode::UtcString,
&CustomJsonType::None,
)
.unwrap();
Expand All @@ -472,6 +508,7 @@ mod tests {
.as_scalar_ref_impl(),
),
TimestampHandlingMode::String,
TimestamptzHandlingMode::UtcString,
&CustomJsonType::None,
)
.unwrap();
Expand All @@ -487,6 +524,7 @@ mod tests {
.as_scalar_ref_impl(),
),
TimestampHandlingMode::String,
TimestamptzHandlingMode::UtcString,
&CustomJsonType::None,
)
.unwrap();
Expand All @@ -502,6 +540,7 @@ mod tests {
},
Some(ScalarImpl::Decimal(Decimal::try_from(1.1111111).unwrap()).as_scalar_ref_impl()),
TimestampHandlingMode::String,
TimestamptzHandlingMode::UtcString,
&CustomJsonType::Doris(map),
)
.unwrap();
Expand All @@ -514,6 +553,7 @@ mod tests {
},
Some(ScalarImpl::Date(Date::from_ymd_uncheck(2010, 10, 10)).as_scalar_ref_impl()),
TimestampHandlingMode::String,
TimestamptzHandlingMode::UtcString,
&CustomJsonType::Doris(HashMap::default()),
)
.unwrap();
Expand All @@ -536,6 +576,7 @@ mod tests {
},
Some(ScalarRefImpl::Struct(StructRef::ValueRef { val: &value })),
TimestampHandlingMode::String,
TimestamptzHandlingMode::UtcString,
&CustomJsonType::Doris(HashMap::default()),
)
.unwrap();
Expand Down
33 changes: 32 additions & 1 deletion src/connector/src/sink/encoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;

use risingwave_common::catalog::Schema;
Expand Down Expand Up @@ -89,6 +89,37 @@ pub enum TimestampHandlingMode {
String,
}

#[derive(Clone, Copy, Default)]
pub enum TimestamptzHandlingMode {
#[default]
UtcString,
UtcWithoutSuffix,
Micro,
Milli,
}

impl TimestamptzHandlingMode {
pub const FRONTEND_DEFAULT: &'static str = "utc_string";
pub const OPTION_KEY: &'static str = "timestamptz.handling.mode";

pub fn from_options(options: &BTreeMap<String, String>) -> Result<Self> {
match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) {
Some(Self::FRONTEND_DEFAULT) => Ok(Self::UtcString),
Some("utc_without_suffix") => Ok(Self::UtcWithoutSuffix),
Some("micro") => Ok(Self::Micro),
Some("milli") => Ok(Self::Milli),
Some(v) => Err(super::SinkError::Config(anyhow::anyhow!(
"unrecognized {} value {}",
Self::OPTION_KEY,
v
))),
// This is not a good default. We just have to select it when no option is provided
// for compatibility with old version.
None => Ok(Self::UtcWithoutSuffix),
}
}
}

#[derive(Clone)]
pub enum CustomJsonType {
Doris(HashMap<String, (u8, u8)>),
Expand Down
19 changes: 16 additions & 3 deletions src/connector/src/sink/formatter/debezium_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ use serde_json::{json, Map, Value};
use tracing::warn;

use super::{Result, SinkFormatter, StreamChunk};
use crate::sink::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode};
use crate::sink::encoder::{
JsonEncoder, RowEncoder, TimestampHandlingMode, TimestamptzHandlingMode,
};
use crate::tri;

const DEBEZIUM_NAME_FIELD_PREFIX: &str = "RisingWave";
Expand Down Expand Up @@ -63,8 +65,14 @@ impl DebeziumJsonFormatter {
schema.clone(),
Some(pk_indices.clone()),
TimestampHandlingMode::Milli,
TimestamptzHandlingMode::UtcString,
);
let val_encoder = JsonEncoder::new(
schema.clone(),
None,
TimestampHandlingMode::Milli,
TimestamptzHandlingMode::UtcString,
);
let val_encoder = JsonEncoder::new(schema.clone(), None, TimestampHandlingMode::Milli);
Self {
schema,
pk_indices,
Expand Down Expand Up @@ -360,7 +368,12 @@ mod tests {
},
]);

let encoder = JsonEncoder::new(schema.clone(), None, TimestampHandlingMode::Milli);
let encoder = JsonEncoder::new(
schema.clone(),
None,
TimestampHandlingMode::Milli,
TimestamptzHandlingMode::UtcString,
);
let json_chunk = chunk_to_json(chunk, &encoder).unwrap();
let schema_json = schema_to_json(&schema, "test_db", "test_table");
assert_eq!(
Expand Down
21 changes: 16 additions & 5 deletions src/connector/src/sink/formatter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub use upsert::UpsertFormatter;

use super::catalog::{SinkEncode, SinkFormat, SinkFormatDesc};
use super::encoder::template::TemplateEncoder;
use super::encoder::KafkaConnectParams;
use super::encoder::{KafkaConnectParams, TimestamptzHandlingMode};
use super::redis::{KEY_FORMAT, VALUE_FORMAT};
use crate::sink::encoder::{
AvroEncoder, AvroHeader, JsonEncoder, ProtoEncoder, TimestampHandlingMode,
Expand Down Expand Up @@ -91,6 +91,7 @@ impl SinkFormatterImpl {
format_desc.encode,
)))
};
let timestamptz_mode = TimestamptzHandlingMode::from_options(&format_desc.options)?;

match format_desc.format {
SinkFormat::AppendOnly => {
Expand All @@ -99,13 +100,18 @@ impl SinkFormatterImpl {
schema.clone(),
Some(pk_indices.clone()),
TimestampHandlingMode::Milli,
timestamptz_mode,
)
});

match format_desc.encode {
SinkEncode::Json => {
let val_encoder =
JsonEncoder::new(schema, None, TimestampHandlingMode::Milli);
let val_encoder = JsonEncoder::new(
schema,
None,
TimestampHandlingMode::Milli,
timestamptz_mode,
);
let formatter = AppendOnlyFormatter::new(key_encoder, val_encoder);
Ok(SinkFormatterImpl::AppendOnlyJson(formatter))
}
Expand Down Expand Up @@ -164,9 +170,14 @@ impl SinkFormatterImpl {
schema.clone(),
Some(pk_indices),
TimestampHandlingMode::Milli,
timestamptz_mode,
);
let mut val_encoder = JsonEncoder::new(
schema,
None,
TimestampHandlingMode::Milli,
timestamptz_mode,
);
let mut val_encoder =
JsonEncoder::new(schema, None, TimestampHandlingMode::Milli);

if let Some(s) = format_desc.options.get("schemas.enable") {
match s.to_lowercase().parse::<bool>() {
Expand Down
9 changes: 7 additions & 2 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ mod test {
use risingwave_common::types::DataType;

use super::*;
use crate::sink::encoder::{JsonEncoder, TimestampHandlingMode};
use crate::sink::encoder::{JsonEncoder, TimestampHandlingMode, TimestamptzHandlingMode};
use crate::sink::formatter::AppendOnlyFormatter;

#[test]
Expand Down Expand Up @@ -729,7 +729,12 @@ mod test {
SinkFormatterImpl::AppendOnlyJson(AppendOnlyFormatter::new(
// We do not specify primary key for this schema
None,
JsonEncoder::new(schema, None, TimestampHandlingMode::Milli),
JsonEncoder::new(
schema,
None,
TimestampHandlingMode::Milli,
TimestamptzHandlingMode::UtcString,
),
)),
)
.await
Expand Down
Loading

0 comments on commit a8a0022

Please sign in to comment.