Skip to content

Commit

Permalink
fix: grpc transport data format
Browse files Browse the repository at this point in the history
  • Loading branch information
crwen committed Dec 13, 2024
1 parent 174d48a commit eeb3109
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 70 deletions.
8 changes: 5 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,9 +418,11 @@ where
Box::new(|_| None),
self.parquet_lru.clone(),
).take().await?;

while let Some(record) = scan.next().await {
yield Ok(f(TransactionEntry::Stream(record?)))

while let Some(record) = scan.next().await.transpose()? {
if record.value().is_some() {
yield Ok(f(TransactionEntry::Stream(record)))
}
}
}
}
Expand Down
19 changes: 16 additions & 3 deletions tonbo_net_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ name = "tonbo_net_client"
version = "0.1.0"
edition = "2021"

[features]
default = ["transport", "tonbo/tokio"]
transport = ["tonic/transport", "tonic-build/transport"]
wasm = ["dep:tonic-web-wasm-client", "tonbo/wasm"]

[dependencies]
async-stream = "0.3"
fusio = { path = "../../fusio/fusio", package = "fusio", version = "0.3.3", features = [
Expand All @@ -13,8 +18,16 @@ futures-core = "0.3"
futures-util = "0.3"
prost = { version = "0.13" }
thiserror = "2"
tonbo = { path = "../", package = "tonbo" }
tonic = { version = "0.12" }
tonbo = { path = "../", package = "tonbo", default-features = false}
tonic = { version = "0.12", default-features = false, features = [
"prost",
"codegen",
] }
tonic-web-wasm-client = {version = "0.6", optional = true}


[build-dependencies]
tonic-build = {version = "0.12"}
tonic-build = { version = "0.12", default-features = false, features = [
"prost",
] }

71 changes: 52 additions & 19 deletions tonbo_net_client/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
use std::io::Cursor;
use std::ops::Bound;
use std::{io::Cursor, ops::Bound};

use async_stream::stream;
use futures_core::Stream;
use futures_util::StreamExt;
use tonbo::{
record::{Column, Datatype, DynRecord, Record},
serdes::{Decode, Encode},
};
use tonic::Request;
use crate::proto::tonbo_rpc_client::TonboRpcClient;
use tonic::transport::Channel;
use crate::proto::{ColumnDesc, Empty, GetReq, InsertReq, RemoveReq, ScanReq};
use tonbo::record::{Column, Datatype, DynRecord, Record};
use tonbo::serdes::{Decode, Encode};
use crate::ClientError;

use crate::{
proto::{
tonbo_rpc_client::TonboRpcClient, ColumnDesc, Empty, GetReq, InsertReq, RemoveReq, ScanReq,
},
ClientError,
};

pub struct TonboSchema {
pub desc: Vec<ColumnDesc>,
Expand All @@ -31,19 +36,27 @@ impl TonboSchema {
}

pub struct TonboClient {
conn: TonboRpcClient<Channel>,
#[cfg(not(feature = "wasm"))]
conn: TonboRpcClient<tonic::transport::Channel>,
#[cfg(feature = "wasm")]
conn: TonboRpcClient<tonic_web_wasm_client::Client>,
}

impl TonboClient {
// #[cfg(not(feature = "wasm"))]
pub async fn connect(addr: String) -> Result<TonboClient, ClientError> {
#[cfg(not(feature = "wasm"))]
let conn = TonboRpcClient::connect(addr).await?;
#[cfg(feature = "wasm")]
let conn = {
let client = tonic_web_wasm_client::Client::new(addr);
TonboRpcClient::new(client)
};
Ok(TonboClient { conn })
}

pub async fn schema(&mut self) -> Result<TonboSchema, ClientError> {
let resp = self.conn.schema(Request::new(Empty {})).await?
.into_inner();

let resp = self.conn.schema(Request::new(Empty {})).await?.into_inner();

Ok(TonboSchema {
desc: resp.desc,
Expand All @@ -57,17 +70,29 @@ impl TonboClient {
column.encode(&mut Cursor::new(&mut bytes)).await?;
let resp = self.conn.get(Request::new(GetReq { key: bytes })).await?;

let Some(mut value) = resp.into_inner().record else { return Ok(None); };
let Some(mut value) = resp.into_inner().record else {
return Ok(None);
};
Ok(Some(DynRecord::decode(&mut Cursor::new(&mut value)).await?))
}

pub async fn scan(&mut self, min: Bound<Column>, max: Bound<Column>) -> Result<impl Stream<Item = Result<DynRecord, ClientError>>, ClientError> {
pub async fn scan(
&mut self,
min: Bound<Column>,
max: Bound<Column>,
) -> Result<impl Stream<Item = Result<DynRecord, ClientError>>, ClientError> {
let mut min_bytes = Vec::new();
let mut max_bytes = Vec::new();
min.encode(&mut Cursor::new(&mut min_bytes)).await?;
max.encode(&mut Cursor::new(&mut max_bytes)).await?;

let resp = self.conn.scan(Request::new(ScanReq { min: min_bytes, max: max_bytes })).await?;
let resp = self
.conn
.scan(Request::new(ScanReq {
min: min_bytes,
max: max_bytes,
}))
.await?;

Ok(Box::pin(stream! {
let mut stream = resp.into_inner();
Expand All @@ -82,11 +107,16 @@ impl TonboClient {
pub async fn insert(&mut self, record: DynRecord) -> Result<(), ClientError> {
let mut bytes = Vec::new();

record.as_record_ref().encode(&mut Cursor::new(&mut bytes)).await?;
record
.as_record_ref()
.encode(&mut Cursor::new(&mut bytes))
.await?;

let record_ref = record.as_record_ref();
println!("{:#?}", record_ref.columns);
let _ = self.conn.insert(Request::new(InsertReq { record: bytes})).await?;
let _ = self
.conn
.insert(Request::new(InsertReq { record: bytes }))
.await?;

Ok(())
}
Expand All @@ -95,7 +125,10 @@ impl TonboClient {
let mut bytes = Vec::new();

column.encode(&mut Cursor::new(&mut bytes)).await?;
let _ = self.conn.remove(Request::new(RemoveReq { key: bytes })).await?;
let _ = self
.conn
.remove(Request::new(RemoveReq { key: bytes }))
.await?;

Ok(())
}
Expand Down
3 changes: 2 additions & 1 deletion tonbo_net_client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use thiserror::Error;
use tonbo::record::{RecordDecodeError, RecordEncodeError};

pub mod proto;
pub mod client;
pub mod proto;

#[derive(Debug, Error)]
pub enum ClientError {
#[error("Failed to parse addr: {0}")]
AddrParseError(#[from] std::net::AddrParseError),

#[cfg(not(feature = "wasm"))]
#[error("Failed to connect to server: {0}")]
TonicTransportErr(#[from] tonic::transport::Error),

Expand Down
Loading

0 comments on commit eeb3109

Please sign in to comment.