Skip to content

Commit

Permalink
chore: fix Schema
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Nov 27, 2024
1 parent 8959b07 commit bd68bab
Show file tree
Hide file tree
Showing 24 changed files with 691 additions and 397 deletions.
25 changes: 18 additions & 7 deletions examples/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ use futures_core::Stream;
use futures_util::StreamExt;
use tokio::fs;
use tonbo::{
executor::tokio::TokioExecutor, inmem::immutable::ArrowArrays, record::Record, DbOption, DB,
executor::tokio::TokioExecutor,
inmem::immutable::ArrowArrays,
record::{Record, Schema},
DbOption, DB,
};
use tonbo_macros::Record;

Expand All @@ -49,7 +52,10 @@ struct MusicExec {
db: Arc<DB<Music, TokioExecutor>>,
projection: Option<Vec<usize>>,
limit: Option<usize>,
range: (Bound<<Music as Record>::Key>, Bound<<Music as Record>::Key>),
range: (
Bound<<MusicSchema as Schema>::Key>,
Bound<<MusicSchema as Schema>::Key>,
),
}

struct MusicStream {
Expand All @@ -63,7 +69,7 @@ impl TableProvider for MusicProvider {
}

fn schema(&self) -> SchemaRef {
Music::arrow_schema().clone()
MusicSchema {}.arrow_schema().clone()
}

fn table_type(&self) -> TableType {
Expand Down Expand Up @@ -96,7 +102,7 @@ impl TableProvider for MusicProvider {

impl MusicExec {
fn new(db: Arc<DB<Music, TokioExecutor>>, projection: Option<&Vec<usize>>) -> Self {
let schema = Music::arrow_schema();
let schema = MusicSchema {}.arrow_schema();
let schema = if let Some(projection) = &projection {
Arc::new(schema.project(projection).unwrap())
} else {
Expand Down Expand Up @@ -127,7 +133,7 @@ impl Stream for MusicStream {

impl RecordBatchStream for MusicStream {
fn schema(&self) -> SchemaRef {
Music::arrow_schema().clone()
MusicSchema {}.arrow_schema().clone()
}
}

Expand Down Expand Up @@ -215,9 +221,14 @@ async fn main() -> Result<()> {
// make sure the path exists
let _ = fs::create_dir_all("./db_path/music").await;

let options = DbOption::from(Path::from_filesystem_path("./db_path/music").unwrap());
let options = DbOption::from((
Path::from_filesystem_path("./db_path/music").unwrap(),
&MusicSchema,
));

let db = DB::new(options, TokioExecutor::default()).await.unwrap();
let db = DB::new(options, TokioExecutor::default(), MusicSchema)
.await
.unwrap();
for (id, name, like) in [
(0, "welcome".to_string(), 0),
(1, "tonbo".to_string(), 999),
Expand Down
9 changes: 7 additions & 2 deletions examples/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@ async fn main() {
// make sure the path exists
let _ = fs::create_dir_all("./db_path/users").await;

let options = DbOption::from(Path::from_filesystem_path("./db_path/users").unwrap());
let options = DbOption::from((
Path::from_filesystem_path("./db_path/users").unwrap(),
&UserSchema,
));
// pluggable async runtime and I/O
let db = DB::new(options, TokioExecutor::default()).await.unwrap();
let db = DB::new(options, TokioExecutor::default(), UserSchema)
.await
.unwrap();

// insert with owned value
db.insert(User {
Expand Down
Loading

0 comments on commit bd68bab

Please sign in to comment.