Skip to content

Commit

Permalink
fix: fix marco example
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Jul 29, 2024
1 parent 6be50ce commit 856a26b
Show file tree
Hide file tree
Showing 7 changed files with 384 additions and 33 deletions.
3 changes: 2 additions & 1 deletion examples/declare.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use morseldb::morsel_record;

// Tips: must be public
#[morsel_record]
struct Post {
pub struct Post {
#[primary_key]
name: String,
}
Expand Down
148 changes: 148 additions & 0 deletions src/inmem/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,151 @@ where
})
}
}

#[cfg(test)]
pub(crate) mod tests {
use std::{mem, sync::Arc};

use arrow::{
array::{
Array, BooleanArray, BooleanBufferBuilder, BooleanBuilder, PrimitiveBuilder,
RecordBatch, StringArray, StringBuilder, UInt32Array, UInt32Builder,
},
datatypes::{ArrowPrimitiveType, UInt32Type},
};
use parquet::arrow::ProjectionMask;

use super::{ArrowArrays, Builder};
use crate::{
record::Record,
tests::{Test, TestRef},
timestamp::timestamped::Timestamped,
};

#[derive(Debug)]
pub struct TestImmutableArrays {
_null: Arc<BooleanArray>,
_ts: Arc<UInt32Array>,
vstring: Arc<StringArray>,
vu32: Arc<UInt32Array>,
vbool: Arc<BooleanArray>,

record_batch: RecordBatch,
}

impl ArrowArrays for TestImmutableArrays {
type Record = Test;

type Builder = TestBuilder;

fn builder(capacity: usize) -> Self::Builder {
TestBuilder {
vstring: StringBuilder::with_capacity(capacity, 0),
vu32: PrimitiveBuilder::<UInt32Type>::with_capacity(capacity),
vobool: BooleanBuilder::with_capacity(capacity),
_null: BooleanBufferBuilder::new(capacity),
_ts: UInt32Builder::with_capacity(capacity),
}
}

fn get(
&self,
offset: u32,
projection_mask: &ProjectionMask,
) -> Option<Option<<Self::Record as Record>::Ref<'_>>> {
let offset = offset as usize;

if offset >= self.vstring.len() {
return None;
}
if self._null.value(offset) {
return Some(None);
}

let vstring = self.vstring.value(offset);
let vu32 = projection_mask
.leaf_included(3)
.then(|| self.vu32.value(offset));
let vbool = (!self.vbool.is_null(offset) && projection_mask.leaf_included(4))
.then(|| self.vbool.value(offset));

Some(Some(TestRef {
vstring,
vu32,
vbool,
}))
}

fn as_record_batch(&self) -> &RecordBatch {
&self.record_batch
}
}

pub struct TestBuilder {
vstring: StringBuilder,
vu32: PrimitiveBuilder<UInt32Type>,
vobool: BooleanBuilder,
_null: BooleanBufferBuilder,
_ts: UInt32Builder,
}

