Skip to content

Commit

Permalink
perf(memcomparable): optimize ser/de for decimal (risingwavelabs#6586)
Browse files Browse the repository at this point in the history
* optimize ser/de for decimal in memcomparable

Signed-off-by: Runji Wang <[email protected]>

* remove date/time from memcomparable crate because they are trivial

Signed-off-by: Runji Wang <[email protected]>

* fix decimal test

Signed-off-by: Runji Wang <[email protected]>

* Update src/utils/memcomparable/src/decimal.rs

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* fix clippy and refactor read_bytes_len

Signed-off-by: Runji Wang <[email protected]>

Signed-off-by: Runji Wang <[email protected]>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Nov 28, 2022
1 parent 80f3096 commit 262eb16
Show file tree
Hide file tree
Showing 13 changed files with 360 additions and 493 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ futures-async-stream = "0.2"
humantime = "2.1"
itertools = "0.10"
lru = { git = "https://github.com/risingwavelabs/lru-rs.git", branch = "evict_by_timestamp" }
memcomparable = { path = "../utils/memcomparable" }
memcomparable = { path = "../utils/memcomparable", features = ["decimal"] }
more-asserts = "0.3"
num-traits = "0.2"
parking_lot = "0.12"
Expand Down
48 changes: 25 additions & 23 deletions src/common/src/types/decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,34 +519,14 @@ impl Decimal {
}
}

/// TODO: 1. test whether the decimal in rust, any crate, has the same behavior as PG.
/// 2. support memcomparable encoding for dynamic decimal.
pub fn mantissa_scale_for_serialization(&self) -> (i128, u8) {
// Since the largest scale supported by `rust_decimal` is 28,
// and we first compare scale, we use 29 and 30 to denote +Inf and NaN.
match self {
Self::NegativeInf => (0, 29),
Self::Normalized(d) => {
// We remark that we do not dynamic numeric, i.e. the scale of all the numeric in
// the system is fixed. So we don't need to do any rescale, just use
// the `scale` of `rust_decimal`. However, it is possible that scale
// may overflow during calculation as `rust_decimal`'s max scale is
// 28.
(d.mantissa(), d.scale() as u8)
}
Self::PositiveInf => (0, 30),
Self::NaN => (0, 31),
}
}

