Skip to content

Commit

Permalink
fix future
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace committed Sep 27, 2020
1 parent 2017cdb commit 5639121
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 51 deletions.
117 changes: 71 additions & 46 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::purge::PurgeManager;
use crate::util::{HandyRwLock, HashMap, Worker};
use crate::wal::{LogMsg, WalRunner, WriteTask};
use crate::{codec, CacheStats, GlobalStats, Result};
use futures::future::{err, ok, BoxFuture};

const SLOTS_COUNT: usize = 128;

Expand Down Expand Up @@ -131,44 +132,62 @@ where
workers: Arc<RwLock<Workers>>,
}

fn apply_item<E, W>(
memtables: &MemTableAccessor<E, W>,
item: LogItem<E>,
queue: LogQueue,
file_num: u64,
) where
E: Message + Clone,
W: EntryExt<E>,
{
let memtable = memtables.get_or_insert(item.raft_group_id);
match item.content {
LogItemContent::Entries(entries_to_add) => {
let entries = entries_to_add.entries;
let entries_index = entries_to_add.entries_index;
if queue == LogQueue::Rewrite {
memtable.wl().append_rewrite(entries, entries_index);
} else {
memtable.wl().append(entries, entries_index);
}
}
LogItemContent::Command(Command::Clean) => {
memtables.remove(item.raft_group_id);
}
LogItemContent::Command(Command::Compact { index }) => {
memtable.wl().compact_to(index);
}
LogItemContent::Kv(kv) => match kv.op_type {
OpType::Put => {
let (key, value) = (kv.key, kv.value.unwrap());
match queue {
LogQueue::Append => memtable.wl().put(key, value, file_num),
LogQueue::Rewrite => memtable.wl().put_rewrite(key, value, file_num),
}
}
OpType::Del => memtable.wl().delete(kv.key.as_slice()),
},
}
}

impl<E, W, P> Engine<E, W, P>
where
E: Message + Clone,
W: EntryExt<E> + 'static,
P: GenericPipeLog,
{
fn apply_to_memtable(&self, item: LogItem<E>, queue: LogQueue, file_num: u64) {
let memtable = self.memtables.get_or_insert(item.raft_group_id);
match item.content {
LogItemContent::Entries(entries_to_add) => {
let entries = entries_to_add.entries;
let entries_index = entries_to_add.entries_index;
if queue == LogQueue::Rewrite {
memtable.wl().append_rewrite(entries, entries_index);
} else {
memtable.wl().append(entries, entries_index);
}
}
LogItemContent::Command(Command::Clean) => {
self.memtables.remove(item.raft_group_id);
}
LogItemContent::Command(Command::Compact { index }) => {
memtable.wl().compact_to(index);
}
LogItemContent::Kv(kv) => match kv.op_type {
OpType::Put => {
let (key, value) = (kv.key, kv.value.unwrap());
match queue {
LogQueue::Append => memtable.wl().put(key, value, file_num),
LogQueue::Rewrite => memtable.wl().put_rewrite(key, value, file_num),
}
}
OpType::Del => memtable.wl().delete(kv.key.as_slice()),
},
fn apply_to_memtable(&self, log_batch: &mut LogBatch<E, W>, queue: LogQueue, file_num: u64) {
for item in log_batch.items.drain(..) {
apply_item(&self.memtables, item, queue, file_num);
}
}

async fn write_impl(&self, log_batch: &mut LogBatch<E, W>, sync: bool) -> Result<usize> {
fn write_impl(
&self,
log_batch: &mut LogBatch<E, W>,
sync: bool,
) -> BoxFuture<'static, Result<usize>> {
let mut entries_size = 0;
if let Some(content) = log_batch.encode_to_bytes(&mut entries_size) {
let (sender, r) = future_channel::oneshot::channel();
Expand All @@ -179,21 +198,25 @@ where
entries_size,
sender,
};
self.wal_sender
.send(LogMsg::Write(task))
.map_err(|_| Error::Stop)?;
let (file_num, offset, tracker) = r.await?;
if file_num > 0 {
for mut item in log_batch.items.drain(..) {
if let LogItemContent::Entries(entries) = &mut item.content {
entries.update_position(LogQueue::Append, file_num, offset, &tracker);
if let Err(_) = self.wal_sender.send(LogMsg::Write(task)) {
return Box::pin(err(Error::Stop));
}
let memtables = self.memtables.clone();
let items = std::mem::replace(&mut log_batch.items, vec![]);
return Box::pin(async move {
let (file_num, offset, tracker) = r.await?;
if file_num > 0 {
for mut item in items {
if let LogItemContent::Entries(entries) = &mut item.content {
entries.update_position(LogQueue::Append, file_num, offset, &tracker);
}
apply_item(&memtables, item, LogQueue::Append, file_num);
}
self.apply_to_memtable(item, LogQueue::Append, file_num);
}
}
return Ok(bytes);
return Ok(bytes);
});
}
Ok(0)
return Box::pin(ok(0));
}
}

Expand Down Expand Up @@ -344,9 +367,7 @@ where
}
}
cache_submitor.fill_chunk(encoded_size);
for item in log_batch.items.drain(..) {
self.apply_to_memtable(item, queue, file_num);
}
self.apply_to_memtable(&mut log_batch, queue, file_num);
offset = (buf.as_ptr() as usize - start_ptr as usize) as u64;
}
Ok(None) => {
Expand Down Expand Up @@ -504,8 +525,12 @@ where
block_on(self.write_impl(log_batch, sync))
}

pub async fn async_write(&self, log_batch: &mut LogBatch<E, W>, sync: bool) -> Result<usize> {
self.write_impl(log_batch, sync).await
pub fn async_write(
&self,
log_batch: &mut LogBatch<E, W>,
sync: bool,
) -> BoxFuture<'static, Result<usize>> {
self.write_impl(log_batch, sync)
}

/// Flush stats about EntryCache.
Expand Down
5 changes: 1 addition & 4 deletions src/log_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,7 @@ impl<E: Message> Entries<E> {
pub fn new(entries: Vec<E>, entries_index: Option<Vec<EntryIndex>>) -> Entries<E> {
let len = entries.len();
let (encoded_size, entries_index) = match entries_index {
Some(index) => (
index.iter().fold(0, |acc, x| acc + x.len as usize),
index,
),
Some(index) => (index.iter().fold(0, |acc, x| acc + x.len as usize), index),
None => (0, vec![EntryIndex::default(); len]),
};
Entries {
Expand Down
4 changes: 3 additions & 1 deletion src/purge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@ fn rewrite_to_memtable<E, W>(
let memtable = memtables.get_or_insert(item.raft_group_id);
match item.content {
LogItemContent::Entries(entries_to_add) => {
memtable.wl().rewrite(entries_to_add.entries_index, latest_rewrite);
memtable
.wl()
.rewrite(entries_to_add.entries_index, latest_rewrite);
}
LogItemContent::Kv(kv) => match kv.op_type {
OpType::Put => memtable.wl().rewrite_key(kv.key, latest_rewrite, file_num),
Expand Down

0 comments on commit 5639121

Please sign in to comment.