Skip to content

Commit

Permalink
refactor: value encoding error (#4671)
Browse files Browse the repository at this point in the history
* remove useless value-encoding crate

* rename bytes_to_scalar -> from_proto_bytes

* done partially

* fix remaining todos

* fix clippy

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
fuyufjh and mergify[bot] authored Aug 16, 2022
1 parent 16cd164 commit 32a3546
Show file tree
Hide file tree
Showing 25 changed files with 65 additions and 205 deletions.
17 changes: 0 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ members = [
"src/utils/runtime",
"src/utils/static-link",
"src/utils/stats_alloc",
"src/utils/value-encoding",
"src/workspace-hack",
]

Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ fn get_scan_bound(
.iter()
.map(|v| {
let ty = pk_types.next().unwrap();
let scalar = ScalarImpl::bytes_to_scalar(v, &ty.to_protobuf()).unwrap();
let scalar = ScalarImpl::from_proto_bytes(v, &ty.to_protobuf()).unwrap();
Some(scalar)
})
.collect_vec());
Expand All @@ -102,7 +102,7 @@ fn get_scan_bound(

let bound_ty = pk_types.next().unwrap();
let build_bound = |bound: &scan_range::Bound| -> Bound<Datum> {
let scalar = ScalarImpl::bytes_to_scalar(&bound.value, &bound_ty.to_protobuf()).unwrap();
let scalar = ScalarImpl::from_proto_bytes(&bound.value, &bound_ty.to_protobuf()).unwrap();

let datum = Some(scalar);
if bound.inclusive {
Expand Down
1 change: 0 additions & 1 deletion src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ tonic = { version = "0.2", package = "madsim-tonic" }
tracing = { version = "0.1" }
twox-hash = "1"
url = "2"
value-encoding = { path = "../utils/value-encoding" }
workspace-hack = { version = "0.1", path = "../workspace-hack" }

[target.'cfg(target_os = "linux")'.dependencies]
Expand Down
6 changes: 3 additions & 3 deletions src/common/src/array/data_chunk_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ use itertools::Itertools;

use super::column::Column;
use crate::array::DataChunk;
use crate::error::Result;
use crate::hash::HashCode;
use crate::types::{hash_datum, DataType, Datum, DatumRef, ToOwnedDatum};
use crate::util::value_encoding;
use crate::util::value_encoding::{deserialize_datum, serialize_datum};

impl DataChunk {
Expand Down Expand Up @@ -254,7 +254,7 @@ impl Row {
/// [`crate::util::ordered::OrderedRow`]
///
/// All values are nullable. Each value will have 1 extra byte to indicate whether it is null.
pub fn serialize(&self) -> Result<Vec<u8>> {
pub fn serialize(&self) -> value_encoding::Result<Vec<u8>> {
let mut result = vec![];
for cell in &self.0 {
result.extend(serialize_datum(cell)?);
Expand Down Expand Up @@ -315,7 +315,7 @@ impl RowDeserializer {
}

/// Deserialize the row from value encoding bytes.
pub fn deserialize(&self, mut data: impl Buf) -> Result<Row> {
pub fn deserialize(&self, mut data: impl Buf) -> value_encoding::Result<Row> {
let mut values = Vec::with_capacity(self.data_types.len());
for typ in &self.data_types {
values.push(deserialize_datum(&mut data, typ)?);
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/array/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ impl ListValue {
if b.is_empty() {
Ok(None)
} else {
Ok(Some(ScalarImpl::bytes_to_scalar(b, d)?))
Ok(Some(ScalarImpl::from_proto_bytes(b, d)?))
}
})
.collect::<ArrayResult<Vec<Datum>>>()?;
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/array/struct_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ impl StructValue {
if b.is_empty() {
Ok(None)
} else {
Ok(Some(ScalarImpl::bytes_to_scalar(b, d)?))
Ok(Some(ScalarImpl::from_proto_bytes(b, d)?))
}
})
.collect::<ArrayResult<Vec<Datum>>>()?;
Expand Down
9 changes: 5 additions & 4 deletions src/common/src/types/chrono_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ use chrono::{Datelike, Duration, NaiveDate, NaiveDateTime, NaiveTime, Timelike};

use super::{CheckedAdd, IntervalUnit};
use crate::array::ArrayResult;
use crate::error::Result;
use crate::util::value_encoding;
use crate::util::value_encoding::error::ValueEncodingError;

/// The same as `NaiveDate::from_ymd(1970, 1, 1).num_days_from_ce()`.
/// Minus this magic number to store the number of days since 1970-01-01.
pub const UNIX_EPOCH_DAYS: i32 = 719_163;
Expand Down Expand Up @@ -103,7 +104,7 @@ impl NaiveDateWrapper {
))
}

pub fn new_with_days_value_encoding(days: i32) -> Result<Self> {
pub fn with_days_value(days: i32) -> value_encoding::Result<Self> {
Ok(NaiveDateWrapper::new(
NaiveDate::from_num_days_from_ce_opt(days)
.ok_or(ValueEncodingError::InvalidNaiveDateEncoding(days))?,
Expand Down Expand Up @@ -141,7 +142,7 @@ impl NaiveTimeWrapper {
))
}

pub fn new_with_secs_nano_value_encoding(secs: u32, nano: u32) -> Result<Self> {
pub fn with_secs_nano_value(secs: u32, nano: u32) -> value_encoding::Result<Self> {
Ok(NaiveTimeWrapper::new(
NaiveTime::from_num_seconds_from_midnight_opt(secs, nano)
.ok_or(ValueEncodingError::InvalidNaiveTimeEncoding(secs, nano))?,
Expand Down Expand Up @@ -189,7 +190,7 @@ impl NaiveDateTimeWrapper {
}))
}

pub fn new_with_secs_nsecs_value_encoding(secs: i64, nsecs: u32) -> Result<Self> {
pub fn with_secs_nsecs_value(secs: i64, nsecs: u32) -> value_encoding::Result<Self> {
Ok(NaiveDateTimeWrapper::new({
NaiveDateTime::from_timestamp_opt(secs, nsecs).ok_or(
ValueEncodingError::InvalidNaiveDateTimeEncoding(secs, nsecs),
Expand Down
10 changes: 6 additions & 4 deletions src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,9 @@ impl ScalarImpl {
body
}

pub fn bytes_to_scalar(b: &Vec<u8>, data_type: &ProstDataType) -> ArrayResult<Self> {
/// This encoding should only be used in proto
/// TODO: replace with value encoding?
pub fn from_proto_bytes(b: &Vec<u8>, data_type: &ProstDataType) -> ArrayResult<Self> {
let value = match data_type.get_type_name()? {
TypeName::Boolean => ScalarImpl::Bool(
i8::from_be_bytes(
Expand Down Expand Up @@ -1057,18 +1059,18 @@ mod tests {
fn test_protobuf_conversion() {
let v = ScalarImpl::NaiveDateTime(NaiveDateTimeWrapper::default());
let actual =
ScalarImpl::bytes_to_scalar(&v.to_protobuf(), &DataType::Timestamp.to_protobuf())
ScalarImpl::from_proto_bytes(&v.to_protobuf(), &DataType::Timestamp.to_protobuf())
.unwrap();
assert_eq!(v, actual);

let v = ScalarImpl::NaiveDate(NaiveDateWrapper::default());
let actual =
ScalarImpl::bytes_to_scalar(&v.to_protobuf(), &DataType::Date.to_protobuf()).unwrap();
ScalarImpl::from_proto_bytes(&v.to_protobuf(), &DataType::Date.to_protobuf()).unwrap();
assert_eq!(v, actual);

let v = ScalarImpl::NaiveTime(NaiveTimeWrapper::default());
let actual =
ScalarImpl::bytes_to_scalar(&v.to_protobuf(), &DataType::Time.to_protobuf()).unwrap();
ScalarImpl::from_proto_bytes(&v.to_protobuf(), &DataType::Time.to_protobuf()).unwrap();
assert_eq!(v, actual);
}
}
6 changes: 5 additions & 1 deletion src/common/src/util/value_encoding/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use thiserror::Error;
pub enum ValueEncodingError {
#[error("Invalid bool value encoding: {0}")]
InvalidBoolEncoding(u8),
#[error("Invalid UTF8 value encofing: {0}")]
#[error("Invalid UTF8 value encoding: {0}")]
InvalidUtf8(#[from] std::string::FromUtf8Error),
#[error("Invalid NaiveDate value encoding: days: {0}")]
InvalidNaiveDateEncoding(i32),
Expand All @@ -28,4 +28,8 @@ pub enum ValueEncodingError {
InvalidNaiveTimeEncoding(u32, u32),
#[error("Invalid null tag value encoding: {0}")]
InvalidTagEncoding(u8),
#[error("Invalid struct encoding: {0}")]
InvalidStructEncoding(crate::array::ArrayError),
#[error("Invalid list encoding: {0}")]
InvalidListEncoding(crate::array::ArrayError),
}
35 changes: 24 additions & 11 deletions src/common/src/util/value_encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
use bytes::{Buf, BufMut};
use chrono::{Datelike, Timelike};

use crate::error::Result;
use crate::types::{
DataType, Datum, Decimal, IntervalUnit, NaiveDateTimeWrapper, NaiveDateWrapper,
NaiveTimeWrapper, OrderedF32, OrderedF64, ScalarImpl, ScalarRefImpl,
};

pub mod error;
use error::ValueEncodingError;
use risingwave_pb::data::data_type::TypeName;

pub type Result<T> = std::result::Result<T, ValueEncodingError>;

use crate::array::{ListRef, StructRef};

Expand Down Expand Up @@ -64,7 +66,7 @@ pub fn deserialize_datum(mut data: impl Buf, ty: &DataType) -> Result<Datum> {
match null_tag {
0 => Ok(None),
1 => deserialize_value(ty, data),
_ => Err(ValueEncodingError::InvalidTagEncoding(null_tag).into()),
_ => Err(ValueEncodingError::InvalidTagEncoding(null_tag)),
}
}

Expand Down Expand Up @@ -147,30 +149,41 @@ fn deserialize_value(ty: &DataType, mut data: impl Buf) -> Result<Datum> {
DataType::Timestamp => ScalarImpl::NaiveDateTime(deserialize_naivedatetime(data)?),
DataType::Timestampz => ScalarImpl::Int64(data.get_i64_le()),
DataType::Date => ScalarImpl::NaiveDate(deserialize_naivedate(data)?),
DataType::Struct { .. } => deserialize_struct_or_list(ty, data)?,
DataType::List { .. } => deserialize_struct_or_list(ty, data)?,
DataType::Struct { .. } => deserialize_struct(ty, data)?,
DataType::List { .. } => deserialize_list(ty, data)?,
}))
}

fn deserialize_struct_or_list(data_type: &DataType, mut data: impl Buf) -> Result<ScalarImpl> {
fn deserialize_struct(data_type: &DataType, mut data: impl Buf) -> Result<ScalarImpl> {
assert_eq!(data_type.prost_type_name(), TypeName::Struct);
let len = data.get_u32_le();
let mut bytes = vec![0; len as usize];
data.copy_to_slice(&mut bytes);
ScalarImpl::from_proto_bytes(&bytes, &data_type.to_protobuf())
.map_err(ValueEncodingError::InvalidStructEncoding)
}

fn deserialize_list(data_type: &DataType, mut data: impl Buf) -> Result<ScalarImpl> {
assert_eq!(data_type.prost_type_name(), TypeName::List);
let len = data.get_u32_le();
let mut bytes = vec![0; len as usize];
data.copy_to_slice(&mut bytes);
ScalarImpl::bytes_to_scalar(&bytes, &data_type.to_protobuf()).map_err(Into::into)
ScalarImpl::from_proto_bytes(&bytes, &data_type.to_protobuf())
.map_err(ValueEncodingError::InvalidListEncoding)
}

fn deserialize_str(mut data: impl Buf) -> Result<String> {
let len = data.get_u32_le();
let mut bytes = vec![0; len as usize];
data.copy_to_slice(&mut bytes);
Ok(String::from_utf8(bytes).map_err(ValueEncodingError::InvalidUtf8)?)
String::from_utf8(bytes).map_err(ValueEncodingError::InvalidUtf8)
}

fn deserialize_bool(mut data: impl Buf) -> Result<bool> {
match data.get_u8() {
1 => Ok(true),
0 => Ok(false),
value => Err(ValueEncodingError::InvalidBoolEncoding(value).into()),
value => Err(ValueEncodingError::InvalidBoolEncoding(value)),
}
}

Expand All @@ -184,18 +197,18 @@ fn deserialize_interval(mut data: impl Buf) -> Result<IntervalUnit> {
fn deserialize_naivetime(mut data: impl Buf) -> Result<NaiveTimeWrapper> {
let secs = data.get_u32_le();
let nano = data.get_u32_le();
NaiveTimeWrapper::new_with_secs_nano_value_encoding(secs, nano)
NaiveTimeWrapper::with_secs_nano_value(secs, nano)
}

fn deserialize_naivedatetime(mut data: impl Buf) -> Result<NaiveDateTimeWrapper> {
let secs = data.get_i64_le();
let nsecs = data.get_u32_le();
NaiveDateTimeWrapper::new_with_secs_nsecs_value_encoding(secs, nsecs)
NaiveDateTimeWrapper::with_secs_nsecs_value(secs, nsecs)
}

fn deserialize_naivedate(mut data: impl Buf) -> Result<NaiveDateWrapper> {
let days = data.get_i32_le();
NaiveDateWrapper::new_with_days_value_encoding(days)
NaiveDateWrapper::with_days_value(days)
}

fn deserialize_decimal(mut data: impl Buf) -> Result<Decimal> {
Expand Down
1 change: 0 additions & 1 deletion src/expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,4 @@ tokio = { version = "0.2", package = "madsim-tokio", features = ["rt", "rt-multi
tokio-stream = "0.1"
toml = "0.5"
tonic = { version = "0.2", package = "madsim-tonic" }
value-encoding = { path = "../utils/value-encoding" }
workspace-hack = { version = "0.1", path = "../workspace-hack" }
2 changes: 1 addition & 1 deletion src/expr/src/expr/expr_literal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl<'a> TryFrom<&'a ExprNode> for LiteralExpression {

if let RexNode::Constant(prost_value) = prost.get_rex_node().unwrap() {
// TODO: We need to unify these
let value = ScalarImpl::bytes_to_scalar(
let value = ScalarImpl::from_proto_bytes(
prost_value.get_body(),
prost.get_return_type().unwrap(),
)?;
Expand Down
2 changes: 1 addition & 1 deletion src/expr/src/expr/expr_regexp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl<'a> TryFrom<&'a ExprNode> for RegexpMatchExpression {
let RexNode::Constant(pattern_value) = pattern_node.get_rex_node().unwrap() else {
return Err(ExprError::UnsupportedFunction("non-constant pattern in regexp_match".to_string()))
};
let pattern_scalar = ScalarImpl::bytes_to_scalar(
let pattern_scalar = ScalarImpl::from_proto_bytes(
pattern_value.get_body(),
pattern_node.get_return_type().unwrap(),
)?;
Expand Down
2 changes: 1 addition & 1 deletion src/expr/src/table_function/regexp_matches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ pub fn new_regexp_matches(prost: &TableFunctionProst) -> Result<BoxedTableFuncti
let RexNode::Constant(pattern_value) = pattern_node.get_rex_node().unwrap() else {
return Err(ExprError::UnsupportedFunction("non-constant pattern in regexp_match".to_string()))
};
let pattern_scalar = ScalarImpl::bytes_to_scalar(
let pattern_scalar = ScalarImpl::from_proto_bytes(
pattern_value.get_body(),
pattern_node.get_return_type().unwrap(),
)?;
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/expr/literal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ mod tests {
let data = Some(ScalarImpl::Struct(value.clone()));
let node = literal_to_protobuf(&data);
if let RexNode::Constant(prost) = node.as_ref().unwrap() {
let data2 = ScalarImpl::bytes_to_scalar(
let data2 = ScalarImpl::from_proto_bytes(
prost.get_body(),
&DataType::Struct {
fields: vec![DataType::Varchar, DataType::Int32, DataType::Int32].into(),
Expand All @@ -119,7 +119,7 @@ mod tests {
let data = Some(ScalarImpl::List(value.clone()));
let node = literal_to_protobuf(&data);
if let RexNode::Constant(prost) = node.as_ref().unwrap() {
let data2 = ScalarImpl::bytes_to_scalar(
let data2 = ScalarImpl::from_proto_bytes(
prost.get_body(),
&DataType::List {
datatype: Box::new(DataType::Int32),
Expand Down
1 change: 0 additions & 1 deletion src/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ tokio-stream = "0.1"
tonic = { version = "0.2", package = "madsim-tonic" }
tracing = { version = "0.1" }
twox-hash = "1"
value-encoding = { path = "../utils/value-encoding" }
workspace-hack = { version = "0.1", path = "../workspace-hack" }
zstd = "0.11.2"

Expand Down
1 change: 0 additions & 1 deletion src/stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ tracing = { version = "0.1" }
tracing-futures = "0.2"
twox-hash = "1"
url = "2"
value-encoding = { path = "../utils/value-encoding" }
workspace-hack = { version = "0.1", path = "../workspace-hack" }

[dev-dependencies]
Expand Down
Loading

0 comments on commit 32a3546

Please sign in to comment.