pub fn unordered_serialize(&self) -> [u8; 16] {
// according to https://docs.rs/rust_decimal/1.18.0/src/rust_decimal/decimal.rs.html#665-684
// the lower 15 bits is not used, so we can use first byte to distinguish nan and inf
match self {
Self::Normalized(d) => d.serialize(),
Self::NaN => [vec![1u8], vec![0u8; 15]].concat().try_into().unwrap(),
Self::PositiveInf => [vec![2u8], vec![0u8; 15]].concat().try_into().unwrap(),
Self::NegativeInf => [vec![3u8], vec![0u8; 15]].concat().try_into().unwrap(),
Self::NaN => [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
Self::PositiveInf => [2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
Self::NegativeInf => [3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
}
}

Expand Down Expand Up @@ -576,6 +556,28 @@ impl Decimal {
}
}

impl From<Decimal> for memcomparable::Decimal {
fn from(d: Decimal) -> Self {
match d {
Decimal::Normalized(d) => Self::Normalized(d),
Decimal::PositiveInf => Self::Inf,
Decimal::NegativeInf => Self::NegInf,
Decimal::NaN => Self::NaN,
}
}
}

impl From<memcomparable::Decimal> for Decimal {
fn from(d: memcomparable::Decimal) -> Self {
match d {
memcomparable::Decimal::Normalized(d) => Self::Normalized(d),
memcomparable::Decimal::Inf => Self::PositiveInf,
memcomparable::Decimal::NegInf => Self::NegativeInf,
memcomparable::Decimal::NaN => Self::NaN,
}
}
}

impl Default for Decimal {
fn default() -> Self {
Self::Normalized(RustDecimal::default())
Expand Down
40 changes: 18 additions & 22 deletions src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -817,17 +817,16 @@ impl ScalarRefImpl<'_> {
Self::Float64(v) => v.serialize(ser)?,
Self::Utf8(v) => v.serialize(ser)?,
Self::Bool(v) => v.serialize(ser)?,
Self::Decimal(v) => {
let (mantissa, scale) = v.mantissa_scale_for_serialization();
ser.serialize_decimal(mantissa, scale)?;
}
Self::Decimal(v) => ser.serialize_decimal((*v).into())?,
Self::Interval(v) => v.serialize(ser)?,
Self::NaiveDate(v) => ser.serialize_naivedate(v.0.num_days_from_ce())?,
Self::NaiveDate(v) => v.0.num_days_from_ce().serialize(ser)?,
Self::NaiveDateTime(v) => {
ser.serialize_naivedatetime(v.0.timestamp(), v.0.timestamp_subsec_nanos())?
v.0.timestamp().serialize(&mut *ser)?;
v.0.timestamp_subsec_nanos().serialize(ser)?;
}
Self::NaiveTime(v) => {
ser.serialize_naivetime(v.0.num_seconds_from_midnight(), v.0.nanosecond())?
v.0.num_seconds_from_midnight().serialize(&mut *ser)?;
v.0.nanosecond().serialize(ser)?;
}
Self::Struct(v) => v.serialize(ser)?,
Self::List(v) => v.serialize(ser)?,
Expand Down Expand Up @@ -859,27 +858,21 @@ impl ScalarImpl {
Ty::Float64 => Self::Float64(f64::deserialize(de)?.into()),
Ty::Varchar => Self::Utf8(String::deserialize(de)?),
Ty::Boolean => Self::Bool(bool::deserialize(de)?),
Ty::Decimal => Self::Decimal({
let (mantissa, scale) = de.deserialize_decimal()?;
match scale {
29 => Decimal::NegativeInf,
30 => Decimal::PositiveInf,
31 => Decimal::NaN,
_ => Decimal::from_i128_with_scale(mantissa, scale as u32),
}
}),
Ty::Decimal => Self::Decimal(de.deserialize_decimal()?.into()),
Ty::Interval => Self::Interval(IntervalUnit::deserialize(de)?),
Ty::Time => Self::NaiveTime({
let (secs, nano) = de.deserialize_naivetime()?;
let secs = u32::deserialize(&mut *de)?;
let nano = u32::deserialize(de)?;
NaiveTimeWrapper::with_secs_nano(secs, nano)?
}),
Ty::Timestamp => Self::NaiveDateTime({
let (secs, nsecs) = de.deserialize_naivedatetime()?;
let secs = i64::deserialize(&mut *de)?;
let nsecs = u32::deserialize(de)?;
NaiveDateTimeWrapper::with_secs_nsecs(secs, nsecs)?
}),
Ty::Timestampz => Self::Int64(i64::deserialize(de)?),
Ty::Date => Self::NaiveDate({
let days = de.deserialize_naivedate()?;
let days = i32::deserialize(de)?;
NaiveDateWrapper::with_days(days)?
}),
Ty::Struct(t) => StructValue::deserialize(&t.fields, de)?.to_scalar_value(),
Expand Down Expand Up @@ -915,16 +908,19 @@ impl ScalarImpl {
DataType::Boolean => size_of::<u8>(),
// IntervalUnit is serialized as (i32, i32, i64)
DataType::Interval => size_of::<(i32, i32, i64)>(),
DataType::Decimal => deserializer.read_decimal_len()?,
DataType::Decimal => {
deserializer.deserialize_decimal()?;
0 // the len is not used since decimal is not a fixed length type
}
// these two types is var-length and should only be determine at runtime.
// TODO: need some test for this case (e.g. e2e test)
DataType::List { .. } => deserializer.read_bytes_len()?,
DataType::List { .. } => deserializer.skip_bytes()?,
DataType::Struct(t) => t
.fields
.iter()
.map(|field| Self::encoding_data_size(field, deserializer))
.try_fold(0, |a, b| b.map(|b| a + b))?,
DataType::Varchar => deserializer.read_bytes_len()?,
DataType::Varchar => deserializer.skip_bytes()?,
};

// consume offset of fixed_type
Expand Down
12 changes: 6 additions & 6 deletions src/common/src/util/ordered/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,8 @@ mod tests {
let encoding_data_size =
ScalarImpl::encoding_data_size(&DataType::Decimal, &mut deserializer)
.unwrap();
// [nulltag, flag, decimal_chunk, 0]
assert_eq!(18, encoding_data_size);
// [nulltag, flag, decimal_chunk]
assert_eq!(17, encoding_data_size);
}

{
Expand All @@ -362,8 +362,8 @@ mod tests {
let encoding_data_size =
ScalarImpl::encoding_data_size(&DataType::Decimal, &mut deserializer)
.unwrap();
// [nulltag, flag, decimal_chunk, 0]
assert_eq!(4, encoding_data_size);
// [nulltag, flag, decimal_chunk]
assert_eq!(3, encoding_data_size);
}

{
Expand All @@ -376,7 +376,7 @@ mod tests {
ScalarImpl::encoding_data_size(&DataType::Decimal, &mut deserializer)
.unwrap();

assert_eq!(3, encoding_data_size); // [1, 35, 0]
assert_eq!(2, encoding_data_size); // [1, 35]
}

{
Expand All @@ -388,7 +388,7 @@ mod tests {
let encoding_data_size =
ScalarImpl::encoding_data_size(&DataType::Decimal, &mut deserializer)
.unwrap();
assert_eq!(3, encoding_data_size); // [1, 6, 0]
assert_eq!(2, encoding_data_size); // [1, 6]
}

{
Expand Down
16 changes: 11 additions & 5 deletions src/utils/memcomparable/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,21 @@ repository = { workspace = true }
description = "Memcomparable format."
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
decimal = ["rust_decimal"]

[dependencies]
bytes = "1"
serde = { version = "1", features = ["derive"] }
rust_decimal = { version = "1", optional = true }
serde = "1"
thiserror = "1"

[target.'cfg(not(madsim))'.dependencies]
workspace-hack = { path = "../../workspace-hack" }

[dev-dependencies]
criterion = "0.4"
rand = "0.8"
rust_decimal = "1"
rust_decimal = { version = "1", features = ["rand"] }
serde = { version = "1", features = ["derive"] }

[[bench]]
name = "serde"
harness = false
65 changes: 65 additions & 0 deletions src/utils/memcomparable/benches/serde.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use criterion::{criterion_group, criterion_main, Criterion};

criterion_group!(benches, decimal);
criterion_main!(benches);

#[cfg(not(feature = "decimal"))]
fn decimal(_c: &mut Criterion) {}

#[cfg(feature = "decimal")]
fn decimal(c: &mut Criterion) {
use memcomparable::{Decimal, Deserializer, Serializer};

// generate decimals
let mut decimals = vec![];
for _ in 0..10 {
decimals.push(Decimal::Normalized(rand::random()));
}

c.bench_function("serialize_decimal", |b| {
let mut i = 0;
b.iter(|| {
let mut ser = Serializer::new(vec![]);
ser.serialize_decimal(decimals[i]).unwrap();
i += 1;
if i == decimals.len() {
i = 0;
}
})
});

c.bench_function("deserialize_decimal", |b| {
let encodings = decimals
.iter()
.map(|d| {
let mut ser = Serializer::new(vec![]);
ser.serialize_decimal(*d).unwrap();
ser.into_inner()
})
.collect::<Vec<_>>();
let mut i = 0;
b.iter(|| {
Deserializer::new(encodings[i].as_slice())
.deserialize_decimal()
.unwrap();
i += 1;
if i == decimals.len() {
i = 0;
}
})
});
}
Loading

0 comments on commit 262eb16

Please sign in to comment.