Skip to content

Commit

Permalink
feat: impl Encode + Decode on marco morsel_record
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Jul 25, 2024
1 parent 0643c48 commit 35e2c64
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 3 deletions.
4 changes: 2 additions & 2 deletions src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::{
ondisk::sstable::SsTable,
record::{KeyRef, Record},
scope::Scope,
serdes::Encode,
stream::{level::LevelStream, merge::MergeStream, ScanStream},
version::{
edit::VersionEdit,
Expand Down Expand Up @@ -329,8 +330,7 @@ where
}
max = Some(key.value.to_key());

// FIXIT: it is not the real size in arrows;
written_size += key.size();
written_size += key.size() + entry.value().size();
builder.push(key, entry.value());

if written_size >= option.max_sst_file_size {
Expand Down
5 changes: 5 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ pub(crate) mod tests {
datatypes::{DataType, Field, Schema, UInt32Type},
};
use async_lock::RwLock;
use futures_util::io;
use morseldb_marco::morsel_record;
use once_cell::sync::Lazy;
use parquet::arrow::ProjectionMask;
Expand All @@ -321,6 +322,10 @@ pub(crate) mod tests {
mutable::Mutable,
},
record::{internal::InternalRecordRef, Key, RecordRef},
serdes::{
option::{DecodeError, EncodeError},
Decode, Encode,
},
timestamp::Timestamped,
version::{cleaner::Cleaner, set::tests::build_version_set, Version},
DbOption, Immutable, Record, WriteError, DB,
Expand Down
51 changes: 51 additions & 0 deletions src/morseldb_marco/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream {

let mut primary_key_definitions = None;

let mut encode_method_fields: Vec<proc_macro2::TokenStream> = Vec::new();
let mut encode_size_fields: Vec<proc_macro2::TokenStream> = Vec::new();
let mut decode_method_fields: Vec<proc_macro2::TokenStream> = Vec::new();

let mut to_ref_init_fields: Vec<proc_macro2::TokenStream> = Vec::new();
let mut schema_fields: Vec<proc_macro2::TokenStream> = Vec::new();
let mut ref_fields: Vec<proc_macro2::TokenStream> = Vec::new();
Expand Down Expand Up @@ -194,6 +198,12 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream {
builder_as_any_fields.push(quote! {
Arc::clone(&#field_name) as Arc<dyn Array>,
});
encode_method_fields.push(quote! {
self.#field_name.encode(writer).await?;
});
encode_size_fields.push(quote! {
+ self.#field_name.size()
});

match ModelAttributes::parse_field(field) {
Ok(false) => {
Expand Down Expand Up @@ -246,6 +256,9 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream {
builder_push_none_fields.push(quote! {
self.#field_name.append_null();
});
decode_method_fields.push(quote! {
let #field_name = Option::<#field_ty>::decode(reader).await?;
});
} else {
from_record_batch_fields.push(quote! {
let mut #field_name = None;
Expand All @@ -271,6 +284,9 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream {
builder_push_none_fields.push(quote! {
self.#field_name.append_value(#default);
});
decode_method_fields.push(quote! {
let #field_name = Option::<#field_ty>::decode(reader).await?.unwrap();
});
}
}
Ok(true) => {
Expand Down Expand Up @@ -308,6 +324,9 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream {
arrays_get_fields.push(quote! {
let #field_name = self.#field_name.value(offset);
});
decode_method_fields.push(quote! {
let #field_name = #field_ty::decode(reader).await?;
});
}
Err(err) => return TokenStream::from(err.to_compile_error()),
}
Expand Down Expand Up @@ -372,6 +391,21 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream {
}
}

impl Decode for #struct_name {
type Error = DecodeError<io::Error>;

async fn decode<R>(reader: &mut R) -> Result<Self, Self::Error>
where
R: futures_io::AsyncRead + Unpin,
{
#(#decode_method_fields)*

Ok(Self {
#(#field_names)*
})
}
}

#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct #struct_ref_name<'r> {
#(#ref_fields)*
Expand Down Expand Up @@ -407,6 +441,23 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream {
}
}

impl<'r> Encode for #struct_ref_name<'r> {
type Error = EncodeError<io::Error>;

async fn encode<W>(&self, writer: &mut W) -> Result<(), Self::Error>
where
W: io::AsyncWrite + Unpin,
{
#(#encode_method_fields)*

Ok(())
}

fn size(&self) -> usize {
0 #(#encode_size_fields)*
}
}

#[derive(Debug)]
pub struct #struct_arrays_name {
_null: Arc<BooleanArray>,
Expand Down
2 changes: 1 addition & 1 deletion src/serdes/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mod arc;
mod boolean;
mod num;
mod option;
pub(crate) mod option;
mod string;

use std::{future::Future, io};
Expand Down

0 comments on commit 35e2c64

Please sign in to comment.