Skip to content

Commit

Permalink
tmp: rpc server
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Dec 12, 2024
1 parent 74f62e2 commit 174d48a
Show file tree
Hide file tree
Showing 22 changed files with 717 additions and 38 deletions.
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
workspace = { members = ["parquet-lru", "tonbo_macros"] }
workspace = { members = [ "parquet-lru", "tonbo_macros", "tonbo_net_client", "tonbo_net_server",] }

[package]
description = "An embedded persistent KV database in Rust."
Expand Down Expand Up @@ -80,12 +80,12 @@ crc32fast = "1"
crossbeam-skiplist = "0.1"
datafusion = { version = "42.2.0", optional = true }
flume = { version = "0.11", features = ["async"] }
fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "216eb446fb0a0c6e5e85bfac51a6f6ed8e5ed606", package = "fusio", version = "0.3.3", features = [
fusio = { path = "../fusio/fusio", package = "fusio", version = "0.3.3", features = [
"dyn",
"fs",
] }
fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "216eb446fb0a0c6e5e85bfac51a6f6ed8e5ed606", package = "fusio-dispatch", version = "0.2.1" }
fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", rev = "216eb446fb0a0c6e5e85bfac51a6f6ed8e5ed606", package = "fusio-parquet", version = "0.2.1" }
fusio-dispatch = { path = "../fusio/fusio-dispatch", package = "fusio-dispatch" }
fusio-parquet = { path = "../fusio/fusio-parquet", package = "fusio-parquet" }
futures-core = "0.3"
futures-io = "0.3"
futures-util = "0.3"
Expand Down
9 changes: 6 additions & 3 deletions src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ where
pub(crate) schema: Arc<RwLock<Schema<R>>>,
pub(crate) version_set: VersionSet<R>,
pub(crate) manager: Arc<StoreManager>,
pub(crate) instance: Arc<RecordInstance>,
}

impl<R> Compactor<R>
Expand All @@ -50,12 +51,14 @@ where
option: Arc<DbOption<R>>,
version_set: VersionSet<R>,
manager: Arc<StoreManager>,
instance: Arc<RecordInstance>
) -> Self {
Compactor::<R> {
option,
schema,
version_set,
manager,
instance,
}
}

Expand All @@ -76,7 +79,7 @@ where
&mut guard.mutable,
Mutable::new(&self.option, trigger_clone, self.manager.base_fs()).await?,
);
let (file_id, immutable) = mutable.into_immutable(&guard.record_instance).await?;
let (file_id, immutable) = mutable.into_immutable(&self.instance).await?;

guard.immutables.push((file_id, immutable));
if guard.immutables.len() > self.option.immutable_chunk_max_num {
Expand All @@ -91,7 +94,7 @@ where
&self.option,
recover_wal_ids,
excess,
&guard.record_instance,
&self.instance,
&self.manager,
)
.await?
Expand All @@ -108,7 +111,7 @@ where
&scope.max,
&mut version_edits,
&mut delete_gens,
&guard.record_instance,
&self.instance,
&self.manager,
parquet_lru,
)
Expand Down
36 changes: 24 additions & 12 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ where
lock_map: LockMap<R::Key>,
manager: Arc<StoreManager>,
parquet_lru: ParquetLru,
instance: Arc<RecordInstance>,
_p: PhantomData<E>,
}

