Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: seperate schema trait from record #241

Merged
merged 12 commits into from
Dec 17, 2024
4 changes: 2 additions & 2 deletions benches/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ impl BenchDatabase for TonboS3BenchDataBase {
.disable_wal();

TonboS3BenchDataBase::new(
tonbo::DB::new(option, TokioExecutor::current())
tonbo::DB::new(option, TokioExecutor::current(), &CustomerSchema)
.await
.unwrap(),
)
Expand Down Expand Up @@ -324,7 +324,7 @@ impl BenchDatabase for TonboBenchDataBase {
DbOption::from(fusio::path::Path::from_filesystem_path(path.as_ref()).unwrap())
.disable_wal();

let db = tonbo::DB::new(option, TokioExecutor::current())
let db = tonbo::DB::new(option, TokioExecutor::current(), &CustomerSchema)
.await
.unwrap();
TonboBenchDataBase::new(db)
Expand Down
10 changes: 5 additions & 5 deletions bindings/python/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
};

use pyo3::{pyclass, pymethods};
use tonbo::record::{ColumnDesc, Datatype};
use tonbo::record::{Datatype, Value, ValueDesc};

use crate::datatype::DataType;

Expand Down Expand Up @@ -58,15 +58,15 @@ impl Display for Column {
}
}

impl From<Column> for ColumnDesc {
impl From<Column> for ValueDesc {
fn from(col: Column) -> Self {
let datatype = Datatype::from(col.datatype);
ColumnDesc::new(col.name, datatype, col.nullable)
ValueDesc::new(col.name, datatype, col.nullable)
}
}
impl From<Column> for tonbo::record::Column {
impl From<Column> for Value {
fn from(col: Column) -> Self {
let datatype = Datatype::from(col.datatype);
tonbo::record::Column::new(datatype, col.name, col.value, col.nullable)
Value::new(datatype, col.name, col.value, col.nullable)
}
}
21 changes: 6 additions & 15 deletions bindings/python/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use pyo3::{
use pyo3_asyncio::tokio::{future_into_py, get_runtime};
use tonbo::{
executor::tokio::TokioExecutor,
record::{ColumnDesc, DynRecord},
record::{DynRecord, DynSchema, Value, ValueDesc},
DB,
};

Expand Down Expand Up @@ -40,7 +40,6 @@ impl TonboDB {
let mut desc = vec![];
let mut cols = vec![];
let mut primary_key_index = None;
let mut primary_key_name = None;

for i in 0..values.len()? {
let value = values.get_item(i)?;
Expand All @@ -51,23 +50,15 @@ impl TonboDB {
panic!("Multiple primary keys is not allowed!")
}
primary_key_index = Some(desc.len());
primary_key_name = Some(col.name.clone());
}
cols.push(col.clone());
desc.push(ColumnDesc::from(col));
desc.push(ValueDesc::from(col));
}
}
let option = option.into_option(primary_key_index.unwrap(), primary_key_name.unwrap());
let schema = DynSchema::new(desc, primary_key_index.unwrap());
let option = option.into_option(&schema);
let db = get_runtime()
.block_on(async {
DB::with_schema(
option,
TokioExecutor::current(),
desc,
primary_key_index.unwrap(),
)
.await
})
.block_on(async { DB::new(option, TokioExecutor::current(), schema).await })
.unwrap();
Ok(Self {
db: Arc::new(db),
Expand All @@ -87,7 +78,7 @@ impl TonboDB {
for i in 0..values.len()? {
let value = values.get_item(i)?;
if let Ok(bound_col) = value.downcast::<Column>() {
let col = tonbo::record::Column::from(bound_col.extract::<Column>()?);
let col = Value::from(bound_col.extract::<Column>()?);
cols.push(col);
}
}
Expand Down
27 changes: 11 additions & 16 deletions bindings/python/src/options.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use fusio::path::Path;
use pyo3::{pyclass, pymethods, PyResult};
use tonbo::record::DynRecord;
use tonbo::record::Schema;

use crate::{ExceedsMaxLevelError, FsOptions};

Expand Down Expand Up @@ -72,21 +72,16 @@ impl DbOption {
}

impl DbOption {
pub(crate) fn into_option(
self,
primary_key_index: usize,
primary_key_name: String,
) -> tonbo::DbOption<DynRecord> {
let mut opt =
tonbo::DbOption::with_path(Path::from(self.path), primary_key_name, primary_key_index)
.clean_channel_buffer(self.clean_channel_buffer)
.immutable_chunk_num(self.immutable_chunk_num)
.level_sst_magnification(self.level_sst_magnification)
.major_default_oldest_table_num(self.major_default_oldest_table_num)
.major_threshold_with_sst_size(self.major_threshold_with_sst_size)
.max_sst_file_size(self.max_sst_file_size)
.version_log_snapshot_threshold(self.version_log_snapshot_threshold)
.base_fs(fusio_dispatch::FsOptions::from(self.base_fs));
pub(crate) fn into_option<S: Schema>(self, schema: &S) -> tonbo::DbOption {
let mut opt = tonbo::DbOption::new(Path::from(self.path), schema)
.clean_channel_buffer(self.clean_channel_buffer)
.immutable_chunk_num(self.immutable_chunk_num)
.level_sst_magnification(self.level_sst_magnification)
.major_default_oldest_table_num(self.major_default_oldest_table_num)
.major_threshold_with_sst_size(self.major_threshold_with_sst_size)
.max_sst_file_size(self.max_sst_file_size)
.version_log_snapshot_threshold(self.version_log_snapshot_threshold)
.base_fs(fusio_dispatch::FsOptions::from(self.base_fs));
for (level, path) in self.level_paths.into_iter().enumerate() {
if let Some((path, fs_options)) = path {
opt = opt
Expand Down
3 changes: 2 additions & 1 deletion bindings/python/src/range.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::ops;

use pyo3::{pyclass, FromPyObject, Py, PyAny, Python};
use tonbo::record::Value;

use crate::{utils::to_col, Column};

Expand All @@ -12,7 +13,7 @@ pub enum Bound {
}

impl Bound {
pub(crate) fn to_bound(&self, py: Python, col: &Column) -> ops::Bound<tonbo::record::Column> {
pub(crate) fn to_bound(&self, py: Python, col: &Column) -> ops::Bound<Value> {
match self {
Bound::Included { key } => ops::Bound::Included(to_col(py, col, key.clone_ref(py))),
Bound::Excluded { key } => ops::Bound::Excluded(to_col(py, col, key.clone_ref(py))),
Expand Down
8 changes: 4 additions & 4 deletions bindings/python/src/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@ use pyo3::{
types::{PyAnyMethods, PyMapping, PyMappingMethods},
Py, PyAny, PyResult, Python,
};
use tonbo::record::DynRecord;
use tonbo::record::{DynRecord, Value};

use crate::Column;

#[derive(Clone)]
struct Record {
columns: Vec<tonbo::record::Column>,
columns: Vec<Value>,
primary_key_index: usize,
}

impl Record {
fn new(columns: Vec<tonbo::record::Column>, primary_key_index: usize) -> Self {
fn new(columns: Vec<Value>, primary_key_index: usize) -> Self {
Self {
columns,
primary_key_index,
Expand Down Expand Up @@ -58,7 +58,7 @@ impl RecordBatch {
if col.primary_key {
primary_key_index = col_idx;
}
let col = tonbo::record::Column::from(col);
let col = Value::from(col);
cols.push(col);
col_idx += 1;
}
Expand Down
21 changes: 11 additions & 10 deletions bindings/python/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use pyo3::{
Bound, IntoPy, Py, PyAny, PyResult, Python,
};
use pyo3_asyncio::tokio::future_into_py;
use tonbo::{record::DynRecord, transaction, Projection};
use tonbo::{
record::{DynRecord, Value},
transaction, Projection,
};

use crate::{
column::Column,
Expand Down Expand Up @@ -123,7 +126,7 @@ impl Transaction {
let tuple = x.downcast::<PyTuple>()?;
let col = tuple.get_item(1)?;
if let Ok(bound_col) = col.downcast::<Column>() {
let col = tonbo::record::Column::from(bound_col.extract::<Column>()?);
let col = Value::from(bound_col.extract::<Column>()?);
cols.push(col);
}
}
Expand Down Expand Up @@ -181,16 +184,14 @@ impl Transaction {
future_into_py(py, async move {
let mut scan = txn.scan((
unsafe {
transmute::<
std::ops::Bound<&tonbo::record::Column>,
std::ops::Bound<&'static tonbo::record::Column>,
>(lower.as_ref())
transmute::<std::ops::Bound<&Value>, std::ops::Bound<&'static Value>>(
lower.as_ref(),
)
},
unsafe {
transmute::<
std::ops::Bound<&tonbo::record::Column>,
std::ops::Bound<&'static tonbo::record::Column>,
>(high.as_ref())
transmute::<std::ops::Bound<&Value>, std::ops::Bound<&'static Value>>(
high.as_ref(),
)
},
));

Expand Down
17 changes: 5 additions & 12 deletions bindings/python/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,11 @@ use pyo3::{
types::{PyBytes, PyDict, PyDictMethods},
Bound, Py, PyAny, Python,
};
use tonbo::record::Datatype;
use tonbo::record::{Datatype, Value};

use crate::{column::Column, datatype::DataType, range};

pub(crate) fn to_dict(
py: Python,
primary_key_index: usize,
record: Vec<tonbo::record::Column>,
) -> Bound<PyDict> {
pub(crate) fn to_dict(py: Python, primary_key_index: usize, record: Vec<Value>) -> Bound<PyDict> {
let dict = PyDict::new_bound(py);
for (idx, col) in record.iter().enumerate() {
match &col.datatype {
Expand Down Expand Up @@ -185,8 +181,8 @@ pub(crate) fn to_key(
}
}

pub(crate) fn to_col(py: Python, col: &Column, key: Py<PyAny>) -> tonbo::record::Column {
tonbo::record::Column::new(
pub(crate) fn to_col(py: Python, col: &Column, key: Py<PyAny>) -> Value {
Value::new(
Datatype::from(&col.datatype),
col.name.to_owned(),
to_key(py, &col.datatype, key),
Expand All @@ -199,10 +195,7 @@ pub(crate) fn to_bound(
col: &Column,
lower: Option<Py<range::Bound>>,
high: Option<Py<range::Bound>>,
) -> (
std::ops::Bound<tonbo::record::Column>,
std::ops::Bound<tonbo::record::Column>,
) {
) -> (std::ops::Bound<Value>, std::ops::Bound<Value>) {
let lower = match lower {
Some(bound) => bound.get().to_bound(py, col),
None => std::ops::Bound::Unbounded,
Expand Down
25 changes: 18 additions & 7 deletions examples/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ use futures_core::Stream;
use futures_util::StreamExt;
use tokio::fs;
use tonbo::{
executor::tokio::TokioExecutor, inmem::immutable::ArrowArrays, record::Record, DbOption, DB,
executor::tokio::TokioExecutor,
inmem::immutable::ArrowArrays,
record::{Record, Schema},
DbOption, DB,
};
use tonbo_macros::Record;

Expand All @@ -49,7 +52,10 @@ struct MusicExec {
db: Arc<DB<Music, TokioExecutor>>,
projection: Option<Vec<usize>>,
limit: Option<usize>,
range: (Bound<<Music as Record>::Key>, Bound<<Music as Record>::Key>),
range: (
Bound<<MusicSchema as Schema>::Key>,
Bound<<MusicSchema as Schema>::Key>,
),
}

struct MusicStream {
Expand All @@ -63,7 +69,7 @@ impl TableProvider for MusicProvider {
}

fn schema(&self) -> SchemaRef {
Music::arrow_schema().clone()
MusicSchema {}.arrow_schema().clone()
}

fn table_type(&self) -> TableType {
Expand Down Expand Up @@ -96,7 +102,7 @@ impl TableProvider for MusicProvider {

impl MusicExec {
fn new(db: Arc<DB<Music, TokioExecutor>>, projection: Option<&Vec<usize>>) -> Self {
let schema = Music::arrow_schema();
let schema = MusicSchema {}.arrow_schema();
let schema = if let Some(projection) = &projection {
Arc::new(schema.project(projection).unwrap())
} else {
Expand Down Expand Up @@ -127,7 +133,7 @@ impl Stream for MusicStream {

impl RecordBatchStream for MusicStream {
fn schema(&self) -> SchemaRef {
Music::arrow_schema().clone()
MusicSchema {}.arrow_schema().clone()
}
}

Expand Down Expand Up @@ -215,9 +221,14 @@ async fn main() -> Result<()> {
// make sure the path exists
let _ = fs::create_dir_all("./db_path/music").await;

let options = DbOption::from(Path::from_filesystem_path("./db_path/music").unwrap());
let options = DbOption::new(
Path::from_filesystem_path("./db_path/music").unwrap(),
&MusicSchema,
);

let db = DB::new(options, TokioExecutor::current()).await.unwrap();
let db = DB::new(options, TokioExecutor::current(), MusicSchema)
.await
.unwrap();
for (id, name, like) in [
(0, "welcome".to_string(), 0),
(1, "tonbo".to_string(), 999),
Expand Down
9 changes: 7 additions & 2 deletions examples/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@ async fn main() {
// make sure the path exists
let _ = fs::create_dir_all("./db_path/users").await;

let options = DbOption::from(Path::from_filesystem_path("./db_path/users").unwrap());
let options = DbOption::new(
Path::from_filesystem_path("./db_path/users").unwrap(),
&UserSchema,
);
// pluggable async runtime and I/O
let db = DB::new(options, TokioExecutor::current()).await.unwrap();
let db = DB::new(options, TokioExecutor::current(), UserSchema)
.await
.unwrap();

// insert with owned value
db.insert(User {
Expand Down
Loading
Loading