From 563912125c7f6da75e7d1e1a5f5f6091958f0bd3 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Sun, 27 Sep 2020 14:08:23 +0800 Subject: [PATCH] fix future Signed-off-by: Little-Wallace --- src/engine.rs | 117 ++++++++++++++++++++++++++++------------------- src/log_batch.rs | 5 +- src/purge.rs | 4 +- 3 files changed, 75 insertions(+), 51 deletions(-) diff --git a/src/engine.rs b/src/engine.rs index e1cea450..ca22807f 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -26,6 +26,7 @@ use crate::purge::PurgeManager; use crate::util::{HandyRwLock, HashMap, Worker}; use crate::wal::{LogMsg, WalRunner, WriteTask}; use crate::{codec, CacheStats, GlobalStats, Result}; +use futures::future::{err, ok, BoxFuture}; const SLOTS_COUNT: usize = 128; @@ -131,44 +132,62 @@ where workers: Arc>, } +fn apply_item( + memtables: &MemTableAccessor, + item: LogItem, + queue: LogQueue, + file_num: u64, +) where + E: Message + Clone, + W: EntryExt, +{ + let memtable = memtables.get_or_insert(item.raft_group_id); + match item.content { + LogItemContent::Entries(entries_to_add) => { + let entries = entries_to_add.entries; + let entries_index = entries_to_add.entries_index; + if queue == LogQueue::Rewrite { + memtable.wl().append_rewrite(entries, entries_index); + } else { + memtable.wl().append(entries, entries_index); + } + } + LogItemContent::Command(Command::Clean) => { + memtables.remove(item.raft_group_id); + } + LogItemContent::Command(Command::Compact { index }) => { + memtable.wl().compact_to(index); + } + LogItemContent::Kv(kv) => match kv.op_type { + OpType::Put => { + let (key, value) = (kv.key, kv.value.unwrap()); + match queue { + LogQueue::Append => memtable.wl().put(key, value, file_num), + LogQueue::Rewrite => memtable.wl().put_rewrite(key, value, file_num), + } + } + OpType::Del => memtable.wl().delete(kv.key.as_slice()), + }, + } +} + impl Engine where E: Message + Clone, W: EntryExt + 'static, P: GenericPipeLog, { - fn apply_to_memtable(&self, item: LogItem, queue: LogQueue, file_num: u64) { - let memtable = self.memtables.get_or_insert(item.raft_group_id); - match item.content { - LogItemContent::Entries(entries_to_add) => { - let entries = entries_to_add.entries; - let entries_index = entries_to_add.entries_index; - if queue == LogQueue::Rewrite { - memtable.wl().append_rewrite(entries, entries_index); - } else { - memtable.wl().append(entries, entries_index); - } - } - LogItemContent::Command(Command::Clean) => { - self.memtables.remove(item.raft_group_id); - } - LogItemContent::Command(Command::Compact { index }) => { - memtable.wl().compact_to(index); - } - LogItemContent::Kv(kv) => match kv.op_type { - OpType::Put => { - let (key, value) = (kv.key, kv.value.unwrap()); - match queue { - LogQueue::Append => memtable.wl().put(key, value, file_num), - LogQueue::Rewrite => memtable.wl().put_rewrite(key, value, file_num), - } - } - OpType::Del => memtable.wl().delete(kv.key.as_slice()), - }, + fn apply_to_memtable(&self, log_batch: &mut LogBatch, queue: LogQueue, file_num: u64) { + for item in log_batch.items.drain(..) { + apply_item(&self.memtables, item, queue, file_num); } } - async fn write_impl(&self, log_batch: &mut LogBatch, sync: bool) -> Result { + fn write_impl( + &self, + log_batch: &mut LogBatch, + sync: bool, + ) -> BoxFuture<'static, Result> { let mut entries_size = 0; if let Some(content) = log_batch.encode_to_bytes(&mut entries_size) { let (sender, r) = future_channel::oneshot::channel(); @@ -179,21 +198,25 @@ where entries_size, sender, }; - self.wal_sender - .send(LogMsg::Write(task)) - .map_err(|_| Error::Stop)?; - let (file_num, offset, tracker) = r.await?; - if file_num > 0 { - for mut item in log_batch.items.drain(..) { - if let LogItemContent::Entries(entries) = &mut item.content { - entries.update_position(LogQueue::Append, file_num, offset, &tracker); + if let Err(_) = self.wal_sender.send(LogMsg::Write(task)) { + return Box::pin(err(Error::Stop)); + } + let memtables = self.memtables.clone(); + let items = std::mem::replace(&mut log_batch.items, vec![]); + return Box::pin(async move { + let (file_num, offset, tracker) = r.await?; + if file_num > 0 { + for mut item in items { + if let LogItemContent::Entries(entries) = &mut item.content { + entries.update_position(LogQueue::Append, file_num, offset, &tracker); + } + apply_item(&memtables, item, LogQueue::Append, file_num); } - self.apply_to_memtable(item, LogQueue::Append, file_num); } - } - return Ok(bytes); + return Ok(bytes); + }); } - Ok(0) + return Box::pin(ok(0)); } } @@ -344,9 +367,7 @@ where } } cache_submitor.fill_chunk(encoded_size); - for item in log_batch.items.drain(..) { - self.apply_to_memtable(item, queue, file_num); - } + self.apply_to_memtable(&mut log_batch, queue, file_num); offset = (buf.as_ptr() as usize - start_ptr as usize) as u64; } Ok(None) => { @@ -504,8 +525,12 @@ where block_on(self.write_impl(log_batch, sync)) } - pub async fn async_write(&self, log_batch: &mut LogBatch, sync: bool) -> Result { - self.write_impl(log_batch, sync).await + pub fn async_write( + &self, + log_batch: &mut LogBatch, + sync: bool, + ) -> BoxFuture<'static, Result> { + self.write_impl(log_batch, sync) } /// Flush stats about EntryCache. diff --git a/src/log_batch.rs b/src/log_batch.rs index 7509e8ce..9fc2eae3 100644 --- a/src/log_batch.rs +++ b/src/log_batch.rs @@ -145,10 +145,7 @@ impl Entries { pub fn new(entries: Vec, entries_index: Option>) -> Entries { let len = entries.len(); let (encoded_size, entries_index) = match entries_index { - Some(index) => ( - index.iter().fold(0, |acc, x| acc + x.len as usize), - index, - ), + Some(index) => (index.iter().fold(0, |acc, x| acc + x.len as usize), index), None => (0, vec![EntryIndex::default(); len]), }; Entries { diff --git a/src/purge.rs b/src/purge.rs index 1ff95b19..ec3a145b 100644 --- a/src/purge.rs +++ b/src/purge.rs @@ -163,7 +163,9 @@ fn rewrite_to_memtable( let memtable = memtables.get_or_insert(item.raft_group_id); match item.content { LogItemContent::Entries(entries_to_add) => { - memtable.wl().rewrite(entries_to_add.entries_index, latest_rewrite); + memtable + .wl() + .rewrite(entries_to_add.entries_index, latest_rewrite); } LogItemContent::Kv(kv) => match kv.op_type { OpType::Put => memtable.wl().rewrite_key(kv.key, latest_rewrite, file_num),