Expand Down Expand Up @@ -241,6 +242,7 @@ where
instance: RecordInstance,
lru_cache: ParquetLru,
) -> Result<Self, DbError<R>> {
let instance = Arc::new(instance);
let manager = Arc::new(StoreManager::new(
option.base_fs.clone(),
option.level_paths.clone(),
Expand All @@ -263,13 +265,14 @@ where

let version_set = VersionSet::new(clean_sender, option.clone(), manager.clone()).await?;
let schema = Arc::new(RwLock::new(
Schema::new(option.clone(), task_tx, &version_set, instance, &manager).await?,
Schema::new(option.clone(), task_tx, &version_set, &manager).await?,
));
let mut compactor = Compactor::<R>::new(
schema.clone(),
option.clone(),
version_set.clone(),
manager.clone(),
instance.clone(),
);

executor.spawn(async move {
Expand Down Expand Up @@ -310,6 +313,7 @@ where
lock_map: Arc::new(Default::default()),
manager,
parquet_lru: lru_cache,
instance,
_p: Default::default(),
})
}
Expand All @@ -323,6 +327,7 @@ where
Snapshot::new(
self.schema.read().await,
self.version_set.current().await,
self.instance.clone(),
self.manager.clone(),
self.parquet_lru.clone(),
)
Expand Down Expand Up @@ -377,6 +382,7 @@ where
.await
.get(
&*self.version_set.current().await,
&self.instance,
&self.manager,
key,
self.version_set.load_ts(),
Expand Down Expand Up @@ -405,6 +411,7 @@ where
let mut scan = Scan::new(
&schema,
&self.manager,
&self.instance,
range,
self.version_set.load_ts(),
&*current,
Expand Down Expand Up @@ -462,6 +469,10 @@ where
self.schema.write().await.flush_wal().await?;
Ok(())
}

pub fn instance(&self) -> &RecordInstance {
self.instance.as_ref()
}
}

pub(crate) struct Schema<R>
Expand All @@ -473,7 +484,6 @@ where
compaction_tx: Sender<CompactTask>,
recover_wal_ids: Option<Vec<FileId>>,
trigger: Arc<Box<dyn Trigger<R> + Send + Sync>>,
record_instance: RecordInstance,
}

impl<R> Schema<R>
Expand All @@ -484,7 +494,6 @@ where
option: Arc<DbOption<R>>,
compaction_tx: Sender<CompactTask>,
version_set: &VersionSet<R>,
record_instance: RecordInstance,
manager: &StoreManager,
) -> Result<Self, DbError<R>> {
let trigger = Arc::new(TriggerFactory::create(option.trigger_type));
Expand All @@ -494,7 +503,6 @@ where
compaction_tx,
recover_wal_ids: None,
trigger,
record_instance,
};

let base_fs = manager.base_fs();
Expand Down Expand Up @@ -592,13 +600,14 @@ where
async fn get<'get>(
&'get self,
version: &'get Version<R>,
record_instance: &'get RecordInstance,
manager: &StoreManager,
key: &'get R::Key,
ts: Timestamp,
projection: Projection,
parquet_lru: ParquetLru,
) -> Result<Option<Entry<'get, R>>, DbError<R>> {
let primary_key_index = self.record_instance.primary_key_index::<R>();
let primary_key_index = record_instance.primary_key_index::<R>();

