Skip to content

Commit

Permalink
fix: absolute path use in macro
Browse files Browse the repository at this point in the history
  • Loading branch information
ethe committed Jul 26, 2024
1 parent 31d1650 commit 9e5ae4b
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 67 deletions.
11 changes: 11 additions & 0 deletions examples/declare.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use morseldb::morsel_record;

#[morsel_record]
struct Post {
#[primary_key]
name: String,
}

fn main() {
println!("Hello, world!");
}
2 changes: 1 addition & 1 deletion src/inmem/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
pub(crate) mod immutable;
pub mod immutable;
pub(crate) mod mutable;
5 changes: 3 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ mod arrows;
mod compaction;
pub mod executor;
pub mod fs;
mod inmem;
pub mod inmem;
mod ondisk;
pub mod option;
mod record;
pub mod record;
mod scope;
pub mod serdes;
mod stream;
Expand All @@ -23,6 +23,7 @@ use futures_core::Stream;
use futures_util::StreamExt;
use inmem::{immutable::Immutable, mutable::Mutable};
use lockable::LockableHashMap;
pub use morseldb_marco::morsel_record;
use parquet::{
arrow::{arrow_to_parquet_schema, ProjectionMask},
errors::ParquetError,
Expand Down
128 changes: 67 additions & 61 deletions src/morseldb_marco/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream {
Some((DataType::UInt8, is_nullable)) => (
is_nullable,
quote!(u8),
quote!(DataType::UInt8),
quote!(::arrow::datatypes::DataType::UInt8),
quote!(UInt8Array),
quote!(as_primitive::<UInt8Type>()),
quote!(PrimitiveBuilder::<UInt8Type>::with_capacity(capacity)),
Expand All @@ -86,7 +86,7 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream {
Some((DataType::UInt16, is_nullable)) => (
is_nullable,
quote!(u16),
quote!(DataType::UInt16),
quote!(::arrow::datatypes::DataType::UInt16),
quote!(UInt16Array),
quote!(as_primitive::<UInt16Type>()),
quote!(PrimitiveBuilder::<UInt16Type>::with_capacity(capacity)),
Expand All @@ -97,18 +97,22 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream {
Some((DataType::UInt32, is_nullable)) => (
is_nullable,
quote!(u32),
quote!(DataType::UInt32),
quote!(UInt32Array),
quote!(as_primitive::<UInt32Type>()),
quote!(PrimitiveBuilder::<UInt32Type>::with_capacity(capacity)),
quote!(PrimitiveBuilder<UInt32Type>),
quote!(::arrow::datatypes::DataType::UInt32),
quote!(::arrow::array::UInt32Array),
quote!(as_primitive::<::arrow::datatypes::UInt32Type>()),
quote!(
PrimitiveBuilder::<::arrow::datatypes::UInt32Type>::with_capacity(
capacity
)
),
quote!(PrimitiveBuilder<::arrow::datatypes::UInt32Type>),
quote!(0),
quote!(std::mem::size_of_val(self.#field_name.values_slice())),
),
Some((DataType::UInt64, is_nullable)) => (
is_nullable,
quote!(u64),
quote!(DataType::UInt64),
quote!(::arrow::datatypes::DataType::UInt64),
quote!(UInt64Array),
quote!(as_primitive::<UInt64Type>()),
quote!(PrimitiveBuilder::<UInt64Type>::with_capacity(capacity)),
Expand All @@ -120,7 +124,7 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream {
Some((DataType::Int8, is_nullable)) => (
is_nullable,
quote!(i8),
quote!(DataType::Int8),
quote!(::arrow::datatypes::DataType::Int8),
quote!(Int8Array),
quote!(as_primitive::<Int8Type>()),
quote!(PrimitiveBuilder::<Int8Type>::with_capacity(capacity)),
Expand All @@ -131,7 +135,7 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream {
Some((DataType::Int16, is_nullable)) => (
is_nullable,
quote!(i16),
quote!(DataType::Int16),
quote!(::arrow::datatypes::DataType::Int16),
quote!(Int16Array),
quote!(as_primitive::<Int16Type>()),
quote!(PrimitiveBuilder::<Int16Type>::with_capacity(capacity)),
Expand All @@ -142,7 +146,7 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream {
Some((DataType::Int32, is_nullable)) => (
is_nullable,
quote!(i32),
quote!(DataType::Int32),
quote!(::arrow::datatypes::DataType::Int32),
quote!(Int32Array),
quote!(as_primitive::<Int32Type>()),
quote!(PrimitiveBuilder::<Int32Type>::with_capacity(capacity)),
Expand All @@ -153,7 +157,7 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream {
Some((DataType::Int64, is_nullable)) => (
is_nullable,
quote!(i64),
quote!(DataType::Int64),
quote!(::arrow::datatypes::DataType::Int64),
quote!(Int64Array),
quote!(as_primitive::<Int64Type>()),
quote!(PrimitiveBuilder::<Int64Type>::with_capacity(capacity)),
Expand All @@ -167,20 +171,20 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream {
(
is_nullable,
quote!(String),
quote!(DataType::Utf8),
quote!(StringArray),
quote!(::arrow::datatypes::DataType::Utf8),
quote!(::arrow::array::StringArray),
quote!(as_string::<i32>()),
quote!(StringBuilder::with_capacity(capacity, 0)),
quote!(StringBuilder),
quote!(::arrow::array::StringBuilder::with_capacity(capacity, 0)),
quote!(::arrow::array::StringBuilder),
quote!(""),
quote!(self.#field_name.values_slice().len()),
)
}
Some((DataType::Boolean, is_nullable)) => (
is_nullable,
quote!(bool),
quote!(DataType::Boolean),
quote!(BooleanArray),
quote!(::arrow::datatypes::DataType::Boolean),
quote!(::arrow::array::BooleanArray),
quote!(as_boolean()),
quote!(BooleanBuilder::with_capacity(capacity)),
quote!(BooleanBuilder),
Expand All @@ -192,11 +196,11 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream {
};

schema_fields.push(quote! {
Field::new(stringify!(#field_name), #mapped_type, #is_nullable),
::arrow::datatypes::Field::new(stringify!(#field_name), #mapped_type, #is_nullable),
});
field_names.push(quote! (#field_name,));
arrays_init_fields.push(quote! {
#field_name: Arc<#array_ty>,
#field_name: ::std::sync::Arc<#array_ty>,
});
builder_init_fields.push(quote! {
#field_name: #builder_with_capacity_method,
Expand All @@ -205,10 +209,10 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream {
#field_name: #builder,
});
builder_finish_fields.push(quote! {
let #field_name = Arc::new(self.#field_name.finish());
let #field_name = ::std::sync::Arc::new(self.#field_name.finish());
});
builder_as_any_fields.push(quote! {
Arc::clone(&#field_name) as Arc<dyn Array>,
::std::sync::Arc::clone(&#field_name) as ::std::sync::Arc<dyn ::arrow::array::Array>,
});
encode_method_fields.push(quote! {
self.#field_name.encode(writer).await?;
Expand Down Expand Up @@ -370,7 +374,7 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream {
#[derive(morseldb_marco::KeyAttributes, Debug, PartialEq, Eq, Clone)]
#ast

impl Record for #struct_name {
impl ::morseldb::record::Record for #struct_name {
type Columns = #struct_arrays_name;

type Key = #base_ty;
Expand All @@ -379,7 +383,7 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream {
where
Self: 'r;

fn key(&self) -> <<Self as Record>::Key as Key>::Ref<'_> {
fn key(&self) -> <<Self as ::morseldb::record::Record>::Key as ::morseldb::record::Key>::Ref<'_> {
&self.#primary_key_name
}

Expand All @@ -393,11 +397,11 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream {
}
}

fn arrow_schema() -> &'static Arc<Schema> {
static SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
Arc::new(Schema::new(vec![
Field::new("_null", DataType::Boolean, false),
Field::new("_ts", DataType::UInt32, false),
fn arrow_schema() -> &'static ::std::sync::Arc<::arrow::datatypes::Schema> {
static SCHEMA: ::once_cell::sync::Lazy<::std::sync::Arc<::arrow::datatypes::Schema>> = ::once_cell::sync::Lazy::new(|| {
::std::sync::Arc::new(::arrow::datatypes::Schema::new(vec![
::arrow::datatypes::Field::new("_null", ::arrow::datatypes::DataType::Boolean, false),
::arrow::datatypes::Field::new("_ts", ::arrow::datatypes::DataType::UInt32, false),
#(#schema_fields)*
]))
});
Expand All @@ -406,12 +410,12 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream {
}
}

impl Decode for #struct_name {
type Error = DecodeError<io::Error>;
impl ::morseldb::serdes::Decode for #struct_name {
type Error = ::std::io::Error;

async fn decode<R>(reader: &mut R) -> Result<Self, Self::Error>
where
R: futures_io::AsyncRead + Unpin,
R: ::futures_io::AsyncRead + Unpin,
{
#(#decode_method_fields)*

Expand All @@ -426,24 +430,26 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream {
#(#ref_fields)*
}

impl<'r> RecordRef<'r> for #struct_ref_name<'r> {
impl<'r> ::morseldb::record::RecordRef<'r> for #struct_ref_name<'r> {
type Record = #struct_name;

fn key(self) -> <<Self::Record as Record>::Key as crate::record::Key>::Ref<'r> {
fn key(self) -> <<Self::Record as ::morseldb::record::Record>::Key as ::morseldb::record::Key>::Ref<'r> {
self.#primary_key_name
}

fn from_record_batch(
record_batch: &'r RecordBatch,
record_batch: &'r ::arrow::record_batch::RecordBatch,
offset: usize,
projection_mask: &'r ProjectionMask,
) -> InternalRecordRef<'r, Self> {
projection_mask: &'r ::parquet::arrow::ProjectionMask,
) -> ::morseldb::record::internal::InternalRecordRef<'r, Self> {
use arrow::array::AsArray;

let mut column_i = 2;
let null = record_batch.column(0).as_boolean().value(offset);

let ts = record_batch
.column(1)
.as_primitive::<UInt32Type>()
.as_primitive::<::arrow::datatypes::UInt32Type>()
.value(offset)
.into();

Expand All @@ -452,16 +458,16 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream {
let record = TestRef {
#(#field_names)*
};
InternalRecordRef::new(ts, record, null)
::morseldb::record::internal::InternalRecordRef::new(ts, record, null)
}
}

impl<'r> Encode for #struct_ref_name<'r> {
type Error = EncodeError<io::Error>;
impl<'r> ::morseldb::serdes::Encode for #struct_ref_name<'r> {
type Error = ::std::io::Error;

async fn encode<W>(&self, writer: &mut W) -> Result<(), Self::Error>
where
W: io::AsyncWrite + Unpin,
W: ::futures_io::AsyncWrite + Unpin,
{
#(#encode_method_fields)*

Expand All @@ -475,15 +481,15 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream {

#[derive(Debug)]
pub struct #struct_arrays_name {
_null: Arc<BooleanArray>,
_ts: Arc<UInt32Array>,
_null: ::std::sync::Arc<::arrow::array::BooleanArray>,
_ts: ::std::sync::Arc<::arrow::array::UInt32Array>,

#(#arrays_init_fields)*

record_batch: RecordBatch,
record_batch: ::arrow::record_batch::RecordBatch,
}

impl ArrowArrays for #struct_arrays_name {
impl ::morseldb::inmem::immutable::ArrowArrays for #struct_arrays_name {
type Record = #struct_name;

type Builder = #struct_builder_name;
Expand All @@ -492,16 +498,16 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream {
TestBuilder {
#(#builder_init_fields)*

_null: BooleanBufferBuilder::new(capacity),
_ts: UInt32Builder::with_capacity(capacity),
_null: ::arrow::array::BooleanBufferBuilder::new(capacity),
_ts: ::arrow::array::UInt32Builder::with_capacity(capacity),
}
}

fn get(
&self,
offset: u32,
projection_mask: &ProjectionMask,
) -> Option<Option<<Self::Record as Record>::Ref<'_>>> {
projection_mask: &::parquet::arrow::ProjectionMask,
) -> Option<Option<<Self::Record as ::morseldb::record::Record>::Ref<'_>>> {
let offset = offset as usize;

if offset >= self.vstring.len() {
Expand All @@ -518,19 +524,19 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream {
}))
}

fn as_record_batch(&self) -> &RecordBatch {
fn as_record_batch(&self) -> &::arrow::record_batch::RecordBatch {
&self.record_batch
}
}

pub struct #struct_builder_name {
#(#builder_fields)*

_null: BooleanBufferBuilder,
_ts: UInt32Builder,
_null: ::arrow::array::BooleanBufferBuilder,
_ts: ::arrow::array::UInt32Builder,
}

impl Builder<TestImmutableArrays> for #struct_builder_name {
impl ::morseldb::inmem::immutable::Builder<TestImmutableArrays> for #struct_builder_name {
fn push(&mut self, key: Timestamped<&str>, row: Option<TestRef>) {
#builder_append_primary_key
match row {
Expand All @@ -556,15 +562,15 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream {
fn finish(&mut self) -> #struct_arrays_name {
#(#builder_finish_fields)*

let _null = Arc::new(BooleanArray::new(self._null.finish(), None));
let _ts = Arc::new(self._ts.finish());
let record_batch = RecordBatch::try_new(
Arc::clone(
<<#struct_arrays_name as ArrowArrays>::Record as Record>::arrow_schema(),
let _null = ::std::sync::Arc::new(::arrow::array::BooleanArray::new(self._null.finish(), None));
let _ts = ::std::sync::Arc::new(self._ts.finish());
let record_batch = ::arrow::record_batch::RecordBatch::try_new(
::std::sync::Arc::clone(
<<#struct_arrays_name as ::morseldb::inmem::immutable::ArrowArrays>::Record as ::morseldb::record::Record>::arrow_schema(),
),
vec![
Arc::clone(&_null) as Arc<dyn Array>,
Arc::clone(&_ts) as Arc<dyn Array>,
::std::sync::Arc::clone(&_null) as ::std::sync::Arc<dyn ::arrow::array::Array>,
::std::sync::Arc::clone(&_ts) as ::std::sync::Arc<dyn ::arrow::array::Array>,

#(#builder_as_any_fields)*
],
Expand Down
4 changes: 2 additions & 2 deletions src/record/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use super::{Key, Record, RecordRef};
use crate::timestamp::{Timestamp, Timestamped};

#[derive(Debug)]
pub(crate) struct InternalRecordRef<'r, R>
pub struct InternalRecordRef<'r, R>
where
R: RecordRef<'r>,
{
Expand All @@ -18,7 +18,7 @@ impl<'r, R> InternalRecordRef<'r, R>
where
R: RecordRef<'r>,
{
pub(crate) fn new(ts: Timestamp, record: R, null: bool) -> Self {
pub fn new(ts: Timestamp, record: R, null: bool) -> Self {
Self {
ts,
record,
Expand Down
2 changes: 1 addition & 1 deletion src/record/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub(crate) mod internal;
pub mod internal;
mod str;

use std::{fmt::Debug, hash::Hash, sync::Arc};
Expand Down

0 comments on commit 9e5ae4b

Please sign in to comment.