From 35e2c64609acc045774847e8f1c5aac98fec1bc8 Mon Sep 17 00:00:00 2001 From: Kould Date: Thu, 25 Jul 2024 15:55:42 +0800 Subject: [PATCH] feat: impl Encode + Decode on marco `morsel_record` --- src/compaction/mod.rs | 4 +-- src/lib.rs | 5 ++++ src/morseldb_marco/src/lib.rs | 51 +++++++++++++++++++++++++++++++++++ src/serdes/mod.rs | 2 +- 4 files changed, 59 insertions(+), 3 deletions(-) diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index 4dac16b5..aad842c4 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -19,6 +19,7 @@ use crate::{ ondisk::sstable::SsTable, record::{KeyRef, Record}, scope::Scope, + serdes::Encode, stream::{level::LevelStream, merge::MergeStream, ScanStream}, version::{ edit::VersionEdit, @@ -329,8 +330,7 @@ where } max = Some(key.value.to_key()); - // FIXIT: it is not the real size in arrows; - written_size += key.size(); + written_size += key.size() + entry.value().size(); builder.push(key, entry.value()); if written_size >= option.max_sst_file_size { diff --git a/src/lib.rs b/src/lib.rs index 7b06304b..64ad2e9f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -309,6 +309,7 @@ pub(crate) mod tests { datatypes::{DataType, Field, Schema, UInt32Type}, }; use async_lock::RwLock; + use futures_util::io; use morseldb_marco::morsel_record; use once_cell::sync::Lazy; use parquet::arrow::ProjectionMask; @@ -321,6 +322,10 @@ pub(crate) mod tests { mutable::Mutable, }, record::{internal::InternalRecordRef, Key, RecordRef}, + serdes::{ + option::{DecodeError, EncodeError}, + Decode, Encode, + }, timestamp::Timestamped, version::{cleaner::Cleaner, set::tests::build_version_set, Version}, DbOption, Immutable, Record, WriteError, DB, diff --git a/src/morseldb_marco/src/lib.rs b/src/morseldb_marco/src/lib.rs index e0db6771..f4f14a78 100644 --- a/src/morseldb_marco/src/lib.rs +++ b/src/morseldb_marco/src/lib.rs @@ -28,6 +28,10 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream { let mut primary_key_definitions = None; + let mut encode_method_fields: Vec = Vec::new(); + let mut encode_size_fields: Vec = Vec::new(); + let mut decode_method_fields: Vec = Vec::new(); + let mut to_ref_init_fields: Vec = Vec::new(); let mut schema_fields: Vec = Vec::new(); let mut ref_fields: Vec = Vec::new(); @@ -194,6 +198,12 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream { builder_as_any_fields.push(quote! { Arc::clone(&#field_name) as Arc, }); + encode_method_fields.push(quote! { + self.#field_name.encode(writer).await?; + }); + encode_size_fields.push(quote! { + + self.#field_name.size() + }); match ModelAttributes::parse_field(field) { Ok(false) => { @@ -246,6 +256,9 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream { builder_push_none_fields.push(quote! { self.#field_name.append_null(); }); + decode_method_fields.push(quote! { + let #field_name = Option::<#field_ty>::decode(reader).await?; + }); } else { from_record_batch_fields.push(quote! { let mut #field_name = None; @@ -271,6 +284,9 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream { builder_push_none_fields.push(quote! { self.#field_name.append_value(#default); }); + decode_method_fields.push(quote! { + let #field_name = Option::<#field_ty>::decode(reader).await?.unwrap(); + }); } } Ok(true) => { @@ -308,6 +324,9 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream { arrays_get_fields.push(quote! { let #field_name = self.#field_name.value(offset); }); + decode_method_fields.push(quote! { + let #field_name = #field_ty::decode(reader).await?; + }); } Err(err) => return TokenStream::from(err.to_compile_error()), } @@ -372,6 +391,21 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream { } } + impl Decode for #struct_name { + type Error = DecodeError; + + async fn decode(reader: &mut R) -> Result + where + R: futures_io::AsyncRead + Unpin, + { + #(#decode_method_fields)* + + Ok(Self { + #(#field_names)* + }) + } + } + #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub struct #struct_ref_name<'r> { #(#ref_fields)* @@ -407,6 +441,23 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream { } } + impl<'r> Encode for #struct_ref_name<'r> { + type Error = EncodeError; + + async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> + where + W: io::AsyncWrite + Unpin, + { + #(#encode_method_fields)* + + Ok(()) + } + + fn size(&self) -> usize { + 0 #(#encode_size_fields)* + } + } + #[derive(Debug)] pub struct #struct_arrays_name { _null: Arc, diff --git a/src/serdes/mod.rs b/src/serdes/mod.rs index 0c02f6a8..6ee5a4a1 100644 --- a/src/serdes/mod.rs +++ b/src/serdes/mod.rs @@ -1,7 +1,7 @@ mod arc; mod boolean; mod num; -mod option; +pub(crate) mod option; mod string; use std::{future::Future, io};