Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/rpc #251

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
workspace = { members = ["parquet-lru", "tonbo_macros"] }
workspace = { members = [ "parquet-lru", "tonbo_macros", "tonbo_net_client", "tonbo_net_server",] }

[package]
description = "An embedded persistent KV database in Rust."
Expand Down Expand Up @@ -80,12 +80,12 @@ crc32fast = "1"
crossbeam-skiplist = "0.1"
datafusion = { version = "42.2.0", optional = true }
flume = { version = "0.11", features = ["async"] }
fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "216eb446fb0a0c6e5e85bfac51a6f6ed8e5ed606", package = "fusio", version = "0.3.3", features = [
fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "88c6134948d05bef33598a969e9292ad38a8db4b", package = "fusio", version = "0.3.3", features = [
"dyn",
"fs",
] }
fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "216eb446fb0a0c6e5e85bfac51a6f6ed8e5ed606", package = "fusio-dispatch", version = "0.2.1" }
fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", rev = "216eb446fb0a0c6e5e85bfac51a6f6ed8e5ed606", package = "fusio-parquet", version = "0.2.1" }
fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "88c6134948d05bef33598a969e9292ad38a8db4b", package = "fusio-dispatch", version = "0.2.1" }
fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", rev = "88c6134948d05bef33598a969e9292ad38a8db4b", package = "fusio-parquet", version = "0.2.1" }
futures-core = "0.3"
futures-io = "0.3"
futures-util = "0.3"
Expand Down
9 changes: 6 additions & 3 deletions src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ where
pub(crate) schema: Arc<RwLock<Schema<R>>>,
pub(crate) version_set: VersionSet<R>,
pub(crate) manager: Arc<StoreManager>,
pub(crate) instance: Arc<RecordInstance>,
}

impl<R> Compactor<R>
Expand All @@ -50,12 +51,14 @@ where
option: Arc<DbOption<R>>,
version_set: VersionSet<R>,
manager: Arc<StoreManager>,
instance: Arc<RecordInstance>,
) -> Self {
Compactor::<R> {
option,
schema,
version_set,
manager,
instance,
}
}

Expand All @@ -75,7 +78,7 @@ where
&mut guard.mutable,
Mutable::new(&self.option, trigger_clone, self.manager.base_fs()).await?,
);
let (file_id, immutable) = mutable.into_immutable(&guard.record_instance).await?;
let (file_id, immutable) = mutable.into_immutable(&self.instance).await?;
guard.immutables.push((file_id, immutable));
} else if !is_manual {
return Ok(());
Expand All @@ -99,7 +102,7 @@ where
&self.option,
recover_wal_ids,
excess,
&guard.record_instance,
&self.instance,
&self.manager,
)
.await?
Expand All @@ -116,7 +119,7 @@ where
&scope.max,
&mut version_edits,
&mut delete_gens,
&guard.record_instance,
&self.instance,
&self.manager,
parquet_lru,
)
Expand Down
42 changes: 28 additions & 14 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ where
lock_map: LockMap<R::Key>,
manager: Arc<StoreManager>,
parquet_lru: ParquetLru,
instance: Arc<RecordInstance>,
_p: PhantomData<E>,
}

