From 9dcb157253566be577ca0b4a3c7979df4a6c939d Mon Sep 17 00:00:00 2001 From: Kould Date: Tue, 16 Jul 2024 16:40:08 +0800 Subject: [PATCH 01/10] feat: impl `Version` & `VersionSet` --- Cargo.toml | 4 + src/executor.rs | 16 ++-- src/fs/mod.rs | 34 ++++++- src/fs/tokio.rs | 16 ++-- src/inmem/immutable.rs | 8 +- src/inmem/mutable.rs | 2 +- src/lib.rs | 110 +++++++++++++++++++---- src/ondisk/scan.rs | 18 ++-- src/ondisk/sstable.rs | 36 ++++---- src/record/mod.rs | 7 +- src/record/str.rs | 4 +- src/scope.rs | 124 ++++++++++++++++++++++++++ src/serdes/arc.rs | 37 ++++++++ src/serdes/boolean.rs | 34 +++++++ src/serdes/mod.rs | 42 +++++++++ src/serdes/num.rs | 49 +++++++++++ src/serdes/option.rs | 73 +++++++++++++++ src/serdes/string.rs | 42 +++++++++ src/stream/merge.rs | 33 ++++--- src/stream/mod.rs | 23 +++-- src/transaction.rs | 12 +-- src/version/cleaner.rs | 66 ++++++++++++++ src/version/edit.rs | 152 ++++++++++++++++++++++++++++++++ src/version/mod.rs | 196 +++++++++++++++++++++++++++++++++++++++++ src/version/set.rs | 139 +++++++++++++++++++++++++++++ 25 files changed, 1187 insertions(+), 90 deletions(-) create mode 100644 src/scope.rs create mode 100644 src/serdes/arc.rs create mode 100644 src/serdes/boolean.rs create mode 100644 src/serdes/mod.rs create mode 100644 src/serdes/num.rs create mode 100644 src/serdes/option.rs create mode 100644 src/serdes/string.rs create mode 100644 src/version/cleaner.rs create mode 100644 src/version/edit.rs create mode 100644 src/version/mod.rs create mode 100644 src/version/set.rs diff --git a/Cargo.toml b/Cargo.toml index 7fe966ad..449eaf72 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,8 @@ arrow = "52" async-lock = "3" crossbeam-skiplist = "0.1" futures-core = "0.3" +futures-channel = "0.3" +futures-executor = "0.3" futures-io = "0.3" futures-util = "0.3" once_cell = "1" @@ -20,6 +22,8 @@ pin-project-lite = "0.2" thiserror = "1" tokio = { version = "1", optional = true } tokio-util = { version = "0.7", features = ["compat"] } +tracing = "0.1" +ulid = "1" [dev-dependencies] tempfile = "3" diff --git a/src/executor.rs b/src/executor.rs index 1f26139d..886130b2 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -1,6 +1,8 @@ use std::future::Future; -pub trait Executor { +use crate::fs::Fs; + +pub trait Executor: Fs { fn spawn(&self, future: F) where F: Future + Send + 'static; @@ -10,15 +12,19 @@ pub trait Executor { pub mod tokio { use std::future::Future; + use tokio::runtime::Handle; + use super::Executor; pub struct TokioExecutor { - tokio: tokio::runtime::Runtime, + handle: Handle, } impl TokioExecutor { - pub fn new(tokio: tokio::runtime::Runtime) -> Self { - Self { tokio } + pub fn new() -> Self { + Self { + handle: Handle::current(), + } } } @@ -27,7 +33,7 @@ pub mod tokio { where F: Future + Send + 'static, { - self.tokio.spawn(future); + self.handle.spawn(future); } } } diff --git a/src/fs/mod.rs b/src/fs/mod.rs index a3a23757..51adde4e 100644 --- a/src/fs/mod.rs +++ b/src/fs/mod.rs @@ -1,16 +1,42 @@ #[cfg(any(feature = "tokio", test))] pub mod tokio; -use std::{future::Future, io, path::Path}; +use std::{ + fmt::{Display, Formatter}, + future::Future, + io, + path::Path, +}; use futures_io::{AsyncRead, AsyncSeek, AsyncWrite}; +use ulid::Ulid; -pub trait AsyncFile: AsyncRead + AsyncWrite + AsyncSeek + Send + Unpin + 'static {} +pub(crate) type FileId = Ulid; -impl AsyncFile for T where T: AsyncRead + AsyncWrite + AsyncSeek + Send + Unpin + 'static {} +pub enum FileType { + WAL, + PARQUET, + LOG, +} + +pub trait AsyncFile: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static {} + +impl AsyncFile for T where T: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static {} pub trait Fs { type File: AsyncFile; - fn open(&self, path: impl AsRef) -> impl Future>; + fn open(path: impl AsRef) -> impl Future>; + + fn remove(path: impl AsRef) -> impl Future>; +} + +impl Display for FileType { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + FileType::WAL => write!(f, "wal"), + FileType::PARQUET => write!(f, "parquet"), + FileType::LOG => write!(f, "log"), + } + } } diff --git a/src/fs/tokio.rs b/src/fs/tokio.rs index 9c41afe7..e7fc7bdd 100644 --- a/src/fs/tokio.rs +++ b/src/fs/tokio.rs @@ -1,17 +1,21 @@ use std::{io, path::Path}; +use tokio::fs::{remove_file, File}; use tokio_util::compat::{Compat, TokioAsyncReadCompatExt}; use super::Fs; +use crate::executor::tokio::TokioExecutor; -pub struct TokioFs; +impl Fs for TokioExecutor { + type File = Compat; -impl Fs for TokioFs { - type File = Compat; - - async fn open(&self, path: impl AsRef) -> io::Result { - tokio::fs::File::create_new(path) + async fn open(path: impl AsRef) -> io::Result { + File::create_new(path) .await .map(TokioAsyncReadCompatExt::compat) } + + async fn remove(path: impl AsRef) -> io::Result<()> { + remove_file(path).await + } } diff --git a/src/inmem/immutable.rs b/src/inmem/immutable.rs index 509a69a6..56725dfd 100644 --- a/src/inmem/immutable.rs +++ b/src/inmem/immutable.rs @@ -19,9 +19,9 @@ use crate::{ pub trait ArrowArrays: Sized { type Record: Record; - type Buider: Builder; + type Builder: Builder; - fn builder(capacity: usize) -> Self::Buider; + fn builder(capacity: usize) -> Self::Builder; fn get(&self, offset: u32) -> Option::Ref<'_>>>; @@ -167,9 +167,9 @@ pub(crate) mod tests { impl ArrowArrays for TestImmutableArrays { type Record = Test; - type Buider = TestBuilder; + type Builder = TestBuilder; - fn builder(capacity: usize) -> Self::Buider { + fn builder(capacity: usize) -> Self::Builder { TestBuilder { vstring: StringBuilder::with_capacity(capacity, 0), vu32: PrimitiveBuilder::::with_capacity(capacity), diff --git a/src/inmem/mutable.rs b/src/inmem/mutable.rs index 9e19ea16..7a5b8a83 100644 --- a/src/inmem/mutable.rs +++ b/src/inmem/mutable.rs @@ -114,7 +114,7 @@ where #[cfg(test)] mod tests { - use std::collections::Bound; + use std::ops::Bound; use super::Mutable; use crate::{ diff --git a/src/lib.rs b/src/lib.rs index d1f22fe6..c9fc95ba 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,15 +1,20 @@ #![allow(dead_code)] pub(crate) mod arrows; -mod executor; +pub mod executor; pub mod fs; mod inmem; mod ondisk; mod oracle; mod record; +mod scope; +pub mod serdes; mod stream; mod transaction; +mod version; -use std::{collections::VecDeque, io, mem, ops::Bound, sync::Arc}; +use std::{ + collections::VecDeque, io, marker::PhantomData, mem, ops::Bound, path::PathBuf, sync::Arc, +}; use async_lock::{RwLock, RwLockReadGuard}; use futures_core::Stream; @@ -18,31 +23,99 @@ use inmem::{immutable::Immutable, mutable::Mutable}; use oracle::Timestamp; use parquet::errors::ParquetError; use record::Record; -use stream::{merge::MergeStream, Entry, ScanStream}; -pub struct DB +use crate::{ + executor::Executor, + fs::{FileId, FileType}, + stream::{merge::MergeStream, Entry, ScanStream}, + version::Version, +}; + +#[derive(Debug)] +pub struct DbOption { + pub path: PathBuf, + pub max_mem_table_size: usize, + pub immutable_chunk_num: usize, + pub major_threshold_with_sst_size: usize, + pub level_sst_magnification: usize, + pub max_sst_file_size: usize, + pub clean_channel_buffer: usize, +} + +pub struct DB where R: Record, + E: Executor, { schema: Arc>>, + _p: PhantomData, } -impl Default for DB +impl DbOption { + pub fn new(path: impl Into + Send) -> Self { + DbOption { + path: path.into(), + max_mem_table_size: 8 * 1024 * 1024, + immutable_chunk_num: 3, + major_threshold_with_sst_size: 10, + level_sst_magnification: 10, + max_sst_file_size: 24 * 1024 * 1024, + clean_channel_buffer: 10, + } + } + + pub(crate) fn table_path(&self, gen: &FileId) -> PathBuf { + self.path.join(format!("{}.{}", gen, FileType::PARQUET)) + } + + pub(crate) fn wal_path(&self, gen: &FileId) -> PathBuf { + self.path.join(format!("{}.{}", gen, FileType::WAL)) + } + + pub(crate) fn version_path(&self) -> PathBuf { + self.path.join(format!("version.{}", FileType::LOG)) + } + + pub(crate) fn is_threshold_exceeded_major( + &self, + version: &Version, + level: usize, + ) -> bool + where + R: Record, + E: Executor, + { + Version::::tables_len(version, level) + >= (self.major_threshold_with_sst_size * self.level_sst_magnification.pow(level as u32)) + } +} + +impl Default for DB where R: Record, + E: Executor, { fn default() -> Self { Self { schema: Arc::new(RwLock::new(Schema::default())), + _p: Default::default(), } } } -impl DB +impl DB where R: Record + Send + Sync, - R::Key: Send, + R::Key: Send + Sync, + E: Executor, { + pub fn empty() -> Self { + Self { + schema: Arc::new(RwLock::new(Schema::default())), + _p: Default::default(), + } + } + pub(crate) async fn write(&self, record: R, ts: Timestamp) -> io::Result<()> { let columns = self.schema.read().await; columns.write(record, ts).await @@ -95,25 +168,31 @@ where Ok(()) } - async fn get<'get>( + async fn get<'get, E>( &'get self, key: &'get R::Key, ts: Timestamp, - ) -> Result>, ParquetError> { - self.scan(Bound::Included(key), Bound::Unbounded, ts) + ) -> Result>, ParquetError> + where + E: Executor, + { + self.scan::(Bound::Included(key), Bound::Unbounded, ts) .await? .next() .await .transpose() } - async fn scan<'scan>( + async fn scan<'scan, E>( &'scan self, lower: Bound<&'scan R::Key>, uppwer: Bound<&'scan R::Key>, ts: Timestamp, - ) -> Result, ParquetError>>, ParquetError> { - let mut streams = Vec::>::with_capacity(self.immutables.len() + 1); + ) -> Result, ParquetError>>, ParquetError> + where + E: Executor, + { + let mut streams = Vec::>::with_capacity(self.immutables.len() + 1); streams.push(self.mutable.scan((lower, uppwer), ts).into()); for immutable in &self.immutables { streams.push(immutable.scan((lower, uppwer), ts).into()); @@ -141,6 +220,7 @@ pub(crate) mod tests { use once_cell::sync::Lazy; use crate::{ + executor::Executor, inmem::immutable::tests::TestImmutableArrays, record::{internal::InternalRecordRef, RecordRef}, Record, DB, @@ -227,8 +307,8 @@ pub(crate) mod tests { } } - pub(crate) async fn get_test_record_batch() -> RecordBatch { - let db = DB::default(); + pub(crate) async fn get_test_record_batch() -> RecordBatch { + let db: DB = DB::empty(); db.write( Test { diff --git a/src/ondisk/scan.rs b/src/ondisk/scan.rs index 5e5abc40..4508be6e 100644 --- a/src/ondisk/scan.rs +++ b/src/ondisk/scan.rs @@ -9,32 +9,36 @@ use pin_project_lite::pin_project; use tokio_util::compat::Compat; use crate::{ - fs::AsyncFile, + executor::Executor, record::Record, stream::record_batch::{RecordBatchEntry, RecordBatchIterator}, }; pin_project! { #[derive(Debug)] - pub struct SsTableScan { + pub struct SsTableScan + where + E: Executor + { #[pin] - stream: ParquetRecordBatchStream>>, + stream: ParquetRecordBatchStream>, iter: Option>, } } -impl SsTableScan +impl SsTableScan where - R: Record, + E: Executor, { - pub fn new(stream: ParquetRecordBatchStream>>) -> Self { + pub fn new(stream: ParquetRecordBatchStream>) -> Self { SsTableScan { stream, iter: None } } } -impl Stream for SsTableScan +impl Stream for SsTableScan where R: Record, + E: Executor, { type Item = Result, parquet::errors::ParquetError>; diff --git a/src/ondisk/sstable.rs b/src/ondisk/sstable.rs index 70e9c42b..334281e5 100644 --- a/src/ondisk/sstable.rs +++ b/src/ondisk/sstable.rs @@ -17,32 +17,34 @@ use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt}; use super::scan::SsTableScan; use crate::{ arrows::get_range_filter, - fs::AsyncFile, + executor::Executor, oracle::{timestamp::TimestampedRef, Timestamp}, record::Record, stream::record_batch::RecordBatchEntry, }; -struct SsTable +pub(crate) struct SsTable where R: Record, + E: Executor, { - file: Box, - _marker: PhantomData, + file: E::File, + _marker: PhantomData<(R, E)>, } -impl SsTable +impl SsTable where R: Record, + E: Executor, { - fn open(file: Box) -> Self { + pub(crate) fn open(file: E::File) -> Self { SsTable { file, _marker: PhantomData, } } - fn create_writer(&mut self) -> AsyncArrowWriter>> { + fn create_writer(&mut self) -> AsyncArrowWriter> { // TODO: expose writer options let options = ArrowWriterOptions::new().with_properties( WriterProperties::builder() @@ -73,7 +75,7 @@ where async fn into_parquet_builder( self, limit: usize, - ) -> parquet::errors::Result>>>> { + ) -> parquet::errors::Result>>> { Ok(ParquetRecordBatchStreamBuilder::new_with_options( self.file.compat(), ArrowReaderOptions::default().with_page_index(true), @@ -82,7 +84,7 @@ where .with_limit(limit)) } - async fn get( + pub(crate) async fn get( self, key: &TimestampedRef, ) -> parquet::errors::Result>> { @@ -97,7 +99,7 @@ where self, range: (Bound<&'scan R::Key>, Bound<&'scan R::Key>), ts: Timestamp, - ) -> Result, parquet::errors::ParquetError> { + ) -> Result, parquet::errors::ParquetError> { let builder = self.into_parquet_builder(1).await?; let schema_descriptor = builder.metadata().file_metadata().schema_descr(); @@ -113,7 +115,8 @@ mod tests { use super::SsTable; use crate::{ - fs::{tokio::TokioFs, AsyncFile, Fs}, + executor::tokio::TokioExecutor, + fs::Fs, oracle::timestamp::Timestamped, tests::{get_test_record_batch, Test}, }; @@ -121,12 +124,11 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn write_sstable() { let temp_dir = tempfile::tempdir().unwrap(); - let file = temp_dir.path().join("test.parquet"); - - let file = TokioFs {}.open(&file).await.unwrap(); - let mut sstable = SsTable::::open(Box::new(file) as Box); - - let record_batch = get_test_record_batch().await; + let record_batch = get_test_record_batch::().await; + let file = TokioExecutor::open(&temp_dir.path().join("test.parquet")) + .await + .unwrap(); + let mut sstable = SsTable::::open(file); sstable.write(record_batch).await.unwrap(); diff --git a/src/record/mod.rs b/src/record/mod.rs index 0ceff140..79fc7eed 100644 --- a/src/record/mod.rs +++ b/src/record/mod.rs @@ -9,9 +9,12 @@ use arrow::{ }; use internal::InternalRecordRef; -use crate::inmem::immutable::ArrowArrays; +use crate::{ + inmem::immutable::ArrowArrays, + serdes::{Decode, Encode}, +}; -pub trait Key: 'static + Ord + Send + Sync { +pub trait Key: 'static + Encode + Decode + Ord + Clone + Send + Sync { type Ref<'r>: KeyRef<'r, Key = Self> + Copy where Self: 'r; diff --git a/src/record/str.rs b/src/record/str.rs index a50411da..deea23cd 100644 --- a/src/record/str.rs +++ b/src/record/str.rs @@ -97,9 +97,9 @@ pub struct StringColumns { impl ArrowArrays for StringColumns { type Record = String; - type Buider = StringColumnsBuilder; + type Builder = StringColumnsBuilder; - fn builder(capacity: usize) -> Self::Buider { + fn builder(capacity: usize) -> Self::Builder { StringColumnsBuilder { _null: BooleanBufferBuilder::new(capacity), _ts: UInt32Builder::with_capacity(capacity), diff --git a/src/scope.rs b/src/scope.rs new file mode 100644 index 00000000..e6bf93aa --- /dev/null +++ b/src/scope.rs @@ -0,0 +1,124 @@ +use futures_io::{AsyncRead, AsyncWrite}; +use futures_util::{AsyncReadExt, AsyncWriteExt}; + +use crate::{ + fs::FileId, + serdes::{Decode, Encode}, +}; + +#[derive(Debug, Eq, PartialEq)] +pub struct Scope +where + K: Encode + Decode + Ord + Clone, +{ + pub(crate) min: K, + pub(crate) max: K, + pub(crate) gen: FileId, + pub(crate) wal_ids: Option>, +} + +impl Clone for Scope +where + K: Encode + Decode + Ord + Clone, +{ + fn clone(&self) -> Self { + Scope { + min: self.min.clone(), + max: self.max.clone(), + gen: self.gen, + wal_ids: self.wal_ids.clone(), + } + } +} + +impl Scope +where + K: Encode + Decode + Ord + Clone, +{ + pub(crate) fn is_between(&self, key: &K) -> bool { + self.min.le(key) && self.max.ge(key) + } + + pub(crate) fn is_meet(&self, target: &Scope) -> bool { + (self.min.le(&target.min) && self.max.ge(&target.min)) + || (self.min.le(&target.max) && self.max.ge(&target.max)) + || (self.min.le(&target.min)) && self.max.ge(&target.max) + || (self.min.ge(&target.min)) && self.max.le(&target.max) + } +} + +impl Encode for Scope +where + K: Encode + Decode + Ord + Clone, +{ + type Error = ::Error; + + async fn encode( + &self, + writer: &mut W, + ) -> Result<(), Self::Error> { + self.min.encode(writer).await?; + self.max.encode(writer).await?; + + writer.write_all(&self.gen.to_bytes()).await?; + + match &self.wal_ids { + None => { + 0u8.encode(writer).await?; + } + Some(ids) => { + 1u8.encode(writer).await?; + (ids.len() as u32).encode(writer).await?; + for id in ids { + writer.write_all(&id.to_bytes()).await?; + } + } + } + Ok(()) + } + + fn size(&self) -> usize { + // ProcessUniqueId: usize + u64 + self.min.size() + self.max.size() + 16 + } +} + +impl Decode for Scope +where + K: Encode + Decode + Ord + Clone, +{ + type Error = ::Error; + + async fn decode(reader: &mut R) -> Result { + let min = K::decode(reader).await?; + let max = K::decode(reader).await?; + + let gen = { + let mut slice = [0; 16]; + reader.read_exact(&mut slice).await?; + FileId::from_bytes(slice) + }; + let wal_ids = match u8::decode(reader).await? { + 0 => None, + 1 => { + let len = u32::decode(reader).await? as usize; + let mut ids = Vec::with_capacity(len); + + for _ in 0..len { + let mut slice = [0; 16]; + reader.read_exact(&mut slice).await?; + ids.push(FileId::from_bytes(slice)); + } + Some(ids) + } + _ => unreachable!(), + }; + + Ok(Scope { + min, + max, + gen, + wal_ids, + }) + } +} diff --git a/src/serdes/arc.rs b/src/serdes/arc.rs new file mode 100644 index 00000000..8d45bc84 --- /dev/null +++ b/src/serdes/arc.rs @@ -0,0 +1,37 @@ +use std::sync::Arc; + +use futures_io::{AsyncRead, AsyncWrite}; + +use super::{Decode, Encode}; + +impl Decode for Arc +where + T: Decode, +{ + type Error = T::Error; + + async fn decode(reader: &mut R) -> Result + where + R: AsyncRead + Unpin, + { + Ok(Arc::from(T::decode(reader).await?)) + } +} + +impl Encode for Arc +where + T: Encode + Send + Sync, +{ + type Error = T::Error; + + async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> + where + W: AsyncWrite + Unpin + Send + Sync, + { + self.as_ref().encode(writer).await + } + + fn size(&self) -> usize { + Encode::size(self.as_ref()) + } +} diff --git a/src/serdes/boolean.rs b/src/serdes/boolean.rs new file mode 100644 index 00000000..f8cc9e43 --- /dev/null +++ b/src/serdes/boolean.rs @@ -0,0 +1,34 @@ +use std::{io, mem::size_of}; + +use futures_io::{AsyncRead, AsyncWrite}; +use futures_util::{AsyncReadExt, AsyncWriteExt}; + +use crate::serdes::{Decode, Encode}; + +impl Encode for bool { + type Error = io::Error; + + async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> { + writer + .write_all(&if *self { 1u8 } else { 0u8 }.to_le_bytes()) + .await + } + + fn size(&self) -> usize { + size_of::() + } +} + +impl Decode for bool { + type Error = io::Error; + + async fn decode(reader: &mut R) -> Result { + let buf = { + let mut buf = [0; size_of::()]; + reader.read_exact(&mut buf).await?; + buf + }; + + Ok(u8::from_le_bytes(buf) == 1u8) + } +} diff --git a/src/serdes/mod.rs b/src/serdes/mod.rs new file mode 100644 index 00000000..beb9e6b0 --- /dev/null +++ b/src/serdes/mod.rs @@ -0,0 +1,42 @@ +mod arc; +mod boolean; +mod num; +mod option; +mod string; + +use std::{future::Future, io}; + +use futures_io::{AsyncRead, AsyncWrite}; + +pub trait Encode: Send + Sync { + type Error: From + std::error::Error + Send + Sync + 'static; + + fn encode(&self, writer: &mut W) -> impl Future> + Send + where + W: AsyncWrite + Unpin + Send + Sync; + + fn size(&self) -> usize; +} + +impl Encode for &T { + type Error = T::Error; + + async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> + where + W: AsyncWrite + Unpin + Send + Sync, + { + Encode::encode(*self, writer).await + } + + fn size(&self) -> usize { + Encode::size(*self) + } +} + +pub trait Decode: Sized { + type Error: From + std::error::Error + Send + Sync + 'static; + + fn decode(reader: &mut R) -> impl Future> + where + R: AsyncRead + Unpin; +} diff --git a/src/serdes/num.rs b/src/serdes/num.rs new file mode 100644 index 00000000..de206920 --- /dev/null +++ b/src/serdes/num.rs @@ -0,0 +1,49 @@ +use std::{io, mem::size_of}; + +use futures_io::{AsyncRead, AsyncWrite}; +use futures_util::{AsyncReadExt, AsyncWriteExt}; + +use super::{Decode, Encode}; + +#[macro_export] +macro_rules! implement_encode_decode { + ($struct_name:ident) => { + impl Encode for $struct_name { + type Error = io::Error; + + async fn encode( + &self, + writer: &mut W, + ) -> Result<(), Self::Error> { + writer.write_all(&self.to_le_bytes()).await + } + + fn size(&self) -> usize { + size_of::() + } + } + + impl Decode for $struct_name { + type Error = io::Error; + + async fn decode(reader: &mut R) -> Result { + let buf = { + let mut buf = [0; size_of::()]; + reader.read_exact(&mut buf).await?; + buf + }; + + Ok(Self::from_le_bytes(buf)) + } + } + }; +} + +implement_encode_decode!(i8); +implement_encode_decode!(i16); +implement_encode_decode!(i32); +implement_encode_decode!(i64); +implement_encode_decode!(u8); +implement_encode_decode!(u16); +implement_encode_decode!(u32); +implement_encode_decode!(u64); diff --git a/src/serdes/option.rs b/src/serdes/option.rs new file mode 100644 index 00000000..397e4397 --- /dev/null +++ b/src/serdes/option.rs @@ -0,0 +1,73 @@ +use std::io; + +use futures_io::{AsyncRead, AsyncWrite}; +use futures_util::{AsyncReadExt, AsyncWriteExt}; +use thiserror::Error; + +use super::{Decode, Encode}; + +#[derive(Debug, Error)] +#[error("option encode error")] +pub enum EncodeError +where + E: std::error::Error, +{ + #[error("io error: {0}")] + Io(#[from] io::Error), + #[error("inner error: {0}")] + Inner(#[source] E), +} + +#[derive(Debug, Error)] +#[error("option decode error")] +pub enum DecodeError +where + E: std::error::Error, +{ + #[error("io error: {0}")] + Io(#[from] io::Error), + #[error("inner error: {0}")] + Inner(#[source] E), +} + +impl Encode for Option +where + V: Encode + Sync, +{ + type Error = EncodeError; + + async fn encode( + &self, + writer: &mut W, + ) -> Result<(), Self::Error> { + match self { + None => writer.write_all(&[0]).await?, + Some(v) => { + writer.write_all(&[1]).await?; + v.encode(writer).await.map_err(EncodeError::Inner)?; + } + } + Ok(()) + } + + fn size(&self) -> usize { + 1 + } +} + +impl Decode for Option +where + V: Decode, +{ + type Error = DecodeError; + + async fn decode(reader: &mut R) -> Result { + let mut o = [0]; + reader.read_exact(&mut o).await?; + match o[0] { + 0 => Ok(None), + 1 => Ok(Some(V::decode(reader).await.map_err(DecodeError::Inner)?)), + _ => panic!("invalid option tag"), + } + } +} diff --git a/src/serdes/string.rs b/src/serdes/string.rs new file mode 100644 index 00000000..3b9ea2da --- /dev/null +++ b/src/serdes/string.rs @@ -0,0 +1,42 @@ +use std::{io, mem::size_of}; + +use futures_io::{AsyncRead, AsyncWrite}; +use futures_util::{AsyncReadExt, AsyncWriteExt}; + +use super::{Decode, Encode}; + +impl Encode for String { + type Error = io::Error; + + async fn encode( + &self, + writer: &mut W, + ) -> Result<(), Self::Error> { + writer.write_all(&(self.len() as u16).to_le_bytes()).await?; + writer.write_all(self.as_bytes()).await + } + + fn size(&self) -> usize { + size_of::() + self.len() + } +} + +impl Decode for String { + type Error = io::Error; + + async fn decode(reader: &mut R) -> Result { + let len = { + let mut len = [0; size_of::()]; + reader.read_exact(&mut len).await?; + u16::from_le_bytes(len) as usize + }; + + let vec = { + let mut vec = vec![0; len]; + reader.read_exact(&mut vec).await?; + vec + }; + + Ok(unsafe { String::from_utf8_unchecked(vec) }) + } +} diff --git a/src/stream/merge.rs b/src/stream/merge.rs index bb1d8de3..2734572b 100644 --- a/src/stream/merge.rs +++ b/src/stream/merge.rs @@ -10,25 +10,27 @@ use futures_util::stream::StreamExt; use pin_project_lite::pin_project; use super::{Entry, ScanStream}; -use crate::record::Record; +use crate::{executor::Executor, record::Record}; pin_project! { - pub(crate) struct MergeStream<'merge, R> + pub(crate) struct MergeStream<'merge, R, E> where R: Record, + E: Executor, { - streams: Vec>, + streams: Vec>, peeked: BinaryHeap>, buf: Option>, } } -impl<'merge, R> MergeStream<'merge, R> +impl<'merge, R, E> MergeStream<'merge, R, E> where R: Record, + E: Executor, { pub(crate) async fn from_vec( - mut streams: Vec>, + mut streams: Vec>, ) -> Result { let mut peeked = BinaryHeap::with_capacity(streams.len()); @@ -49,9 +51,10 @@ where } } -impl<'merge, R> Stream for MergeStream<'merge, R> +impl<'merge, R, E> Stream for MergeStream<'merge, R, E> where R: Record, + E: Executor, { type Item = Result, parquet::errors::ParquetError>; @@ -135,7 +138,7 @@ mod tests { use futures_util::StreamExt; use super::MergeStream; - use crate::inmem::mutable::Mutable; + use crate::{executor::tokio::TokioExecutor, inmem::mutable::Mutable}; #[tokio::test] async fn merge_mutable() { @@ -155,7 +158,7 @@ mod tests { let lower = "a".to_string(); let upper = "e".to_string(); let bound = (Bound::Included(&lower), Bound::Included(&upper)); - let mut merge = MergeStream::from_vec(vec![ + let mut merge = MergeStream::::from_vec(vec![ m1.scan(bound, 6.into()).into(), m2.scan(bound, 6.into()).into(), m3.scan(bound, 6.into()).into(), @@ -183,9 +186,10 @@ mod tests { let lower = "1".to_string(); let upper = "4".to_string(); let bound = (Bound::Included(&lower), Bound::Included(&upper)); - let mut merge = MergeStream::from_vec(vec![m1.scan(bound, 0.into()).into()]) - .await - .unwrap(); + let mut merge = + MergeStream::::from_vec(vec![m1.scan(bound, 0.into()).into()]) + .await + .unwrap(); dbg!(merge.next().await); dbg!(merge.next().await); @@ -194,9 +198,10 @@ mod tests { let lower = "1".to_string(); let upper = "4".to_string(); let bound = (Bound::Included(&lower), Bound::Included(&upper)); - let mut merge = MergeStream::from_vec(vec![m1.scan(bound, 1.into()).into()]) - .await - .unwrap(); + let mut merge = + MergeStream::::from_vec(vec![m1.scan(bound, 1.into()).into()]) + .await + .unwrap(); dbg!(merge.next().await); dbg!(merge.next().await); diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 819601aa..75c1bcf6 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -14,6 +14,7 @@ use pin_project_lite::pin_project; use record_batch::RecordBatchEntry; use crate::{ + executor::Executor, inmem::{immutable::ImmutableScan, mutable::MutableScan}, ondisk::scan::SsTableScan, oracle::timestamp::Timestamped, @@ -65,9 +66,10 @@ where pin_project! { #[project = ScanStreamProject] - pub enum ScanStream<'scan, R> + pub enum ScanStream<'scan, R, E> where R: Record, + E: Executor, { Mutable { #[pin] @@ -79,14 +81,15 @@ pin_project! { }, SsTable { #[pin] - inner: SsTableScan, + inner: SsTableScan, }, } } -impl<'scan, R> From> for ScanStream<'scan, R> +impl<'scan, R, E> From> for ScanStream<'scan, R, E> where R: Record, + E: Executor, { fn from(inner: MutableScan<'scan, R>) -> Self { ScanStream::Mutable { @@ -95,9 +98,10 @@ where } } -impl<'scan, R> From> for ScanStream<'scan, R> +impl<'scan, R, E> From> for ScanStream<'scan, R, E> where R: Record, + E: Executor, { fn from(inner: ImmutableScan<'scan, R>) -> Self { ScanStream::Immutable { @@ -106,18 +110,20 @@ where } } -impl<'scan, R> From> for ScanStream<'scan, R> +impl<'scan, R, E> From> for ScanStream<'scan, R, E> where R: Record, + E: Executor, { - fn from(inner: SsTableScan) -> Self { + fn from(inner: SsTableScan) -> Self { ScanStream::SsTable { inner } } } -impl fmt::Debug for ScanStream<'_, R> +impl fmt::Debug for ScanStream<'_, R, E> where R: Record, + E: Executor, { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { @@ -128,9 +134,10 @@ where } } -impl<'scan, R> Stream for ScanStream<'scan, R> +impl<'scan, R, E> Stream for ScanStream<'scan, R, E> where R: Record, + E: Executor, { type Item = Result, parquet::errors::ParquetError>; diff --git a/src/transaction.rs b/src/transaction.rs index f7054b83..20fdd367 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -1,21 +1,23 @@ use std::{collections::BTreeMap, io, sync::Arc}; -use crate::{oracle::Timestamp, Record, DB}; +use crate::{executor::Executor, oracle::Timestamp, Record, DB}; -pub struct Transaction +pub struct Transaction where R: Record, + E: Executor, { - db: Arc>, + db: Arc>, read_at: Timestamp, local: BTreeMap>, } -impl Transaction +impl Transaction where R: Record, + E: Executor, { - pub(crate) fn new(db: Arc>, read_at: Timestamp) -> Self { + pub(crate) fn new(db: Arc>, read_at: Timestamp) -> Self { Self { db, read_at, diff --git a/src/version/cleaner.rs b/src/version/cleaner.rs new file mode 100644 index 00000000..daeb8bb9 --- /dev/null +++ b/src/version/cleaner.rs @@ -0,0 +1,66 @@ +use std::{collections::BTreeMap, fs, io, sync::Arc}; + +use futures_channel::mpsc::{channel, Receiver, Sender}; +use futures_util::StreamExt; + +use crate::{fs::FileId, DbOption}; + +pub(crate) enum CleanTag { + Add { + version_num: usize, + gens: Vec, + }, + Clean { + version_num: usize, + }, +} + +pub(crate) struct Cleaner { + tag_recv: Receiver, + gens_map: BTreeMap, bool)>, + option: Arc, +} + +impl Cleaner { + pub(crate) fn new(option: Arc) -> (Self, Sender) { + let (tag_send, tag_recv) = channel(option.clean_channel_buffer); + + ( + Cleaner { + tag_recv, + gens_map: Default::default(), + option, + }, + tag_send, + ) + } + + pub(crate) async fn listen(&mut self) -> Result<(), io::Error> { + loop { + match self.tag_recv.next().await { + None => break, + Some(CleanTag::Add { version_num, gens }) => { + let _ = self.gens_map.insert(version_num, (gens, false)); + } + Some(CleanTag::Clean { version_num }) => { + if let Some((_, dropped)) = self.gens_map.get_mut(&version_num) { + *dropped = true; + } + while let Some((first_version, (gens, dropped))) = self.gens_map.pop_first() { + if !dropped { + let _ = self.gens_map.insert(first_version, (gens, false)); + continue; + } + for gen in gens { + fs::remove_file(self.option.table_path(&gen))?; + } + } + } + } + } + + Ok(()) + } +} + +// TODO: TestCase diff --git a/src/version/edit.rs b/src/version/edit.rs new file mode 100644 index 00000000..2133864f --- /dev/null +++ b/src/version/edit.rs @@ -0,0 +1,152 @@ +use std::mem::size_of; + +use futures_io::{AsyncRead, AsyncWrite}; +use futures_util::{AsyncReadExt, AsyncWriteExt}; + +use crate::{ + fs::FileId, + scope::Scope, + serdes::{Decode, Encode}, +}; + +#[derive(Debug, Clone, Eq, PartialEq)] +pub(crate) enum VersionEdit +where + K: Encode + Decode + Ord + Clone, +{ + Add { level: u8, scope: Scope }, + Remove { level: u8, gen: FileId }, +} + +impl VersionEdit +where + K: Encode + Decode + Ord + Clone, +{ + pub(crate) async fn recover(reader: &mut R) -> Vec> { + let mut edits = Vec::new(); + + while let Ok(edit) = VersionEdit::decode(reader).await { + edits.push(edit) + } + edits + } +} + +impl Encode for VersionEdit +where + K: Encode + Decode + Ord + Clone, +{ + type Error = ::Error; + + async fn encode( + &self, + writer: &mut W, + ) -> Result<(), Self::Error> { + match self { + VersionEdit::Add { scope, level } => { + writer.write_all(&0u8.to_le_bytes()).await?; + writer.write_all(&level.to_le_bytes()).await?; + scope.encode(writer).await?; + } + VersionEdit::Remove { gen, level } => { + writer.write_all(&1u8.to_le_bytes()).await?; + writer.write_all(&level.to_le_bytes()).await?; + writer.write_all(&gen.to_bytes()).await?; + } + } + + Ok(()) + } + + fn size(&self) -> usize { + size_of::() + + size_of::() + + match self { + VersionEdit::Add { scope, .. } => scope.size(), + VersionEdit::Remove { .. } => 16, + } + } +} + +impl Decode for VersionEdit +where + K: Encode + Decode + Ord + Clone, +{ + type Error = ::Error; + + async fn decode(reader: &mut R) -> Result { + let edit_type = { + let mut len = [0; size_of::()]; + reader.read_exact(&mut len).await?; + u8::from_le_bytes(len) as usize + }; + let level = { + let mut level = [0; size_of::()]; + reader.read_exact(&mut level).await?; + u8::from_le_bytes(level) + }; + + Ok(match edit_type { + 0 => { + let scope = Scope::::decode(reader).await?; + + VersionEdit::Add { level, scope } + } + 1 => { + let gen = { + let mut slice = [0; 16]; + reader.read_exact(&mut slice).await?; + FileId::from_bytes(slice) + }; + VersionEdit::Remove { level, gen } + } + _ => todo!(), + }) + } +} + +#[cfg(test)] +mod tests { + use futures_executor::block_on; + use futures_util::io::Cursor; + + use crate::{fs::FileId, scope::Scope, serdes::Encode, version::edit::VersionEdit}; + + #[test] + fn encode_and_decode() { + block_on(async { + let edits = vec![ + VersionEdit::Add { + level: 0, + scope: Scope { + min: "Min".to_string(), + max: "Max".to_string(), + gen: Default::default(), + wal_ids: Some(vec![FileId::new(), FileId::new()]), + }, + }, + VersionEdit::Remove { + level: 1, + gen: Default::default(), + }, + ]; + + let bytes = { + let mut cursor = Cursor::new(vec![]); + + for edit in edits.clone() { + edit.encode(&mut cursor).await.unwrap(); + } + cursor.into_inner() + }; + + let decode_edits = { + let mut cursor = Cursor::new(bytes); + + VersionEdit::::recover(&mut cursor).await + }; + + assert_eq!(edits, decode_edits); + }) + } +} diff --git a/src/version/mod.rs b/src/version/mod.rs new file mode 100644 index 00000000..6dd364be --- /dev/null +++ b/src/version/mod.rs @@ -0,0 +1,196 @@ +mod cleaner; +mod edit; +mod set; + +use std::{marker::PhantomData, ops::Bound, sync::Arc}; + +use futures_channel::mpsc::{SendError, Sender}; +use futures_executor::block_on; +use futures_util::SinkExt; +use thiserror::Error; +use tracing::error; + +use crate::{ + executor::Executor, + fs::FileId, + ondisk::sstable::SsTable, + oracle::{timestamp::TimestampedRef, Timestamp}, + record::Record, + scope::Scope, + serdes::Encode, + stream::{record_batch::RecordBatchEntry, ScanStream}, + version::cleaner::CleanTag, + DbOption, +}; + +pub const MAX_LEVEL: usize = 7; + +pub(crate) type VersionRef = Arc>; + +pub(crate) struct Version +where + R: Record, + E: Executor, +{ + pub(crate) num: usize, + pub(crate) level_slice: [Vec>; MAX_LEVEL], + pub(crate) clean_sender: Sender, + pub(crate) option: Arc, + _p: PhantomData, +} + +impl Clone for Version +where + R: Record, + E: Executor, +{ + fn clone(&self) -> Self { + let mut level_slice = Version::::level_slice_new(); + + for (level, scopes) in self.level_slice.iter().enumerate() { + for scope in scopes { + level_slice[level].push(scope.clone()); + } + } + + Self { + num: self.num, + level_slice, + clean_sender: self.clean_sender.clone(), + option: self.option.clone(), + _p: Default::default(), + } + } +} + +impl Version +where + R: Record, + E: Executor, +{ + pub(crate) async fn query( + &self, + key: &TimestampedRef, + ) -> Result>, VersionError> { + for scope in self.level_slice[0].iter().rev() { + if !scope.is_between(key.value()) { + continue; + } + if let Some(entry) = Self::table_query(self, key, &scope.gen).await? { + return Ok(Some(entry)); + } + } + for level in self.level_slice[1..6].iter() { + if level.is_empty() { + continue; + } + let index = Self::scope_search(key.value(), level); + if !level[index].is_between(key.value()) { + continue; + } + if let Some(entry) = Self::table_query(self, key, &level[index].gen).await? { + return Ok(Some(entry)); + } + } + + Ok(None) + } + + async fn table_query( + &self, + key: &TimestampedRef<::Key>, + gen: &FileId, + ) -> Result>, VersionError> { + let file = E::open(self.option.table_path(gen)) + .await + .map_err(VersionError::Io)?; + let table = SsTable::::open(file); + table.get(key).await.map_err(VersionError::Parquet) + } + + pub(crate) fn scope_search(key: &R::Key, level: &[Scope]) -> usize { + level + .binary_search_by(|scope| scope.min.cmp(key)) + .unwrap_or_else(|index| index.saturating_sub(1)) + } + + pub(crate) fn tables_len(&self, level: usize) -> usize { + self.level_slice[level].len() + } + + pub(crate) fn level_slice_new() -> [Vec>; 7] { + [ + Vec::new(), + Vec::new(), + Vec::new(), + Vec::new(), + Vec::new(), + Vec::new(), + Vec::new(), + ] + } + + pub(crate) async fn iters<'a>( + &self, + iters: &mut Vec>, + range: (Bound<&'a R::Key>, Bound<&'a R::Key>), + ts: Timestamp, + ) -> Result<(), VersionError> { + for scope in self.level_slice[0].iter() { + let file = E::open(self.option.table_path(&scope.gen)) + .await + .map_err(VersionError::Io)?; + let table = SsTable::::open(file); + + iters.push(ScanStream::SsTable { + inner: table.scan(range, ts).await.map_err(VersionError::Parquet)?, + }) + } + for scopes in self.level_slice[1..].iter() { + if scopes.is_empty() { + continue; + } + let _gens = scopes.iter().map(|scope| scope.gen).collect::>(); + todo!("level stream") + // iters.push(EStreamImpl::Level( + // LevelStream::new(option, gens, lower, upper).await?, + // )); + } + Ok(()) + } +} + +impl Drop for Version +where + R: Record, + E: Executor, +{ + fn drop(&mut self) { + block_on(async { + if let Err(err) = self + .clean_sender + .send(CleanTag::Clean { + version_num: self.num, + }) + .await + { + error!("[Version Drop Error]: {}", err) + } + }); + } +} + +#[derive(Debug, Error)] +pub enum VersionError +where + R: Record, +{ + #[error("version encode error: {0}")] + Encode(#[source] ::Error), + #[error("version io error: {0}")] + Io(#[source] std::io::Error), + #[error("version parquet error: {0}")] + Parquet(#[source] parquet::errors::ParquetError), + #[error("version send error: {0}")] + Send(#[source] SendError), +} diff --git a/src/version/set.rs b/src/version/set.rs new file mode 100644 index 00000000..96fdcc6c --- /dev/null +++ b/src/version/set.rs @@ -0,0 +1,139 @@ +use std::{io::SeekFrom, sync::Arc}; + +use async_lock::RwLock; +use futures_channel::mpsc::Sender; +use futures_util::{AsyncSeekExt, AsyncWriteExt, SinkExt}; + +use crate::{ + executor::Executor, + fs::FileId, + record::Record, + serdes::Encode, + version::{cleaner::CleanTag, edit::VersionEdit, Version, VersionError, VersionRef}, + DbOption, +}; + +pub(crate) struct VersionSetInner +where + R: Record, + E: Executor, +{ + current: VersionRef, + log: E::File, +} + +pub(crate) struct VersionSet +where + R: Record, + E: Executor, +{ + inner: Arc>>, + clean_sender: Sender, + option: Arc, +} + +impl Clone for VersionSet +where + R: Record, + E: Executor, +{ + fn clone(&self) -> Self { + VersionSet { + inner: self.inner.clone(), + clean_sender: self.clean_sender.clone(), + option: self.option.clone(), + } + } +} + +impl VersionSet +where + R: Record, + E: Executor, +{ + pub(crate) async fn new( + clean_sender: Sender, + option: Arc, + ) -> Result> { + let mut log = E::open(option.version_path()) + .await + .map_err(VersionError::Io)?; + let edits = VersionEdit::recover(&mut log).await; + log.seek(SeekFrom::End(0)).await.map_err(VersionError::Io)?; + + let set = VersionSet:: { + inner: Arc::new(RwLock::new(VersionSetInner { + current: Arc::new(Version:: { + num: 0, + level_slice: Version::::level_slice_new(), + clean_sender: clean_sender.clone(), + option: option.clone(), + _p: Default::default(), + }), + log, + })), + clean_sender, + option, + }; + set.apply_edits(edits, None, true).await?; + + Ok(set) + } + + pub(crate) async fn current(&self) -> VersionRef { + self.inner.read().await.current.clone() + } + + pub(crate) async fn apply_edits( + &self, + version_edits: Vec>, + delete_gens: Option>, + is_recover: bool, + ) -> Result<(), VersionError> { + let mut guard = self.inner.write().await; + + let mut new_version = Version::::clone(&guard.current); + + for version_edit in version_edits { + if !is_recover { + version_edit + .encode(&mut guard.log) + .await + .map_err(VersionError::Encode)?; + } + match version_edit { + VersionEdit::Add { mut scope, level } => { + if let Some(wal_ids) = scope.wal_ids.take() { + for wal_id in wal_ids { + E::remove(self.option.wal_path(&wal_id)) + .await + .map_err(VersionError::Io)?; + } + } + new_version.level_slice[level as usize].push(scope); + } + VersionEdit::Remove { gen, level } => { + if let Some(i) = new_version.level_slice[level as usize] + .iter() + .position(|scope| scope.gen == gen) + { + new_version.level_slice[level as usize].remove(i); + } + } + } + } + if let Some(delete_gens) = delete_gens { + new_version + .clean_sender + .send(CleanTag::Add { + version_num: new_version.num, + gens: delete_gens, + }) + .await + .map_err(VersionError::Send)?; + } + guard.log.flush().await.map_err(VersionError::Io)?; + guard.current = Arc::new(new_version); + Ok(()) + } +} From b0f18dc46041762dc535636b78970a43ad8f7154 Mon Sep 17 00:00:00 2001 From: Gwo Tzu-Hsing Date: Tue, 16 Jul 2024 20:36:19 +0800 Subject: [PATCH 02/10] chore: make tokio-util optional --- Cargo.toml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 449eaf72..bbd5b556 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,14 +5,14 @@ resolver = "2" version = "0.1.0" [features] -tokio = ["dep:tokio"] +tokio = ["dep:tokio", "dep:tokio-util"] [dependencies] arrow = "52" async-lock = "3" crossbeam-skiplist = "0.1" -futures-core = "0.3" futures-channel = "0.3" +futures-core = "0.3" futures-executor = "0.3" futures-io = "0.3" futures-util = "0.3" @@ -21,7 +21,7 @@ parquet = { version = "52", features = ["async"] } pin-project-lite = "0.2" thiserror = "1" tokio = { version = "1", optional = true } -tokio-util = { version = "0.7", features = ["compat"] } +tokio-util = { version = "0.7", features = ["compat"], optional = true } tracing = "0.1" ulid = "1" From 4d66fbe46cde52fd1a82b3f450133b40fd6eaad6 Mon Sep 17 00:00:00 2001 From: Gwo Tzu-Hsing Date: Tue, 16 Jul 2024 20:52:59 +0800 Subject: [PATCH 03/10] use file as trait object --- Cargo.toml | 4 ++-- src/executor.rs | 6 ++++++ src/fs/mod.rs | 9 ++++++++- src/lib.rs | 24 +++++++++--------------- src/ondisk/scan.rs | 18 ++++++------------ src/ondisk/sstable.rs | 29 ++++++++++++++--------------- src/stream/merge.rs | 33 ++++++++++++++------------------- src/stream/mod.rs | 23 ++++++++--------------- src/version/mod.rs | 14 ++++++++------ 9 files changed, 75 insertions(+), 85 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index bbd5b556..8b398acd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ resolver = "2" version = "0.1.0" [features] -tokio = ["dep:tokio", "dep:tokio-util"] +tokio = ["dep:tokio"] [dependencies] arrow = "52" @@ -21,7 +21,7 @@ parquet = { version = "52", features = ["async"] } pin-project-lite = "0.2" thiserror = "1" tokio = { version = "1", optional = true } -tokio-util = { version = "0.7", features = ["compat"], optional = true } +tokio-util = { version = "0.7", features = ["compat"] } tracing = "0.1" ulid = "1" diff --git a/src/executor.rs b/src/executor.rs index 886130b2..07836790 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -20,6 +20,12 @@ pub mod tokio { handle: Handle, } + impl Default for TokioExecutor { + fn default() -> Self { + Self::new() + } + } + impl TokioExecutor { pub fn new() -> Self { Self { diff --git a/src/fs/mod.rs b/src/fs/mod.rs index 51adde4e..7525879f 100644 --- a/src/fs/mod.rs +++ b/src/fs/mod.rs @@ -19,7 +19,14 @@ pub enum FileType { LOG, } -pub trait AsyncFile: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static {} +pub trait AsyncFile: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static { + fn to_file(self) -> Box + where + Self: Sized, + { + Box::new(self) as Box + } +} impl AsyncFile for T where T: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static {} diff --git a/src/lib.rs b/src/lib.rs index c9fc95ba..4dfa96a7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,7 +27,7 @@ use record::Record; use crate::{ executor::Executor, fs::{FileId, FileType}, - stream::{merge::MergeStream, Entry, ScanStream}, + stream::{merge::MergeStream, Entry}, version::Version, }; @@ -106,7 +106,7 @@ where impl DB where R: Record + Send + Sync, - R::Key: Send + Sync, + R::Key: Send, E: Executor, { pub fn empty() -> Self { @@ -161,38 +161,32 @@ where impl Schema where R: Record + Send + Sync, - R::Key: Send + Sync, + R::Key: Send, { async fn write(&self, record: R, ts: Timestamp) -> io::Result<()> { self.mutable.insert(record, ts); Ok(()) } - async fn get<'get, E>( + async fn get<'get>( &'get self, key: &'get R::Key, ts: Timestamp, - ) -> Result>, ParquetError> - where - E: Executor, - { - self.scan::(Bound::Included(key), Bound::Unbounded, ts) + ) -> Result>, ParquetError> { + self.scan(Bound::Included(key), Bound::Unbounded, ts) .await? .next() .await .transpose() } - async fn scan<'scan, E>( + async fn scan<'scan>( &'scan self, lower: Bound<&'scan R::Key>, uppwer: Bound<&'scan R::Key>, ts: Timestamp, - ) -> Result, ParquetError>>, ParquetError> - where - E: Executor, - { - let mut streams = Vec::>::with_capacity(self.immutables.len() + 1); + ) -> Result, ParquetError>>, ParquetError> { + let mut streams = Vec::with_capacity(self.immutables.len() + 1); streams.push(self.mutable.scan((lower, uppwer), ts).into()); for immutable in &self.immutables { streams.push(immutable.scan((lower, uppwer), ts).into()); diff --git a/src/ondisk/scan.rs b/src/ondisk/scan.rs index 4508be6e..0705c232 100644 --- a/src/ondisk/scan.rs +++ b/src/ondisk/scan.rs @@ -9,36 +9,30 @@ use pin_project_lite::pin_project; use tokio_util::compat::Compat; use crate::{ - executor::Executor, + fs::AsyncFile, record::Record, stream::record_batch::{RecordBatchEntry, RecordBatchIterator}, }; pin_project! { #[derive(Debug)] - pub struct SsTableScan - where - E: Executor + pub struct SsTableScan { #[pin] - stream: ParquetRecordBatchStream>, + stream: ParquetRecordBatchStream>>, iter: Option>, } } -impl SsTableScan -where - E: Executor, -{ - pub fn new(stream: ParquetRecordBatchStream>) -> Self { +impl SsTableScan { + pub fn new(stream: ParquetRecordBatchStream>>) -> Self { SsTableScan { stream, iter: None } } } -impl Stream for SsTableScan +impl Stream for SsTableScan where R: Record, - E: Executor, { type Item = Result, parquet::errors::ParquetError>; diff --git a/src/ondisk/sstable.rs b/src/ondisk/sstable.rs index 334281e5..8bc8bd80 100644 --- a/src/ondisk/sstable.rs +++ b/src/ondisk/sstable.rs @@ -17,34 +17,32 @@ use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt}; use super::scan::SsTableScan; use crate::{ arrows::get_range_filter, - executor::Executor, + fs::AsyncFile, oracle::{timestamp::TimestampedRef, Timestamp}, record::Record, stream::record_batch::RecordBatchEntry, }; -pub(crate) struct SsTable +pub(crate) struct SsTable where R: Record, - E: Executor, { - file: E::File, - _marker: PhantomData<(R, E)>, + file: Box, + _marker: PhantomData, } -impl SsTable +impl SsTable where R: Record, - E: Executor, { - pub(crate) fn open(file: E::File) -> Self { + pub(crate) fn open(file: Box) -> Self { SsTable { file, _marker: PhantomData, } } - fn create_writer(&mut self) -> AsyncArrowWriter> { + fn create_writer(&mut self) -> AsyncArrowWriter> { // TODO: expose writer options let options = ArrowWriterOptions::new().with_properties( WriterProperties::builder() @@ -53,7 +51,7 @@ where .build(), ); AsyncArrowWriter::try_new_with_options( - (&mut self.file).compat(), + (&mut self.file as &mut dyn AsyncFile).compat(), R::arrow_schema().clone(), options, ) @@ -75,7 +73,7 @@ where async fn into_parquet_builder( self, limit: usize, - ) -> parquet::errors::Result>>> { + ) -> parquet::errors::Result>>>> { Ok(ParquetRecordBatchStreamBuilder::new_with_options( self.file.compat(), ArrowReaderOptions::default().with_page_index(true), @@ -99,7 +97,7 @@ where self, range: (Bound<&'scan R::Key>, Bound<&'scan R::Key>), ts: Timestamp, - ) -> Result, parquet::errors::ParquetError> { + ) -> Result, parquet::errors::ParquetError> { let builder = self.into_parquet_builder(1).await?; let schema_descriptor = builder.metadata().file_metadata().schema_descr(); @@ -116,7 +114,7 @@ mod tests { use super::SsTable; use crate::{ executor::tokio::TokioExecutor, - fs::Fs, + fs::{AsyncFile, Fs}, oracle::timestamp::Timestamped, tests::{get_test_record_batch, Test}, }; @@ -127,8 +125,9 @@ mod tests { let record_batch = get_test_record_batch::().await; let file = TokioExecutor::open(&temp_dir.path().join("test.parquet")) .await - .unwrap(); - let mut sstable = SsTable::::open(file); + .unwrap() + .to_file(); + let mut sstable = SsTable::::open(file); sstable.write(record_batch).await.unwrap(); diff --git a/src/stream/merge.rs b/src/stream/merge.rs index 2734572b..bb1d8de3 100644 --- a/src/stream/merge.rs +++ b/src/stream/merge.rs @@ -10,27 +10,25 @@ use futures_util::stream::StreamExt; use pin_project_lite::pin_project; use super::{Entry, ScanStream}; -use crate::{executor::Executor, record::Record}; +use crate::record::Record; pin_project! { - pub(crate) struct MergeStream<'merge, R, E> + pub(crate) struct MergeStream<'merge, R> where R: Record, - E: Executor, { - streams: Vec>, + streams: Vec>, peeked: BinaryHeap>, buf: Option>, } } -impl<'merge, R, E> MergeStream<'merge, R, E> +impl<'merge, R> MergeStream<'merge, R> where R: Record, - E: Executor, { pub(crate) async fn from_vec( - mut streams: Vec>, + mut streams: Vec>, ) -> Result { let mut peeked = BinaryHeap::with_capacity(streams.len()); @@ -51,10 +49,9 @@ where } } -impl<'merge, R, E> Stream for MergeStream<'merge, R, E> +impl<'merge, R> Stream for MergeStream<'merge, R> where R: Record, - E: Executor, { type Item = Result, parquet::errors::ParquetError>; @@ -138,7 +135,7 @@ mod tests { use futures_util::StreamExt; use super::MergeStream; - use crate::{executor::tokio::TokioExecutor, inmem::mutable::Mutable}; + use crate::inmem::mutable::Mutable; #[tokio::test] async fn merge_mutable() { @@ -158,7 +155,7 @@ mod tests { let lower = "a".to_string(); let upper = "e".to_string(); let bound = (Bound::Included(&lower), Bound::Included(&upper)); - let mut merge = MergeStream::::from_vec(vec![ + let mut merge = MergeStream::from_vec(vec![ m1.scan(bound, 6.into()).into(), m2.scan(bound, 6.into()).into(), m3.scan(bound, 6.into()).into(), @@ -186,10 +183,9 @@ mod tests { let lower = "1".to_string(); let upper = "4".to_string(); let bound = (Bound::Included(&lower), Bound::Included(&upper)); - let mut merge = - MergeStream::::from_vec(vec![m1.scan(bound, 0.into()).into()]) - .await - .unwrap(); + let mut merge = MergeStream::from_vec(vec![m1.scan(bound, 0.into()).into()]) + .await + .unwrap(); dbg!(merge.next().await); dbg!(merge.next().await); @@ -198,10 +194,9 @@ mod tests { let lower = "1".to_string(); let upper = "4".to_string(); let bound = (Bound::Included(&lower), Bound::Included(&upper)); - let mut merge = - MergeStream::::from_vec(vec![m1.scan(bound, 1.into()).into()]) - .await - .unwrap(); + let mut merge = MergeStream::from_vec(vec![m1.scan(bound, 1.into()).into()]) + .await + .unwrap(); dbg!(merge.next().await); dbg!(merge.next().await); diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 75c1bcf6..819601aa 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -14,7 +14,6 @@ use pin_project_lite::pin_project; use record_batch::RecordBatchEntry; use crate::{ - executor::Executor, inmem::{immutable::ImmutableScan, mutable::MutableScan}, ondisk::scan::SsTableScan, oracle::timestamp::Timestamped, @@ -66,10 +65,9 @@ where pin_project! { #[project = ScanStreamProject] - pub enum ScanStream<'scan, R, E> + pub enum ScanStream<'scan, R> where R: Record, - E: Executor, { Mutable { #[pin] @@ -81,15 +79,14 @@ pin_project! { }, SsTable { #[pin] - inner: SsTableScan, + inner: SsTableScan, }, } } -impl<'scan, R, E> From> for ScanStream<'scan, R, E> +impl<'scan, R> From> for ScanStream<'scan, R> where R: Record, - E: Executor, { fn from(inner: MutableScan<'scan, R>) -> Self { ScanStream::Mutable { @@ -98,10 +95,9 @@ where } } -impl<'scan, R, E> From> for ScanStream<'scan, R, E> +impl<'scan, R> From> for ScanStream<'scan, R> where R: Record, - E: Executor, { fn from(inner: ImmutableScan<'scan, R>) -> Self { ScanStream::Immutable { @@ -110,20 +106,18 @@ where } } -impl<'scan, R, E> From> for ScanStream<'scan, R, E> +impl<'scan, R> From> for ScanStream<'scan, R> where R: Record, - E: Executor, { - fn from(inner: SsTableScan) -> Self { + fn from(inner: SsTableScan) -> Self { ScanStream::SsTable { inner } } } -impl fmt::Debug for ScanStream<'_, R, E> +impl fmt::Debug for ScanStream<'_, R> where R: Record, - E: Executor, { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { @@ -134,10 +128,9 @@ where } } -impl<'scan, R, E> Stream for ScanStream<'scan, R, E> +impl<'scan, R> Stream for ScanStream<'scan, R> where R: Record, - E: Executor, { type Item = Result, parquet::errors::ParquetError>; diff --git a/src/version/mod.rs b/src/version/mod.rs index 6dd364be..2fd89cc7 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -12,7 +12,7 @@ use tracing::error; use crate::{ executor::Executor, - fs::FileId, + fs::{AsyncFile, FileId}, ondisk::sstable::SsTable, oracle::{timestamp::TimestampedRef, Timestamp}, record::Record, @@ -103,8 +103,9 @@ where ) -> Result>, VersionError> { let file = E::open(self.option.table_path(gen)) .await - .map_err(VersionError::Io)?; - let table = SsTable::::open(file); + .map_err(VersionError::Io)? + .to_file(); + let table = SsTable::open(file); table.get(key).await.map_err(VersionError::Parquet) } @@ -132,15 +133,16 @@ where pub(crate) async fn iters<'a>( &self, - iters: &mut Vec>, + iters: &mut Vec>, range: (Bound<&'a R::Key>, Bound<&'a R::Key>), ts: Timestamp, ) -> Result<(), VersionError> { for scope in self.level_slice[0].iter() { let file = E::open(self.option.table_path(&scope.gen)) .await - .map_err(VersionError::Io)?; - let table = SsTable::::open(file); + .map_err(VersionError::Io)? + .to_file(); + let table = SsTable::open(file); iters.push(ScanStream::SsTable { inner: table.scan(range, ts).await.map_err(VersionError::Parquet)?, From a211a1e33b2dd65f5a90a58c417e2d50f9f2abaf Mon Sep 17 00:00:00 2001 From: Kould Date: Wed, 17 Jul 2024 11:31:37 +0800 Subject: [PATCH 04/10] chore: replace channel to `flume` --- Cargo.toml | 3 +-- src/version/cleaner.rs | 14 +++++----- src/version/edit.rs | 61 ++++++++++++++++++++---------------------- src/version/mod.rs | 22 +++++---------- src/version/set.rs | 6 ++--- 5 files changed, 46 insertions(+), 60 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8b398acd..944c6bf1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,9 +11,8 @@ tokio = ["dep:tokio"] arrow = "52" async-lock = "3" crossbeam-skiplist = "0.1" -futures-channel = "0.3" +flume = { version = "0.11.0", features = ["async"] } futures-core = "0.3" -futures-executor = "0.3" futures-io = "0.3" futures-util = "0.3" once_cell = "1" diff --git a/src/version/cleaner.rs b/src/version/cleaner.rs index daeb8bb9..169da40e 100644 --- a/src/version/cleaner.rs +++ b/src/version/cleaner.rs @@ -1,7 +1,6 @@ use std::{collections::BTreeMap, fs, io, sync::Arc}; -use futures_channel::mpsc::{channel, Receiver, Sender}; -use futures_util::StreamExt; +use flume::{Receiver, Sender}; use crate::{fs::FileId, DbOption}; @@ -23,7 +22,7 @@ pub(crate) struct Cleaner { impl Cleaner { pub(crate) fn new(option: Arc) -> (Self, Sender) { - let (tag_send, tag_recv) = channel(option.clean_channel_buffer); + let (tag_send, tag_recv) = flume::bounded(option.clean_channel_buffer); ( Cleaner { @@ -36,13 +35,12 @@ impl Cleaner { } pub(crate) async fn listen(&mut self) -> Result<(), io::Error> { - loop { - match self.tag_recv.next().await { - None => break, - Some(CleanTag::Add { version_num, gens }) => { + while let Ok(tag) = self.tag_recv.recv_async().await { + match tag { + CleanTag::Add { version_num, gens } => { let _ = self.gens_map.insert(version_num, (gens, false)); } - Some(CleanTag::Clean { version_num }) => { + CleanTag::Clean { version_num } => { if let Some((_, dropped)) = self.gens_map.get_mut(&version_num) { *dropped = true; } diff --git a/src/version/edit.rs b/src/version/edit.rs index 2133864f..ef07e09e 100644 --- a/src/version/edit.rs +++ b/src/version/edit.rs @@ -107,46 +107,43 @@ where #[cfg(test)] mod tests { - use futures_executor::block_on; use futures_util::io::Cursor; use crate::{fs::FileId, scope::Scope, serdes::Encode, version::edit::VersionEdit}; - #[test] - fn encode_and_decode() { - block_on(async { - let edits = vec![ - VersionEdit::Add { - level: 0, - scope: Scope { - min: "Min".to_string(), - max: "Max".to_string(), - gen: Default::default(), - wal_ids: Some(vec![FileId::new(), FileId::new()]), - }, - }, - VersionEdit::Remove { - level: 1, + #[tokio::test] + async fn encode_and_decode() { + let edits = vec![ + VersionEdit::Add { + level: 0, + scope: Scope { + min: "Min".to_string(), + max: "Max".to_string(), gen: Default::default(), + wal_ids: Some(vec![FileId::new(), FileId::new()]), }, - ]; - - let bytes = { - let mut cursor = Cursor::new(vec![]); - - for edit in edits.clone() { - edit.encode(&mut cursor).await.unwrap(); - } - cursor.into_inner() - }; + }, + VersionEdit::Remove { + level: 1, + gen: Default::default(), + }, + ]; + + let bytes = { + let mut cursor = Cursor::new(vec![]); + + for edit in edits.clone() { + edit.encode(&mut cursor).await.unwrap(); + } + cursor.into_inner() + }; - let decode_edits = { - let mut cursor = Cursor::new(bytes); + let decode_edits = { + let mut cursor = Cursor::new(bytes); - VersionEdit::::recover(&mut cursor).await - }; + VersionEdit::::recover(&mut cursor).await + }; - assert_eq!(edits, decode_edits); - }) + assert_eq!(edits, decode_edits); } } diff --git a/src/version/mod.rs b/src/version/mod.rs index 2fd89cc7..4e8f1bf7 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -4,9 +4,7 @@ mod set; use std::{marker::PhantomData, ops::Bound, sync::Arc}; -use futures_channel::mpsc::{SendError, Sender}; -use futures_executor::block_on; -use futures_util::SinkExt; +use flume::{SendError, Sender}; use thiserror::Error; use tracing::error; @@ -168,17 +166,11 @@ where E: Executor, { fn drop(&mut self) { - block_on(async { - if let Err(err) = self - .clean_sender - .send(CleanTag::Clean { - version_num: self.num, - }) - .await - { - error!("[Version Drop Error]: {}", err) - } - }); + if let Err(err) = self.clean_sender.send(CleanTag::Clean { + version_num: self.num, + }) { + error!("[Version Drop Error]: {}", err) + } } } @@ -194,5 +186,5 @@ where #[error("version parquet error: {0}")] Parquet(#[source] parquet::errors::ParquetError), #[error("version send error: {0}")] - Send(#[source] SendError), + Send(#[source] SendError), } diff --git a/src/version/set.rs b/src/version/set.rs index 96fdcc6c..dda5cf3c 100644 --- a/src/version/set.rs +++ b/src/version/set.rs @@ -1,8 +1,8 @@ use std::{io::SeekFrom, sync::Arc}; use async_lock::RwLock; -use futures_channel::mpsc::Sender; -use futures_util::{AsyncSeekExt, AsyncWriteExt, SinkExt}; +use flume::Sender; +use futures_util::{AsyncSeekExt, AsyncWriteExt}; use crate::{ executor::Executor, @@ -125,7 +125,7 @@ where if let Some(delete_gens) = delete_gens { new_version .clean_sender - .send(CleanTag::Add { + .send_async(CleanTag::Add { version_num: new_version.num, gens: delete_gens, }) From 7bf33138f29e3f7539e081812217ecd04300a473 Mon Sep 17 00:00:00 2001 From: Gwo Tzu-Hsing Date: Wed, 17 Jul 2024 13:10:34 +0800 Subject: [PATCH 05/10] chore: refactor --- src/inmem/immutable.rs | 2 +- src/inmem/mutable.rs | 3 +-- src/lib.rs | 6 ++---- src/record/mod.rs | 2 +- src/scope.rs | 13 +++++-------- src/version/edit.rs | 11 ++++------- src/version/mod.rs | 22 +++------------------- src/version/set.rs | 5 +++-- 8 files changed, 20 insertions(+), 44 deletions(-) diff --git a/src/inmem/immutable.rs b/src/inmem/immutable.rs index 56725dfd..659ef9f1 100644 --- a/src/inmem/immutable.rs +++ b/src/inmem/immutable.rs @@ -48,7 +48,7 @@ where impl From> for Immutable where A: ArrowArrays, - A::Record: Send + Sync, + A::Record: Send, { fn from(mutable: Mutable) -> Self { let mut index = BTreeMap::new(); diff --git a/src/inmem/mutable.rs b/src/inmem/mutable.rs index 7a5b8a83..319900ff 100644 --- a/src/inmem/mutable.rs +++ b/src/inmem/mutable.rs @@ -54,8 +54,7 @@ where impl Mutable where - R: Record + Send + Sync, - R::Key: Send, + R: Record + Send, { pub(crate) fn insert(&self, record: R, ts: Timestamp) { self.data diff --git a/src/lib.rs b/src/lib.rs index 4dfa96a7..ba3701f7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -105,8 +105,7 @@ where impl DB where - R: Record + Send + Sync, - R::Key: Send, + R: Record + Send, E: Executor, { pub fn empty() -> Self { @@ -160,8 +159,7 @@ where impl Schema where - R: Record + Send + Sync, - R::Key: Send, + R: Record + Send, { async fn write(&self, record: R, ts: Timestamp) -> io::Result<()> { self.mutable.insert(record, ts); diff --git a/src/record/mod.rs b/src/record/mod.rs index 79fc7eed..fa805c19 100644 --- a/src/record/mod.rs +++ b/src/record/mod.rs @@ -14,7 +14,7 @@ use crate::{ serdes::{Decode, Encode}, }; -pub trait Key: 'static + Encode + Decode + Ord + Clone + Send + Sync { +pub trait Key: 'static + Encode + Decode + Ord + Clone + Send { type Ref<'r>: KeyRef<'r, Key = Self> + Copy where Self: 'r; diff --git a/src/scope.rs b/src/scope.rs index e6bf93aa..081f1e54 100644 --- a/src/scope.rs +++ b/src/scope.rs @@ -7,10 +7,7 @@ use crate::{ }; #[derive(Debug, Eq, PartialEq)] -pub struct Scope -where - K: Encode + Decode + Ord + Clone, -{ +pub struct Scope { pub(crate) min: K, pub(crate) max: K, pub(crate) gen: FileId, @@ -19,7 +16,7 @@ where impl Clone for Scope where - K: Encode + Decode + Ord + Clone, + K: Clone, { fn clone(&self) -> Self { Scope { @@ -33,7 +30,7 @@ where impl Scope where - K: Encode + Decode + Ord + Clone, + K: Ord, { pub(crate) fn is_between(&self, key: &K) -> bool { self.min.le(key) && self.max.ge(key) @@ -49,7 +46,7 @@ where impl Encode for Scope where - K: Encode + Decode + Ord + Clone, + K: Encode, { type Error = ::Error; @@ -85,7 +82,7 @@ where impl Decode for Scope where - K: Encode + Decode + Ord + Clone, + K: Decode, { type Error = ::Error; diff --git a/src/version/edit.rs b/src/version/edit.rs index ef07e09e..e89021b3 100644 --- a/src/version/edit.rs +++ b/src/version/edit.rs @@ -10,17 +10,14 @@ use crate::{ }; #[derive(Debug, Clone, Eq, PartialEq)] -pub(crate) enum VersionEdit -where - K: Encode + Decode + Ord + Clone, -{ +pub(crate) enum VersionEdit { Add { level: u8, scope: Scope }, Remove { level: u8, gen: FileId }, } impl VersionEdit where - K: Encode + Decode + Ord + Clone, + K: Decode, { pub(crate) async fn recover(reader: &mut R) -> Vec> { let mut edits = Vec::new(); @@ -34,7 +31,7 @@ where impl Encode for VersionEdit where - K: Encode + Decode + Ord + Clone, + K: Encode, { type Error = ::Error; @@ -70,7 +67,7 @@ where impl Decode for VersionEdit where - K: Encode + Decode + Ord + Clone, + K: Decode, { type Error = ::Error; diff --git a/src/version/mod.rs b/src/version/mod.rs index 4e8f1bf7..6d3dbce5 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -21,14 +21,13 @@ use crate::{ DbOption, }; -pub const MAX_LEVEL: usize = 7; +const MAX_LEVEL: usize = 7; pub(crate) type VersionRef = Arc>; pub(crate) struct Version where R: Record, - E: Executor, { pub(crate) num: usize, pub(crate) level_slice: [Vec>; MAX_LEVEL], @@ -43,12 +42,10 @@ where E: Executor, { fn clone(&self) -> Self { - let mut level_slice = Version::::level_slice_new(); + let mut level_slice = [const { Vec::new() }; MAX_LEVEL]; for (level, scopes) in self.level_slice.iter().enumerate() { - for scope in scopes { - level_slice[level].push(scope.clone()); - } + level_slice[level].clone_from(scopes); } Self { @@ -117,18 +114,6 @@ where self.level_slice[level].len() } - pub(crate) fn level_slice_new() -> [Vec>; 7] { - [ - Vec::new(), - Vec::new(), - Vec::new(), - Vec::new(), - Vec::new(), - Vec::new(), - Vec::new(), - ] - } - pub(crate) async fn iters<'a>( &self, iters: &mut Vec>, @@ -163,7 +148,6 @@ where impl Drop for Version where R: Record, - E: Executor, { fn drop(&mut self) { if let Err(err) = self.clean_sender.send(CleanTag::Clean { diff --git a/src/version/set.rs b/src/version/set.rs index dda5cf3c..c5c26f42 100644 --- a/src/version/set.rs +++ b/src/version/set.rs @@ -4,6 +4,7 @@ use async_lock::RwLock; use flume::Sender; use futures_util::{AsyncSeekExt, AsyncWriteExt}; +use super::MAX_LEVEL; use crate::{ executor::Executor, fs::FileId, @@ -65,7 +66,7 @@ where inner: Arc::new(RwLock::new(VersionSetInner { current: Arc::new(Version:: { num: 0, - level_slice: Version::::level_slice_new(), + level_slice: [const { Vec::new() }; MAX_LEVEL], clean_sender: clean_sender.clone(), option: option.clone(), _p: Default::default(), @@ -92,7 +93,7 @@ where ) -> Result<(), VersionError> { let mut guard = self.inner.write().await; - let mut new_version = Version::::clone(&guard.current); + let mut new_version = Version::clone(&guard.current); for version_edit in version_edits { if !is_recover { From 69b5cd26a0aeca4d77ad3caa780f5bb00ef10a12 Mon Sep 17 00:00:00 2001 From: Gwo Tzu-Hsing Date: Wed, 17 Jul 2024 13:25:26 +0800 Subject: [PATCH 06/10] chore: rename AsyncFile::to_file to AsyncFile::boxed --- src/fs/mod.rs | 2 +- src/ondisk/sstable.rs | 2 +- src/version/mod.rs | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/fs/mod.rs b/src/fs/mod.rs index 7525879f..7a8c635f 100644 --- a/src/fs/mod.rs +++ b/src/fs/mod.rs @@ -20,7 +20,7 @@ pub enum FileType { } pub trait AsyncFile: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static { - fn to_file(self) -> Box + fn boxed(self) -> Box where Self: Sized, { diff --git a/src/ondisk/sstable.rs b/src/ondisk/sstable.rs index 8bc8bd80..c3b203d6 100644 --- a/src/ondisk/sstable.rs +++ b/src/ondisk/sstable.rs @@ -126,7 +126,7 @@ mod tests { let file = TokioExecutor::open(&temp_dir.path().join("test.parquet")) .await .unwrap() - .to_file(); + .boxed(); let mut sstable = SsTable::::open(file); sstable.write(record_batch).await.unwrap(); diff --git a/src/version/mod.rs b/src/version/mod.rs index 6d3dbce5..0ce60eac 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -99,7 +99,7 @@ where let file = E::open(self.option.table_path(gen)) .await .map_err(VersionError::Io)? - .to_file(); + .boxed(); let table = SsTable::open(file); table.get(key).await.map_err(VersionError::Parquet) } @@ -124,7 +124,7 @@ where let file = E::open(self.option.table_path(&scope.gen)) .await .map_err(VersionError::Io)? - .to_file(); + .boxed(); let table = SsTable::open(file); iters.push(ScanStream::SsTable { From 5246344c95a6159ef142dc6e9e97328282043dcd Mon Sep 17 00:00:00 2001 From: Kould Date: Wed, 17 Jul 2024 14:06:24 +0800 Subject: [PATCH 07/10] chore: pub -> pub (crate) --- Cargo.toml | 2 +- src/fs/mod.rs | 2 +- src/scope.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 944c6bf1..fd031d83 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ tokio = ["dep:tokio"] arrow = "52" async-lock = "3" crossbeam-skiplist = "0.1" -flume = { version = "0.11.0", features = ["async"] } +flume = { version = "0.11", features = ["async"] } futures-core = "0.3" futures-io = "0.3" futures-util = "0.3" diff --git a/src/fs/mod.rs b/src/fs/mod.rs index 7a8c635f..5b06c725 100644 --- a/src/fs/mod.rs +++ b/src/fs/mod.rs @@ -13,7 +13,7 @@ use ulid::Ulid; pub(crate) type FileId = Ulid; -pub enum FileType { +pub(crate) enum FileType { WAL, PARQUET, LOG, diff --git a/src/scope.rs b/src/scope.rs index 081f1e54..0f1baf02 100644 --- a/src/scope.rs +++ b/src/scope.rs @@ -7,7 +7,7 @@ use crate::{ }; #[derive(Debug, Eq, PartialEq)] -pub struct Scope { +pub(crate) struct Scope { pub(crate) min: K, pub(crate) max: K, pub(crate) gen: FileId, From 3074a8954810f38621babd1f98cad78951b13636 Mon Sep 17 00:00:00 2001 From: Gwo Tzu-Hsing Date: Wed, 17 Jul 2024 14:08:22 +0800 Subject: [PATCH 08/10] refactor: scope compare --- src/scope.rs | 11 ++++------- src/version/mod.rs | 4 ++-- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/src/scope.rs b/src/scope.rs index 0f1baf02..020c71db 100644 --- a/src/scope.rs +++ b/src/scope.rs @@ -32,15 +32,12 @@ impl Scope where K: Ord, { - pub(crate) fn is_between(&self, key: &K) -> bool { - self.min.le(key) && self.max.ge(key) + pub(crate) fn contains(&self, key: &K) -> bool { + &self.min <= key && key <= &self.max } - pub(crate) fn is_meet(&self, target: &Scope) -> bool { - (self.min.le(&target.min) && self.max.ge(&target.min)) - || (self.min.le(&target.max) && self.max.ge(&target.max)) - || (self.min.le(&target.min)) && self.max.ge(&target.max) - || (self.min.ge(&target.min)) && self.max.le(&target.max) + pub(crate) fn meets(&self, target: &Self) -> bool { + self.contains(&target.min) || self.contains(&target.max) } } diff --git a/src/version/mod.rs b/src/version/mod.rs index 0ce60eac..3e045679 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -68,7 +68,7 @@ where key: &TimestampedRef, ) -> Result>, VersionError> { for scope in self.level_slice[0].iter().rev() { - if !scope.is_between(key.value()) { + if !scope.contains(key.value()) { continue; } if let Some(entry) = Self::table_query(self, key, &scope.gen).await? { @@ -80,7 +80,7 @@ where continue; } let index = Self::scope_search(key.value(), level); - if !level[index].is_between(key.value()) { + if !level[index].contains(key.value()) { continue; } if let Some(entry) = Self::table_query(self, key, &level[index].gen).await? { From 9b41e5436062a5349366c1a20f13ec20823293ed Mon Sep 17 00:00:00 2001 From: Gwo Tzu-Hsing Date: Wed, 17 Jul 2024 14:22:15 +0800 Subject: [PATCH 09/10] refactor: rename lifetime --- src/version/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/version/mod.rs b/src/version/mod.rs index 3e045679..8499c509 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -114,10 +114,10 @@ where self.level_slice[level].len() } - pub(crate) async fn iters<'a>( + pub(crate) async fn iters<'iters>( &self, - iters: &mut Vec>, - range: (Bound<&'a R::Key>, Bound<&'a R::Key>), + iters: &mut Vec>, + range: (Bound<&'iters R::Key>, Bound<&'iters R::Key>), ts: Timestamp, ) -> Result<(), VersionError> { for scope in self.level_slice[0].iter() { From 6be446a4ed70a43c7a9baac44209fa607c87d53b Mon Sep 17 00:00:00 2001 From: Gwo Tzu-Hsing Date: Wed, 17 Jul 2024 14:26:01 +0800 Subject: [PATCH 10/10] refactor: use self method instead --- src/version/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/version/mod.rs b/src/version/mod.rs index 8499c509..8fbd2945 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -71,7 +71,7 @@ where if !scope.contains(key.value()) { continue; } - if let Some(entry) = Self::table_query(self, key, &scope.gen).await? { + if let Some(entry) = self.table_query(key, &scope.gen).await? { return Ok(Some(entry)); } } @@ -83,7 +83,7 @@ where if !level[index].contains(key.value()) { continue; } - if let Some(entry) = Self::table_query(self, key, &level[index].gen).await? { + if let Some(entry) = self.table_query(key, &level[index].gen).await? { return Ok(Some(entry)); } }