Skip to content

Commit

Permalink
feat(sink): support sink map to avro (#18551)
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan authored and xxchan committed Sep 19, 2024
1 parent 680c134 commit 0a6192e
Show file tree
Hide file tree
Showing 4 changed files with 245 additions and 39 deletions.
38 changes: 36 additions & 2 deletions src/common/src/array/map_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ use std::fmt::{self, Debug, Display};
use bytes::{Buf, BufMut};
use itertools::Itertools;
use risingwave_common_estimate_size::EstimateSize;
use risingwave_error::BoxedError;
use risingwave_pb::data::{PbArray, PbArrayType};
use serde::Serializer;

use super::{
Array, ArrayBuilder, ArrayImpl, ArrayResult, DatumRef, DefaultOrdered, ListArray,
ListArrayBuilder, ListRef, ListValue, MapType, ScalarRef, ScalarRefImpl, StructArray,
StructRef,
ListArrayBuilder, ListRef, ListValue, MapType, ScalarImpl, ScalarRef, ScalarRefImpl,
StructArray, StructRef,
};
use crate::bitmap::Bitmap;
use crate::types::{DataType, Scalar, ToText};
Expand Down Expand Up @@ -525,3 +526,36 @@ impl ToText for MapRef<'_> {
}
}
}

impl MapValue {
pub fn from_str_for_test(s: &str, data_type: &MapType) -> Result<Self, BoxedError> {
// TODO: this is a quick trivial implementation. Implement the full version later.

// example: {1:1,2:NULL,3:3}

if !s.starts_with('{') {
return Err(format!("Missing left parenthesis: {}", s).into());
}
if !s.ends_with('}') {
return Err(format!("Missing right parenthesis: {}", s).into());
}
let mut key_builder = data_type.key().create_array_builder(100);
let mut value_builder = data_type.value().create_array_builder(100);
for kv in s[1..s.len() - 1].split(',') {
let (k, v) = kv.split_once(':').ok_or("Invalid map format")?;
key_builder.append(Some(ScalarImpl::from_text(k, data_type.key())?));
if v == "NULL" {
value_builder.append_null();
} else {
value_builder.append(Some(ScalarImpl::from_text(v, data_type.value())?));
}
}
let key_array = key_builder.finish();
let value_array = value_builder.finish();

Ok(MapValue::try_from_kv(
ListValue::new(key_array),
ListValue::new(value_array),
)?)
}
}
12 changes: 3 additions & 9 deletions src/common/src/array/struct_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,17 +337,14 @@ impl StructValue {
.map(Self::new)
}

/// Construct an array from literal string.
/// Construct a struct from literal string.
///
/// # Example
///
/// ```
/// # use risingwave_common::types::{StructValue, StructType, DataType, ScalarImpl};
///
/// let ty = DataType::Struct(StructType::unnamed(vec![
/// DataType::Int32,
/// DataType::Float64,
/// ]));
/// let ty = StructType::unnamed(vec![DataType::Int32, DataType::Float64]);
/// let s = StructValue::from_str("(1, 2.0)", &ty).unwrap();
/// assert_eq!(s.fields()[0], Some(ScalarImpl::Int32(1)));
/// assert_eq!(s.fields()[1], Some(ScalarImpl::Float64(2.0.into())));
Expand All @@ -356,11 +353,8 @@ impl StructValue {
/// assert_eq!(s.fields()[0], None);
/// assert_eq!(s.fields()[1], None);
/// ```
pub fn from_str(s: &str, data_type: &DataType) -> Result<Self, BoxedError> {
pub fn from_str(s: &str, ty: &StructType) -> Result<Self, BoxedError> {
// FIXME(runji): this is a trivial implementation which does not support nested struct.
let DataType::Struct(ty) = data_type else {
return Err(format!("Expect struct type, got {:?}", data_type).into());
};
if !s.starts_with('(') {
return Err("Missing left parenthesis".into());
}
Expand Down
13 changes: 9 additions & 4 deletions src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -918,12 +918,17 @@ impl ScalarImpl {
DataType::Time => Time::from_str(s)?.into(),
DataType::Interval => Interval::from_str(s)?.into(),
DataType::List(_) => ListValue::from_str(s, data_type)?.into(),
DataType::Struct(_) => StructValue::from_str(s, data_type)?.into(),
DataType::Struct(st) => StructValue::from_str(s, st)?.into(),
DataType::Jsonb => JsonbVal::from_str(s)?.into(),
DataType::Bytea => str_to_bytea(s)?.into(),
DataType::Map(_) => {
todo!()
}
DataType::Map(_m) => return Err("map from text is not supported".into()),
})
}

pub fn from_text_for_test(s: &str, data_type: &DataType) -> Result<Self, BoxedError> {
Ok(match data_type {
DataType::Map(map_type) => MapValue::from_str_for_test(s, map_type)?.into(),
_ => ScalarImpl::from_text(s, data_type)?,
})
}
}
Expand Down
Loading

0 comments on commit 0a6192e

Please sign in to comment.