Expand Down Expand Up @@ -241,6 +242,7 @@ where
instance: RecordInstance,
lru_cache: ParquetLru,
) -> Result<Self, DbError<R>> {
let instance = Arc::new(instance);
let manager = Arc::new(StoreManager::new(
option.base_fs.clone(),
option.level_paths.clone(),
Expand All @@ -263,13 +265,14 @@ where

let version_set = VersionSet::new(clean_sender, option.clone(), manager.clone()).await?;
let schema = Arc::new(RwLock::new(
Schema::new(option.clone(), task_tx, &version_set, instance, &manager).await?,
Schema::new(option.clone(), task_tx, &version_set, &manager).await?,
));
let mut compactor = Compactor::<R>::new(
schema.clone(),
option.clone(),
version_set.clone(),
manager.clone(),
instance.clone(),
);

executor.spawn(async move {
Expand Down Expand Up @@ -310,6 +313,7 @@ where
lock_map: Arc::new(Default::default()),
manager,
parquet_lru: lru_cache,
instance,
_p: Default::default(),
})
}
Expand All @@ -323,6 +327,7 @@ where
Snapshot::new(
self.schema.read().await,
self.version_set.current().await,
self.instance.clone(),
self.manager.clone(),
self.parquet_lru.clone(),
)
Expand Down Expand Up @@ -377,6 +382,7 @@ where
.await
.get(
&*self.version_set.current().await,
&self.instance,
&self.manager,
key,
self.version_set.load_ts(),
Expand Down Expand Up @@ -405,15 +411,18 @@ where
let mut scan = Scan::new(
&schema,
&self.manager,
&self.instance,
range,
self.version_set.load_ts(),
&*current,
Box::new(|_| None),
self.parquet_lru.clone(),
).take().await?;

while let Some(record) = scan.next().await {
yield Ok(f(TransactionEntry::Stream(record?)))
while let Some(record) = scan.next().await.transpose()? {
if record.value().is_some() {
yield Ok(f(TransactionEntry::Stream(record)))
}
}
}
}
Expand Down Expand Up @@ -462,6 +471,10 @@ where
self.schema.write().await.flush_wal().await?;
Ok(())
}

pub fn instance(&self) -> &RecordInstance {
self.instance.as_ref()
}
}

pub(crate) struct Schema<R>
Expand All @@ -473,7 +486,6 @@ where
compaction_tx: Sender<CompactTask>,
recover_wal_ids: Option<Vec<FileId>>,
trigger: Arc<Box<dyn Trigger<R> + Send + Sync>>,
record_instance: RecordInstance,
}

impl<R> Schema<R>
Expand All @@ -484,7 +496,6 @@ where
option: Arc<DbOption<R>>,
compaction_tx: Sender<CompactTask>,
version_set: &VersionSet<R>,
record_instance: RecordInstance,
manager: &StoreManager,
) -> Result<Self, DbError<R>> {
let trigger = Arc::new(TriggerFactory::create(option.trigger_type));
Expand All @@ -494,7 +505,6 @@ where
compaction_tx,
recover_wal_ids: None,
trigger,
record_instance,
};

let base_fs = manager.base_fs();
Expand Down Expand Up @@ -592,13 +602,14 @@ where
async fn get<'get>(
&'get self,
version: &'get Version<R>,
record_instance: &'get RecordInstance,
manager: &StoreManager,
key: &'get R::Key,
ts: Timestamp,
projection: Projection,
parquet_lru: ParquetLru,
) -> Result<Option<Entry<'get, R>>, DbError<R>> {
let primary_key_index = self.record_instance.primary_key_index::<R>();
let primary_key_index = record_instance.primary_key_index::<R>();