let projection = match projection {
Projection::All => ProjectionMask::all(),
Expand All @@ -610,7 +619,7 @@ where
fixed_projection.dedup();

ProjectionMask::roots(
&arrow_to_parquet_schema(&self.record_instance.arrow_schema::<R>()).unwrap(),
&arrow_to_parquet_schema(&record_instance.arrow_schema::<R>()).unwrap(),
fixed_projection,
)
}
Expand Down Expand Up @@ -663,6 +672,7 @@ where
{
schema: &'scan Schema<R>,
manager: &'scan StoreManager,
instance: &'scan RecordInstance,
lower: Bound<&'range R::Key>,
upper: Bound<&'range R::Key>,
ts: Timestamp,
Expand All @@ -685,6 +695,7 @@ where
fn new(
schema: &'scan Schema<R>,
manager: &'scan StoreManager,
instance: &'scan RecordInstance,
(lower, upper): (Bound<&'range R::Key>, Bound<&'range R::Key>),
ts: Timestamp,
version: &'scan Version<R>,
Expand All @@ -696,6 +707,7 @@ where
Self {
schema,
manager,
instance,
lower,
upper,
ts,
Expand All @@ -722,13 +734,13 @@ where
for p in &mut projection {
*p += 2;
}
let primary_key_index = self.schema.record_instance.primary_key_index::<R>();
let primary_key_index = self.instance.primary_key_index::<R>();
let mut fixed_projection = vec![0, 1, primary_key_index];
fixed_projection.append(&mut projection);
fixed_projection.dedup();

let mask = ProjectionMask::roots(
&arrow_to_parquet_schema(&self.schema.record_instance.arrow_schema::<R>()).unwrap(),
&arrow_to_parquet_schema(&self.instance.arrow_schema::<R>()).unwrap(),
fixed_projection.clone(),
);

Expand Down Expand Up @@ -842,7 +854,7 @@ where
batch_size,
merge_stream,
self.projection_indices,
&self.schema.record_instance,
&self.instance,
))
}
}
Expand Down Expand Up @@ -1279,7 +1291,6 @@ pub(crate) mod tests {
compaction_tx,
recover_wal_ids: None,
trigger,
record_instance: RecordInstance::Normal,
},
compaction_rx,
))
Expand All @@ -1290,6 +1301,7 @@ pub(crate) mod tests {
compaction_rx: Receiver<CompactTask>,
executor: E,
schema: crate::Schema<R>,
instance: Arc<RecordInstance>,
version: Version<R>,
manager: Arc<StoreManager>,
) -> Result<DB<R, E>, DbError<R>>
Expand All @@ -1315,6 +1327,7 @@ pub(crate) mod tests {
option.clone(),
version_set.clone(),
manager.clone(),
instance.clone(),
);

executor.spawn(async move {
Expand Down Expand Up @@ -1355,6 +1368,7 @@ pub(crate) mod tests {
lock_map: Arc::new(Default::default()),
manager,
parquet_lru: Arc::new(NoCache::default()),
instance,
_p: Default::default(),
})
}
Expand Down Expand Up @@ -1656,7 +1670,6 @@ pub(crate) mod tests {
compaction_tx: task_tx.clone(),
recover_wal_ids: None,
trigger,
record_instance: RecordInstance::Normal,
};

for (i, item) in test_items().into_iter().enumerate() {
Expand Down Expand Up @@ -1723,7 +1736,6 @@ pub(crate) mod tests {
compaction_tx: task_tx.clone(),
recover_wal_ids: None,
trigger,
record_instance: RecordInstance::Normal,
};

for item in test_dyn_items().into_iter() {
Expand Down
18 changes: 15 additions & 3 deletions src/record/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ use crate::{
};

#[allow(unused)]
pub(crate) enum RecordInstance {
pub enum RecordInstance {
Normal,
Runtime(DynRecord),
}

#[allow(unused)]
impl RecordInstance {
pub(crate) fn primary_key_index<R>(&self) -> usize
pub fn primary_key_index<R>(&self) -> usize
where
R: Record,
{
Expand All @@ -36,7 +36,7 @@ impl RecordInstance {
}
}

pub(crate) fn arrow_schema<R>(&self) -> Arc<Schema>
pub fn arrow_schema<R>(&self) -> Arc<Schema>
where
R: Record,
{
Expand All @@ -45,6 +45,18 @@ impl RecordInstance {
RecordInstance::Runtime(record) => record.arrow_schema(),
}
}

pub fn dyn_columns(&self) -> &[Column] {
match self {
RecordInstance::Runtime(record) => {
record.columns()
}
RecordInstance::Normal => {
&[]
}
}
}

}

pub trait Record: 'static + Sized + Decode + Debug + Send + Sync {
Expand Down
8 changes: 8 additions & 0 deletions src/record/runtime/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ impl DynRecord {
);
Arc::new(Schema::new_with_metadata(fields, metadata))
}

pub fn columns(&self) -> &[Column] {
self.columns.as_slice()
}

pub fn primary_column(&self) -> &Column {
&self.columns[self.primary_index]
}
}

impl DynRecord {
Expand Down
Loading

0 comments on commit 174d48a

Please sign in to comment.