From 3c0b73ee4530ae1df990618512929b702e47ad93 Mon Sep 17 00:00:00 2001 From: adolph liu Date: Sun, 2 Jun 2024 13:40:42 -0700 Subject: [PATCH] Implement the Buf to avoid extra memory allocation #4065 --- Cargo.lock | 1 + src/mito2/Cargo.toml | 5 +++-- src/mito2/src/wal/entry_reader.rs | 7 ++++--- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 35fabee98b6c..f95d876eb95e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5842,6 +5842,7 @@ dependencies = [ "memcomparable", "moka", "object-store", + "opendal", "parquet", "paste", "pin-project", diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index f9fdb5b574a4..26bed663debb 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -33,9 +33,9 @@ common-time.workspace = true common-wal.workspace = true crc32fast = "1" crossbeam-utils.workspace = true -datafusion.workspace = true datafusion-common.workspace = true datafusion-expr.workspace = true +datafusion.workspace = true datatypes.workspace = true futures.workspace = true humantime-serde.workspace = true @@ -46,6 +46,7 @@ log-store = { workspace = true, optional = true } memcomparable = "0.2" moka = { workspace = true, features = ["sync", "future"] } object-store.workspace = true +opendal = { version = "*" } parquet = { workspace = true, features = ["async"] } paste.workspace = true pin-project.workspace = true @@ -62,9 +63,9 @@ snafu.workspace = true store-api.workspace = true strum.workspace = true table.workspace = true -tokio.workspace = true tokio-stream.workspace = true tokio-util.workspace = true +tokio.workspace = true uuid.workspace = true [dev-dependencies] diff --git a/src/mito2/src/wal/entry_reader.rs b/src/mito2/src/wal/entry_reader.rs index c29a5e629d5c..25dcb38cb128 100644 --- a/src/mito2/src/wal/entry_reader.rs +++ b/src/mito2/src/wal/entry_reader.rs @@ -16,6 +16,7 @@ use api::v1::WalEntry; use async_stream::stream; use common_telemetry::info; use futures::StreamExt; +use opendal::Buffer; use prost::Message; use snafu::{ensure, ResultExt}; use store_api::logstore::entry::Entry; @@ -30,9 +31,9 @@ pub(crate) fn decode_raw_entry(raw_entry: Entry) -> Result<(EntryId, WalEntry)> let entry_id = raw_entry.entry_id(); let region_id = raw_entry.region_id(); ensure!(raw_entry.is_complete(), CorruptedEntrySnafu { region_id }); - // TODO(weny): implement the [Buf] for return value, avoid extra memory allocation. - let bytes = raw_entry.into_bytes(); - let wal_entry = WalEntry::decode(bytes.as_slice()).context(DecodeWalSnafu { region_id })?; + // TODO(yuliu): implement the [Buf] for return value, avoid extra memory allocation. + let buffer: Buffer = raw_entry.into_bytes().into(); + let wal_entry = WalEntry::decode(buffer).context(DecodeWalSnafu { region_id })?; Ok((entry_id, wal_entry)) }