Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: value encoding error #4671

Merged
merged 6 commits into from
Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 0 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ members = [
"src/utils/runtime",
"src/utils/static-link",
"src/utils/stats_alloc",
"src/utils/value-encoding",
"src/workspace-hack",
]

Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ fn get_scan_bound(
.iter()
.map(|v| {
let ty = pk_types.next().unwrap();
let scalar = ScalarImpl::bytes_to_scalar(v, &ty.to_protobuf()).unwrap();
let scalar = ScalarImpl::from_proto_bytes(v, &ty.to_protobuf()).unwrap();
Some(scalar)
})
.collect_vec());
Expand All @@ -102,7 +102,7 @@ fn get_scan_bound(

let bound_ty = pk_types.next().unwrap();
let build_bound = |bound: &scan_range::Bound| -> Bound<Datum> {
let scalar = ScalarImpl::bytes_to_scalar(&bound.value, &bound_ty.to_protobuf()).unwrap();
let scalar = ScalarImpl::from_proto_bytes(&bound.value, &bound_ty.to_protobuf()).unwrap();

let datum = Some(scalar);
if bound.inclusive {
Expand Down
1 change: 0 additions & 1 deletion src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ tonic = { version = "0.2", package = "madsim-tonic" }
tracing = { version = "0.1" }
twox-hash = "1"
url = "2"
value-encoding = { path = "../utils/value-encoding" }
workspace-hack = { version = "0.1", path = "../workspace-hack" }

[target.'cfg(target_os = "linux")'.dependencies]
Expand Down
6 changes: 3 additions & 3 deletions src/common/src/array/data_chunk_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ use itertools::Itertools;

use super::column::Column;
use crate::array::DataChunk;
use crate::error::Result;
use crate::hash::HashCode;
use crate::types::{hash_datum, DataType, Datum, DatumRef, ToOwnedDatum};
use crate::util::value_encoding;
use crate::util::value_encoding::{deserialize_datum, serialize_datum};

impl DataChunk {
Expand Down Expand Up @@ -254,7 +254,7 @@ impl Row {
/// [`crate::util::ordered::OrderedRow`]
///
/// All values are nullable. Each value will have 1 extra byte to indicate whether it is null.
pub fn serialize(&self) -> Result<Vec<u8>> {
pub fn serialize(&self) -> value_encoding::Result<Vec<u8>> {
let mut result = vec![];
for cell in &self.0 {
result.extend(serialize_datum(cell)?);
Expand Down Expand Up @@ -315,7 +315,7 @@ impl RowDeserializer {
}

/// Deserialize the row from value encoding bytes.
pub fn deserialize(&self, mut data: impl Buf) -> Result<Row> {
pub fn deserialize(&self, mut data: impl Buf) -> value_encoding::Result<Row> {
let mut values = Vec::with_capacity(self.data_types.len());
for typ in &self.data_types {
values.push(deserialize_datum(&mut data, typ)?);
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/array/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ impl ListValue {
if b.is_empty() {
Ok(None)
} else {
Ok(Some(ScalarImpl::bytes_to_scalar(b, d)?))
Ok(Some(ScalarImpl::from_proto_bytes(b, d)?))
}
})
.collect::<ArrayResult<Vec<Datum>>>()?;
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/array/struct_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ impl StructValue {
if b.is_empty() {
Ok(None)
} else {
Ok(Some(ScalarImpl::bytes_to_scalar(b, d)?))
Ok(Some(ScalarImpl::from_proto_bytes(b, d)?))
}
})
.collect::<ArrayResult<Vec<Datum>>>()?;
Expand Down
9 changes: 5 additions & 4 deletions src/common/src/types/chrono_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ use chrono::{Datelike, Duration, NaiveDate, NaiveDateTime, NaiveTime, Timelike};

use super::{CheckedAdd, IntervalUnit};
use crate::array::ArrayResult;
use crate::error::Result;
use crate::util::value_encoding;
use crate::util::value_encoding::error::ValueEncodingError;

/// The same as `NaiveDate::from_ymd(1970, 1, 1).num_days_from_ce()`.
/// Minus this magic number to store the number of days since 1970-01-01.
pub const UNIX_EPOCH_DAYS: i32 = 719_163;
Expand Down Expand Up @@ -103,7 +104,7 @@ impl NaiveDateWrapper {
))
}

pub fn new_with_days_value_encoding(days: i32) -> Result<Self> {
pub fn with_days_value(days: i32) -> value_encoding::Result<Self> {
Ok(NaiveDateWrapper::new(
NaiveDate::from_num_days_from_ce_opt(days)
.ok_or(ValueEncodingError::InvalidNaiveDateEncoding(days))?,
Expand Down Expand Up @@ -141,7 +142,7 @@ impl NaiveTimeWrapper {
))
}

pub fn new_with_secs_nano_value_encoding(secs: u32, nano: u32) -> Result<Self> {
pub fn with_secs_nano_value(secs: u32, nano: u32) -> value_encoding::Result<Self> {
Ok(NaiveTimeWrapper::new(
NaiveTime::from_num_seconds_from_midnight_opt(secs, nano)
.ok_or(ValueEncodingError::InvalidNaiveTimeEncoding(secs, nano))?,
Expand Down Expand Up @@ -189,7 +190,7 @@ impl NaiveDateTimeWrapper {
}))
}

pub fn new_with_secs_nsecs_value_encoding(secs: i64, nsecs: u32) -> Result<Self> {
pub fn with_secs_nsecs_value(secs: i64, nsecs: u32) -> value_encoding::Result<Self> {
Ok(NaiveDateTimeWrapper::new({
NaiveDateTime::from_timestamp_opt(secs, nsecs).ok_or(
ValueEncodingError::InvalidNaiveDateTimeEncoding(secs, nsecs),
Expand Down
10 changes: 6 additions & 4 deletions src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,9 @@ impl ScalarImpl {
body
}

pub fn bytes_to_scalar(b: &Vec<u8>, data_type: &ProstDataType) -> ArrayResult<Self> {
/// This encoding should only be used in proto
/// TODO: replace with value encoding?
pub fn from_proto_bytes(b: &Vec<u8>, data_type: &ProstDataType) -> ArrayResult<Self> {
let value = match data_type.get_type_name()? {
TypeName::Boolean => ScalarImpl::Bool(
i8::from_be_bytes(
Expand Down Expand Up @@ -1057,18 +1059,18 @@ mod tests {
fn test_protobuf_conversion() {
let v = ScalarImpl::NaiveDateTime(NaiveDateTimeWrapper::default());
let actual =
ScalarImpl::bytes_to_scalar(&v.to_protobuf(), &DataType::Timestamp.to_protobuf())
ScalarImpl::from_proto_bytes(&v.to_protobuf(), &DataType::Timestamp.to_protobuf())
.unwrap();
assert_eq!(v, actual);

let v = ScalarImpl::NaiveDate(NaiveDateWrapper::default());
let actual =
ScalarImpl::bytes_to_scalar(&v.to_protobuf(), &DataType::Date.to_protobuf()).unwrap();
ScalarImpl::from_proto_bytes(&v.to_protobuf(), &DataType::Date.to_protobuf()).unwrap();
assert_eq!(v, actual);

let v = ScalarImpl::NaiveTime(NaiveTimeWrapper::default());
let actual =
ScalarImpl::bytes_to_scalar(&v.to_protobuf(), &DataType::Time.to_protobuf()).unwrap();
ScalarImpl::from_proto_bytes(&v.to_protobuf(), &DataType::Time.to_protobuf()).unwrap();
assert_eq!(v, actual);
}
}
6 changes: 5 additions & 1 deletion src/common/src/util/value_encoding/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use thiserror::Error;
pub enum ValueEncodingError {
#[error("Invalid bool value encoding: {0}")]
InvalidBoolEncoding(u8),
#[error("Invalid UTF8 value encofing: {0}")]
#[error("Invalid UTF8 value encoding: {0}")]
InvalidUtf8(#[from] std::string::FromUtf8Error),
#[error("Invalid NaiveDate value encoding: days: {0}")]
InvalidNaiveDateEncoding(i32),
Expand All @@ -28,4 +28,8 @@ pub enum ValueEncodingError {
InvalidNaiveTimeEncoding(u32, u32),
#[error("Invalid null tag value encoding: {0}")]
InvalidTagEncoding(u8),
#[error("Invalid struct encoding: {0}")]
InvalidStructEncoding(crate::array::ArrayError),
#[error("Invalid list encoding: {0}")]
InvalidListEncoding(crate::array::ArrayError),
}
35 changes: 24 additions & 11 deletions src/common/src/util/value_encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
use bytes::{Buf, BufMut};
use chrono::{Datelike, Timelike};

use crate::error::Result;
use crate::types::{
DataType, Datum, Decimal, IntervalUnit, NaiveDateTimeWrapper, NaiveDateWrapper,
NaiveTimeWrapper, OrderedF32, OrderedF64, ScalarImpl, ScalarRefImpl,
};

pub mod error;
use error::ValueEncodingError;
use risingwave_pb::data::data_type::TypeName;

pub type Result<T> = std::result::Result<T, ValueEncodingError>;

use crate::array::{ListRef, StructRef};

Expand Down Expand Up @@ -64,7 +66,7 @@ pub fn deserialize_datum(mut data: impl Buf, ty: &DataType) -> Result<Datum> {
match null_tag {
0 => Ok(None),
1 => deserialize_value(ty, data),
_ => Err(ValueEncodingError::InvalidTagEncoding(null_tag).into()),
_ => Err(ValueEncodingError::InvalidTagEncoding(null_tag)),
}
}

Expand Down Expand Up @@ -147,30 +149,41 @@ fn deserialize_value(ty: &DataType, mut data: impl Buf) -> Result<Datum> {
DataType::Timestamp => ScalarImpl::NaiveDateTime(deserialize_naivedatetime(data)?),
DataType::Timestampz => ScalarImpl::Int64(data.get_i64_le()),
DataType::Date => ScalarImpl::NaiveDate(deserialize_naivedate(data)?),
DataType::Struct { .. } => deserialize_struct_or_list(ty, data)?,
DataType::List { .. } => deserialize_struct_or_list(ty, data)?,
DataType::Struct { .. } => deserialize_struct(ty, data)?,
DataType::List { .. } => deserialize_list(ty, data)?,
}))
}

fn deserialize_struct_or_list(data_type: &DataType, mut data: impl Buf) -> Result<ScalarImpl> {
fn deserialize_struct(data_type: &DataType, mut data: impl Buf) -> Result<ScalarImpl> {
assert_eq!(data_type.prost_type_name(), TypeName::Struct);
let len = data.get_u32_le();
let mut bytes = vec![0; len as usize];
data.copy_to_slice(&mut bytes);
ScalarImpl::from_proto_bytes(&bytes, &data_type.to_protobuf())
.map_err(ValueEncodingError::InvalidStructEncoding)
}

fn deserialize_list(data_type: &DataType, mut data: impl Buf) -> Result<ScalarImpl> {
assert_eq!(data_type.prost_type_name(), TypeName::List);
let len = data.get_u32_le();
let mut bytes = vec![0; len as usize];
data.copy_to_slice(&mut bytes);
ScalarImpl::bytes_to_scalar(&bytes, &data_type.to_protobuf()).map_err(Into::into)
ScalarImpl::from_proto_bytes(&bytes, &data_type.to_protobuf())
.map_err(ValueEncodingError::InvalidListEncoding)
}

fn deserialize_str(mut data: impl Buf) -> Result<String> {
let len = data.get_u32_le();
let mut bytes = vec![0; len as usize];
data.copy_to_slice(&mut bytes);
Ok(String::from_utf8(bytes).map_err(ValueEncodingError::InvalidUtf8)?)
String::from_utf8(bytes).map_err(ValueEncodingError::InvalidUtf8)
}

fn deserialize_bool(mut data: impl Buf) -> Result<bool> {
match data.get_u8() {
1 => Ok(true),
0 => Ok(false),
value => Err(ValueEncodingError::InvalidBoolEncoding(value).into()),
value => Err(ValueEncodingError::InvalidBoolEncoding(value)),
}
}

Expand All @@ -184,18 +197,18 @@ fn deserialize_interval(mut data: impl Buf) -> Result<IntervalUnit> {
fn deserialize_naivetime(mut data: impl Buf) -> Result<NaiveTimeWrapper> {
let secs = data.get_u32_le();
let nano = data.get_u32_le();
NaiveTimeWrapper::new_with_secs_nano_value_encoding(secs, nano)
NaiveTimeWrapper::with_secs_nano_value(secs, nano)
}

fn deserialize_naivedatetime(mut data: impl Buf) -> Result<NaiveDateTimeWrapper> {
let secs = data.get_i64_le();
let nsecs = data.get_u32_le();
NaiveDateTimeWrapper::new_with_secs_nsecs_value_encoding(secs, nsecs)
NaiveDateTimeWrapper::with_secs_nsecs_value(secs, nsecs)
}

fn deserialize_naivedate(mut data: impl Buf) -> Result<NaiveDateWrapper> {
let days = data.get_i32_le();
NaiveDateWrapper::new_with_days_value_encoding(days)
NaiveDateWrapper::with_days_value(days)
}

fn deserialize_decimal(mut data: impl Buf) -> Result<Decimal> {
Expand Down
1 change: 0 additions & 1 deletion src/expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,4 @@ tokio = { version = "0.2", package = "madsim-tokio", features = ["rt", "rt-multi
tokio-stream = "0.1"
toml = "0.5"
tonic = { version = "0.2", package = "madsim-tonic" }
value-encoding = { path = "../utils/value-encoding" }
workspace-hack = { version = "0.1", path = "../workspace-hack" }
2 changes: 1 addition & 1 deletion src/expr/src/expr/expr_literal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl<'a> TryFrom<&'a ExprNode> for LiteralExpression {

if let RexNode::Constant(prost_value) = prost.get_rex_node().unwrap() {
// TODO: We need to unify these
let value = ScalarImpl::bytes_to_scalar(
let value = ScalarImpl::from_proto_bytes(
prost_value.get_body(),
prost.get_return_type().unwrap(),
)?;
Expand Down
2 changes: 1 addition & 1 deletion src/expr/src/expr/expr_regexp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl<'a> TryFrom<&'a ExprNode> for RegexpMatchExpression {
let RexNode::Constant(pattern_value) = pattern_node.get_rex_node().unwrap() else {
return Err(ExprError::UnsupportedFunction("non-constant pattern in regexp_match".to_string()))
};
let pattern_scalar = ScalarImpl::bytes_to_scalar(
let pattern_scalar = ScalarImpl::from_proto_bytes(
pattern_value.get_body(),
pattern_node.get_return_type().unwrap(),
)?;
Expand Down
2 changes: 1 addition & 1 deletion src/expr/src/table_function/regexp_matches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ pub fn new_regexp_matches(prost: &TableFunctionProst) -> Result<BoxedTableFuncti
let RexNode::Constant(pattern_value) = pattern_node.get_rex_node().unwrap() else {
return Err(ExprError::UnsupportedFunction("non-constant pattern in regexp_match".to_string()))
};
let pattern_scalar = ScalarImpl::bytes_to_scalar(
let pattern_scalar = ScalarImpl::from_proto_bytes(
pattern_value.get_body(),
pattern_node.get_return_type().unwrap(),
)?;
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/expr/literal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ mod tests {
let data = Some(ScalarImpl::Struct(value.clone()));
let node = literal_to_protobuf(&data);
if let RexNode::Constant(prost) = node.as_ref().unwrap() {
let data2 = ScalarImpl::bytes_to_scalar(
let data2 = ScalarImpl::from_proto_bytes(
prost.get_body(),
&DataType::Struct {
fields: vec![DataType::Varchar, DataType::Int32, DataType::Int32].into(),
Expand All @@ -119,7 +119,7 @@ mod tests {
let data = Some(ScalarImpl::List(value.clone()));
let node = literal_to_protobuf(&data);
if let RexNode::Constant(prost) = node.as_ref().unwrap() {
let data2 = ScalarImpl::bytes_to_scalar(
let data2 = ScalarImpl::from_proto_bytes(
prost.get_body(),
&DataType::List {
datatype: Box::new(DataType::Int32),
Expand Down
1 change: 0 additions & 1 deletion src/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ tokio-stream = "0.1"
tonic = { version = "0.2", package = "madsim-tonic" }
tracing = { version = "0.1" }
twox-hash = "1"
value-encoding = { path = "../utils/value-encoding" }
workspace-hack = { version = "0.1", path = "../workspace-hack" }
zstd = "0.11.2"

Expand Down
1 change: 0 additions & 1 deletion src/stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ tracing = { version = "0.1" }
tracing-futures = "0.2"
twox-hash = "1"
url = "2"
value-encoding = { path = "../utils/value-encoding" }
workspace-hack = { version = "0.1", path = "../workspace-hack" }

[dev-dependencies]
Expand Down
Loading