Skip to content

Commit

Permalink
avro
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Sep 17, 2024
1 parent 80ec46d commit 20bc6e2
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 35 deletions.
38 changes: 36 additions & 2 deletions src/common/src/array/map_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ use std::fmt::{self, Debug, Display};
use bytes::{Buf, BufMut};
use itertools::Itertools;
use risingwave_common_estimate_size::EstimateSize;
use risingwave_error::BoxedError;
use risingwave_pb::data::{PbArray, PbArrayType};
use serde::Serializer;

use super::{
Array, ArrayBuilder, ArrayImpl, ArrayResult, DatumRef, DefaultOrdered, ListArray,
ListArrayBuilder, ListRef, ListValue, MapType, ScalarRef, ScalarRefImpl, StructArray,
StructRef,
ListArrayBuilder, ListRef, ListValue, MapType, ScalarImpl, ScalarRef, ScalarRefImpl,
StructArray, StructRef,
};
use crate::bitmap::Bitmap;
use crate::types::{DataType, Scalar, ToText};
Expand Down Expand Up @@ -525,3 +526,36 @@ impl ToText for MapRef<'_> {
}
}
}

impl MapValue {
pub fn from_str_for_test(s: &str, data_type: &MapType) -> Result<Self, BoxedError> {
// TODO: this is a quick trivial implementation. Implement the full version later.

// example: {1:1,2:NULL,3:3}

if !s.starts_with('{') {
return Err(format!("Missing left parenthesis: {}", s).into());
}
if !s.ends_with('}') {
return Err(format!("Missing right parenthesis: {}", s).into());
}
let mut key_builder = data_type.key().create_array_builder(100);
let mut value_builder = data_type.value().create_array_builder(100);
for kv in s[1..s.len() - 1].split(',') {
let (k, v) = kv.split_once(':').ok_or("Invalid map format")?;
key_builder.append(Some(ScalarImpl::from_text(k, data_type.key())?));
if v == "NULL" {
value_builder.append_null();
} else {
value_builder.append(Some(ScalarImpl::from_text(v, data_type.value())?));
}
}
let key_array = key_builder.finish();
let value_array = value_builder.finish();

Ok(MapValue::try_from_kv(
ListValue::new(key_array),
ListValue::new(value_array),
)?)
}
}
7 changes: 2 additions & 5 deletions src/common/src/array/struct_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ impl StructValue {
.map(Self::new)
}

/// Construct an array from literal string.
/// Construct a struct from literal string.
///
/// # Example
///
Expand All @@ -356,11 +356,8 @@ impl StructValue {
/// assert_eq!(s.fields()[0], None);
/// assert_eq!(s.fields()[1], None);
/// ```
pub fn from_str(s: &str, data_type: &DataType) -> Result<Self, BoxedError> {
pub fn from_str(s: &str, ty: &StructType) -> Result<Self, BoxedError> {
// FIXME(runji): this is a trivial implementation which does not support nested struct.
let DataType::Struct(ty) = data_type else {
return Err(format!("Expect struct type, got {:?}", data_type).into());
};
if !s.starts_with('(') {
return Err("Missing left parenthesis".into());
}
Expand Down
13 changes: 9 additions & 4 deletions src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -917,12 +917,17 @@ impl ScalarImpl {
DataType::Time => Time::from_str(s)?.into(),
DataType::Interval => Interval::from_str(s)?.into(),
DataType::List(_) => ListValue::from_str(s, data_type)?.into(),
DataType::Struct(_) => StructValue::from_str(s, data_type)?.into(),
DataType::Struct(st) => StructValue::from_str(s, st)?.into(),
DataType::Jsonb => JsonbVal::from_str(s)?.into(),
DataType::Bytea => str_to_bytea(s)?.into(),
DataType::Map(_) => {
todo!()
}
DataType::Map(_m) => return Err("map from text is not supported".into()),
})
}

pub fn from_text_for_test(s: &str, data_type: &DataType) -> Result<Self, BoxedError> {
Ok(match data_type {
DataType::Map(map_type) => MapValue::from_str_for_test(s, map_type)?.into(),
_ => ScalarImpl::from_text(s, data_type)?,
})
}
}
Expand Down
Loading

0 comments on commit 20bc6e2

Please sign in to comment.