Skip to content

Commit

Permalink
refactor(sink): impl RowEncoder for JsonEncoder (#12264)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu authored and Li0k committed Sep 15, 2023
1 parent 9a873a2 commit 56341aa
Show file tree
Hide file tree
Showing 7 changed files with 446 additions and 315 deletions.
309 changes: 309 additions & 0 deletions src/connector/src/sink/encoder/json.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,309 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use base64::engine::general_purpose;
use base64::Engine as _;
use chrono::{Datelike, Timelike};
use risingwave_common::array::{ArrayError, ArrayResult};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::row::Row;
use risingwave_common::types::{DataType, DatumRef, ScalarRefImpl, ToText};
use risingwave_common::util::iter_util::ZipEqDebug;
use serde_json::{json, Map, Value};

use super::{Result, RowEncoder, SerTo, TimestampHandlingMode};
use crate::sink::SinkError;

pub struct JsonEncoder<'a> {
schema: &'a Schema,
col_indices: Option<&'a [usize]>,
timestamp_handling_mode: TimestampHandlingMode,
}

impl<'a> JsonEncoder<'a> {
pub fn new(
schema: &'a Schema,
col_indices: Option<&'a [usize]>,
timestamp_handling_mode: TimestampHandlingMode,
) -> Self {
Self {
schema,
col_indices,
timestamp_handling_mode,
}
}
}

impl<'a> RowEncoder for JsonEncoder<'a> {
type Output = Map<String, Value>;

fn schema(&self) -> &Schema {
self.schema
}

fn col_indices(&self) -> Option<&[usize]> {
self.col_indices
}

fn encode_cols(
&self,
row: impl Row,
col_indices: impl Iterator<Item = usize>,
) -> Result<Self::Output> {
let mut mappings = Map::with_capacity(self.schema.len());
for idx in col_indices {
let field = &self.schema[idx];
let key = field.name.clone();
let value =
datum_to_json_object(field, row.datum_at(idx), self.timestamp_handling_mode)
.map_err(|e| SinkError::JsonParse(e.to_string()))?;
mappings.insert(key, value);
}
Ok(mappings)
}
}

impl SerTo<String> for Map<String, Value> {
fn ser_to(self) -> Result<String> {
Value::Object(self).ser_to()
}
}

impl SerTo<String> for Value {
fn ser_to(self) -> Result<String> {
Ok(self.to_string())
}
}