let projection = match projection {
Projection::All => ProjectionMask::all(),
Expand All @@ -610,7 +621,7 @@ where
fixed_projection.dedup();

ProjectionMask::roots(
&arrow_to_parquet_schema(&self.record_instance.arrow_schema::<R>()).unwrap(),
&arrow_to_parquet_schema(&record_instance.arrow_schema::<R>()).unwrap(),
fixed_projection,
)
}
Expand Down Expand Up @@ -663,6 +674,7 @@ where
{
schema: &'scan Schema<R>,
manager: &'scan StoreManager,
instance: &'scan RecordInstance,
lower: Bound<&'range R::Key>,
upper: Bound<&'range R::Key>,
ts: Timestamp,
Expand All @@ -685,6 +697,7 @@ where
fn new(
schema: &'scan Schema<R>,
manager: &'scan StoreManager,
instance: &'scan RecordInstance,
(lower, upper): (Bound<&'range R::Key>, Bound<&'range R::Key>),
ts: Timestamp,
version: &'scan Version<R>,
Expand All @@ -696,6 +709,7 @@ where
Self {
schema,
manager,
instance,
lower,
upper,
ts,
Expand All @@ -722,13 +736,13 @@ where
for p in &mut projection {
*p += 2;
}
let primary_key_index = self.schema.record_instance.primary_key_index::<R>();
let primary_key_index = self.instance.primary_key_index::<R>();
let mut fixed_projection = vec![0, 1, primary_key_index];
fixed_projection.append(&mut projection);
fixed_projection.dedup();

let mask = ProjectionMask::roots(
&arrow_to_parquet_schema(&self.schema.record_instance.arrow_schema::<R>()).unwrap(),
&arrow_to_parquet_schema(&self.instance.arrow_schema::<R>()).unwrap(),
fixed_projection.clone(),
);

Expand Down Expand Up @@ -842,7 +856,7 @@ where
batch_size,
merge_stream,
self.projection_indices,
&self.schema.record_instance,
self.instance,
))
}
}
Expand Down Expand Up @@ -1279,7 +1293,6 @@ pub(crate) mod tests {
compaction_tx,
recover_wal_ids: None,
trigger,
record_instance: RecordInstance::Normal,
},
compaction_rx,
))
Expand All @@ -1290,6 +1303,7 @@ pub(crate) mod tests {
compaction_rx: Receiver<CompactTask>,
executor: E,
schema: crate::Schema<R>,
instance: Arc<RecordInstance>,
version: Version<R>,
manager: Arc<StoreManager>,
) -> Result<DB<R, E>, DbError<R>>
Expand All @@ -1315,6 +1329,7 @@ pub(crate) mod tests {
option.clone(),
version_set.clone(),
manager.clone(),
instance.clone(),
);

executor.spawn(async move {
Expand Down Expand Up @@ -1355,6 +1370,7 @@ pub(crate) mod tests {
lock_map: Arc::new(Default::default()),
manager,
parquet_lru: Arc::new(NoCache::default()),
instance,
_p: Default::default(),
})
}
Expand Down Expand Up @@ -1656,7 +1672,6 @@ pub(crate) mod tests {
compaction_tx: task_tx.clone(),
recover_wal_ids: None,
trigger,
record_instance: RecordInstance::Normal,
};

for (i, item) in test_items().into_iter().enumerate() {
Expand Down Expand Up @@ -1723,7 +1738,6 @@ pub(crate) mod tests {
compaction_tx: task_tx.clone(),
recover_wal_ids: None,
trigger,
record_instance: RecordInstance::Normal,
};

for item in test_dyn_items().into_iter() {
Expand Down
13 changes: 10 additions & 3 deletions src/record/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ use crate::{
};

#[allow(unused)]
pub(crate) enum RecordInstance {
pub enum RecordInstance {
Normal,
Runtime(DynRecord),
}

#[allow(unused)]
impl RecordInstance {
pub(crate) fn primary_key_index<R>(&self) -> usize
pub fn primary_key_index<R>(&self) -> usize
where
R: Record,
{
Expand All @@ -36,7 +36,7 @@ impl RecordInstance {
}
}

pub(crate) fn arrow_schema<R>(&self) -> Arc<Schema>
pub fn arrow_schema<R>(&self) -> Arc<Schema>
where
R: Record,
{
Expand All @@ -45,6 +45,13 @@ impl RecordInstance {
RecordInstance::Runtime(record) => record.arrow_schema(),
}
}

pub fn dyn_columns(&self) -> &[Column] {
match self {
RecordInstance::Runtime(record) => record.columns(),
RecordInstance::Normal => &[],
}
}
}

pub trait Record: 'static + Sized + Decode + Debug + Send + Sync {
Expand Down
8 changes: 8 additions & 0 deletions src/record/runtime/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ impl DynRecord {
);
Arc::new(Schema::new_with_metadata(fields, metadata))
}

pub fn columns(&self) -> &[Column] {
self.columns.as_slice()
}

pub fn primary_column(&self) -> &Column {
&self.columns[self.primary_index]
}
}

impl DynRecord {
Expand Down
Loading
Loading