From a8a00224e646b4db7ae07372ed450dca25bd4f1e Mon Sep 17 00:00:00 2001 From: Xiangjin Date: Tue, 31 Oct 2023 17:28:39 +0800 Subject: [PATCH] timestamptz.handling.mode --- src/connector/src/sink/encoder/json.rs | 57 ++++++++++++++++--- src/connector/src/sink/encoder/mod.rs | 33 ++++++++++- .../src/sink/formatter/debezium_json.rs | 19 ++++++- src/connector/src/sink/formatter/mod.rs | 21 +++++-- src/connector/src/sink/kafka.rs | 9 ++- src/connector/src/sink/nats.rs | 8 ++- src/connector/src/sink/remote.rs | 16 +++++- src/frontend/src/handler/create_sink.rs | 7 ++- 8 files changed, 146 insertions(+), 24 deletions(-) diff --git a/src/connector/src/sink/encoder/json.rs b/src/connector/src/sink/encoder/json.rs index 12caaeef22826..7ef8f1cf98a3b 100644 --- a/src/connector/src/sink/encoder/json.rs +++ b/src/connector/src/sink/encoder/json.rs @@ -29,7 +29,7 @@ use serde_json::{json, Map, Value}; use super::{ CustomJsonType, KafkaConnectParams, KafkaConnectParamsRef, Result, RowEncoder, SerTo, - TimestampHandlingMode, + TimestampHandlingMode, TimestamptzHandlingMode, }; use crate::sink::SinkError; @@ -37,6 +37,7 @@ pub struct JsonEncoder { schema: Schema, col_indices: Option>, timestamp_handling_mode: TimestampHandlingMode, + timestamptz_handling_mode: TimestamptzHandlingMode, custom_json_type: CustomJsonType, kafka_connect: Option, } @@ -46,11 +47,13 @@ impl JsonEncoder { schema: Schema, col_indices: Option>, timestamp_handling_mode: TimestampHandlingMode, + timestamptz_handling_mode: TimestamptzHandlingMode, ) -> Self { Self { schema, col_indices, timestamp_handling_mode, + timestamptz_handling_mode, custom_json_type: CustomJsonType::None, kafka_connect: None, } @@ -66,6 +69,7 @@ impl JsonEncoder { schema, col_indices, timestamp_handling_mode, + timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix, custom_json_type: CustomJsonType::Doris(map), kafka_connect: None, } @@ -104,6 +108,7 @@ impl RowEncoder for JsonEncoder { field, row.datum_at(*idx), self.timestamp_handling_mode, + self.timestamptz_handling_mode, &self.custom_json_type, ) .map_err(|e| SinkError::Encode(e.to_string()))?; @@ -138,6 +143,7 @@ fn datum_to_json_object( field: &Field, datum: DatumRef<'_>, timestamp_handling_mode: TimestampHandlingMode, + timestamptz_handling_mode: TimestamptzHandlingMode, custom_json_type: &CustomJsonType, ) -> ArrayResult { let scalar_ref = match datum { @@ -191,13 +197,20 @@ fn datum_to_json_object( json!(v.to_text()) } }, - (DataType::Timestamptz, ScalarRefImpl::Timestamptz(v)) => { - // risingwave's timestamp with timezone is stored in UTC and does not maintain the - // timezone info and the time is in microsecond. - let parsed = v.to_datetime_utc(); - let v = parsed.to_rfc3339_opts(chrono::SecondsFormat::Micros, true); - json!(v) - } + (DataType::Timestamptz, ScalarRefImpl::Timestamptz(v)) => match timestamptz_handling_mode { + TimestamptzHandlingMode::UtcString => { + let parsed = v.to_datetime_utc(); + let v = parsed.to_rfc3339_opts(chrono::SecondsFormat::Micros, true); + json!(v) + } + TimestamptzHandlingMode::UtcWithoutSuffix => { + let parsed = v.to_datetime_utc().naive_utc(); + let v = parsed.format("%Y-%m-%d %H:%M:%S%.6f").to_string(); + json!(v) + } + TimestamptzHandlingMode::Micro => json!(v.timestamp_micros()), + TimestamptzHandlingMode::Milli => json!(v.timestamp_millis()), + }, (DataType::Time, ScalarRefImpl::Time(v)) => { // todo: just ignore the nanos part to avoid leap second complex json!(v.0.num_seconds_from_midnight() as i64 * 1000) @@ -232,6 +245,7 @@ fn datum_to_json_object( &inner_field, sub_datum_ref, timestamp_handling_mode, + timestamptz_handling_mode, custom_json_type, )?; vec.push(value); @@ -251,6 +265,7 @@ fn datum_to_json_object( &sub_field, sub_datum_ref, timestamp_handling_mode, + timestamptz_handling_mode, custom_json_type, )?; map.insert(sub_field.name.clone(), value); @@ -269,6 +284,7 @@ fn datum_to_json_object( &sub_field, sub_datum_ref, timestamp_handling_mode, + timestamptz_handling_mode, custom_json_type, )?; map.insert(sub_field.name.clone(), value); @@ -385,6 +401,7 @@ mod tests { }, Some(ScalarImpl::Bool(false).as_scalar_ref_impl()), TimestampHandlingMode::String, + TimestamptzHandlingMode::UtcString, &CustomJsonType::None, ) .unwrap(); @@ -397,6 +414,7 @@ mod tests { }, Some(ScalarImpl::Int16(16).as_scalar_ref_impl()), TimestampHandlingMode::String, + TimestamptzHandlingMode::UtcString, &CustomJsonType::None, ) .unwrap(); @@ -409,6 +427,7 @@ mod tests { }, Some(ScalarImpl::Int64(std::i64::MAX).as_scalar_ref_impl()), TimestampHandlingMode::String, + TimestamptzHandlingMode::UtcString, &CustomJsonType::None, ) .unwrap(); @@ -426,11 +445,26 @@ mod tests { }, Some(ScalarImpl::Timestamptz(tstz_inner).as_scalar_ref_impl()), TimestampHandlingMode::String, + TimestamptzHandlingMode::UtcString, &CustomJsonType::None, ) .unwrap(); assert_eq!(tstz_value, "2018-01-26T18:30:09.453000Z"); + let tstz_inner = "2018-01-26T18:30:09.453Z".parse().unwrap(); + let tstz_value = datum_to_json_object( + &Field { + data_type: DataType::Timestamptz, + ..mock_field.clone() + }, + Some(ScalarImpl::Timestamptz(tstz_inner).as_scalar_ref_impl()), + TimestampHandlingMode::String, + TimestamptzHandlingMode::UtcWithoutSuffix, + &CustomJsonType::None, + ) + .unwrap(); + assert_eq!(tstz_value, "2018-01-26 18:30:09.453000"); + let ts_value = datum_to_json_object( &Field { data_type: DataType::Timestamp, @@ -441,6 +475,7 @@ mod tests { .as_scalar_ref_impl(), ), TimestampHandlingMode::Milli, + TimestamptzHandlingMode::UtcString, &CustomJsonType::None, ) .unwrap(); @@ -456,6 +491,7 @@ mod tests { .as_scalar_ref_impl(), ), TimestampHandlingMode::String, + TimestamptzHandlingMode::UtcString, &CustomJsonType::None, ) .unwrap(); @@ -472,6 +508,7 @@ mod tests { .as_scalar_ref_impl(), ), TimestampHandlingMode::String, + TimestamptzHandlingMode::UtcString, &CustomJsonType::None, ) .unwrap(); @@ -487,6 +524,7 @@ mod tests { .as_scalar_ref_impl(), ), TimestampHandlingMode::String, + TimestamptzHandlingMode::UtcString, &CustomJsonType::None, ) .unwrap(); @@ -502,6 +540,7 @@ mod tests { }, Some(ScalarImpl::Decimal(Decimal::try_from(1.1111111).unwrap()).as_scalar_ref_impl()), TimestampHandlingMode::String, + TimestamptzHandlingMode::UtcString, &CustomJsonType::Doris(map), ) .unwrap(); @@ -514,6 +553,7 @@ mod tests { }, Some(ScalarImpl::Date(Date::from_ymd_uncheck(2010, 10, 10)).as_scalar_ref_impl()), TimestampHandlingMode::String, + TimestamptzHandlingMode::UtcString, &CustomJsonType::Doris(HashMap::default()), ) .unwrap(); @@ -536,6 +576,7 @@ mod tests { }, Some(ScalarRefImpl::Struct(StructRef::ValueRef { val: &value })), TimestampHandlingMode::String, + TimestamptzHandlingMode::UtcString, &CustomJsonType::Doris(HashMap::default()), ) .unwrap(); diff --git a/src/connector/src/sink/encoder/mod.rs b/src/connector/src/sink/encoder/mod.rs index b55fd534d5eb3..fda7065453d62 100644 --- a/src/connector/src/sink/encoder/mod.rs +++ b/src/connector/src/sink/encoder/mod.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use risingwave_common::catalog::Schema; @@ -89,6 +89,37 @@ pub enum TimestampHandlingMode { String, } +#[derive(Clone, Copy, Default)] +pub enum TimestamptzHandlingMode { + #[default] + UtcString, + UtcWithoutSuffix, + Micro, + Milli, +} + +impl TimestamptzHandlingMode { + pub const FRONTEND_DEFAULT: &'static str = "utc_string"; + pub const OPTION_KEY: &'static str = "timestamptz.handling.mode"; + + pub fn from_options(options: &BTreeMap) -> Result { + match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) { + Some(Self::FRONTEND_DEFAULT) => Ok(Self::UtcString), + Some("utc_without_suffix") => Ok(Self::UtcWithoutSuffix), + Some("micro") => Ok(Self::Micro), + Some("milli") => Ok(Self::Milli), + Some(v) => Err(super::SinkError::Config(anyhow::anyhow!( + "unrecognized {} value {}", + Self::OPTION_KEY, + v + ))), + // This is not a good default. We just have to select it when no option is provided + // for compatibility with old version. + None => Ok(Self::UtcWithoutSuffix), + } + } +} + #[derive(Clone)] pub enum CustomJsonType { Doris(HashMap), diff --git a/src/connector/src/sink/formatter/debezium_json.rs b/src/connector/src/sink/formatter/debezium_json.rs index ce98daab88756..a40789f7d9c95 100644 --- a/src/connector/src/sink/formatter/debezium_json.rs +++ b/src/connector/src/sink/formatter/debezium_json.rs @@ -20,7 +20,9 @@ use serde_json::{json, Map, Value}; use tracing::warn; use super::{Result, SinkFormatter, StreamChunk}; -use crate::sink::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode}; +use crate::sink::encoder::{ + JsonEncoder, RowEncoder, TimestampHandlingMode, TimestamptzHandlingMode, +}; use crate::tri; const DEBEZIUM_NAME_FIELD_PREFIX: &str = "RisingWave"; @@ -63,8 +65,14 @@ impl DebeziumJsonFormatter { schema.clone(), Some(pk_indices.clone()), TimestampHandlingMode::Milli, + TimestamptzHandlingMode::UtcString, + ); + let val_encoder = JsonEncoder::new( + schema.clone(), + None, + TimestampHandlingMode::Milli, + TimestamptzHandlingMode::UtcString, ); - let val_encoder = JsonEncoder::new(schema.clone(), None, TimestampHandlingMode::Milli); Self { schema, pk_indices, @@ -360,7 +368,12 @@ mod tests { }, ]); - let encoder = JsonEncoder::new(schema.clone(), None, TimestampHandlingMode::Milli); + let encoder = JsonEncoder::new( + schema.clone(), + None, + TimestampHandlingMode::Milli, + TimestamptzHandlingMode::UtcString, + ); let json_chunk = chunk_to_json(chunk, &encoder).unwrap(); let schema_json = schema_to_json(&schema, "test_db", "test_table"); assert_eq!( diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs index 1e165268300fa..9b8cb953372b4 100644 --- a/src/connector/src/sink/formatter/mod.rs +++ b/src/connector/src/sink/formatter/mod.rs @@ -28,7 +28,7 @@ pub use upsert::UpsertFormatter; use super::catalog::{SinkEncode, SinkFormat, SinkFormatDesc}; use super::encoder::template::TemplateEncoder; -use super::encoder::KafkaConnectParams; +use super::encoder::{KafkaConnectParams, TimestamptzHandlingMode}; use super::redis::{KEY_FORMAT, VALUE_FORMAT}; use crate::sink::encoder::{ AvroEncoder, AvroHeader, JsonEncoder, ProtoEncoder, TimestampHandlingMode, @@ -91,6 +91,7 @@ impl SinkFormatterImpl { format_desc.encode, ))) }; + let timestamptz_mode = TimestamptzHandlingMode::from_options(&format_desc.options)?; match format_desc.format { SinkFormat::AppendOnly => { @@ -99,13 +100,18 @@ impl SinkFormatterImpl { schema.clone(), Some(pk_indices.clone()), TimestampHandlingMode::Milli, + timestamptz_mode, ) }); match format_desc.encode { SinkEncode::Json => { - let val_encoder = - JsonEncoder::new(schema, None, TimestampHandlingMode::Milli); + let val_encoder = JsonEncoder::new( + schema, + None, + TimestampHandlingMode::Milli, + timestamptz_mode, + ); let formatter = AppendOnlyFormatter::new(key_encoder, val_encoder); Ok(SinkFormatterImpl::AppendOnlyJson(formatter)) } @@ -164,9 +170,14 @@ impl SinkFormatterImpl { schema.clone(), Some(pk_indices), TimestampHandlingMode::Milli, + timestamptz_mode, + ); + let mut val_encoder = JsonEncoder::new( + schema, + None, + TimestampHandlingMode::Milli, + timestamptz_mode, ); - let mut val_encoder = - JsonEncoder::new(schema, None, TimestampHandlingMode::Milli); if let Some(s) = format_desc.options.get("schemas.enable") { match s.to_lowercase().parse::() { diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 07709f182dc47..e0ff1f67471df 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -564,7 +564,7 @@ mod test { use risingwave_common::types::DataType; use super::*; - use crate::sink::encoder::{JsonEncoder, TimestampHandlingMode}; + use crate::sink::encoder::{JsonEncoder, TimestampHandlingMode, TimestamptzHandlingMode}; use crate::sink::formatter::AppendOnlyFormatter; #[test] @@ -729,7 +729,12 @@ mod test { SinkFormatterImpl::AppendOnlyJson(AppendOnlyFormatter::new( // We do not specify primary key for this schema None, - JsonEncoder::new(schema, None, TimestampHandlingMode::Milli), + JsonEncoder::new( + schema, + None, + TimestampHandlingMode::Milli, + TimestamptzHandlingMode::UtcString, + ), )), ) .await diff --git a/src/connector/src/sink/nats.rs b/src/connector/src/sink/nats.rs index 2f810eed786a9..1cf3bd25a275d 100644 --- a/src/connector/src/sink/nats.rs +++ b/src/connector/src/sink/nats.rs @@ -24,6 +24,7 @@ use serde_with::serde_as; use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tokio_retry::Retry; +use super::encoder::TimestamptzHandlingMode; use super::utils::chunk_to_json; use super::{DummySinkCommitCoordinator, SinkWriterParam}; use crate::common::NatsCommon; @@ -138,7 +139,12 @@ impl NatsSinkWriter { config: config.clone(), context, schema: schema.clone(), - json_encoder: JsonEncoder::new(schema, None, TimestampHandlingMode::Milli), + json_encoder: JsonEncoder::new( + schema, + None, + TimestampHandlingMode::Milli, + TimestamptzHandlingMode::UtcString, + ), }) } diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 0b5948646637b..228da05a48108 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -57,7 +57,7 @@ use tracing::warn; use super::encoder::{JsonEncoder, RowEncoder}; use crate::sink::coordinate::CoordinatedSinkWriter; -use crate::sink::encoder::TimestampHandlingMode; +use crate::sink::encoder::{TimestampHandlingMode, TimestamptzHandlingMode}; use crate::sink::log_store::{LogReader, LogStoreReadItem, TruncateOffset}; use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt}; use crate::sink::{ @@ -412,7 +412,12 @@ impl RemoteSinkWriterInner { batch_id: 0, stream_handle, payload_format: connector_params.sink_payload_format, - json_encoder: JsonEncoder::new(schema, None, TimestampHandlingMode::String), + json_encoder: JsonEncoder::new( + schema, + None, + TimestampHandlingMode::String, + TimestamptzHandlingMode::UtcWithoutSuffix, + ), sink_metrics, _phantom: PhantomData, }) @@ -451,7 +456,12 @@ impl RemoteSinkWriterInner { properties, epoch: None, batch_id: 0, - json_encoder: JsonEncoder::new(schema, None, TimestampHandlingMode::String), + json_encoder: JsonEncoder::new( + schema, + None, + TimestampHandlingMode::String, + TimestamptzHandlingMode::UtcWithoutSuffix, + ), stream_handle, payload_format: SinkPayloadFormat::Json, sink_metrics: SinkMetrics::for_test(), diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index e4081fbee4fcf..a4db476b6274a 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -230,6 +230,7 @@ pub async fn handle_create_sink( /// which transforms sqlparser AST `SourceSchemaV2` into `StreamSourceInfo`. fn bind_sink_format_desc(value: ConnectorSchema) -> Result { use risingwave_connector::sink::catalog::{SinkEncode, SinkFormat}; + use risingwave_connector::sink::encoder::TimestamptzHandlingMode; use risingwave_sqlparser::ast::{Encode as E, Format as F}; let format = match value.format { @@ -249,7 +250,11 @@ fn bind_sink_format_desc(value: ConnectorSchema) -> Result { return Err(ErrorCode::BindError(format!("sink encode unsupported: {e}")).into()) } }; - let options = WithOptions::try_from(value.row_options.as_slice())?.into_inner(); + let mut options = WithOptions::try_from(value.row_options.as_slice())?.into_inner(); + + options + .entry(TimestamptzHandlingMode::OPTION_KEY.to_owned()) + .or_insert(TimestamptzHandlingMode::FRONTEND_DEFAULT.to_owned()); Ok(SinkFormatDesc { format,