From a211a1e33b2dd65f5a90a58c417e2d50f9f2abaf Mon Sep 17 00:00:00 2001 From: Kould Date: Wed, 17 Jul 2024 11:31:37 +0800 Subject: [PATCH] chore: replace channel to `flume` --- Cargo.toml | 3 +-- src/version/cleaner.rs | 14 +++++----- src/version/edit.rs | 61 ++++++++++++++++++++---------------------- src/version/mod.rs | 22 +++++---------- src/version/set.rs | 6 ++--- 5 files changed, 46 insertions(+), 60 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8b398acd..944c6bf1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,9 +11,8 @@ tokio = ["dep:tokio"] arrow = "52" async-lock = "3" crossbeam-skiplist = "0.1" -futures-channel = "0.3" +flume = { version = "0.11.0", features = ["async"] } futures-core = "0.3" -futures-executor = "0.3" futures-io = "0.3" futures-util = "0.3" once_cell = "1" diff --git a/src/version/cleaner.rs b/src/version/cleaner.rs index daeb8bb9..169da40e 100644 --- a/src/version/cleaner.rs +++ b/src/version/cleaner.rs @@ -1,7 +1,6 @@ use std::{collections::BTreeMap, fs, io, sync::Arc}; -use futures_channel::mpsc::{channel, Receiver, Sender}; -use futures_util::StreamExt; +use flume::{Receiver, Sender}; use crate::{fs::FileId, DbOption}; @@ -23,7 +22,7 @@ pub(crate) struct Cleaner { impl Cleaner { pub(crate) fn new(option: Arc) -> (Self, Sender) { - let (tag_send, tag_recv) = channel(option.clean_channel_buffer); + let (tag_send, tag_recv) = flume::bounded(option.clean_channel_buffer); ( Cleaner { @@ -36,13 +35,12 @@ impl Cleaner { } pub(crate) async fn listen(&mut self) -> Result<(), io::Error> { - loop { - match self.tag_recv.next().await { - None => break, - Some(CleanTag::Add { version_num, gens }) => { + while let Ok(tag) = self.tag_recv.recv_async().await { + match tag { + CleanTag::Add { version_num, gens } => { let _ = self.gens_map.insert(version_num, (gens, false)); } - Some(CleanTag::Clean { version_num }) => { + CleanTag::Clean { version_num } => { if let Some((_, dropped)) = self.gens_map.get_mut(&version_num) { *dropped = true; } diff --git a/src/version/edit.rs b/src/version/edit.rs index 2133864f..ef07e09e 100644 --- a/src/version/edit.rs +++ b/src/version/edit.rs @@ -107,46 +107,43 @@ where #[cfg(test)] mod tests { - use futures_executor::block_on; use futures_util::io::Cursor; use crate::{fs::FileId, scope::Scope, serdes::Encode, version::edit::VersionEdit}; - #[test] - fn encode_and_decode() { - block_on(async { - let edits = vec![ - VersionEdit::Add { - level: 0, - scope: Scope { - min: "Min".to_string(), - max: "Max".to_string(), - gen: Default::default(), - wal_ids: Some(vec![FileId::new(), FileId::new()]), - }, - }, - VersionEdit::Remove { - level: 1, + #[tokio::test] + async fn encode_and_decode() { + let edits = vec![ + VersionEdit::Add { + level: 0, + scope: Scope { + min: "Min".to_string(), + max: "Max".to_string(), gen: Default::default(), + wal_ids: Some(vec![FileId::new(), FileId::new()]), }, - ]; - - let bytes = { - let mut cursor = Cursor::new(vec![]); - - for edit in edits.clone() { - edit.encode(&mut cursor).await.unwrap(); - } - cursor.into_inner() - }; + }, + VersionEdit::Remove { + level: 1, + gen: Default::default(), + }, + ]; + + let bytes = { + let mut cursor = Cursor::new(vec![]); + + for edit in edits.clone() { + edit.encode(&mut cursor).await.unwrap(); + } + cursor.into_inner() + }; - let decode_edits = { - let mut cursor = Cursor::new(bytes); + let decode_edits = { + let mut cursor = Cursor::new(bytes); - VersionEdit::::recover(&mut cursor).await - }; + VersionEdit::::recover(&mut cursor).await + }; - assert_eq!(edits, decode_edits); - }) + assert_eq!(edits, decode_edits); } } diff --git a/src/version/mod.rs b/src/version/mod.rs index 2fd89cc7..4e8f1bf7 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -4,9 +4,7 @@ mod set; use std::{marker::PhantomData, ops::Bound, sync::Arc}; -use futures_channel::mpsc::{SendError, Sender}; -use futures_executor::block_on; -use futures_util::SinkExt; +use flume::{SendError, Sender}; use thiserror::Error; use tracing::error; @@ -168,17 +166,11 @@ where E: Executor, { fn drop(&mut self) { - block_on(async { - if let Err(err) = self - .clean_sender - .send(CleanTag::Clean { - version_num: self.num, - }) - .await - { - error!("[Version Drop Error]: {}", err) - } - }); + if let Err(err) = self.clean_sender.send(CleanTag::Clean { + version_num: self.num, + }) { + error!("[Version Drop Error]: {}", err) + } } } @@ -194,5 +186,5 @@ where #[error("version parquet error: {0}")] Parquet(#[source] parquet::errors::ParquetError), #[error("version send error: {0}")] - Send(#[source] SendError), + Send(#[source] SendError), } diff --git a/src/version/set.rs b/src/version/set.rs index 96fdcc6c..dda5cf3c 100644 --- a/src/version/set.rs +++ b/src/version/set.rs @@ -1,8 +1,8 @@ use std::{io::SeekFrom, sync::Arc}; use async_lock::RwLock; -use futures_channel::mpsc::Sender; -use futures_util::{AsyncSeekExt, AsyncWriteExt, SinkExt}; +use flume::Sender; +use futures_util::{AsyncSeekExt, AsyncWriteExt}; use crate::{ executor::Executor, @@ -125,7 +125,7 @@ where if let Some(delete_gens) = delete_gens { new_version .clean_sender - .send(CleanTag::Add { + .send_async(CleanTag::Add { version_num: new_version.num, gens: delete_gens, })