From cb3200c493ae08c50105571b7e56de9207c46498 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Tue, 16 Aug 2022 16:16:26 +0800 Subject: [PATCH 1/5] remove useless value-encoding crate --- Cargo.lock | 17 ---------- Cargo.toml | 1 - src/common/Cargo.toml | 1 - src/expr/Cargo.toml | 1 - src/storage/Cargo.toml | 1 - src/stream/Cargo.toml | 1 - src/utils/value-encoding/Cargo.toml | 18 ---------- src/utils/value-encoding/src/de.rs | 51 ----------------------------- src/utils/value-encoding/src/lib.rs | 24 -------------- src/utils/value-encoding/src/ser.rs | 44 ------------------------- 10 files changed, 159 deletions(-) delete mode 100644 src/utils/value-encoding/Cargo.toml delete mode 100644 src/utils/value-encoding/src/de.rs delete mode 100644 src/utils/value-encoding/src/lib.rs delete mode 100644 src/utils/value-encoding/src/ser.rs diff --git a/Cargo.lock b/Cargo.lock index 348b93a2befb7..a4ddad045c8da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4660,7 +4660,6 @@ dependencies = [ "tracing", "twox-hash", "url", - "value-encoding", "workspace-hack", ] @@ -4887,7 +4886,6 @@ dependencies = [ "thiserror", "tokio-stream", "toml", - "value-encoding", "workspace-hack", ] @@ -5343,7 +5341,6 @@ dependencies = [ "tracing", "twox-hash", "uuid", - "value-encoding", "workspace-hack", "zstd", ] @@ -5409,7 +5406,6 @@ dependencies = [ "tracing-futures", "twox-hash", "url", - "value-encoding", "workspace-hack", ] @@ -6683,19 +6679,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" -[[package]] -name = "value-encoding" -version = "0.1.11" -dependencies = [ - "bytes", - "memcomparable", - "rand 0.8.5", - "rust_decimal", - "serde", - "thiserror", - "workspace-hack", -] - [[package]] name = "vcpkg" version = "0.2.15" diff --git a/Cargo.toml b/Cargo.toml index 7be4721aa5b34..b88cb4f9b39d6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,7 +36,6 @@ members = [ "src/utils/runtime", "src/utils/static-link", "src/utils/stats_alloc", - "src/utils/value-encoding", "src/workspace-hack", ] diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml index 292c8c48b7842..19102735e194f 100644 --- a/src/common/Cargo.toml +++ b/src/common/Cargo.toml @@ -49,7 +49,6 @@ tonic = { version = "0.2", package = "madsim-tonic" } tracing = { version = "0.1" } twox-hash = "1" url = "2" -value-encoding = { path = "../utils/value-encoding" } workspace-hack = { version = "0.1", path = "../workspace-hack" } [target.'cfg(target_os = "linux")'.dependencies] diff --git a/src/expr/Cargo.toml b/src/expr/Cargo.toml index 11f61f6fa3f3f..a93d3f17f0baf 100644 --- a/src/expr/Cargo.toml +++ b/src/expr/Cargo.toml @@ -34,5 +34,4 @@ tokio = { version = "0.2", package = "madsim-tokio", features = ["rt", "rt-multi tokio-stream = "0.1" toml = "0.5" tonic = { version = "0.2", package = "madsim-tonic" } -value-encoding = { path = "../utils/value-encoding" } workspace-hack = { version = "0.1", path = "../workspace-hack" } diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index b4cd813fa84e8..d20ef1772c2ac 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -74,7 +74,6 @@ tokio-stream = "0.1" tonic = { version = "0.2", package = "madsim-tonic" } tracing = { version = "0.1" } twox-hash = "1" -value-encoding = { path = "../utils/value-encoding" } workspace-hack = { version = "0.1", path = "../workspace-hack" } zstd = "0.11.2" diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index 5fa4bdfca5055..519855e793f67 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -69,7 +69,6 @@ tracing = { version = "0.1" } tracing-futures = "0.2" twox-hash = "1" url = "2" -value-encoding = { path = "../utils/value-encoding" } workspace-hack = { version = "0.1", path = "../workspace-hack" } [dev-dependencies] diff --git a/src/utils/value-encoding/Cargo.toml b/src/utils/value-encoding/Cargo.toml deleted file mode 100644 index c872e86530f3a..0000000000000 --- a/src/utils/value-encoding/Cargo.toml +++ /dev/null @@ -1,18 +0,0 @@ -[package] -name = "value-encoding" -version = "0.1.11" -edition = "2021" -description = "Value encoding format." -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -bytes = "1" -memcomparable = { path = "../memcomparable" } -serde = { version = "1", features = ["derive"] } -thiserror = "1" -workspace-hack = { version = "0.1", path = "../../workspace-hack" } - -[dev-dependencies] -rand = "0.8" -rust_decimal = "1" -serde = { version = "1", features = ["derive"] } diff --git a/src/utils/value-encoding/src/de.rs b/src/utils/value-encoding/src/de.rs deleted file mode 100644 index 6013bfcf20d7f..0000000000000 --- a/src/utils/value-encoding/src/de.rs +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright 2022 Singularity Data -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use bytes::Buf; -use memcomparable::Result; -/// A structure that deserializes memcomparable bytes into Rust values. -pub struct Deserializer { - inner: memcomparable::Deserializer, -} - -impl Deserializer { - /// Creates a deserializer from a buffer. - pub fn new(input: B) -> Self { - Deserializer { - inner: memcomparable::Deserializer::new(input), - } - } - - /// Set whether data is serialized in reverse order. - pub fn set_reverse(&mut self, reverse: bool) { - self.inner.set_reverse(reverse); - } - - /// Unwrap the inner buffer from the `Deserializer`. - pub fn into_inner(self) -> B { - self.inner.into_inner() - } - - /// The inner is just a memcomparable deserializer. Removed in future. - pub fn memcom_de(&mut self) -> &mut memcomparable::Deserializer { - &mut self.inner - } - - /// Read u8 from Bytes input in decimal form (Do not include null tag). Used by value encoding - /// (`serialize_cell` in risingwave_common crate). TODO: It is a temporal solution For value - /// encoding. Will moved to value encoding serializer in future. - pub fn read_decimal_v2(&mut self) -> Result> { - self.inner.read_decimal_v2() - } -} diff --git a/src/utils/value-encoding/src/lib.rs b/src/utils/value-encoding/src/lib.rs deleted file mode 100644 index 0fe8454a285e2..0000000000000 --- a/src/utils/value-encoding/src/lib.rs +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright 2022 Singularity Data -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! value encoding is an encoding format which converts the data into a form that do not guarantee -//! directly compared with memcmp. - -#![deny(missing_docs)] - -mod de; -mod ser; - -pub use de::Deserializer; -pub use ser::Serializer; diff --git a/src/utils/value-encoding/src/ser.rs b/src/utils/value-encoding/src/ser.rs deleted file mode 100644 index dfc1adf6ee280..0000000000000 --- a/src/utils/value-encoding/src/ser.rs +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright 2022 Singularity Data -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use bytes::BufMut; - -/// A structure for serializing Rust values into a memcomparable bytes. -pub struct Serializer { - inner: memcomparable::Serializer, -} - -impl Serializer { - /// Create a new `Serializer`. - pub fn new(buffer: B) -> Self { - Serializer { - inner: memcomparable::Serializer::new(buffer), - } - } - - /// Unwrap the inner buffer from the `Serializer`. - pub fn into_inner(self) -> B { - self.inner.into_inner() - } - - /// Set whether data is serialized in reverse order. - pub fn set_reverse(&mut self, reverse: bool) { - self.inner.set_reverse(reverse) - } - - /// The inner is just a memcomparable serializer. Removed in future. - pub fn memcom_ser(&mut self) -> &mut memcomparable::Serializer { - &mut self.inner - } -} From 04da21798e80ed877c3e61fa7543c9e4650c35bd Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Tue, 16 Aug 2022 16:18:53 +0800 Subject: [PATCH 2/5] rename bytes_to_scalar -> from_proto_bytes --- src/batch/src/executor/row_seq_scan.rs | 4 ++-- src/common/src/array/list_array.rs | 2 +- src/common/src/array/struct_array.rs | 2 +- src/common/src/types/mod.rs | 10 ++++++---- src/expr/src/expr/expr_literal.rs | 2 +- src/expr/src/expr/expr_regexp.rs | 2 +- src/expr/src/table_function/regexp_matches.rs | 2 +- src/frontend/src/expr/literal.rs | 4 ++-- 8 files changed, 15 insertions(+), 13 deletions(-) diff --git a/src/batch/src/executor/row_seq_scan.rs b/src/batch/src/executor/row_seq_scan.rs index 12915be8a2cba..1ac377e974ffb 100644 --- a/src/batch/src/executor/row_seq_scan.rs +++ b/src/batch/src/executor/row_seq_scan.rs @@ -92,7 +92,7 @@ fn get_scan_bound( .iter() .map(|v| { let ty = pk_types.next().unwrap(); - let scalar = ScalarImpl::bytes_to_scalar(v, &ty.to_protobuf()).unwrap(); + let scalar = ScalarImpl::from_proto_bytes(v, &ty.to_protobuf()).unwrap(); Some(scalar) }) .collect_vec()); @@ -102,7 +102,7 @@ fn get_scan_bound( let bound_ty = pk_types.next().unwrap(); let build_bound = |bound: &scan_range::Bound| -> Bound { - let scalar = ScalarImpl::bytes_to_scalar(&bound.value, &bound_ty.to_protobuf()).unwrap(); + let scalar = ScalarImpl::from_proto_bytes(&bound.value, &bound_ty.to_protobuf()).unwrap(); let datum = Some(scalar); if bound.inclusive { diff --git a/src/common/src/array/list_array.rs b/src/common/src/array/list_array.rs index 1c3f55065eba7..01c7d716318ec 100644 --- a/src/common/src/array/list_array.rs +++ b/src/common/src/array/list_array.rs @@ -344,7 +344,7 @@ impl ListValue { if b.is_empty() { Ok(None) } else { - Ok(Some(ScalarImpl::bytes_to_scalar(b, d)?)) + Ok(Some(ScalarImpl::from_proto_bytes(b, d)?)) } }) .collect::>>()?; diff --git a/src/common/src/array/struct_array.rs b/src/common/src/array/struct_array.rs index f184c2c225a95..dde19a041c741 100644 --- a/src/common/src/array/struct_array.rs +++ b/src/common/src/array/struct_array.rs @@ -364,7 +364,7 @@ impl StructValue { if b.is_empty() { Ok(None) } else { - Ok(Some(ScalarImpl::bytes_to_scalar(b, d)?)) + Ok(Some(ScalarImpl::from_proto_bytes(b, d)?)) } }) .collect::>>()?; diff --git a/src/common/src/types/mod.rs b/src/common/src/types/mod.rs index 21fbcfe7cb737..8ba5bfc77d208 100644 --- a/src/common/src/types/mod.rs +++ b/src/common/src/types/mod.rs @@ -866,7 +866,9 @@ impl ScalarImpl { body } - pub fn bytes_to_scalar(b: &Vec, data_type: &ProstDataType) -> ArrayResult { + /// This encoding should only be used in proto + /// TODO: replace with value encoding? + pub fn from_proto_bytes(b: &Vec, data_type: &ProstDataType) -> ArrayResult { let value = match data_type.get_type_name()? { TypeName::Boolean => ScalarImpl::Bool( i8::from_be_bytes( @@ -1057,18 +1059,18 @@ mod tests { fn test_protobuf_conversion() { let v = ScalarImpl::NaiveDateTime(NaiveDateTimeWrapper::default()); let actual = - ScalarImpl::bytes_to_scalar(&v.to_protobuf(), &DataType::Timestamp.to_protobuf()) + ScalarImpl::from_proto_bytes(&v.to_protobuf(), &DataType::Timestamp.to_protobuf()) .unwrap(); assert_eq!(v, actual); let v = ScalarImpl::NaiveDate(NaiveDateWrapper::default()); let actual = - ScalarImpl::bytes_to_scalar(&v.to_protobuf(), &DataType::Date.to_protobuf()).unwrap(); + ScalarImpl::from_proto_bytes(&v.to_protobuf(), &DataType::Date.to_protobuf()).unwrap(); assert_eq!(v, actual); let v = ScalarImpl::NaiveTime(NaiveTimeWrapper::default()); let actual = - ScalarImpl::bytes_to_scalar(&v.to_protobuf(), &DataType::Time.to_protobuf()).unwrap(); + ScalarImpl::from_proto_bytes(&v.to_protobuf(), &DataType::Time.to_protobuf()).unwrap(); assert_eq!(v, actual); } } diff --git a/src/expr/src/expr/expr_literal.rs b/src/expr/src/expr/expr_literal.rs index e313031ef98cf..8d03543272246 100644 --- a/src/expr/src/expr/expr_literal.rs +++ b/src/expr/src/expr/expr_literal.rs @@ -110,7 +110,7 @@ impl<'a> TryFrom<&'a ExprNode> for LiteralExpression { if let RexNode::Constant(prost_value) = prost.get_rex_node().unwrap() { // TODO: We need to unify these - let value = ScalarImpl::bytes_to_scalar( + let value = ScalarImpl::from_proto_bytes( prost_value.get_body(), prost.get_return_type().unwrap(), )?; diff --git a/src/expr/src/expr/expr_regexp.rs b/src/expr/src/expr/expr_regexp.rs index e8cee479a209f..829db298f6a56 100644 --- a/src/expr/src/expr/expr_regexp.rs +++ b/src/expr/src/expr/expr_regexp.rs @@ -61,7 +61,7 @@ impl<'a> TryFrom<&'a ExprNode> for RegexpMatchExpression { let RexNode::Constant(pattern_value) = pattern_node.get_rex_node().unwrap() else { return Err(ExprError::UnsupportedFunction("non-constant pattern in regexp_match".to_string())) }; - let pattern_scalar = ScalarImpl::bytes_to_scalar( + let pattern_scalar = ScalarImpl::from_proto_bytes( pattern_value.get_body(), pattern_node.get_return_type().unwrap(), )?; diff --git a/src/expr/src/table_function/regexp_matches.rs b/src/expr/src/table_function/regexp_matches.rs index b51c2b39310a9..27fca8b8dd149 100644 --- a/src/expr/src/table_function/regexp_matches.rs +++ b/src/expr/src/table_function/regexp_matches.rs @@ -139,7 +139,7 @@ pub fn new_regexp_matches(prost: &TableFunctionProst) -> Result Date: Tue, 16 Aug 2022 16:59:03 +0800 Subject: [PATCH 3/5] done partially --- src/common/src/array/data_chunk_iter.rs | 1 + src/common/src/types/chrono_wrapper.rs | 9 +++--- src/common/src/util/value_encoding/error.rs | 6 +++- src/common/src/util/value_encoding/mod.rs | 29 ++++++++++++++----- src/stream/src/executor/error.rs | 10 ++++++- .../managed_state/join/join_entry_state.rs | 5 ++-- 6 files changed, 43 insertions(+), 17 deletions(-) diff --git a/src/common/src/array/data_chunk_iter.rs b/src/common/src/array/data_chunk_iter.rs index 4634b6bcb8931..cc81f15aa9829 100644 --- a/src/common/src/array/data_chunk_iter.rs +++ b/src/common/src/array/data_chunk_iter.rs @@ -23,6 +23,7 @@ use crate::array::DataChunk; use crate::error::Result; use crate::hash::HashCode; use crate::types::{hash_datum, DataType, Datum, DatumRef, ToOwnedDatum}; +use crate::util::value_encoding; use crate::util::value_encoding::{deserialize_datum, serialize_datum}; impl DataChunk { diff --git a/src/common/src/types/chrono_wrapper.rs b/src/common/src/types/chrono_wrapper.rs index 0c457c2d274d6..3133170b7b164 100644 --- a/src/common/src/types/chrono_wrapper.rs +++ b/src/common/src/types/chrono_wrapper.rs @@ -22,8 +22,9 @@ use chrono::{Datelike, Duration, NaiveDate, NaiveDateTime, NaiveTime, Timelike}; use super::{CheckedAdd, IntervalUnit}; use crate::array::ArrayResult; -use crate::error::Result; +use crate::util::value_encoding; use crate::util::value_encoding::error::ValueEncodingError; + /// The same as `NaiveDate::from_ymd(1970, 1, 1).num_days_from_ce()`. /// Minus this magic number to store the number of days since 1970-01-01. pub const UNIX_EPOCH_DAYS: i32 = 719_163; @@ -103,7 +104,7 @@ impl NaiveDateWrapper { )) } - pub fn new_with_days_value_encoding(days: i32) -> Result { + pub fn with_days_value(days: i32) -> value_encoding::Result { Ok(NaiveDateWrapper::new( NaiveDate::from_num_days_from_ce_opt(days) .ok_or(ValueEncodingError::InvalidNaiveDateEncoding(days))?, @@ -141,7 +142,7 @@ impl NaiveTimeWrapper { )) } - pub fn new_with_secs_nano_value_encoding(secs: u32, nano: u32) -> Result { + pub fn with_secs_nano_value(secs: u32, nano: u32) -> value_encoding::Result { Ok(NaiveTimeWrapper::new( NaiveTime::from_num_seconds_from_midnight_opt(secs, nano) .ok_or(ValueEncodingError::InvalidNaiveTimeEncoding(secs, nano))?, @@ -189,7 +190,7 @@ impl NaiveDateTimeWrapper { })) } - pub fn new_with_secs_nsecs_value_encoding(secs: i64, nsecs: u32) -> Result { + pub fn with_secs_nsecs_value(secs: i64, nsecs: u32) -> value_encoding::Result { Ok(NaiveDateTimeWrapper::new({ NaiveDateTime::from_timestamp_opt(secs, nsecs).ok_or( ValueEncodingError::InvalidNaiveDateTimeEncoding(secs, nsecs), diff --git a/src/common/src/util/value_encoding/error.rs b/src/common/src/util/value_encoding/error.rs index 9292f42704e02..be5e4287aefd6 100644 --- a/src/common/src/util/value_encoding/error.rs +++ b/src/common/src/util/value_encoding/error.rs @@ -18,7 +18,7 @@ use thiserror::Error; pub enum ValueEncodingError { #[error("Invalid bool value encoding: {0}")] InvalidBoolEncoding(u8), - #[error("Invalid UTF8 value encofing: {0}")] + #[error("Invalid UTF8 value encoding: {0}")] InvalidUtf8(#[from] std::string::FromUtf8Error), #[error("Invalid NaiveDate value encoding: days: {0}")] InvalidNaiveDateEncoding(i32), @@ -28,4 +28,8 @@ pub enum ValueEncodingError { InvalidNaiveTimeEncoding(u32, u32), #[error("Invalid null tag value encoding: {0}")] InvalidTagEncoding(u8), + #[error("Invalid struct encoding: {0}")] + InvalidStructEncoding(crate::array::ArrayError), + #[error("Invalid list encoding: {0}")] + InvalidListEncoding(crate::array::ArrayError), } diff --git a/src/common/src/util/value_encoding/mod.rs b/src/common/src/util/value_encoding/mod.rs index 89104775b641b..8f7bc256af036 100644 --- a/src/common/src/util/value_encoding/mod.rs +++ b/src/common/src/util/value_encoding/mod.rs @@ -18,7 +18,6 @@ use bytes::{Buf, BufMut}; use chrono::{Datelike, Timelike}; -use crate::error::Result; use crate::types::{ DataType, Datum, Decimal, IntervalUnit, NaiveDateTimeWrapper, NaiveDateWrapper, NaiveTimeWrapper, OrderedF32, OrderedF64, ScalarImpl, ScalarRefImpl, @@ -26,6 +25,9 @@ use crate::types::{ pub mod error; use error::ValueEncodingError; +use risingwave_pb::data::data_type::TypeName; + +pub type Result = std::result::Result; use crate::array::{ListRef, StructRef}; @@ -147,16 +149,27 @@ fn deserialize_value(ty: &DataType, mut data: impl Buf) -> Result { DataType::Timestamp => ScalarImpl::NaiveDateTime(deserialize_naivedatetime(data)?), DataType::Timestampz => ScalarImpl::Int64(data.get_i64_le()), DataType::Date => ScalarImpl::NaiveDate(deserialize_naivedate(data)?), - DataType::Struct { .. } => deserialize_struct_or_list(ty, data)?, - DataType::List { .. } => deserialize_struct_or_list(ty, data)?, + DataType::Struct { .. } => deserialize_struct(ty, data)?, + DataType::List { .. } => deserialize_list(ty, data)?, })) } -fn deserialize_struct_or_list(data_type: &DataType, mut data: impl Buf) -> Result { +fn deserialize_struct(data_type: &DataType, mut data: impl Buf) -> Result { + assert_eq!(data_type.prost_type_name(), TypeName::Struct); + let len = data.get_u32_le(); + let mut bytes = vec![0; len as usize]; + data.copy_to_slice(&mut bytes); + ScalarImpl::from_proto_bytes(&bytes, &data_type.to_protobuf()) + .map_err(|e| ValueEncodingError::InvalidStructEncoding(e)) +} + +fn deserialize_list(data_type: &DataType, mut data: impl Buf) -> Result { + assert_eq!(data_type.prost_type_name(), TypeName::List); let len = data.get_u32_le(); let mut bytes = vec![0; len as usize]; data.copy_to_slice(&mut bytes); - ScalarImpl::bytes_to_scalar(&bytes, &data_type.to_protobuf()).map_err(Into::into) + ScalarImpl::from_proto_bytes(&bytes, &data_type.to_protobuf()) + .map_err(|e| ValueEncodingError::InvalidListEncoding(e)) } fn deserialize_str(mut data: impl Buf) -> Result { @@ -184,18 +197,18 @@ fn deserialize_interval(mut data: impl Buf) -> Result { fn deserialize_naivetime(mut data: impl Buf) -> Result { let secs = data.get_u32_le(); let nano = data.get_u32_le(); - NaiveTimeWrapper::new_with_secs_nano_value_encoding(secs, nano) + NaiveTimeWrapper::with_secs_nano_value(secs, nano) } fn deserialize_naivedatetime(mut data: impl Buf) -> Result { let secs = data.get_i64_le(); let nsecs = data.get_u32_le(); - NaiveDateTimeWrapper::new_with_secs_nsecs_value_encoding(secs, nsecs) + NaiveDateTimeWrapper::with_secs_nsecs_value(secs, nsecs) } fn deserialize_naivedate(mut data: impl Buf) -> Result { let days = data.get_i32_le(); - NaiveDateWrapper::new_with_days_value_encoding(days) + NaiveDateWrapper::with_days_value(days) } fn deserialize_decimal(mut data: impl Buf) -> Result { diff --git a/src/stream/src/executor/error.rs b/src/stream/src/executor/error.rs index e200937af8521..9db4d9abdba79 100644 --- a/src/stream/src/executor/error.rs +++ b/src/stream/src/executor/error.rs @@ -17,6 +17,7 @@ use std::backtrace::Backtrace; use either::Either; use risingwave_common::array::ArrayError; use risingwave_common::error::{BoxedError, Error, ErrorCode, RwError, TrackingIssue}; +use risingwave_common::util::value_encoding::error::ValueEncodingError; use risingwave_connector::error::ConnectorError; use risingwave_expr::ExprError; use risingwave_rpc_client::error::RpcError; @@ -123,12 +124,13 @@ impl From for StreamExecutorError { } } -// Chunk operation error. +/// Chunk operation error. impl From for StreamExecutorError { fn from(e: ArrayError) -> Self { StreamExecutorErrorInner::EvalError(Either::Left(e)).into() } } + impl From for StreamExecutorError { fn from(e: ExprError) -> Self { StreamExecutorErrorInner::EvalError(Either::Right(e)).into() @@ -169,6 +171,12 @@ impl From for StreamExecutorError { } } +impl From for StreamExecutorError { + fn from(e: ValueEncodingError) -> Self { + StreamExecutorErrorInner::SerdeError(Box::new(e)).into() + } +} + pub type StreamExecutorResult = std::result::Result; #[cfg(test)] diff --git a/src/stream/src/executor/managed_state/join/join_entry_state.rs b/src/stream/src/executor/managed_state/join/join_entry_state.rs index 69b7ae6001ed1..c4e4a18e99a77 100644 --- a/src/stream/src/executor/managed_state/join/join_entry_state.rs +++ b/src/stream/src/executor/managed_state/join/join_entry_state.rs @@ -64,9 +64,8 @@ impl JoinEntryState { data_types: &'b [DataType], ) -> impl Iterator)> + 'a { self.cached.values_mut().map(|encoded| { - let decoded = encoded.decode(data_types); - // TODO(yuhao): remove the `RwError` in value encoding. - (encoded, decoded.map_err(StreamExecutorError::serde_error)) + let decoded_row = encoded.decode(data_types)?; + (encoded, decoded_row) }) } } From fc70e5747f4b1d0759906d97d24fc2a7c8a6dbb6 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Tue, 16 Aug 2022 17:02:28 +0800 Subject: [PATCH 4/5] fix remaining todos --- src/common/src/array/data_chunk_iter.rs | 5 ++--- .../executor/managed_state/join/join_entry_state.rs | 5 ++--- src/stream/src/executor/managed_state/join/mod.rs | 13 +++---------- 3 files changed, 7 insertions(+), 16 deletions(-) diff --git a/src/common/src/array/data_chunk_iter.rs b/src/common/src/array/data_chunk_iter.rs index cc81f15aa9829..26c01ddde279c 100644 --- a/src/common/src/array/data_chunk_iter.rs +++ b/src/common/src/array/data_chunk_iter.rs @@ -20,7 +20,6 @@ use itertools::Itertools; use super::column::Column; use crate::array::DataChunk; -use crate::error::Result; use crate::hash::HashCode; use crate::types::{hash_datum, DataType, Datum, DatumRef, ToOwnedDatum}; use crate::util::value_encoding; @@ -255,7 +254,7 @@ impl Row { /// [`crate::util::ordered::OrderedRow`] /// /// All values are nullable. Each value will have 1 extra byte to indicate whether it is null. - pub fn serialize(&self) -> Result> { + pub fn serialize(&self) -> value_encoding::Result> { let mut result = vec![]; for cell in &self.0 { result.extend(serialize_datum(cell)?); @@ -316,7 +315,7 @@ impl RowDeserializer { } /// Deserialize the row from value encoding bytes. - pub fn deserialize(&self, mut data: impl Buf) -> Result { + pub fn deserialize(&self, mut data: impl Buf) -> value_encoding::Result { let mut values = Vec::with_capacity(self.data_types.len()); for typ in &self.data_types { values.push(deserialize_datum(&mut data, typ)?); diff --git a/src/stream/src/executor/managed_state/join/join_entry_state.rs b/src/stream/src/executor/managed_state/join/join_entry_state.rs index c4e4a18e99a77..c3e6033323a58 100644 --- a/src/stream/src/executor/managed_state/join/join_entry_state.rs +++ b/src/stream/src/executor/managed_state/join/join_entry_state.rs @@ -15,7 +15,6 @@ use std::collections::{btree_map, BTreeMap}; use super::*; -use crate::executor::error::StreamExecutorError; #[expect(dead_code)] type JoinEntryStateIter<'a> = btree_map::Iter<'a, PkType, StateValueType>; @@ -64,8 +63,8 @@ impl JoinEntryState { data_types: &'b [DataType], ) -> impl Iterator)> + 'a { self.cached.values_mut().map(|encoded| { - let decoded_row = encoded.decode(data_types)?; - (encoded, decoded_row) + let decoded = encoded.decode(data_types); + (encoded, decoded) }) } } diff --git a/src/stream/src/executor/managed_state/join/mod.rs b/src/stream/src/executor/managed_state/join/mod.rs index 77df7aa2cb72c..46bc3328b0985 100644 --- a/src/stream/src/executor/managed_state/join/mod.rs +++ b/src/stream/src/executor/managed_state/join/mod.rs @@ -31,7 +31,7 @@ use risingwave_storage::table::state_table::RowBasedStateTable; use risingwave_storage::StateStore; use stats_alloc::{SharedStatsAlloc, StatsAlloc}; -use crate::executor::error::{StreamExecutorError, StreamExecutorResult}; +use crate::executor::error::StreamExecutorResult; use crate::executor::monitor::StreamingMetrics; type DegreeType = u64; @@ -106,11 +106,7 @@ impl JoinRow { pub fn encode(&self) -> StreamExecutorResult { Ok(EncodedJoinRow { - // TODO(yuhao): remove the `RwError` in value encoding. - row: self - .row - .serialize() - .map_err(StreamExecutorError::serde_error)?, + row: self.row.serialize()?, degree: self.degree, }) } @@ -125,10 +121,7 @@ pub struct EncodedJoinRow { impl EncodedJoinRow { fn decode(&self, data_types: &[DataType]) -> StreamExecutorResult { let deserializer = RowDeserializer::new(data_types.to_vec()); - // TODO(yuhao): remove the `RwError` in value encoding. - let row = deserializer - .deserialize(self.row.as_ref()) - .map_err(StreamExecutorError::serde_error)?; + let row = deserializer.deserialize(self.row.as_ref())?; Ok(JoinRow { row, degree: self.degree, From 3eb54485ffa02029a8649221b299598be3e5725a Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Tue, 16 Aug 2022 17:07:08 +0800 Subject: [PATCH 5/5] fix clippy --- src/common/src/util/value_encoding/mod.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/common/src/util/value_encoding/mod.rs b/src/common/src/util/value_encoding/mod.rs index 8f7bc256af036..63e24c3ffa530 100644 --- a/src/common/src/util/value_encoding/mod.rs +++ b/src/common/src/util/value_encoding/mod.rs @@ -66,7 +66,7 @@ pub fn deserialize_datum(mut data: impl Buf, ty: &DataType) -> Result { match null_tag { 0 => Ok(None), 1 => deserialize_value(ty, data), - _ => Err(ValueEncodingError::InvalidTagEncoding(null_tag).into()), + _ => Err(ValueEncodingError::InvalidTagEncoding(null_tag)), } } @@ -160,7 +160,7 @@ fn deserialize_struct(data_type: &DataType, mut data: impl Buf) -> Result Result { @@ -169,21 +169,21 @@ fn deserialize_list(data_type: &DataType, mut data: impl Buf) -> Result Result { let len = data.get_u32_le(); let mut bytes = vec![0; len as usize]; data.copy_to_slice(&mut bytes); - Ok(String::from_utf8(bytes).map_err(ValueEncodingError::InvalidUtf8)?) + String::from_utf8(bytes).map_err(ValueEncodingError::InvalidUtf8) } fn deserialize_bool(mut data: impl Buf) -> Result { match data.get_u8() { 1 => Ok(true), 0 => Ok(false), - value => Err(ValueEncodingError::InvalidBoolEncoding(value).into()), + value => Err(ValueEncodingError::InvalidBoolEncoding(value)), } }