impl Builder<TestImmutableArrays> for TestBuilder {
fn push(&mut self, key: Timestamped<&str>, row: Option<TestRef>) {
self.vstring.append_value(key.value);
match row {
Some(row) => {
self.vu32.append_value(row.vu32.unwrap());
match row.vbool {
Some(vobool) => self.vobool.append_value(vobool),
None => self.vobool.append_null(),
}
self._null.append(false);
self._ts.append_value(key.ts.into());
}
None => {
self.vu32
.append_value(<UInt32Type as ArrowPrimitiveType>::Native::default());
self.vobool.append_null();
self._null.append(true);
self._ts.append_value(key.ts.into());
}
}
}

fn written_size(&self) -> usize {
mem::size_of_val(self.vstring.values_slice())
+ mem::size_of_val(self.vu32.values_slice())
+ mem::size_of_val(self.vobool.values_slice())
}

fn finish(&mut self) -> TestImmutableArrays {
let vstring = Arc::new(self.vstring.finish());
let vu32 = Arc::new(self.vu32.finish());
let vbool = Arc::new(self.vobool.finish());
let _null = Arc::new(BooleanArray::new(self._null.finish(), None));
let _ts = Arc::new(self._ts.finish());
let record_batch = RecordBatch::try_new(
Arc::clone(
<<TestImmutableArrays as ArrowArrays>::Record as Record>::arrow_schema(),
),
vec![
Arc::clone(&_null) as Arc<dyn Array>,
Arc::clone(&_ts) as Arc<dyn Array>,
Arc::clone(&vstring) as Arc<dyn Array>,
Arc::clone(&vu32) as Arc<dyn Array>,
Arc::clone(&vbool) as Arc<dyn Array>,
],
)
.expect("create record batch must be successful");

TestImmutableArrays {
vstring,
vu32,
vbool,
_null,
_ts,
record_batch,
}
}
}
}
203 changes: 185 additions & 18 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub mod record;
mod scope;
pub mod serdes;
mod stream;
mod timestamp;
pub mod timestamp;
mod transaction;
mod version;
mod wal;
Expand Down Expand Up @@ -303,43 +303,210 @@ pub(crate) mod tests {
use std::{collections::VecDeque, sync::Arc};

use arrow::{
array::{
Array, AsArray, BooleanArray, BooleanBufferBuilder, BooleanBuilder, PrimitiveBuilder,
RecordBatch, StringArray, StringBuilder, UInt32Array, UInt32Builder,
},
array::{Array, AsArray, RecordBatch},
datatypes::{DataType, Field, Schema, UInt32Type},
};
use async_lock::RwLock;
use futures_util::io;
use morseldb_marco::morsel_record;
use once_cell::sync::Lazy;
use parquet::arrow::ProjectionMask;
use tracing::error;

use crate::{
executor::{tokio::TokioExecutor, Executor},
inmem::{
immutable::{ArrowArrays, Builder},
mutable::Mutable,
},
record::{internal::InternalRecordRef, Key, RecordRef},
serdes::{
option::{DecodeError, EncodeError},
Decode, Encode,
},
timestamp::Timestamped,
inmem::{immutable::tests::TestImmutableArrays, mutable::Mutable},
record::{internal::InternalRecordRef, RecordDecodeError, RecordEncodeError, RecordRef},
serdes::{Decode, Encode},
version::{cleaner::Cleaner, set::tests::build_version_set, Version},
DbOption, Immutable, Record, WriteError, DB,
};

#[morsel_record]
#[derive(Debug, PartialEq, Eq)]
pub struct Test {
#[primary_key]
pub vstring: String,
pub vu32: u32,
pub vbool: Option<bool>,
}

impl Decode for Test {
type Error = RecordDecodeError;

async fn decode<R>(reader: &mut R) -> Result<Self, Self::Error>
where
R: futures_io::AsyncRead + Unpin,
{
let vstring =
String::decode(reader)
.await
.map_err(|err| RecordDecodeError::Decode {
field_name: "vstring".to_string(),
error: Box::new(err),
})?;
let vu32 = Option::<u32>::decode(reader)
.await
.map_err(|err| RecordDecodeError::Decode {
field_name: "vu32".to_string(),
error: Box::new(err),
})?
.unwrap();
let vbool =
Option::<bool>::decode(reader)
.await
.map_err(|err| RecordDecodeError::Decode {
field_name: "vbool".to_string(),
error: Box::new(err),
})?;

Ok(Self {
vstring,
vu32,
vbool,
})
}
}

impl Record for Test {
type Columns = TestImmutableArrays;

type Key = String;

type Ref<'r> = TestRef<'r>
where
Self: 'r;

fn key(&self) -> &str {
&self.vstring
}

fn primary_key_index() -> usize {
2
}

fn as_record_ref(&self) -> Self::Ref<'_> {
TestRef {
vstring: &self.vstring,
vu32: Some(self.vu32),
vbool: self.vbool,
}
}

fn arrow_schema() -> &'static Arc<Schema> {
static SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
Arc::new(Schema::new(vec![
Field::new("_null", DataType::Boolean, false),
Field::new("_ts", DataType::UInt32, false),
Field::new("vstring", DataType::Utf8, false),
Field::new("vu32", DataType::UInt32, false),
Field::new("vbool", DataType::Boolean, true),
]))
});

&SCHEMA
}
}

#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct TestRef<'r> {
pub vstring: &'r str,
pub vu32: Option<u32>,
pub vbool: Option<bool>,
}

impl<'r> Encode for TestRef<'r> {
type Error = RecordEncodeError;

async fn encode<W>(&self, writer: &mut W) -> Result<(), Self::Error>
where
W: io::AsyncWrite + Unpin,
{
self.vstring
.encode(writer)
.await
.map_err(|err| RecordEncodeError::Encode {
field_name: "vstring".to_string(),
error: Box::new(err),
})?;
self.vu32
.encode(writer)
.await
.map_err(|err| RecordEncodeError::Encode {
field_name: "vu32".to_string(),
error: Box::new(err),
})?;
self.vbool
.encode(writer)
.await
.map_err(|err| RecordEncodeError::Encode {
field_name: "vbool".to_string(),
error: Box::new(err),
})?;

Ok(())
}

fn size(&self) -> usize {
self.vstring.size() + self.vu32.size() + self.vbool.size()
}
}

impl<'r> RecordRef<'r> for TestRef<'r> {
type Record = Test;

fn key(self) -> <<Self::Record as Record>::Key as crate::record::Key>::Ref<'r> {
self.vstring
}

fn from_record_batch(
record_batch: &'r RecordBatch,
offset: usize,
projection_mask: &'r ProjectionMask,
) -> InternalRecordRef<'r, Self> {
let mut column_i = 2;
let null = record_batch.column(0).as_boolean().value(offset);

let ts = record_batch
.column(1)
.as_primitive::<UInt32Type>()
.value(offset)
.into();

let vstring = record_batch
.column(column_i)
.as_string::<i32>()
.value(offset);
column_i += 1;

let mut vu32 = None;

if projection_mask.leaf_included(3) {
vu32 = Some(
record_batch
.column(column_i)
.as_primitive::<UInt32Type>()
.value(offset),
);
column_i += 1;
}

let mut vbool = None;

if projection_mask.leaf_included(4) {
let vbool_array = record_batch.column(column_i).as_boolean();

if !vbool_array.is_null(offset) {
vbool = Some(vbool_array.value(offset));
}
}

let record = TestRef {
vstring,
vu32,
vbool,
};
InternalRecordRef::new(ts, record, null)
}
}

pub(crate) async fn get_test_record_batch<E: Executor>(
option: Arc<DbOption>,
executor: E,
Expand Down
Loading

0 comments on commit 856a26b

Please sign in to comment.