diff --git a/Cargo.toml b/Cargo.toml index ece9fb4..d5b9866 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,4 +1,4 @@ -workspace = { members = ["parquet-lru", "tonbo_macros"] } +workspace = { members = [ "parquet-lru", "tonbo_macros", "tonbo_net_client", "tonbo_net_server",] } [package] description = "An embedded persistent KV database in Rust." @@ -80,12 +80,12 @@ crc32fast = "1" crossbeam-skiplist = "0.1" datafusion = { version = "42.2.0", optional = true } flume = { version = "0.11", features = ["async"] } -fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "216eb446fb0a0c6e5e85bfac51a6f6ed8e5ed606", package = "fusio", version = "0.3.3", features = [ +fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "88c6134948d05bef33598a969e9292ad38a8db4b", package = "fusio", version = "0.3.3", features = [ "dyn", "fs", ] } -fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "216eb446fb0a0c6e5e85bfac51a6f6ed8e5ed606", package = "fusio-dispatch", version = "0.2.1" } -fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", rev = "216eb446fb0a0c6e5e85bfac51a6f6ed8e5ed606", package = "fusio-parquet", version = "0.2.1" } +fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "88c6134948d05bef33598a969e9292ad38a8db4b", package = "fusio-dispatch", version = "0.2.1" } +fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", rev = "88c6134948d05bef33598a969e9292ad38a8db4b", package = "fusio-parquet", version = "0.2.1" } futures-core = "0.3" futures-io = "0.3" futures-util = "0.3" diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index c5fb593..12a0406 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -39,6 +39,7 @@ where pub(crate) schema: Arc>>, pub(crate) version_set: VersionSet, pub(crate) manager: Arc, + pub(crate) instance: Arc, } impl Compactor @@ -50,12 +51,14 @@ where option: Arc>, version_set: VersionSet, manager: Arc, + instance: Arc, ) -> Self { Compactor:: { option, schema, version_set, manager, + instance, } } @@ -75,7 +78,7 @@ where &mut guard.mutable, Mutable::new(&self.option, trigger_clone, self.manager.base_fs()).await?, ); - let (file_id, immutable) = mutable.into_immutable(&guard.record_instance).await?; + let (file_id, immutable) = mutable.into_immutable(&self.instance).await?; guard.immutables.push((file_id, immutable)); } else if !is_manual { return Ok(()); @@ -99,7 +102,7 @@ where &self.option, recover_wal_ids, excess, - &guard.record_instance, + &self.instance, &self.manager, ) .await? @@ -116,7 +119,7 @@ where &scope.max, &mut version_edits, &mut delete_gens, - &guard.record_instance, + &self.instance, &self.manager, parquet_lru, ) diff --git a/src/lib.rs b/src/lib.rs index dbbe59c..1f18e46 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -184,6 +184,7 @@ where lock_map: LockMap, manager: Arc, parquet_lru: ParquetLru, + instance: Arc, _p: PhantomData, } @@ -241,6 +242,7 @@ where instance: RecordInstance, lru_cache: ParquetLru, ) -> Result> { + let instance = Arc::new(instance); let manager = Arc::new(StoreManager::new( option.base_fs.clone(), option.level_paths.clone(), @@ -263,13 +265,14 @@ where let version_set = VersionSet::new(clean_sender, option.clone(), manager.clone()).await?; let schema = Arc::new(RwLock::new( - Schema::new(option.clone(), task_tx, &version_set, instance, &manager).await?, + Schema::new(option.clone(), task_tx, &version_set, &manager).await?, )); let mut compactor = Compactor::::new( schema.clone(), option.clone(), version_set.clone(), manager.clone(), + instance.clone(), ); executor.spawn(async move { @@ -310,6 +313,7 @@ where lock_map: Arc::new(Default::default()), manager, parquet_lru: lru_cache, + instance, _p: Default::default(), }) } @@ -323,6 +327,7 @@ where Snapshot::new( self.schema.read().await, self.version_set.current().await, + self.instance.clone(), self.manager.clone(), self.parquet_lru.clone(), ) @@ -377,6 +382,7 @@ where .await .get( &*self.version_set.current().await, + &self.instance, &self.manager, key, self.version_set.load_ts(), @@ -405,6 +411,7 @@ where let mut scan = Scan::new( &schema, &self.manager, + &self.instance, range, self.version_set.load_ts(), &*current, @@ -412,8 +419,10 @@ where self.parquet_lru.clone(), ).take().await?; - while let Some(record) = scan.next().await { - yield Ok(f(TransactionEntry::Stream(record?))) + while let Some(record) = scan.next().await.transpose()? { + if record.value().is_some() { + yield Ok(f(TransactionEntry::Stream(record))) + } } } } @@ -462,6 +471,10 @@ where self.schema.write().await.flush_wal().await?; Ok(()) } + + pub fn instance(&self) -> &RecordInstance { + self.instance.as_ref() + } } pub(crate) struct Schema @@ -473,7 +486,6 @@ where compaction_tx: Sender, recover_wal_ids: Option>, trigger: Arc + Send + Sync>>, - record_instance: RecordInstance, } impl Schema @@ -484,7 +496,6 @@ where option: Arc>, compaction_tx: Sender, version_set: &VersionSet, - record_instance: RecordInstance, manager: &StoreManager, ) -> Result> { let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); @@ -494,7 +505,6 @@ where compaction_tx, recover_wal_ids: None, trigger, - record_instance, }; let base_fs = manager.base_fs(); @@ -592,13 +602,14 @@ where async fn get<'get>( &'get self, version: &'get Version, + record_instance: &'get RecordInstance, manager: &StoreManager, key: &'get R::Key, ts: Timestamp, projection: Projection, parquet_lru: ParquetLru, ) -> Result>, DbError> { - let primary_key_index = self.record_instance.primary_key_index::(); + let primary_key_index = record_instance.primary_key_index::(); let projection = match projection { Projection::All => ProjectionMask::all(), @@ -610,7 +621,7 @@ where fixed_projection.dedup(); ProjectionMask::roots( - &arrow_to_parquet_schema(&self.record_instance.arrow_schema::()).unwrap(), + &arrow_to_parquet_schema(&record_instance.arrow_schema::()).unwrap(), fixed_projection, ) } @@ -663,6 +674,7 @@ where { schema: &'scan Schema, manager: &'scan StoreManager, + instance: &'scan RecordInstance, lower: Bound<&'range R::Key>, upper: Bound<&'range R::Key>, ts: Timestamp, @@ -685,6 +697,7 @@ where fn new( schema: &'scan Schema, manager: &'scan StoreManager, + instance: &'scan RecordInstance, (lower, upper): (Bound<&'range R::Key>, Bound<&'range R::Key>), ts: Timestamp, version: &'scan Version, @@ -696,6 +709,7 @@ where Self { schema, manager, + instance, lower, upper, ts, @@ -722,13 +736,13 @@ where for p in &mut projection { *p += 2; } - let primary_key_index = self.schema.record_instance.primary_key_index::(); + let primary_key_index = self.instance.primary_key_index::(); let mut fixed_projection = vec![0, 1, primary_key_index]; fixed_projection.append(&mut projection); fixed_projection.dedup(); let mask = ProjectionMask::roots( - &arrow_to_parquet_schema(&self.schema.record_instance.arrow_schema::()).unwrap(), + &arrow_to_parquet_schema(&self.instance.arrow_schema::()).unwrap(), fixed_projection.clone(), ); @@ -842,7 +856,7 @@ where batch_size, merge_stream, self.projection_indices, - &self.schema.record_instance, + self.instance, )) } } @@ -1279,7 +1293,6 @@ pub(crate) mod tests { compaction_tx, recover_wal_ids: None, trigger, - record_instance: RecordInstance::Normal, }, compaction_rx, )) @@ -1290,6 +1303,7 @@ pub(crate) mod tests { compaction_rx: Receiver, executor: E, schema: crate::Schema, + instance: Arc, version: Version, manager: Arc, ) -> Result, DbError> @@ -1315,6 +1329,7 @@ pub(crate) mod tests { option.clone(), version_set.clone(), manager.clone(), + instance.clone(), ); executor.spawn(async move { @@ -1355,6 +1370,7 @@ pub(crate) mod tests { lock_map: Arc::new(Default::default()), manager, parquet_lru: Arc::new(NoCache::default()), + instance, _p: Default::default(), }) } @@ -1656,7 +1672,6 @@ pub(crate) mod tests { compaction_tx: task_tx.clone(), recover_wal_ids: None, trigger, - record_instance: RecordInstance::Normal, }; for (i, item) in test_items().into_iter().enumerate() { @@ -1723,7 +1738,6 @@ pub(crate) mod tests { compaction_tx: task_tx.clone(), recover_wal_ids: None, trigger, - record_instance: RecordInstance::Normal, }; for item in test_dyn_items().into_iter() { diff --git a/src/record/mod.rs b/src/record/mod.rs index 4712fa5..f51fb0a 100644 --- a/src/record/mod.rs +++ b/src/record/mod.rs @@ -19,14 +19,14 @@ use crate::{ }; #[allow(unused)] -pub(crate) enum RecordInstance { +pub enum RecordInstance { Normal, Runtime(DynRecord), } #[allow(unused)] impl RecordInstance { - pub(crate) fn primary_key_index(&self) -> usize + pub fn primary_key_index(&self) -> usize where R: Record, { @@ -36,7 +36,7 @@ impl RecordInstance { } } - pub(crate) fn arrow_schema(&self) -> Arc + pub fn arrow_schema(&self) -> Arc where R: Record, { @@ -45,6 +45,13 @@ impl RecordInstance { RecordInstance::Runtime(record) => record.arrow_schema(), } } + + pub fn dyn_columns(&self) -> &[Column] { + match self { + RecordInstance::Runtime(record) => record.columns(), + RecordInstance::Normal => &[], + } + } } pub trait Record: 'static + Sized + Decode + Debug + Send + Sync { diff --git a/src/record/runtime/record.rs b/src/record/runtime/record.rs index 3c11eb6..3a45932 100644 --- a/src/record/runtime/record.rs +++ b/src/record/runtime/record.rs @@ -49,6 +49,14 @@ impl DynRecord { ); Arc::new(Schema::new_with_metadata(fields, metadata)) } + + pub fn columns(&self) -> &[Column] { + self.columns.as_slice() + } + + pub fn primary_column(&self) -> &Column { + &self.columns[self.primary_index] + } } impl DynRecord { diff --git a/src/serdes/bound.rs b/src/serdes/bound.rs new file mode 100644 index 0000000..009318d --- /dev/null +++ b/src/serdes/bound.rs @@ -0,0 +1,60 @@ +use std::ops::Bound; + +use fusio::{SeqRead, Write}; + +use crate::serdes::{Decode, Encode}; + +impl Decode for Bound +where + T: Decode, +{ + type Error = T::Error; + + async fn decode(reader: &mut R) -> Result + where + R: SeqRead, + { + Ok(match u8::decode(reader).await? { + 0 => Bound::Included(T::decode(reader).await?), + 1 => Bound::Excluded(T::decode(reader).await?), + 2 => Bound::Unbounded, + _ => unreachable!(), + }) + } +} + +impl Encode for Bound +where + T: Encode + Send + Sync, +{ + type Error = T::Error; + + async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> + where + W: Write, + { + match self { + Bound::Included(value) => { + 0u8.encode(writer).await?; + value.encode(writer).await?; + } + Bound::Excluded(value) => { + 1u8.encode(writer).await?; + value.encode(writer).await?; + } + Bound::Unbounded => { + 2u8.encode(writer).await?; + } + } + + Ok(()) + } + + fn size(&self) -> usize { + size_of::() + + match self { + Bound::Included(value) | Bound::Excluded(value) => value.size(), + Bound::Unbounded => 0, + } + } +} diff --git a/src/serdes/mod.rs b/src/serdes/mod.rs index 6962ae5..2ca3307 100644 --- a/src/serdes/mod.rs +++ b/src/serdes/mod.rs @@ -1,11 +1,12 @@ mod arc; mod boolean; +mod bound; #[cfg(feature = "bytes")] mod bytes; -mod list; mod num; pub(crate) mod option; mod string; +mod vec; use std::future::Future; diff --git a/src/serdes/list.rs b/src/serdes/vec.rs similarity index 61% rename from src/serdes/list.rs rename to src/serdes/vec.rs index 039cf82..9fde76e 100644 --- a/src/serdes/list.rs +++ b/src/serdes/vec.rs @@ -1,40 +1,47 @@ use fusio::{SeqRead, Write}; -use super::{Decode, Encode}; +use crate::serdes::{Decode, Encode}; -impl Decode for Vec { - type Error = fusio::Error; +impl Decode for Vec +where + T: Decode, +{ + type Error = T::Error; async fn decode(reader: &mut R) -> Result where R: SeqRead, { - let len = u32::decode(reader).await?; - let (result, buf) = reader - .read_exact(vec![0u8; len as usize * size_of::()]) - .await; - result?; + let len = u32::decode(reader).await? as usize; + let mut items = Vec::with_capacity(len); - Ok(buf) + for _ in 0..len { + items.push(T::decode(reader).await?); + } + Ok(items) } } -impl Encode for Vec { - type Error = fusio::Error; +impl Encode for Vec +where + T: Encode + Send + Sync, +{ + type Error = T::Error; async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> where W: Write, { (self.len() as u32).encode(writer).await?; - let (result, _) = writer.write_all(self.as_slice()).await; - result?; + for item in self { + item.encode(writer).await?; + } Ok(()) } fn size(&self) -> usize { - size_of::() + size_of::() * self.len() + self.iter().map(|item| item.size()).sum::() + size_of::() } } diff --git a/src/snapshot.rs b/src/snapshot.rs index c548d01..51add6c 100644 --- a/src/snapshot.rs +++ b/src/snapshot.rs @@ -5,7 +5,7 @@ use parquet::arrow::ProjectionMask; use crate::{ fs::manager::StoreManager, - record::Record, + record::{Record, RecordInstance}, stream, stream::ScanStream, timestamp::Timestamp, @@ -20,6 +20,7 @@ where ts: Timestamp, share: RwLockReadGuard<'s, Schema>, version: VersionRef, + instance: Arc, manager: Arc, parquet_lru: ParquetLru, } @@ -37,6 +38,7 @@ where .share .get( &self.version, + &self.instance, &self.manager, key, self.ts, @@ -60,6 +62,7 @@ where Scan::new( &self.share, &self.manager, + &self.instance, range, self.ts, &self.version, @@ -71,6 +74,7 @@ where pub(crate) fn new( share: RwLockReadGuard<'s, Schema>, version: VersionRef, + instance: Arc, manager: Arc, parquet_lru: ParquetLru, ) -> Self { @@ -78,6 +82,7 @@ where ts: version.load_ts(), share, version, + instance, manager, parquet_lru, } @@ -105,6 +110,7 @@ where Scan::new( &self.share, &self.manager, + &self.instance, range, self.ts, &self.version, @@ -127,6 +133,7 @@ mod tests { compaction::tests::build_version, executor::tokio::TokioExecutor, fs::manager::StoreManager, + record::RecordInstance, tests::{build_db, build_schema}, version::TransactionTs, DbOption, @@ -160,6 +167,7 @@ mod tests { compaction_rx, TokioExecutor::new(), schema, + Arc::new(RecordInstance::Normal), version, manager, ) diff --git a/src/transaction.rs b/src/transaction.rs index 0293d76..c345032 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -247,7 +247,7 @@ mod tests { fs::manager::StoreManager, record::{ runtime::{Column, Datatype, DynRecord}, - ColumnDesc, + ColumnDesc, RecordInstance, }, tests::{build_db, build_schema, Test}, transaction::CommitError, @@ -319,6 +319,7 @@ mod tests { compaction_rx, TokioExecutor::new(), schema, + Arc::new(RecordInstance::Normal), version, manager, ) @@ -480,6 +481,7 @@ mod tests { compaction_rx, TokioExecutor::new(), schema, + Arc::new(RecordInstance::Normal), version, manager, ) @@ -575,6 +577,7 @@ mod tests { compaction_rx, TokioExecutor::new(), schema, + Arc::new(RecordInstance::Normal), version, manager, ) @@ -751,6 +754,7 @@ mod tests { compaction_rx, TokioExecutor::new(), schema, + Arc::new(RecordInstance::Normal), version, manager, ) diff --git a/tonbo_net_client/Cargo.toml b/tonbo_net_client/Cargo.toml new file mode 100644 index 0000000..8d935ea --- /dev/null +++ b/tonbo_net_client/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "tonbo_net_client" +version = "0.1.0" +edition = "2021" + +[features] +default = ["transport", "tonbo/tokio"] +transport = ["tonic/transport", "tonic-build/transport"] +wasm = ["dep:tonic-web-wasm-client", "tonbo/wasm"] + +[dependencies] +async-stream = "0.3" +fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "88c6134948d05bef33598a969e9292ad38a8db4b", package = "fusio", version = "0.3.3", features = [ + "dyn", + "fs", +] } +futures-core = "0.3" +futures-util = "0.3" +prost = { version = "0.13" } +thiserror = "2" +tonbo = { path = "../", package = "tonbo", default-features = false} +tonic = { version = "0.12", default-features = false, features = [ + "prost", + "codegen", +] } +tonic-web-wasm-client = {version = "0.6", optional = true} + + +[build-dependencies] +tonic-build = { version = "0.12", default-features = false, features = [ + "prost", +] } + diff --git a/tonbo_net_client/build.rs b/tonbo_net_client/build.rs new file mode 100644 index 0000000..095e35b --- /dev/null +++ b/tonbo_net_client/build.rs @@ -0,0 +1,6 @@ +fn main() -> Result<(), Box> { + tonic_build::configure() + .protoc_arg("--experimental_allow_proto3_optional") + .compile_protos(&["src/proto/tonbo.proto"], &["src/proto"])?; + Ok(()) +} diff --git a/tonbo_net_client/src/client.rs b/tonbo_net_client/src/client.rs new file mode 100644 index 0000000..13188bc --- /dev/null +++ b/tonbo_net_client/src/client.rs @@ -0,0 +1,197 @@ +use std::{io::Cursor, ops::Bound}; + +use async_stream::stream; +use futures_core::Stream; +use futures_util::StreamExt; +use tonbo::{ + record::{Column, Datatype, DynRecord, Record}, + serdes::{Decode, Encode}, +}; +use tonic::Request; + +use crate::{ + proto::{ + tonbo_rpc_client::TonboRpcClient, ColumnDesc, CreateTableReq, FlushReq, GetReq, InsertReq, + RemoveReq, ScanReq, + }, + ClientError, +}; + +pub struct TonboClient { + #[cfg(not(feature = "wasm"))] + conn: TonboRpcClient, + #[cfg(feature = "wasm")] + conn: TonboRpcClient, + table_name: String, + descs: Vec, + pk_index: usize, +} + +impl TonboClient { + pub async fn connect( + addr: String, + table_name: String, + descs: Vec, + pk_index: usize, + ) -> Result { + #[cfg(not(feature = "wasm"))] + let mut conn = TonboRpcClient::connect(addr).await?; + #[cfg(feature = "wasm")] + let mut conn = { + let client = tonic_web_wasm_client::Client::new(addr); + TonboRpcClient::new(client) + }; + let desc = descs + .iter() + .map(|desc| { + let ty: crate::proto::Datatype = desc.datatype.into(); + ColumnDesc { + name: desc.name.clone(), + ty: ty as i32, + is_nullable: desc.is_nullable, + } + }) + .collect::>(); + conn.create_table(CreateTableReq { + table_name: table_name.clone(), + desc, + primary_key_index: pk_index as u32, + }) + .await?; + + Ok(TonboClient { + conn, + table_name, + descs, + pk_index, + }) + } + + pub async fn get(&mut self, column: Column) -> Result, ClientError> { + let mut bytes = Vec::new(); + + column.encode(&mut Cursor::new(&mut bytes)).await?; + let resp = self + .conn + .get(Request::new(GetReq { + table_name: self.table_name.clone(), + key: bytes, + })) + .await?; + + let Some(mut value) = resp.into_inner().record else { + return Ok(None); + }; + Ok(Some(DynRecord::decode(&mut Cursor::new(&mut value)).await?)) + } + + pub async fn scan( + &mut self, + min: Bound, + max: Bound, + ) -> Result>, ClientError> { + let mut min_bytes = Vec::new(); + let mut max_bytes = Vec::new(); + min.encode(&mut Cursor::new(&mut min_bytes)).await?; + max.encode(&mut Cursor::new(&mut max_bytes)).await?; + + let resp = self + .conn + .scan(Request::new(ScanReq { + table_name: self.table_name.clone(), + min: min_bytes, + max: max_bytes, + })) + .await?; + + Ok(Box::pin(stream! { + let mut stream = resp.into_inner(); + + while let Some(result) = stream.next().await { + let mut record = result?; + yield Ok(DynRecord::decode(&mut Cursor::new(&mut record.record)).await?) + } + })) + } + + pub async fn insert(&mut self, record: DynRecord) -> Result<(), ClientError> { + let mut bytes = Vec::new(); + + record + .as_record_ref() + .encode(&mut Cursor::new(&mut bytes)) + .await?; + + let _ = self + .conn + .insert(Request::new(InsertReq { + table_name: self.table_name.clone(), + record: bytes, + })) + .await?; + + Ok(()) + } + + pub async fn remove(&mut self, column: Column) -> Result<(), ClientError> { + let mut bytes = Vec::new(); + + column.encode(&mut Cursor::new(&mut bytes)).await?; + let _ = self + .conn + .remove(Request::new(RemoveReq { + table_name: self.table_name.clone(), + key: bytes, + })) + .await?; + + Ok(()) + } + + pub async fn flush(&mut self) -> Result<(), ClientError> { + let _ = self + .conn + .flush(Request::new(FlushReq { + table_name: self.table_name.clone(), + })) + .await?; + + Ok(()) + } +} + +impl Into for Datatype { + fn into(self) -> crate::proto::Datatype { + match self { + Datatype::UInt8 => crate::proto::Datatype::Uint8, + Datatype::UInt16 => crate::proto::Datatype::Uint16, + Datatype::UInt32 => crate::proto::Datatype::Uint32, + Datatype::UInt64 => crate::proto::Datatype::Uint64, + Datatype::Int8 => crate::proto::Datatype::Int8, + Datatype::Int16 => crate::proto::Datatype::Int16, + Datatype::Int32 => crate::proto::Datatype::Int32, + Datatype::Int64 => crate::proto::Datatype::Int64, + Datatype::String => crate::proto::Datatype::String, + Datatype::Boolean => crate::proto::Datatype::Boolean, + Datatype::Bytes => crate::proto::Datatype::Bytes, + } + } +} + +impl From for Datatype { + fn from(value: crate::proto::Datatype) -> Self { + match value { + crate::proto::Datatype::Uint8 => Datatype::UInt8, + crate::proto::Datatype::Uint16 => Datatype::UInt16, + crate::proto::Datatype::Uint32 => Datatype::UInt32, + crate::proto::Datatype::Uint64 => Datatype::UInt64, + crate::proto::Datatype::Int8 => Datatype::Int8, + crate::proto::Datatype::Int16 => Datatype::Int16, + crate::proto::Datatype::Int32 => Datatype::Int32, + crate::proto::Datatype::Int64 => Datatype::Int64, + crate::proto::Datatype::String => Datatype::String, + crate::proto::Datatype::Boolean => Datatype::Boolean, + crate::proto::Datatype::Bytes => Datatype::Bytes, + } + } +} diff --git a/tonbo_net_client/src/lib.rs b/tonbo_net_client/src/lib.rs new file mode 100644 index 0000000..3597b17 --- /dev/null +++ b/tonbo_net_client/src/lib.rs @@ -0,0 +1,27 @@ +use thiserror::Error; +use tonbo::record::{RecordDecodeError, RecordEncodeError}; + +pub mod client; +pub mod proto; + +#[derive(Debug, Error)] +pub enum ClientError { + #[error("Failed to parse addr: {0}")] + AddrParseError(#[from] std::net::AddrParseError), + + #[cfg(not(feature = "wasm"))] + #[error("Failed to connect to server: {0}")] + TonicTransportErr(#[from] tonic::transport::Error), + + #[error("Failed to call server: {0}")] + TonicFailureStatus(#[from] tonic::Status), + + #[error("Failed to encode record: {0}")] + RecordEncode(#[from] RecordEncodeError), + + #[error("Failed to decode record: {0}")] + RecordDecode(#[from] RecordDecodeError), + + #[error("fusio error: {0}")] + Fusio(#[from] fusio::Error), +} diff --git a/tonbo_net_client/src/proto/mod.rs b/tonbo_net_client/src/proto/mod.rs new file mode 100644 index 0000000..f674aab --- /dev/null +++ b/tonbo_net_client/src/proto/mod.rs @@ -0,0 +1 @@ +tonic::include_proto!("tonbo"); diff --git a/tonbo_net_client/src/proto/tonbo.proto b/tonbo_net_client/src/proto/tonbo.proto new file mode 100644 index 0000000..4c04f48 --- /dev/null +++ b/tonbo_net_client/src/proto/tonbo.proto @@ -0,0 +1,73 @@ +syntax = "proto3"; + +package tonbo; + +message GetReq { + string table_name = 1; + bytes key = 2; +} + +message GetResp { + optional bytes record = 1; +} + +message ScanReq { + string table_name = 1; + bytes min = 2; + bytes max = 3; +} + +message ScanResp { + bytes record = 1; +} + +message InsertReq { + string table_name = 1; + bytes record = 2; +} + +message RemoveReq { + string table_name = 1; + bytes key = 2; +} + +message FlushReq { + string table_name = 1; +} + +message CreateTableReq { + string table_name = 1; + repeated ColumnDesc desc = 2; + uint32 primary_key_index = 3; +} + +enum Datatype { + UINT8 = 0; + UINT16 = 1; + UINT32 = 2; + UINT64 = 3; + INT8 = 4; + INT16 = 5; + INT32 = 6; + INT64 = 7; + STRING = 8; + BOOLEAN = 9; + BYTES = 10; +} + +message ColumnDesc { + string name = 1; + Datatype ty = 2; + bool is_nullable = 3; +} + +message Empty {} + +service TonboRpc { + rpc CreateTable(CreateTableReq) returns (Empty) {} + rpc Get (GetReq) returns (GetResp) {} + rpc Scan (ScanReq) returns (stream ScanResp) {} + rpc Insert (InsertReq) returns (Empty) {} + rpc Remove (RemoveReq) returns (Empty) {} + rpc Flush (FlushReq) returns (Empty) {} +} diff --git a/tonbo_net_server/Cargo.toml b/tonbo_net_server/Cargo.toml new file mode 100644 index 0000000..1f92b07 --- /dev/null +++ b/tonbo_net_server/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "tonbo_net_server" +version = "0.1.0" +edition = "2021" + +[dependencies] +async-stream = "0.3" +fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "88c6134948d05bef33598a969e9292ad38a8db4b", package = "fusio", version = "0.3.3", features = [ + "dyn", + "fs", +] } +futures-core = "0.3" +futures-util = "0.3" +http = "1.2.0" +prost = { version = "0.13" } +thiserror = "2" +tonbo = { path = "../", package = "tonbo" } +tonic = { version = "0.12" } +tower-http = { version = "0.6.2", default-features = false, features = [ + "cors", +] } +tonic-web = "0.12.3" +ttl_cache = "0.5" +itertools = "0.13.0" + +[dev-dependencies] +tempfile = "3" +tokio = { version = "1", features = ["full"] } + +[build-dependencies] +tonic-build = {version = "0.12"} diff --git a/tonbo_net_server/build.rs b/tonbo_net_server/build.rs new file mode 100644 index 0000000..095e35b --- /dev/null +++ b/tonbo_net_server/build.rs @@ -0,0 +1,6 @@ +fn main() -> Result<(), Box> { + tonic_build::configure() + .protoc_arg("--experimental_allow_proto3_optional") + .compile_protos(&["src/proto/tonbo.proto"], &["src/proto"])?; + Ok(()) +} diff --git a/tonbo_net_server/src/lib.rs b/tonbo_net_server/src/lib.rs new file mode 100644 index 0000000..413795e --- /dev/null +++ b/tonbo_net_server/src/lib.rs @@ -0,0 +1,26 @@ +use thiserror::Error; +use tonbo::record::{RecordDecodeError, RecordEncodeError}; + +pub mod proto; +pub mod server; + +#[derive(Debug, Error)] +pub enum ServerError { + #[error("Failed to parse addr: {0}")] + AddrParseError(#[from] std::net::AddrParseError), + + #[error("Failed to connect to server: {0}")] + TonicTransportErr(#[from] tonic::transport::Error), + + #[error("Failed to call server: {0}")] + TonicFailureStatus(#[from] tonic::Status), + + #[error("Failed to encode record: {0}")] + RecordEncode(#[from] RecordEncodeError), + + #[error("Failed to decode record: {0}")] + RecordDecode(#[from] RecordDecodeError), + + #[error("fusio error: {0}")] + Fusio(#[from] fusio::Error), +} diff --git a/tonbo_net_server/src/proto/mod.rs b/tonbo_net_server/src/proto/mod.rs new file mode 100644 index 0000000..f674aab --- /dev/null +++ b/tonbo_net_server/src/proto/mod.rs @@ -0,0 +1 @@ +tonic::include_proto!("tonbo"); diff --git a/tonbo_net_server/src/proto/tonbo.proto b/tonbo_net_server/src/proto/tonbo.proto new file mode 100644 index 0000000..4c04f48 --- /dev/null +++ b/tonbo_net_server/src/proto/tonbo.proto @@ -0,0 +1,73 @@ +syntax = "proto3"; + +package tonbo; + +message GetReq { + string table_name = 1; + bytes key = 2; +} + +message GetResp { + optional bytes record = 1; +} + +message ScanReq { + string table_name = 1; + bytes min = 2; + bytes max = 3; +} + +message ScanResp { + bytes record = 1; +} + +message InsertReq { + string table_name = 1; + bytes record = 2; +} + +message RemoveReq { + string table_name = 1; + bytes key = 2; +} + +message FlushReq { + string table_name = 1; +} + +message CreateTableReq { + string table_name = 1; + repeated ColumnDesc desc = 2; + uint32 primary_key_index = 3; +} + +enum Datatype { + UINT8 = 0; + UINT16 = 1; + UINT32 = 2; + UINT64 = 3; + INT8 = 4; + INT16 = 5; + INT32 = 6; + INT64 = 7; + STRING = 8; + BOOLEAN = 9; + BYTES = 10; +} + +message ColumnDesc { + string name = 1; + Datatype ty = 2; + bool is_nullable = 3; +} + +message Empty {} + +service TonboRpc { + rpc CreateTable(CreateTableReq) returns (Empty) {} + rpc Get (GetReq) returns (GetResp) {} + rpc Scan (ScanReq) returns (stream ScanResp) {} + rpc Insert (InsertReq) returns (Empty) {} + rpc Remove (RemoveReq) returns (Empty) {} + rpc Flush (FlushReq) returns (Empty) {} +} diff --git a/tonbo_net_server/src/server.rs b/tonbo_net_server/src/server.rs new file mode 100644 index 0000000..19c8a91 --- /dev/null +++ b/tonbo_net_server/src/server.rs @@ -0,0 +1,351 @@ +use std::{ + io::Cursor, + ops::Bound, + pin::{pin, Pin}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::Duration, +}; + +use ::http::HeaderName; +use async_stream::stream; +use fusio::path::Path; +use futures_core::Stream; +use futures_util::StreamExt; +use tokio::sync::RwLock; +use tonbo::{ + executor::tokio::TokioExecutor, + record::{Column, Datatype, DynRecord, Record}, + serdes::{Decode, Encode}, + DbOption, DB, +}; +use tonic::{transport::Server, Code, Request, Response, Status}; +use tonic_web::GrpcWebLayer; +use tower_http::cors::{AllowOrigin, CorsLayer}; +use ttl_cache::TtlCache; + +use crate::{ + proto::{ + tonbo_rpc_server::{TonboRpc, TonboRpcServer}, + CreateTableReq, Empty, FlushReq, GetReq, GetResp, InsertReq, RemoveReq, ScanReq, ScanResp, + }, + ServerError, +}; + +const DEFAULT_EXPOSED_HEADERS: [&str; 3] = + ["grpc-status", "grpc-message", "grpc-status-details-bin"]; +const DEFAULT_ALLOW_HEADERS: [&str; 4] = + ["x-grpc-web", "content-type", "x-user-agent", "grpc-timeout"]; + +const DB_DURATION: Duration = Duration::from_secs(20 * 60); +const DB_WRITE_LIMIT: usize = 100 * 1024 * 1024; + +pub async fn service>( + addr: String, + base_path: P, +) -> Result<(), ServerError> { + let service = TonboService { + base_path: base_path.as_ref().to_path_buf(), + inner: RwLock::new(TtlCache::new(256)), + write_limit: Default::default(), + }; + let addr = addr.parse()?; + println!("addr: {}", addr); + + Server::builder() + .accept_http1(true) + .layer( + CorsLayer::new() + .allow_origin(AllowOrigin::mirror_request()) + .expose_headers( + DEFAULT_EXPOSED_HEADERS + .iter() + .cloned() + .map(HeaderName::from_static) + .collect::>(), + ) + .allow_headers( + DEFAULT_ALLOW_HEADERS + .iter() + .cloned() + .map(HeaderName::from_static) + .collect::>(), + ), + ) + .layer(GrpcWebLayer::new()) + .add_service(TonboRpcServer::new(service)) + .serve(addr) + .await?; + Ok(()) +} + +struct TonboService { + base_path: std::path::PathBuf, + inner: RwLock>>>, + write_limit: AtomicUsize, +} + +#[tonic::async_trait] +impl TonboRpc for TonboService { + async fn create_table( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + let mut guard = self.inner.write().await; + + let pk_index = req.primary_key_index as usize; + let mut descs = Vec::with_capacity(req.desc.len()); + + for desc in req.desc { + let datatype = crate::proto::Datatype::try_from(desc.ty) + .map_err(|e| Status::new(Code::Internal, e.to_string()))?; + descs.push(tonbo::record::ColumnDesc::new( + desc.name, + Datatype::from(datatype), + desc.is_nullable, + )); + } + if let Some(db_instance) = guard.get(&req.table_name) { + let instance = db_instance.instance(); + + if instance.primary_key_index::() != req.primary_key_index as usize { + return Err(Status::new( + Code::Internal, + "`primary_key_index` does not match the existing schema".to_string(), + )); + } + // FIXME: why `dyn_columns` returns not ColumnDesc + for (column, column_desc) in instance.dyn_columns().iter().zip(descs.iter()) { + if column.name != column_desc.name + || column.datatype != column_desc.datatype + || column.is_nullable != column_desc.is_nullable + { + return Err(Status::new( + Code::Internal, + "`descs` does not match the existing schema".to_string(), + )); + } + } + } else { + let option = DbOption::with_path( + Path::from_absolute_path(self.base_path.join(&req.table_name)).unwrap(), + descs[pk_index].name.to_string(), + pk_index, + ); + let db: DB = + DB::with_schema(option, TokioExecutor::new(), descs, pk_index) + .await + .unwrap(); + guard.insert(req.table_name, Arc::new(db), DB_DURATION); + } + + Ok(Response::new(Empty {})) + } + + async fn get(&self, request: Request) -> Result, Status> { + let mut req = request.into_inner(); + let guard = self.inner.read().await; + let Some(db_instance) = guard.get(&req.table_name) else { + return Err(Status::new(Code::NotFound, "table not found")); + }; + + let key = Column::decode(&mut Cursor::new(&mut req.key)) + .await + .map_err(|e| Status::new(Code::Internal, e.to_string()))?; + + let tuple = db_instance + .get(&key, |e| Some(e.get().columns)) + .await + .map_err(|e| Status::new(Code::Internal, e.to_string()))?; + let record = if let Some(tuple) = tuple { + let primary_key_index = db_instance.instance().primary_key_index::() - 2; + let mut bytes = Vec::new(); + let mut writer = Cursor::new(&mut bytes); + (tuple.len() as u32) + .encode(&mut writer) + .await + .map_err(|e| Status::new(Code::Internal, e.to_string()))?; + (primary_key_index as u32) + .encode(&mut writer) + .await + .map_err(|e| Status::new(Code::Internal, e.to_string()))?; + + for col in tuple.iter() { + col.encode(&mut writer) + .await + .map_err(|e| Status::new(Code::Internal, e.to_string()))?; + } + Some(bytes) + } else { + None + }; + Ok(Response::new(GetResp { record })) + } + + type ScanStream = Pin> + Send>>; + + async fn scan(&self, request: Request) -> Result, Status> { + let mut req = request.into_inner(); + let guard = self.inner.read().await; + let Some(db_instance) = guard.get(&req.table_name) else { + return Err(Status::new(Code::NotFound, "table not found")); + }; + let primary_key_index = db_instance.instance().primary_key_index::() - 2; + let db = db_instance.clone(); + + let stream = stream! { + let min = Bound::::decode(&mut Cursor::new(&mut req.min)).await + .map_err(|e| Status::new(Code::Internal, e.to_string()))?; + let max = Bound::::decode(&mut Cursor::new(&mut req.max)).await + .map_err(|e| Status::new(Code::Internal, e.to_string()))?; + + let mut stream = pin!(db + .scan((min.as_ref(), max.as_ref()), |e| Some(e.get().columns)) + .await); + while let Some(entry) = stream.next().await { + let Some(columns) = entry.map_err(|e| Status::new(Code::Internal, e.to_string()))? else { continue }; + let mut bytes = Vec::new(); + + let mut writer = Cursor::new(&mut bytes); + (columns.len() as u32) + .encode(&mut writer) + .await + .map_err(|e| Status::new(Code::Internal, e.to_string()))?; + (primary_key_index as u32) + .encode(&mut writer) + .await + .map_err(|e| Status::new(Code::Internal, e.to_string()))?; + + for col in columns.iter() { + col.encode(&mut writer) + .await + .map_err(|e| Status::new(Code::Internal, e.to_string()))?; + } + + yield Ok::(ScanResp { record: bytes }); + } + }; + + Ok(Response::new(Box::pin(stream))) + } + + async fn insert(&self, request: Request) -> Result, Status> { + let mut req = request.into_inner(); + let record = DynRecord::decode(&mut Cursor::new(&mut req.record)) + .await + .map_err(|e| Status::new(Code::Internal, e.to_string()))?; + if self.write_limit.load(Ordering::Relaxed) > DB_WRITE_LIMIT { + return Err(Status::new( + Code::Internal, + "write limit exceeded".to_string(), + )); + } + let guard = self.inner.read().await; + let Some(db_instance) = guard.get(&req.table_name) else { + return Err(Status::new(Code::NotFound, "table not found")); + }; + + let record_size = record.size(); + db_instance + .insert(record) + .await + .map_err(|e| Status::new(Code::Internal, e.to_string()))?; + self.write_limit.fetch_add(record_size, Ordering::SeqCst); + Ok(Response::new(Empty {})) + } + + async fn remove(&self, request: Request) -> Result, Status> { + let mut req = request.into_inner(); + let column = Column::decode(&mut Cursor::new(&mut req.key)) + .await + .map_err(|e| Status::new(Code::Internal, e.to_string()))?; + if self.write_limit.load(Ordering::Relaxed) > DB_WRITE_LIMIT { + return Err(Status::new( + Code::Internal, + "write limit exceeded".to_string(), + )); + } + let guard = self.inner.read().await; + + let Some(db_instance) = guard.get(&req.table_name) else { + return Err(Status::new(Code::NotFound, "table not found")); + }; + + let column_size = column.size(); + db_instance + .remove(column) + .await + .map_err(|e| Status::new(Code::Internal, e.to_string()))?; + self.write_limit.fetch_add(column_size, Ordering::SeqCst); + Ok(Response::new(Empty {})) + } + + async fn flush(&self, request: Request) -> Result, Status> { + let req = request.into_inner(); + let guard = self.inner.read().await; + let Some(db_instance) = guard.get(&req.table_name) else { + return Err(Status::new(Code::NotFound, "table not found")); + }; + + db_instance + .flush() + .await + .map_err(|e| Status::new(Code::Internal, e.to_string()))?; + Ok(Response::new(Empty {})) + } +} + +impl Into for Datatype { + fn into(self) -> crate::proto::Datatype { + match self { + Datatype::UInt8 => crate::proto::Datatype::Uint8, + Datatype::UInt16 => crate::proto::Datatype::Uint16, + Datatype::UInt32 => crate::proto::Datatype::Uint32, + Datatype::UInt64 => crate::proto::Datatype::Uint64, + Datatype::Int8 => crate::proto::Datatype::Int8, + Datatype::Int16 => crate::proto::Datatype::Int16, + Datatype::Int32 => crate::proto::Datatype::Int32, + Datatype::Int64 => crate::proto::Datatype::Int64, + Datatype::String => crate::proto::Datatype::String, + Datatype::Boolean => crate::proto::Datatype::Boolean, + Datatype::Bytes => crate::proto::Datatype::Bytes, + } + } +} + +impl From for Datatype { + fn from(value: crate::proto::Datatype) -> Self { + match value { + crate::proto::Datatype::Uint8 => Datatype::UInt8, + crate::proto::Datatype::Uint16 => Datatype::UInt16, + crate::proto::Datatype::Uint32 => Datatype::UInt32, + crate::proto::Datatype::Uint64 => Datatype::UInt64, + crate::proto::Datatype::Int8 => Datatype::Int8, + crate::proto::Datatype::Int16 => Datatype::Int16, + crate::proto::Datatype::Int32 => Datatype::Int32, + crate::proto::Datatype::Int64 => Datatype::Int64, + crate::proto::Datatype::String => Datatype::String, + crate::proto::Datatype::Boolean => Datatype::Boolean, + crate::proto::Datatype::Bytes => Datatype::Bytes, + } + } +} + +#[cfg(test)] +mod tests { + use tempfile::TempDir; + + use crate::server::service; + + #[tokio::test] + async fn test_service() { + let temp_dir = TempDir::new().unwrap(); + + service("[::1]:50051".to_string(), temp_dir.path()) + .await + .unwrap() + } +}