fn datum_to_json_object(
field: &Field,
datum: DatumRef<'_>,
timestamp_handling_mode: TimestampHandlingMode,
) -> ArrayResult<Value> {
let scalar_ref = match datum {
None => return Ok(Value::Null),
Some(datum) => datum,
};

let data_type = field.data_type();

tracing::debug!("datum_to_json_object: {:?}, {:?}", data_type, scalar_ref);

let value = match (data_type, scalar_ref) {
(DataType::Boolean, ScalarRefImpl::Bool(v)) => {
json!(v)
}
(DataType::Int16, ScalarRefImpl::Int16(v)) => {
json!(v)
}
(DataType::Int32, ScalarRefImpl::Int32(v)) => {
json!(v)
}
(DataType::Int64, ScalarRefImpl::Int64(v)) => {
json!(v)
}
(DataType::Float32, ScalarRefImpl::Float32(v)) => {
json!(f32::from(v))
}
(DataType::Float64, ScalarRefImpl::Float64(v)) => {
json!(f64::from(v))
}
(DataType::Varchar, ScalarRefImpl::Utf8(v)) => {
json!(v)
}
(DataType::Decimal, ScalarRefImpl::Decimal(v)) => {
json!(v.to_text())
}
(DataType::Timestamptz, ScalarRefImpl::Timestamptz(v)) => {
// risingwave's timestamp with timezone is stored in UTC and does not maintain the
// timezone info and the time is in microsecond.
let parsed = v.to_datetime_utc().naive_utc();
let v = parsed.format("%Y-%m-%d %H:%M:%S%.6f").to_string();
json!(v)
}
(DataType::Time, ScalarRefImpl::Time(v)) => {
// todo: just ignore the nanos part to avoid leap second complex
json!(v.0.num_seconds_from_midnight() as i64 * 1000)
}
(DataType::Date, ScalarRefImpl::Date(v)) => {
json!(v.0.num_days_from_ce())
}
(DataType::Timestamp, ScalarRefImpl::Timestamp(v)) => match timestamp_handling_mode {
TimestampHandlingMode::Milli => json!(v.0.timestamp_millis()),
TimestampHandlingMode::String => json!(v.0.format("%Y-%m-%d %H:%M:%S%.6f").to_string()),
},
(DataType::Bytea, ScalarRefImpl::Bytea(v)) => {
json!(general_purpose::STANDARD_NO_PAD.encode(v))
}
// P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S
(DataType::Interval, ScalarRefImpl::Interval(v)) => {
json!(v.as_iso_8601())
}
(DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => {
json!(jsonb_ref.to_string())
}
(DataType::List(datatype), ScalarRefImpl::List(list_ref)) => {
let elems = list_ref.iter();
let mut vec = Vec::with_capacity(elems.len());
let inner_field = Field::unnamed(Box::<DataType>::into_inner(datatype));
for sub_datum_ref in elems {
let value =
datum_to_json_object(&inner_field, sub_datum_ref, timestamp_handling_mode)?;
vec.push(value);
}
json!(vec)
}
(DataType::Struct(st), ScalarRefImpl::Struct(struct_ref)) => {
let mut map = Map::with_capacity(st.len());
for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug(
st.iter()
.map(|(name, dt)| Field::with_name(dt.clone(), name)),
) {
let value =
datum_to_json_object(&sub_field, sub_datum_ref, timestamp_handling_mode)?;
map.insert(sub_field.name.clone(), value);
}
json!(map)
}
(data_type, scalar_ref) => {
return Err(ArrayError::internal(
format!("datum_to_json_object: unsupported data type: field name: {:?}, logical type: {:?}, physical type: {:?}", field.name, data_type, scalar_ref),
));
}
};

Ok(value)
}

#[cfg(test)]
mod tests {

use risingwave_common::types::{DataType, Interval, ScalarImpl, Time, Timestamp};

use super::*;
#[test]
fn test_to_json_basic_type() {
let mock_field = Field {
data_type: DataType::Boolean,
name: Default::default(),
sub_fields: Default::default(),
type_name: Default::default(),
};
let boolean_value = datum_to_json_object(
&Field {
data_type: DataType::Boolean,
..mock_field.clone()
},
Some(ScalarImpl::Bool(false).as_scalar_ref_impl()),
TimestampHandlingMode::String,
)
.unwrap();
assert_eq!(boolean_value, json!(false));

let int16_value = datum_to_json_object(
&Field {
data_type: DataType::Int16,
..mock_field.clone()
},
Some(ScalarImpl::Int16(16).as_scalar_ref_impl()),
TimestampHandlingMode::String,
)
.unwrap();
assert_eq!(int16_value, json!(16));

let int64_value = datum_to_json_object(
&Field {
data_type: DataType::Int64,
..mock_field.clone()
},
Some(ScalarImpl::Int64(std::i64::MAX).as_scalar_ref_impl()),
TimestampHandlingMode::String,
)
.unwrap();
assert_eq!(
serde_json::to_string(&int64_value).unwrap(),
std::i64::MAX.to_string()
);

// https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/time/ZonedTimestamp.java
let tstz_inner = "2018-01-26T18:30:09.453Z".parse().unwrap();
let tstz_value = datum_to_json_object(
&Field {
data_type: DataType::Timestamptz,
..mock_field.clone()
},
Some(ScalarImpl::Timestamptz(tstz_inner).as_scalar_ref_impl()),
TimestampHandlingMode::String,
)
.unwrap();
assert_eq!(tstz_value, "2018-01-26 18:30:09.453000");

let ts_value = datum_to_json_object(
&Field {
data_type: DataType::Timestamp,
..mock_field.clone()
},
Some(
ScalarImpl::Timestamp(Timestamp::from_timestamp_uncheck(1000, 0))
.as_scalar_ref_impl(),
),
TimestampHandlingMode::Milli,
)
.unwrap();
assert_eq!(ts_value, json!(1000 * 1000));

let ts_value = datum_to_json_object(
&Field {
data_type: DataType::Timestamp,
..mock_field.clone()
},
Some(
ScalarImpl::Timestamp(Timestamp::from_timestamp_uncheck(1000, 0))
.as_scalar_ref_impl(),
),
TimestampHandlingMode::String,
)
.unwrap();
assert_eq!(ts_value, json!("1970-01-01 00:16:40.000000".to_string()));

// Represents the number of microseconds past midnigh, io.debezium.time.Time
let time_value = datum_to_json_object(
&Field {
data_type: DataType::Time,
..mock_field.clone()
},
Some(
ScalarImpl::Time(Time::from_num_seconds_from_midnight_uncheck(1000, 0))
.as_scalar_ref_impl(),
),
TimestampHandlingMode::String,
)
.unwrap();
assert_eq!(time_value, json!(1000 * 1000));

let interval_value = datum_to_json_object(
&Field {
data_type: DataType::Interval,
..mock_field
},
Some(
ScalarImpl::Interval(Interval::from_month_day_usec(13, 2, 1000000))
.as_scalar_ref_impl(),
),
TimestampHandlingMode::String,
)
.unwrap();
assert_eq!(interval_value, json!("P1Y1M2DT0H0M1S"));
}
}
80 changes: 80 additions & 0 deletions src/connector/src/sink/encoder/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::catalog::Schema;
use risingwave_common::row::Row;

use crate::sink::Result;

mod json;

pub use json::JsonEncoder;

/// Encode a row of a relation into
/// * an object in json
/// * a message in protobuf
/// * a record in avro
pub trait RowEncoder {
type Output: SerTo<Vec<u8>>;

fn encode_cols(
&self,
row: impl Row,
col_indices: impl Iterator<Item = usize>,
) -> Result<Self::Output>;
fn schema(&self) -> &Schema;
fn col_indices(&self) -> Option<&[usize]>;

fn encode(&self, row: impl Row) -> Result<Self::Output> {
assert_eq!(row.len(), self.schema().len());
match self.col_indices() {
Some(col_indices) => self.encode_cols(row, col_indices.iter().copied()),
None => self.encode_cols(row, 0..self.schema().len()),
}
}
}

/// Do the actual encoding from
/// * an json object
/// * a protobuf message
/// * an avro record
/// into
/// * string (required by kinesis key)
/// * bytes
///
/// This is like `TryInto` but allows us to `impl<T: SerTo<String>> SerTo<Vec<u8>> for T`.
///
/// Shall we consider `impl serde::Serialize` in the future?
pub trait SerTo<T> {
fn ser_to(self) -> Result<T>;
}

impl<T: SerTo<String>> SerTo<Vec<u8>> for T {
fn ser_to(self) -> Result<Vec<u8>> {
self.ser_to().map(|s: String| s.into_bytes())
}
}

impl<T> SerTo<T> for T {
fn ser_to(self) -> Result<T> {
Ok(self)
}
}

/// Useful for both json and protobuf
#[derive(Clone, Copy)]
pub enum TimestampHandlingMode {
Milli,
String,
}
Loading

0 comments on commit 56341aa

Please sign in to comment.