diff --git a/tonbo_net_client/src/client.rs b/tonbo_net_client/src/client.rs index 9d5dd19..2419a3a 100644 --- a/tonbo_net_client/src/client.rs +++ b/tonbo_net_client/src/client.rs @@ -11,7 +11,8 @@ use tonic::Request; use crate::{ proto::{ - tonbo_rpc_client::TonboRpcClient, ColumnDesc, Empty, GetReq, InsertReq, RemoveReq, ScanReq, + tonbo_rpc_client::TonboRpcClient, ColumnDesc, CreateTableReq, FlushReq, GetReq, InsertReq, + RemoveReq, ScanReq, }, ClientError, }; @@ -40,26 +41,48 @@ pub struct TonboClient { conn: TonboRpcClient, #[cfg(feature = "wasm")] conn: TonboRpcClient, + table_name: String, + descs: Vec, + pk_index: usize, } impl TonboClient { - pub async fn connect(addr: String) -> Result { + pub async fn connect( + addr: String, + table_name: String, + descs: Vec, + pk_index: usize, + ) -> Result { #[cfg(not(feature = "wasm"))] - let conn = TonboRpcClient::connect(addr).await?; + let mut conn = TonboRpcClient::connect(addr).await?; #[cfg(feature = "wasm")] - let conn = { + let mut conn = { let client = tonic_web_wasm_client::Client::new(addr); TonboRpcClient::new(client) }; - Ok(TonboClient { conn }) - } - - pub async fn schema(&mut self) -> Result { - let resp = self.conn.schema(Request::new(Empty {})).await?.into_inner(); + let desc = descs + .iter() + .map(|desc| { + let ty: crate::proto::Datatype = desc.datatype.into(); + ColumnDesc { + name: desc.name.clone(), + ty: ty as i32, + is_nullable: desc.is_nullable, + } + }) + .collect::>(); + conn.create_table(CreateTableReq { + table_name: table_name.clone(), + desc, + primary_key_index: pk_index as u32, + }) + .await?; - Ok(TonboSchema { - desc: resp.desc, - primary_key_index: resp.primary_key_index as usize, + Ok(TonboClient { + conn, + table_name, + descs, + pk_index, }) } @@ -67,7 +90,13 @@ impl TonboClient { let mut bytes = Vec::new(); column.encode(&mut Cursor::new(&mut bytes)).await?; - let resp = self.conn.get(Request::new(GetReq { key: bytes })).await?; + let resp = self + .conn + .get(Request::new(GetReq { + table_name: self.table_name.clone(), + key: bytes, + })) + .await?; let Some(mut value) = resp.into_inner().record else { return Ok(None); @@ -88,6 +117,7 @@ impl TonboClient { let resp = self .conn .scan(Request::new(ScanReq { + table_name: self.table_name.clone(), min: min_bytes, max: max_bytes, })) @@ -113,7 +143,10 @@ impl TonboClient { let _ = self .conn - .insert(Request::new(InsertReq { record: bytes })) + .insert(Request::new(InsertReq { + table_name: self.table_name.clone(), + record: bytes, + })) .await?; Ok(()) @@ -125,19 +158,45 @@ impl TonboClient { column.encode(&mut Cursor::new(&mut bytes)).await?; let _ = self .conn - .remove(Request::new(RemoveReq { key: bytes })) + .remove(Request::new(RemoveReq { + table_name: self.table_name.clone(), + key: bytes, + })) .await?; Ok(()) } pub async fn flush(&mut self) -> Result<(), ClientError> { - let _ = self.conn.flush(Request::new(Empty {})).await?; + let _ = self + .conn + .flush(Request::new(FlushReq { + table_name: self.table_name.clone(), + })) + .await?; Ok(()) } } +impl Into for Datatype { + fn into(self) -> crate::proto::Datatype { + match self { + Datatype::UInt8 => crate::proto::Datatype::Uint8, + Datatype::UInt16 => crate::proto::Datatype::Uint16, + Datatype::UInt32 => crate::proto::Datatype::Uint32, + Datatype::UInt64 => crate::proto::Datatype::Uint64, + Datatype::Int8 => crate::proto::Datatype::Int8, + Datatype::Int16 => crate::proto::Datatype::Int16, + Datatype::Int32 => crate::proto::Datatype::Int32, + Datatype::Int64 => crate::proto::Datatype::Int64, + Datatype::String => crate::proto::Datatype::String, + Datatype::Boolean => crate::proto::Datatype::Boolean, + Datatype::Bytes => crate::proto::Datatype::Bytes, + } + } +} + impl From for Datatype { fn from(value: crate::proto::Datatype) -> Self { match value { diff --git a/tonbo_net_client/src/proto/tonbo.proto b/tonbo_net_client/src/proto/tonbo.proto index 0a68b7c..4c04f48 100644 --- a/tonbo_net_client/src/proto/tonbo.proto +++ b/tonbo_net_client/src/proto/tonbo.proto @@ -3,7 +3,8 @@ syntax = "proto3"; package tonbo; message GetReq { - bytes key = 1; + string table_name = 1; + bytes key = 2; } message GetResp { @@ -11,8 +12,9 @@ message GetResp { } message ScanReq { - bytes min = 1; - bytes max = 2; + string table_name = 1; + bytes min = 2; + bytes max = 3; } message ScanResp { @@ -20,16 +22,23 @@ message ScanResp { } message InsertReq { - bytes record = 1; + string table_name = 1; + bytes record = 2; } message RemoveReq { - bytes key = 1; + string table_name = 1; + bytes key = 2; +} + +message FlushReq { + string table_name = 1; } -message SchemaResp { - repeated ColumnDesc desc = 1; - uint32 primary_key_index = 2; +message CreateTableReq { + string table_name = 1; + repeated ColumnDesc desc = 2; + uint32 primary_key_index = 3; } enum Datatype { @@ -55,10 +64,10 @@ message ColumnDesc { message Empty {} service TonboRpc { - rpc Schema(Empty) returns (SchemaResp) {} + rpc CreateTable(CreateTableReq) returns (Empty) {} rpc Get (GetReq) returns (GetResp) {} rpc Scan (ScanReq) returns (stream ScanResp) {} rpc Insert (InsertReq) returns (Empty) {} rpc Remove (RemoveReq) returns (Empty) {} - rpc Flush (Empty) returns (Empty) {} + rpc Flush (FlushReq) returns (Empty) {} } diff --git a/tonbo_net_server/Cargo.toml b/tonbo_net_server/Cargo.toml index eece8ee..b12c117 100644 --- a/tonbo_net_server/Cargo.toml +++ b/tonbo_net_server/Cargo.toml @@ -20,6 +20,7 @@ tower-http = { version = "0.6.2", default-features = false, features = [ "cors", ] } tonic-web = "0.12.3" +ttl_cache = "0.5" itertools = "0.13.0" [dev-dependencies] diff --git a/tonbo_net_server/src/proto/tonbo.proto b/tonbo_net_server/src/proto/tonbo.proto index 0a68b7c..4c04f48 100644 --- a/tonbo_net_server/src/proto/tonbo.proto +++ b/tonbo_net_server/src/proto/tonbo.proto @@ -3,7 +3,8 @@ syntax = "proto3"; package tonbo; message GetReq { - bytes key = 1; + string table_name = 1; + bytes key = 2; } message GetResp { @@ -11,8 +12,9 @@ message GetResp { } message ScanReq { - bytes min = 1; - bytes max = 2; + string table_name = 1; + bytes min = 2; + bytes max = 3; } message ScanResp { @@ -20,16 +22,23 @@ message ScanResp { } message InsertReq { - bytes record = 1; + string table_name = 1; + bytes record = 2; } message RemoveReq { - bytes key = 1; + string table_name = 1; + bytes key = 2; +} + +message FlushReq { + string table_name = 1; } -message SchemaResp { - repeated ColumnDesc desc = 1; - uint32 primary_key_index = 2; +message CreateTableReq { + string table_name = 1; + repeated ColumnDesc desc = 2; + uint32 primary_key_index = 3; } enum Datatype { @@ -55,10 +64,10 @@ message ColumnDesc { message Empty {} service TonboRpc { - rpc Schema(Empty) returns (SchemaResp) {} + rpc CreateTable(CreateTableReq) returns (Empty) {} rpc Get (GetReq) returns (GetResp) {} rpc Scan (ScanReq) returns (stream ScanResp) {} rpc Insert (InsertReq) returns (Empty) {} rpc Remove (RemoveReq) returns (Empty) {} - rpc Flush (Empty) returns (Empty) {} + rpc Flush (FlushReq) returns (Empty) {} } diff --git a/tonbo_net_server/src/server.rs b/tonbo_net_server/src/server.rs index f05180b..80bf9bc 100644 --- a/tonbo_net_server/src/server.rs +++ b/tonbo_net_server/src/server.rs @@ -3,27 +3,30 @@ use std::{ ops::Bound, pin::{pin, Pin}, sync::Arc, + time::Duration, }; use ::http::HeaderName; use async_stream::stream; +use fusio::path::Path; use futures_core::Stream; use futures_util::StreamExt; -use itertools::Itertools; +use tokio::sync::RwLock; use tonbo::{ executor::tokio::TokioExecutor, record::{Column, Datatype, DynRecord}, serdes::{Decode, Encode}, - DB, + DbOption, DB, }; use tonic::{transport::Server, Code, Request, Response, Status}; use tonic_web::GrpcWebLayer; use tower_http::cors::{AllowOrigin, CorsLayer}; +use ttl_cache::TtlCache; use crate::{ proto::{ tonbo_rpc_server::{TonboRpc, TonboRpcServer}, - ColumnDesc, Empty, GetReq, GetResp, InsertReq, RemoveReq, ScanReq, ScanResp, SchemaResp, + CreateTableReq, Empty, FlushReq, GetReq, GetResp, InsertReq, RemoveReq, ScanReq, ScanResp, }, ServerError, }; @@ -33,9 +36,15 @@ const DEFAULT_EXPOSED_HEADERS: [&str; 3] = const DEFAULT_ALLOW_HEADERS: [&str; 4] = ["x-grpc-web", "content-type", "x-user-agent", "grpc-timeout"]; -pub async fn service(addr: String, db: DB) -> Result<(), ServerError> { +const DB_DURATION: Duration = Duration::from_secs(20 * 60); + +pub async fn service>( + addr: String, + base_path: P, +) -> Result<(), ServerError> { let service = TonboService { - inner: Arc::new(db), + base_path: base_path.as_ref().to_path_buf(), + inner: RwLock::new(TtlCache::new(256)), }; let addr = addr.parse()?; println!("addr: {}", addr); @@ -67,59 +76,73 @@ pub async fn service(addr: String, db: DB) -> Result<( Ok(()) } -#[derive(Clone)] struct TonboService { - inner: Arc>, -} - -impl TonboService { - fn primary_key_index(&self) -> usize { - self.inner.instance().primary_key_index::() - 2 - } + base_path: std::path::PathBuf, + inner: RwLock>>>, } #[tonic::async_trait] impl TonboRpc for TonboService { - async fn schema(&self, _: Request) -> Result, Status> { - let instance = self.inner.instance(); - let desc = instance - .dyn_columns() - .iter() - .map(|column| { - let ty: crate::proto::Datatype = column.datatype.into(); - ColumnDesc { - name: column.name.clone(), - ty: ty as i32, - is_nullable: column.is_nullable, - } - }) - .collect_vec(); + async fn create_table( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + + let mut guard = self.inner.write().await; - Ok(Response::new(SchemaResp { - desc, - primary_key_index: instance.primary_key_index::() as u32 - 2, - })) + if !guard.contains_key(&req.table_name) { + let pk_index = req.primary_key_index as usize; + let mut descs = Vec::with_capacity(req.desc.len()); + + for desc in req.desc { + let datatype = crate::proto::Datatype::try_from(desc.ty) + .map_err(|e| Status::new(Code::Internal, e.to_string()))?; + descs.push(tonbo::record::ColumnDesc::new( + desc.name, + Datatype::from(datatype), + desc.is_nullable, + )); + } + let option = DbOption::with_path( + Path::from_absolute_path(self.base_path.join(&req.table_name)).unwrap(), + descs[pk_index].name.to_string(), + pk_index, + ); + let db: DB = + DB::with_schema(option, TokioExecutor::new(), descs, pk_index) + .await + .unwrap(); + guard.insert(req.table_name, Arc::new(db), DB_DURATION); + } + + Ok(Response::new(Empty {})) } async fn get(&self, request: Request) -> Result, Status> { let mut req = request.into_inner(); + let guard = self.inner.read().await; + let Some(db_instance) = guard.get(&req.table_name) else { + return Err(Status::new(Code::NotFound, "table not found")); + }; + let key = Column::decode(&mut Cursor::new(&mut req.key)) .await .map_err(|e| Status::new(Code::Internal, e.to_string()))?; - let tuple = self - .inner + let tuple = db_instance .get(&key, |e| Some(e.get().columns)) .await .map_err(|e| Status::new(Code::Internal, e.to_string()))?; let record = if let Some(tuple) = tuple { + let primary_key_index = db_instance.instance().primary_key_index::() - 2; let mut bytes = Vec::new(); let mut writer = Cursor::new(&mut bytes); (tuple.len() as u32) .encode(&mut writer) .await .map_err(|e| Status::new(Code::Internal, e.to_string()))?; - (self.primary_key_index() as u32) + (primary_key_index as u32) .encode(&mut writer) .await .map_err(|e| Status::new(Code::Internal, e.to_string()))?; @@ -140,8 +163,12 @@ impl TonboRpc for TonboService { async fn scan(&self, request: Request) -> Result, Status> { let mut req = request.into_inner(); - let db = self.inner.clone(); - let primary_key_index = self.primary_key_index(); + let guard = self.inner.read().await; + let Some(db_instance) = guard.get(&req.table_name) else { + return Err(Status::new(Code::NotFound, "table not found")); + }; + let primary_key_index = db_instance.instance().primary_key_index::() - 2; + let db = db_instance.clone(); let stream = stream! { let min = Bound::::decode(&mut Cursor::new(&mut req.min)).await @@ -185,7 +212,12 @@ impl TonboRpc for TonboService { .await .map_err(|e| Status::new(Code::Internal, e.to_string()))?; - self.inner + let guard = self.inner.read().await; + let Some(db_instance) = guard.get(&req.table_name) else { + return Err(Status::new(Code::NotFound, "table not found")); + }; + + db_instance .insert(record) .await .map_err(|e| Status::new(Code::Internal, e.to_string()))?; @@ -197,15 +229,26 @@ impl TonboRpc for TonboService { let column = Column::decode(&mut Cursor::new(&mut req.key)) .await .map_err(|e| Status::new(Code::Internal, e.to_string()))?; - self.inner + let guard = self.inner.read().await; + let Some(db_instance) = guard.get(&req.table_name) else { + return Err(Status::new(Code::NotFound, "table not found")); + }; + + db_instance .remove(column) .await .map_err(|e| Status::new(Code::Internal, e.to_string()))?; Ok(Response::new(Empty {})) } - async fn flush(&self, _: Request) -> Result, Status> { - self.inner + async fn flush(&self, request: Request) -> Result, Status> { + let req = request.into_inner(); + let guard = self.inner.read().await; + let Some(db_instance) = guard.get(&req.table_name) else { + return Err(Status::new(Code::NotFound, "table not found")); + }; + + db_instance .flush() .await .map_err(|e| Status::new(Code::Internal, e.to_string()))?; @@ -231,36 +274,36 @@ impl Into for Datatype { } } +impl From for Datatype { + fn from(value: crate::proto::Datatype) -> Self { + match value { + crate::proto::Datatype::Uint8 => Datatype::UInt8, + crate::proto::Datatype::Uint16 => Datatype::UInt16, + crate::proto::Datatype::Uint32 => Datatype::UInt32, + crate::proto::Datatype::Uint64 => Datatype::UInt64, + crate::proto::Datatype::Int8 => Datatype::Int8, + crate::proto::Datatype::Int16 => Datatype::Int16, + crate::proto::Datatype::Int32 => Datatype::Int32, + crate::proto::Datatype::Int64 => Datatype::Int64, + crate::proto::Datatype::String => Datatype::String, + crate::proto::Datatype::Boolean => Datatype::Boolean, + crate::proto::Datatype::Bytes => Datatype::Bytes, + } + } +} + #[cfg(test)] mod tests { - use fusio::path::Path; use tempfile::TempDir; - use tonbo::{ - executor::tokio::TokioExecutor, - record::{ColumnDesc, Datatype, DynRecord}, - DbOption, DB, - }; use crate::server::service; #[tokio::test] async fn test_service() { let temp_dir = TempDir::new().unwrap(); - let desc = vec![ - ColumnDesc::new("id".to_string(), Datatype::Int64, false), - ColumnDesc::new("name".to_string(), Datatype::String, true), - ColumnDesc::new("like".to_string(), Datatype::Int32, true), - ]; - let option2 = DbOption::with_path( - Path::from_filesystem_path(temp_dir.path()).unwrap(), - "id".to_string(), - 0, - ); - let db: DB = - DB::with_schema(option2, TokioExecutor::new(), desc, 0) - .await - .unwrap(); - service("[::1]:50051".to_string(), db).await.unwrap() + service("[::1]:50051".to_string(), temp_dir.path()) + .await + .unwrap() } }