From c3484bc1f4ee6197c091ce6c67f122fea1c6112e Mon Sep 17 00:00:00 2001 From: zhanglinwei Date: Fri, 13 Oct 2023 15:22:52 +0800 Subject: [PATCH] Replace failure with thiserror --- Cargo.toml | 2 +- src/bin/cli.rs | 4 +- src/bin/server.rs | 4 +- src/error.rs | 206 +++++++------------- src/kernel/io/buf.rs | 12 +- src/kernel/io/direct.rs | 8 +- src/kernel/io/mod.rs | 16 +- src/kernel/lsm/compactor.rs | 26 +-- src/kernel/lsm/iterator/level_iter.rs | 16 +- src/kernel/lsm/iterator/merging_iter.rs | 8 +- src/kernel/lsm/iterator/mod.rs | 8 +- src/kernel/lsm/log.rs | 32 +-- src/kernel/lsm/mem_table.rs | 36 ++-- src/kernel/lsm/mod.rs | 8 +- src/kernel/lsm/mvcc.rs | 28 +-- src/kernel/lsm/storage.rs | 31 +-- src/kernel/lsm/table/loader.rs | 22 ++- src/kernel/lsm/table/mod.rs | 8 +- src/kernel/lsm/table/scope.rs | 7 +- src/kernel/lsm/table/skip_table/iter.rs | 4 +- src/kernel/lsm/table/skip_table/mod.rs | 4 +- src/kernel/lsm/table/ss_table/block.rs | 40 ++-- src/kernel/lsm/table/ss_table/block_iter.rs | 14 +- src/kernel/lsm/table/ss_table/footer.rs | 8 +- src/kernel/lsm/table/ss_table/iter.rs | 18 +- src/kernel/lsm/table/ss_table/mod.rs | 20 +- src/kernel/lsm/version/iter.rs | 10 +- src/kernel/lsm/version/meta.rs | 4 +- src/kernel/lsm/version/mod.rs | 14 +- src/kernel/lsm/version/status.rs | 11 +- src/kernel/lsm/version/test.rs | 6 +- src/kernel/mod.rs | 22 +-- src/kernel/sled_storage.rs | 14 +- src/kernel/utils/lru_cache.rs | 25 ++- src/server/client.rs | 22 +-- tests/tests.rs | 32 +-- 36 files changed, 352 insertions(+), 398 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e17aa96..0078539 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,7 @@ debug = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -failure = { version = "0.1.5", features = ["derive"] } +thiserror = "1.0.24" # 序列化 serde = { version = "1.0.89", features = ["derive", "rc"] } bincode = "1.3.3" diff --git a/src/bin/cli.rs b/src/bin/cli.rs index 3a2e56c..6382dbc 100644 --- a/src/bin/cli.rs +++ b/src/bin/cli.rs @@ -1,7 +1,7 @@ use clap::{Parser, Subcommand}; use itertools::Itertools; +use kip_db::server::client::ConnectionResult; use kip_db::server::client::KipdbClient; -use kip_db::server::client::Result; use kip_db::DEFAULT_PORT; use serde::{Deserialize, Serialize}; use tracing::{error, info}; @@ -33,7 +33,7 @@ struct Cli { /// 就是说客户端没必要多线程,强制单线程避免产生额外线程 /// 调用方法基本:./kip-db-cli get key1 value1 #[tokio::main(flavor = "current_thread")] -async fn main() -> Result<()> { +async fn main() -> ConnectionResult<()> { // Enable logging tracing_subscriber::fmt::try_init().unwrap(); let cli: Cli = Cli::parse(); diff --git a/src/bin/server.rs b/src/bin/server.rs index 2b851b8..1c628d7 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -1,13 +1,13 @@ use clap::Parser; -use kip_db::server::client::Result; +use kip_db::server::client::ConnectionResult; use kip_db::server::server::serve; use kip_db::{DEFAULT_PORT, LOCAL_IP}; /// 服务启动方法 /// 二进制执行文件调用方法:./kip-db-cli #[tokio::main] -pub async fn main() -> Result<()> { +pub async fn main() -> ConnectionResult<()> { tracing_subscriber::fmt::try_init().unwrap(); let cli = Cli::parse(); diff --git a/src/error.rs b/src/error.rs index ba526a9..7870a3d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,179 +1,105 @@ -use failure::Fail; +use crate::kernel::lsm::compactor::CompactTask; +use crate::kernel::lsm::version::cleaner::CleanTag; use std::io; +use thiserror::Error; use tokio::sync::mpsc::error::SendError; use tokio::sync::oneshot::error::RecvError; /// Error type for kvs -#[derive(Fail, Debug)] +#[derive(Error, Debug)] #[non_exhaustive] pub enum KernelError { /// IO error - #[fail(display = "{}", _0)] - Io(#[cause] io::Error), - #[fail(display = "{}", _0)] - Recv(#[cause] RecvError), + #[error("{0}")] + Io(#[from] io::Error), + + #[error("{0}")] + RecvError(#[from] RecvError), + + #[error("Failed to send compact task")] + SendCompactTaskError(#[from] SendError), + + #[error("Failed to send clean tag")] + SendCleanTagError(#[from] SendError), /// Serialization or deserialization error - #[fail(display = "{}", _0)] - SerdeBinCode(#[cause] Box), + #[error("{0}")] + SerdeBinCode(#[from] Box), + /// Remove no-existent key error - #[fail(display = "Key not found")] + #[error("Key not found")] KeyNotFound, - #[fail(display = "Data is empty")] + + #[error("Data is empty")] DataEmpty, - #[fail(display = "Max Level is 7")] + + #[error("Max Level is 7")] LevelOver, - #[fail(display = "Not the correct type of Cmd")] + + #[error("Not the correct type of Cmd")] NotMatchCmd, - #[fail(display = "CRC code does not match")] + + #[error("CRC code does not match")] CrcMisMatch, - #[fail(display = "{}", _0)] - SledErr(#[cause] sled::Error), - #[fail(display = "Cache size overflow")] + + #[error("{0}")] + SledErr(#[from] sled::Error), + + #[error("Cache size overflow")] CacheSizeOverFlow, - #[fail(display = "Cache sharding and size overflow")] + + #[error("Cache sharding and size overflow")] CacheShardingNotAlign, - #[fail(display = "File not found")] + + #[error("File not found")] FileNotFound, + /// 正常情况wal在内存中存在索引则表示硬盘中存在有对应的数据 /// 而错误则是内存存在索引却在硬盘中不存在这个数据 - #[fail(display = "WAL log load error")] + #[error("WAL log load error")] WalLoad, + /// Unexpected command type error. /// It indicated a corrupted log or a program bug. - #[fail(display = "Unexpected command type")] + #[error("Unexpected command type")] UnexpectedCommandType, - #[fail(display = "Process already exists")] + + #[error("Process already exists")] ProcessExists, - #[fail(display = "channel is closed")] + + #[error("channel is closed")] ChannelClose, - #[fail(display = "{}", _0)] + + #[error("{0}")] NotSupport(&'static str), + + #[error("The number of caches cannot be divisible by the number of shards")] + ShardingNotAlign, } -#[derive(Fail, Debug)] +#[derive(Error, Debug)] #[non_exhaustive] pub enum ConnectionError { - #[fail(display = "{}", _0)] - IO(#[cause] io::Error), - #[fail(display = "disconnected")] + #[error("{0}")] + IO(#[from] io::Error), + #[error("disconnected")] Disconnected, - #[fail(display = "write failed")] + #[error("write failed")] WriteFailed, - #[fail(display = "wrong instruction")] + #[error("wrong instruction")] WrongInstruction, - #[fail(display = "encode error")] + #[error("encode error")] EncodeErr, - #[fail(display = "decode error")] + #[error("decode error")] DecodeErr, - #[fail(display = "server flush error")] + #[error("server flush error")] FlushError, - #[fail(display = "{}", _0)] - StoreErr(#[cause] KernelError), - #[fail(display = "Failed to connect to server, {}", _0)] - TonicTransportErr(#[cause] tonic::transport::Error), - #[fail(display = "Failed to call server, {}", _0)] - TonicFailureStatus(#[cause] tonic::Status), - #[fail(display = "Failed to parse addr, {}", _0)] - AddrParseError(#[cause] std::net::AddrParseError), -} - -#[derive(Fail, Debug)] -#[non_exhaustive] -#[allow(missing_copy_implementations)] -pub enum CacheError { - #[fail(display = "The number of caches cannot be divisible by the number of shards")] - ShardingNotAlign, - #[fail(display = "Cache size overflow")] - CacheSizeOverFlow, - #[fail(display = "{}", _0)] - StoreErr(#[cause] KernelError), -} - -impl From> for KernelError { - #[inline] - fn from(_: SendError) -> Self { - KernelError::ChannelClose - } -} - -impl From for ConnectionError { - #[inline] - fn from(err: io::Error) -> Self { - ConnectionError::IO(err) - } -} - -impl From for KernelError { - #[inline] - fn from(err: io::Error) -> Self { - KernelError::Io(err) - } -} - -impl From for KernelError { - #[inline] - fn from(err: RecvError) -> Self { - KernelError::Recv(err) - } -} - -impl From> for KernelError { - #[inline] - fn from(err: Box) -> Self { - KernelError::SerdeBinCode(err) - } -} - -impl From for KernelError { - #[inline] - fn from(err: sled::Error) -> Self { - KernelError::SledErr(err) - } -} - -impl From for ConnectionError { - #[inline] - fn from(err: KernelError) -> Self { - ConnectionError::StoreErr(err) - } -} - -impl From for ConnectionError { - #[inline] - fn from(status: tonic::Status) -> Self { - ConnectionError::TonicFailureStatus(status) - } -} - -impl From for ConnectionError { - #[inline] - fn from(err: tonic::transport::Error) -> Self { - ConnectionError::TonicTransportErr(err) - } -} - -impl From for ConnectionError { - #[inline] - fn from(err: std::net::AddrParseError) -> Self { - ConnectionError::AddrParseError(err) - } -} - -impl From for KernelError { - #[inline] - fn from(value: CacheError) -> Self { - match value { - CacheError::StoreErr(kv_error) => kv_error, - CacheError::CacheSizeOverFlow => KernelError::CacheSizeOverFlow, - CacheError::ShardingNotAlign => KernelError::CacheShardingNotAlign, - } - } -} - -impl From for CacheError { - #[inline] - fn from(value: KernelError) -> Self { - CacheError::StoreErr(value) - } + #[error("Failed to connect to server, {0}")] + TonicTransportErr(#[from] tonic::transport::Error), + #[error("Failed to call server, {0}")] + TonicFailureStatus(#[from] tonic::Status), + #[error("Failed to parse addr, {0}")] + AddrParseError(#[from] std::net::AddrParseError), + #[error("{0}")] + KernelError(#[from] KernelError), } diff --git a/src/kernel/io/buf.rs b/src/kernel/io/buf.rs index 8406131..15d694f 100644 --- a/src/kernel/io/buf.rs +++ b/src/kernel/io/buf.rs @@ -1,5 +1,5 @@ use crate::kernel::io::{FileExtension, IoReader, IoType, IoWriter}; -use crate::kernel::Result; +use crate::kernel::KernelResult; use std::fs::{File, OpenOptions}; use std::io; use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write}; @@ -22,7 +22,7 @@ impl BufIoReader { dir_path: Arc, gen: i64, extension: Arc, - ) -> Result { + ) -> KernelResult { let path = extension.path_with_gen(&dir_path, gen); let reader = BufReaderWithPos::new(File::open(path)?)?; @@ -46,7 +46,7 @@ impl BufIoWriter { dir_path: Arc, gen: i64, extension: Arc, - ) -> Result { + ) -> KernelResult { // 通过路径构造写入器 let file = OpenOptions::new() .create(true) @@ -103,7 +103,7 @@ impl Seek for BufIoWriter { } impl IoWriter for BufIoWriter { - fn current_pos(&mut self) -> Result { + fn current_pos(&mut self) -> KernelResult { Ok(self.writer.pos) } } @@ -115,7 +115,7 @@ pub(crate) struct BufReaderWithPos { } impl BufReaderWithPos { - fn new(mut inner: R) -> Result { + fn new(mut inner: R) -> KernelResult { let pos = inner.stream_position()?; Ok(BufReaderWithPos { reader: BufReader::new(inner), @@ -146,7 +146,7 @@ pub(crate) struct BufWriterWithPos { } impl BufWriterWithPos { - fn new(mut inner: W) -> Result { + fn new(mut inner: W) -> KernelResult { let pos = inner.stream_position()?; Ok(BufWriterWithPos { writer: BufWriter::new(inner), diff --git a/src/kernel/io/direct.rs b/src/kernel/io/direct.rs index af5acf9..cded891 100644 --- a/src/kernel/io/direct.rs +++ b/src/kernel/io/direct.rs @@ -1,5 +1,5 @@ use crate::kernel::io::{FileExtension, IoReader, IoType, IoWriter}; -use crate::kernel::Result; +use crate::kernel::KernelResult; use std::fs::{File, OpenOptions}; use std::io::{Read, Seek, SeekFrom, Write}; use std::path::PathBuf; @@ -23,7 +23,7 @@ impl DirectIoReader { dir_path: Arc, gen: i64, extension: Arc, - ) -> Result { + ) -> KernelResult { let path = extension.path_with_gen(&dir_path, gen); let fs = File::open(path)?; @@ -41,7 +41,7 @@ impl DirectIoWriter { dir_path: Arc, gen: i64, extension: Arc, - ) -> Result { + ) -> KernelResult { let path = extension.path_with_gen(&dir_path, gen); let fs = OpenOptions::new() .create(true) @@ -96,7 +96,7 @@ impl Seek for DirectIoWriter { } impl IoWriter for DirectIoWriter { - fn current_pos(&mut self) -> Result { + fn current_pos(&mut self) -> KernelResult { Ok(self.fs.stream_position()?) } } diff --git a/src/kernel/io/mod.rs b/src/kernel/io/mod.rs index ed74345..76368d7 100644 --- a/src/kernel/io/mod.rs +++ b/src/kernel/io/mod.rs @@ -3,7 +3,7 @@ pub(crate) mod direct; use crate::kernel::io::buf::{BufIoReader, BufIoWriter}; use crate::kernel::io::direct::{DirectIoReader, DirectIoWriter}; -use crate::kernel::Result; +use crate::kernel::KernelResult; use std::fs; use std::io::{Read, Seek, Write}; use std::path::{Path, PathBuf}; @@ -44,7 +44,7 @@ pub enum IoType { impl IoFactory { #[inline] - pub fn reader(&self, gen: i64, io_type: IoType) -> Result> { + pub fn reader(&self, gen: i64, io_type: IoType) -> KernelResult> { let dir_path = Arc::clone(&self.dir_path); let extension = Arc::clone(&self.extension); @@ -55,7 +55,7 @@ impl IoFactory { } #[inline] - pub fn writer(&self, gen: i64, io_type: IoType) -> Result> { + pub fn writer(&self, gen: i64, io_type: IoType) -> KernelResult> { let dir_path = Arc::clone(&self.dir_path); let extension = Arc::clone(&self.extension); @@ -71,7 +71,7 @@ impl IoFactory { } #[inline] - pub fn new(dir_path: impl Into, extension: FileExtension) -> Result { + pub fn new(dir_path: impl Into, extension: FileExtension) -> KernelResult { let path_buf = dir_path.into(); // 创建文件夹(如果他们缺失) fs::create_dir_all(&path_buf)?; @@ -85,13 +85,13 @@ impl IoFactory { } #[inline] - pub fn clean(&self, gen: i64) -> Result<()> { + pub fn clean(&self, gen: i64) -> KernelResult<()> { fs::remove_file(self.extension.path_with_gen(&self.dir_path, gen))?; Ok(()) } #[inline] - pub fn exists(&self, gen: i64) -> Result { + pub fn exists(&self, gen: i64) -> KernelResult { let path = self.extension.path_with_gen(&self.dir_path, gen); Ok(fs::try_exists(path)?) } @@ -103,7 +103,7 @@ pub trait IoReader: Send + Sync + 'static + Read + Seek { fn get_path(&self) -> PathBuf; #[inline] - fn file_size(&self) -> Result { + fn file_size(&self) -> KernelResult { let path_buf = self.get_path(); Ok(fs::metadata(path_buf)?.len()) } @@ -112,5 +112,5 @@ pub trait IoReader: Send + Sync + 'static + Read + Seek { } pub trait IoWriter: Send + Sync + 'static + Write + Seek { - fn current_pos(&mut self) -> Result; + fn current_pos(&mut self) -> KernelResult; } diff --git a/src/kernel/lsm/compactor.rs b/src/kernel/lsm/compactor.rs index 89e2290..8e95f8b 100644 --- a/src/kernel/lsm/compactor.rs +++ b/src/kernel/lsm/compactor.rs @@ -6,7 +6,7 @@ use crate::kernel::lsm::table::scope::Scope; use crate::kernel::lsm::table::{collect_gen, Table}; use crate::kernel::lsm::version::edit::VersionEdit; use crate::kernel::lsm::version::status::VersionStatus; -use crate::kernel::Result; +use crate::kernel::KernelResult; use crate::KernelError; use bytes::Bytes; use futures::future; @@ -56,7 +56,7 @@ impl Compactor { pub(crate) async fn check_then_compaction( &mut self, option_tx: Option>, - ) -> Result<()> { + ) -> KernelResult<()> { let is_force = option_tx.is_some(); if let Some((gen, values)) = self.mem_table().try_swap(is_force)? { @@ -79,7 +79,11 @@ impl Compactor { /// 持久化immutable_table为SSTable /// /// 请注意:vec_values必须是依照key值有序的 - pub(crate) async fn minor_compaction(&self, gen: i64, values: Vec) -> Result<()> { + pub(crate) async fn minor_compaction( + &self, + gen: i64, + values: Vec, + ) -> KernelResult<()> { if !values.is_empty() { let (scope, meta) = self.ver_status().loader().create( gen, @@ -120,7 +124,7 @@ impl Compactor { scope: Scope, mut vec_ver_edit: Vec, mut is_skip_sized: bool, - ) -> Result<()> { + ) -> KernelResult<()> { let config = self.config(); let mut is_over = false; @@ -188,7 +192,7 @@ impl Compactor { level: usize, target: &Scope, is_skip_sized: bool, - ) -> Result> { + ) -> KernelResult> { let version = self.ver_status().current().await; let config = self.config(); let next_level = level + 1; @@ -238,7 +242,7 @@ impl Compactor { tables_l: Vec<&dyn Table>, tables_ll: Vec<&dyn Table>, file_size: usize, - ) -> Result { + ) -> KernelResult { // SSTables的Gen会基于时间有序生成,所有以此作为SSTables的排序依据 let map_futures_l = tables_l .iter() @@ -274,7 +278,7 @@ impl Compactor { Ok(data_sharding(vec_cmd_data, file_size)) } - fn table_load_data(table: &&dyn Table, fn_is_filter: F) -> Result> + fn table_load_data(table: &&dyn Table, fn_is_filter: F) -> KernelResult> where F: Fn(&Bytes) -> bool, { @@ -314,7 +318,7 @@ mod tests { use crate::kernel::lsm::version::edit::VersionEdit; use crate::kernel::lsm::version::DEFAULT_SS_TABLE_PATH; use crate::kernel::utils::lru_cache::ShardingLruCache; - use crate::kernel::{Result, Storage}; + use crate::kernel::{KernelResult, Storage}; use bytes::Bytes; use itertools::Itertools; use std::collections::hash_map::RandomState; @@ -324,7 +328,7 @@ mod tests { use tempfile::TempDir; #[test] - fn test_lsm_major_compactor() -> Result<()> { + fn test_lsm_major_compactor() -> KernelResult<()> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); tokio_test::block_on(async move { @@ -409,7 +413,7 @@ mod tests { } #[test] - fn test_data_merge() -> Result<()> { + fn test_data_merge() -> KernelResult<()> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let config = Config::new(temp_dir.into_path()); @@ -503,7 +507,7 @@ mod tests { /// Level 1: [1,2] /// Level 2: [1,2],[3,4,5,6] #[test] - fn test_seek_compaction() -> Result<()> { + fn test_seek_compaction() -> KernelResult<()> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let config = Config::new(temp_dir.into_path()); diff --git a/src/kernel/lsm/iterator/level_iter.rs b/src/kernel/lsm/iterator/level_iter.rs index 3d8b728..48f87da 100644 --- a/src/kernel/lsm/iterator/level_iter.rs +++ b/src/kernel/lsm/iterator/level_iter.rs @@ -2,7 +2,7 @@ use crate::kernel::lsm::compactor::LEVEL_0; use crate::kernel::lsm::iterator::{Iter, Seek}; use crate::kernel::lsm::mem_table::KeyValue; use crate::kernel::lsm::version::Version; -use crate::kernel::Result; +use crate::kernel::KernelResult; use crate::KernelError; const LEVEL_0_SEEK_MESSAGE: &str = "level 0 cannot seek"; @@ -18,7 +18,7 @@ pub(crate) struct LevelIter<'a> { impl<'a> LevelIter<'a> { #[allow(dead_code)] - pub(crate) fn new(version: &'a Version, level: usize) -> Result> { + pub(crate) fn new(version: &'a Version, level: usize) -> KernelResult> { let table = version.table(level, 0).ok_or(KernelError::DataEmpty)?; let child_iter = table.iter()?; let level_len = version.level_len(level); @@ -32,7 +32,7 @@ impl<'a> LevelIter<'a> { }) } - fn child_iter_seek(&mut self, seek: Seek<'_>, offset: usize) -> Result> { + fn child_iter_seek(&mut self, seek: Seek<'_>, offset: usize) -> KernelResult> { self.offset = offset; if self.is_valid() { if let Some(table) = self.version.table(self.level, offset) { @@ -44,7 +44,7 @@ impl<'a> LevelIter<'a> { Ok(None) } - fn seek_ward(&mut self, key: &[u8], seek: Seek<'_>) -> Result> { + fn seek_ward(&mut self, key: &[u8], seek: Seek<'_>) -> KernelResult> { let level = self.level; if level == LEVEL_0 { @@ -57,7 +57,7 @@ impl<'a> LevelIter<'a> { impl<'a> Iter<'a> for LevelIter<'a> { type Item = KeyValue; - fn try_next(&mut self) -> Result> { + fn try_next(&mut self) -> KernelResult> { match self.child_iter.try_next()? { None => self.child_iter_seek(Seek::First, self.offset + 1), Some(item) => Ok(Some(item)), @@ -70,7 +70,7 @@ impl<'a> Iter<'a> for LevelIter<'a> { /// Tips: Level 0的LevelIter不支持Seek /// 因为Level 0中的SSTable并非有序排列,其中数据范围是可能交错的 - fn seek(&mut self, seek: Seek<'_>) -> Result> { + fn seek(&mut self, seek: Seek<'_>) -> KernelResult> { match seek { Seek::First => self.child_iter_seek(Seek::First, 0), Seek::Last => self.child_iter_seek(Seek::Last, self.level_len - 1), @@ -91,13 +91,13 @@ mod tests { use crate::kernel::lsm::table::TableType; use crate::kernel::lsm::version::edit::VersionEdit; use crate::kernel::lsm::version::status::VersionStatus; - use crate::kernel::Result; + use crate::kernel::KernelResult; use bincode::Options; use bytes::Bytes; use tempfile::TempDir; #[test] - fn test_iterator() -> Result<()> { + fn test_iterator() -> KernelResult<()> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); tokio_test::block_on(async move { diff --git a/src/kernel/lsm/iterator/merging_iter.rs b/src/kernel/lsm/iterator/merging_iter.rs index eb52c98..d8203bf 100644 --- a/src/kernel/lsm/iterator/merging_iter.rs +++ b/src/kernel/lsm/iterator/merging_iter.rs @@ -1,6 +1,6 @@ use crate::kernel::lsm::iterator::{Iter, Seek}; use crate::kernel::lsm::mem_table::KeyValue; -use crate::kernel::Result; +use crate::kernel::KernelResult; use bytes::Bytes; use std::cmp::Ordering; use std::collections::BTreeMap; @@ -38,7 +38,7 @@ impl<'a> MergingIter<'a> { #[allow(dead_code, clippy::mutable_key_type)] pub(crate) fn new( mut vec_iter: Vec + 'a + Send + Sync>>, - ) -> Result { + ) -> KernelResult { let mut map_buf = BTreeMap::new(); for (num, iter) in vec_iter.iter_mut().enumerate() { @@ -58,7 +58,7 @@ impl<'a> MergingIter<'a> { impl<'a> Iter<'a> for MergingIter<'a> { type Item = KeyValue; - fn try_next(&mut self) -> Result> { + fn try_next(&mut self) -> KernelResult> { while let Some((IterKey { num, .. }, old_item)) = self.map_buf.pop_first() { if let Some(item) = self.vec_iter[num].try_next()? { Self::buf_map_insert(&mut self.map_buf, num, item); @@ -86,7 +86,7 @@ impl<'a> Iter<'a> for MergingIter<'a> { } #[allow(clippy::mutable_key_type)] - fn seek(&mut self, seek: Seek<'_>) -> Result> { + fn seek(&mut self, seek: Seek<'_>) -> KernelResult> { let mut seek_map = BTreeMap::new(); for (num, iter) in self.vec_iter.iter_mut().enumerate() { diff --git a/src/kernel/lsm/iterator/mod.rs b/src/kernel/lsm/iterator/mod.rs index 1afbff8..9b71c0d 100644 --- a/src/kernel/lsm/iterator/mod.rs +++ b/src/kernel/lsm/iterator/mod.rs @@ -1,7 +1,7 @@ pub(crate) mod level_iter; pub(crate) mod merging_iter; -use crate::kernel::Result; +use crate::kernel::KernelResult; #[derive(Clone, Copy)] #[allow(dead_code)] @@ -18,14 +18,14 @@ pub enum Seek<'s> { pub trait Iter<'a> { type Item; - fn try_next(&mut self) -> Result>; + fn try_next(&mut self) -> KernelResult>; fn is_valid(&self) -> bool; - fn seek(&mut self, seek: Seek<'_>) -> Result>; + fn seek(&mut self, seek: Seek<'_>) -> KernelResult>; } /// 向前迭代器 pub(crate) trait ForwardIter<'a>: Iter<'a> { - fn try_prev(&mut self) -> Result>; + fn try_prev(&mut self) -> KernelResult>; } diff --git a/src/kernel/lsm/log.rs b/src/kernel/lsm/log.rs index 2ebb684..a7dfe6c 100644 --- a/src/kernel/lsm/log.rs +++ b/src/kernel/lsm/log.rs @@ -1,6 +1,6 @@ use crate::kernel::io::{FileExtension, IoFactory, IoType, IoWriter}; use crate::kernel::lsm::storage::Gen; -use crate::kernel::{sorted_gen_list, Result}; +use crate::kernel::{sorted_gen_list, KernelResult}; use crate::KernelError; use integer_encoding::FixedInt; use std::cmp::min; @@ -26,9 +26,9 @@ impl LogLoader { path_name: (&str, Option), io_type: IoType, fn_decode: F, - ) -> Result<(Self, Vec, i64)> + ) -> KernelResult<(Self, Vec, i64)> where - F: Fn(&mut Vec) -> Result, + F: Fn(&mut Vec) -> KernelResult, { let (loader, log_gen) = Self::_reload(wal_dir_path, path_name, io_type)?; let reload_data = loader.load(log_gen, fn_decode).unwrap_or(Vec::new()); @@ -40,7 +40,7 @@ impl LogLoader { wal_dir_path: &Path, path_name: (&str, Option), io_type: IoType, - ) -> Result<(Self, i64)> { + ) -> KernelResult<(Self, i64)> { let (path, name) = path_name; let wal_path = wal_dir_path.join(path); @@ -58,9 +58,9 @@ impl LogLoader { } /// 通过Gen载入数据进行读取 - pub(crate) fn load(&self, gen: i64, fn_decode: F) -> Result> + pub(crate) fn load(&self, gen: i64, fn_decode: F) -> KernelResult> where - F: Fn(&mut Vec) -> Result, + F: Fn(&mut Vec) -> KernelResult, { let mut reader = LogReader::new(self.factory.reader(gen, self.io_type)?); let mut vec_data = Vec::new(); @@ -75,11 +75,11 @@ impl LogLoader { } #[allow(dead_code)] - pub(crate) fn clean(&self, gen: i64) -> Result<()> { + pub(crate) fn clean(&self, gen: i64) -> KernelResult<()> { self.factory.clean(gen) } - pub(crate) fn writer(&self, gen: i64) -> Result>> { + pub(crate) fn writer(&self, gen: i64) -> KernelResult>> { let new_fs = self.factory.writer(gen, self.io_type)?; Ok(LogWriter::new(new_fs)) } @@ -129,11 +129,11 @@ impl LogWriter { w } - pub(crate) fn seek_end(&mut self) -> Result { + pub(crate) fn seek_end(&mut self) -> KernelResult { Ok(self.dst.seek(SeekFrom::End(0))?) } - pub(crate) fn add_record(&mut self, r: &[u8]) -> Result { + pub(crate) fn add_record(&mut self, r: &[u8]) -> KernelResult { let mut record = r; let mut len = 0; @@ -171,7 +171,7 @@ impl LogWriter { Ok(len) } - fn emit_record(&mut self, t: RecordType, data: &[u8], len: usize) -> Result { + fn emit_record(&mut self, t: RecordType, data: &[u8], len: usize) -> KernelResult { let crc = crc32fast::hash(&data[0..len]); let mut header_bytes = crc.encode_fixed_vec(); @@ -187,7 +187,7 @@ impl LogWriter { } #[allow(dead_code)] - pub(crate) fn flush(&mut self) -> Result<()> { + pub(crate) fn flush(&mut self) -> KernelResult<()> { self.dst.flush()?; Ok(()) } @@ -211,7 +211,7 @@ impl LogReader { } /// EOF is signalled by Ok(0) - pub(crate) fn read(&mut self, dst: &mut Vec) -> Result { + pub(crate) fn read(&mut self, dst: &mut Vec) -> KernelResult { let mut dst_offset = 0; let mut head_pos = 0; @@ -267,7 +267,7 @@ mod tests { use crate::kernel::lsm::log::{LogLoader, LogReader, LogWriter, HEADER_SIZE}; use crate::kernel::lsm::mem_table::DEFAULT_WAL_PATH; use crate::kernel::lsm::storage::Config; - use crate::kernel::Result; + use crate::kernel::KernelResult; use std::fs::{File, OpenOptions}; use std::io::Cursor; use std::mem; @@ -324,7 +324,7 @@ mod tests { } #[test] - fn test_reader() -> Result<()> { + fn test_reader() -> KernelResult<()> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let file_path = temp_dir.path().join("test.txt"); @@ -370,7 +370,7 @@ mod tests { } #[test] - fn test_log_loader() -> Result<()> { + fn test_log_loader() -> KernelResult<()> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let config = Config::new(temp_dir.into_path()); diff --git a/src/kernel/lsm/mem_table.rs b/src/kernel/lsm/mem_table.rs index 721bafa..45573be 100644 --- a/src/kernel/lsm/mem_table.rs +++ b/src/kernel/lsm/mem_table.rs @@ -4,7 +4,7 @@ use crate::kernel::lsm::log::{LogLoader, LogWriter}; use crate::kernel::lsm::storage::{Config, Gen, Sequence}; use crate::kernel::lsm::table::ss_table::block::{Entry, Value}; use crate::kernel::lsm::trigger::{Trigger, TriggerFactory}; -use crate::kernel::Result; +use crate::kernel::KernelResult; use bytes::Bytes; use itertools::Itertools; use parking_lot::Mutex; @@ -92,7 +92,7 @@ impl<'a> MemMapIter<'a> { impl<'a> Iter<'a> for MemMapIter<'a> { type Item = KeyValue; - fn try_next(&mut self) -> Result> { + fn try_next(&mut self) -> KernelResult> { if let Some(iter) = &mut self.iter { for (InternalKey { key, .. }, value) in iter.by_ref() { if let Some(prev_item) = &self.prev_item { @@ -116,7 +116,7 @@ impl<'a> Iter<'a> for MemMapIter<'a> { true } - fn seek(&mut self, seek: Seek<'_>) -> Result> { + fn seek(&mut self, seek: Seek<'_>) -> KernelResult> { self.prev_item = None; self.iter = match seek { Seek::First => Some(self.mem_map.iter()), @@ -173,7 +173,7 @@ macro_rules! check_count { } impl MemTable { - pub(crate) fn new(config: &Config) -> Result { + pub(crate) fn new(config: &Config) -> KernelResult { let (log_loader, log_bytes, log_gen) = LogLoader::reload( config.path(), (DEFAULT_WAL_PATH, None), @@ -204,7 +204,7 @@ impl MemTable { /// 插入并判断是否溢出 /// /// 插入时不会去除重复键值,而是进行追加 - pub(crate) fn insert_data(&self, data: KeyValue) -> Result { + pub(crate) fn insert_data(&self, data: KeyValue) -> KernelResult { let (key, value) = data.clone(); let mut inner = self.inner.lock(); @@ -217,7 +217,11 @@ impl MemTable { } /// Tips: 当数据在插入mem_table中停机,则不会存入日志中 - pub(crate) fn insert_batch_data(&self, vec_data: Vec, seq_id: i64) -> Result { + pub(crate) fn insert_batch_data( + &self, + vec_data: Vec, + seq_id: i64, + ) -> KernelResult { let mut inner = self.inner.lock(); let mut buf = Vec::new(); @@ -248,7 +252,7 @@ impl MemTable { } /// MemTable将数据弹出并转移到immut table中 (弹出数据为转移至immut table中数据的迭代器) - pub(crate) fn try_swap(&self, is_force: bool) -> Result)>> { + pub(crate) fn try_swap(&self, is_force: bool) -> KernelResult)>> { let count = &self.tx_count; loop { @@ -389,7 +393,9 @@ impl MemTable { } } -pub(crate) fn logs_decode(log_bytes: Vec>) -> Result)>> { +pub(crate) fn logs_decode( + log_bytes: Vec>, +) -> KernelResult)>> { let flatten_bytes = log_bytes.into_iter().flatten().collect_vec(); Entry::::batch_decode(&mut Cursor::new(flatten_bytes)).map(|vec| { vec.into_iter() @@ -400,7 +406,7 @@ pub(crate) fn logs_decode(log_bytes: Vec>) -> Result Result> { +pub(crate) fn data_to_bytes(data: KeyValue) -> KernelResult> { let (key, value) = data; Entry::new(0, key.len(), key, Value::from(value)).encode() } @@ -412,13 +418,13 @@ mod tests { data_to_bytes, InternalKey, KeyValue, MemMap, MemMapIter, MemTable, }; use crate::kernel::lsm::storage::{Config, Sequence}; - use crate::kernel::Result; + use crate::kernel::KernelResult; use bytes::Bytes; use std::collections::Bound; use tempfile::TempDir; impl MemTable { - pub(crate) fn insert_data_with_seq(&self, data: KeyValue, seq: i64) -> Result { + pub(crate) fn insert_data_with_seq(&self, data: KeyValue, seq: i64) -> KernelResult { let (key, value) = data.clone(); let mut inner = self.inner.lock(); @@ -432,7 +438,7 @@ mod tests { } #[test] - fn test_mem_table_find() -> Result<()> { + fn test_mem_table_find() -> KernelResult<()> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let mem_table = MemTable::new(&Config::new(temp_dir.path()))?; @@ -472,7 +478,7 @@ mod tests { } #[test] - fn test_mem_table_swap() -> Result<()> { + fn test_mem_table_swap() -> KernelResult<()> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let mem_table = MemTable::new(&Config::new(temp_dir.path()))?; @@ -501,7 +507,7 @@ mod tests { } #[test] - fn test_mem_table_range_scan() -> Result<()> { + fn test_mem_table_range_scan() -> KernelResult<()> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let mem_table = MemTable::new(&Config::new(temp_dir.path()))?; @@ -594,7 +600,7 @@ mod tests { } #[test] - fn test_mem_map_iter() -> Result<()> { + fn test_mem_map_iter() -> KernelResult<()> { let mut map = MemMap::new(); let key_1_1 = InternalKey::new(Bytes::from(vec![b'1'])); diff --git a/src/kernel/lsm/mod.rs b/src/kernel/lsm/mod.rs index a4feca9..04bb891 100644 --- a/src/kernel/lsm/mod.rs +++ b/src/kernel/lsm/mod.rs @@ -2,12 +2,12 @@ use crate::kernel::lsm::compactor::{CompactTask, MergeShardingVec}; use crate::kernel::lsm::mem_table::{key_value_bytes_len, KeyValue}; use crate::kernel::lsm::storage::Gen; use crate::kernel::lsm::version::Version; -use crate::kernel::Result; +use crate::kernel::KernelResult; use crate::KernelError; use tokio::sync::mpsc::error::TrySendError; use tokio::sync::mpsc::Sender; -mod compactor; +pub mod compactor; pub mod iterator; mod log; mod mem_table; @@ -15,7 +15,7 @@ pub mod mvcc; pub mod storage; mod table; mod trigger; -mod version; +pub mod version; /// KeyValue数据分片,尽可能将数据按给定的分片大小:file_size,填满一片(可能会溢出一些) /// 保持原有数据的顺序进行分片,所有第一片分片中最后的值肯定会比其他分片开始的值Key排序较前(如果vec_data是以Key从小到大排序的话) @@ -56,7 +56,7 @@ fn query_and_compaction( key: &[u8], version: &Version, compactor_tx: &Sender, -) -> Result> { +) -> KernelResult> { let (value_option, miss_option) = version.query(key)?; if let Some(miss_scope) = miss_option { diff --git a/src/kernel/lsm/mvcc.rs b/src/kernel/lsm/mvcc.rs index 5af654a..c3d7a8d 100644 --- a/src/kernel/lsm/mvcc.rs +++ b/src/kernel/lsm/mvcc.rs @@ -6,7 +6,7 @@ use crate::kernel::lsm::query_and_compaction; use crate::kernel::lsm::storage::{Sequence, StoreInner}; use crate::kernel::lsm::version::iter::VersionIter; use crate::kernel::lsm::version::Version; -use crate::kernel::Result; +use crate::kernel::KernelResult; use crate::KernelError; use bytes::Bytes; use itertools::Itertools; @@ -47,7 +47,7 @@ impl Transaction { /// /// 此处不需要等待压缩,因为在Transaction存活时不会触发Compaction #[inline] - pub fn get(&self, key: &[u8]) -> Result> { + pub fn get(&self, key: &[u8]) -> KernelResult> { if let Some(value) = self.write_buf.as_ref().and_then(|buf| buf.get(key)) { return Ok(value.clone()); } @@ -69,7 +69,7 @@ impl Transaction { } #[inline] - pub fn remove(&mut self, key: &[u8]) -> Result<()> { + pub fn remove(&mut self, key: &[u8]) -> KernelResult<()> { let _ = self.get(key)?.ok_or(KernelError::KeyNotFound)?; let bytes = Bytes::copy_from_slice(key); let _ignore = self.write_buf_or_init().insert(bytes, None); @@ -78,7 +78,7 @@ impl Transaction { } #[inline] - pub async fn commit(mut self) -> Result<()> { + pub async fn commit(mut self) -> KernelResult<()> { if let Some(buf) = self.write_buf.take() { let batch_data = buf .into_iter() @@ -136,12 +136,16 @@ impl Transaction { } #[inline] - pub fn disk_iter(&self) -> Result { + pub fn disk_iter(&self) -> KernelResult { VersionIter::new(&self.version) } #[inline] - pub fn iter<'a>(&'a self, min: Bound<&[u8]>, max: Bound<&[u8]>) -> Result { + pub fn iter<'a>( + &'a self, + min: Bound<&[u8]>, + max: Bound<&[u8]>, + ) -> KernelResult { let range_buf = self.mem_range(min, max); let ptr = BufPtr(Box::leak(Box::new(range_buf)).into()); @@ -212,7 +216,7 @@ impl<'a> Iter<'a> for TransactionIter<'a> { type Item = KeyValue; #[inline] - fn try_next(&mut self) -> Result> { + fn try_next(&mut self) -> KernelResult> { if let Some(item) = self.seek_buf.take() { return Ok(Some(item)); } @@ -238,7 +242,7 @@ impl<'a> Iter<'a> for TransactionIter<'a> { } #[inline] - fn seek(&mut self, seek: Seek<'_>) -> Result> { + fn seek(&mut self, seek: Seek<'_>) -> KernelResult> { self.inner.seek(seek) } } @@ -258,7 +262,7 @@ struct BufIter<'a> { impl<'a> Iter<'a> for BufIter<'a> { type Item = KeyValue; - fn try_next(&mut self) -> Result> { + fn try_next(&mut self) -> KernelResult> { Ok(self.is_valid().then(|| { let item = self.inner[self.pos].clone(); self.pos += 1; @@ -270,7 +274,7 @@ impl<'a> Iter<'a> for BufIter<'a> { self.pos < self.inner.len() } - fn seek(&mut self, seek: Seek<'_>) -> Result> { + fn seek(&mut self, seek: Seek<'_>) -> KernelResult> { match seek { Seek::First => self.pos = 0, Seek::Last => self.pos = self.inner.len() - 1, @@ -291,7 +295,7 @@ impl<'a> Iter<'a> for BufIter<'a> { mod tests { use crate::kernel::lsm::iterator::Iter; use crate::kernel::lsm::storage::{Config, KipStorage}; - use crate::kernel::{Result, Storage}; + use crate::kernel::{KernelResult, Storage}; use bincode::Options; use bytes::Bytes; use itertools::Itertools; @@ -299,7 +303,7 @@ mod tests { use tempfile::TempDir; #[test] - fn test_transaction() -> Result<()> { + fn test_transaction() -> KernelResult<()> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); tokio_test::block_on(async move { diff --git a/src/kernel/lsm/storage.rs b/src/kernel/lsm/storage.rs index 1a41ec3..bf23864 100644 --- a/src/kernel/lsm/storage.rs +++ b/src/kernel/lsm/storage.rs @@ -9,7 +9,7 @@ use crate::kernel::lsm::trigger::TriggerType; use crate::kernel::lsm::version::status::VersionStatus; use crate::kernel::lsm::version::Version; use crate::kernel::lsm::{query_and_compaction, version}; -use crate::kernel::Result; +use crate::kernel::KernelResult; use crate::kernel::{lock_or_time_out, Storage, DEFAULT_LOCK_FILE}; use crate::KernelError; use async_trait::async_trait; @@ -83,7 +83,7 @@ pub(crate) struct StoreInner { } impl StoreInner { - pub(crate) async fn new(config: Config) -> Result { + pub(crate) async fn new(config: Config) -> KernelResult { let mem_table = MemTable::new(&config)?; let ver_status = VersionStatus::load_with_path(config.clone(), mem_table.log_loader_clone())?; @@ -107,12 +107,12 @@ impl Storage for KipStorage { } #[inline] - async fn open(path: impl Into + Send) -> Result { + async fn open(path: impl Into + Send) -> KernelResult { KipStorage::open_with_config(Config::new(path.into())).await } #[inline] - async fn flush(&self) -> Result<()> { + async fn flush(&self) -> KernelResult<()> { let (tx, rx) = oneshot::channel(); self.compactor_tx.send(CompactTask::Flush(Some(tx))).await?; @@ -123,12 +123,12 @@ impl Storage for KipStorage { } #[inline] - async fn set(&self, key: Bytes, value: Bytes) -> Result<()> { + async fn set(&self, key: Bytes, value: Bytes) -> KernelResult<()> { self.append_cmd_data((key, Some(value))).await } #[inline] - async fn get(&self, key: &[u8]) -> Result> { + async fn get(&self, key: &[u8]) -> KernelResult> { if let Some((_, value)) = self.mem_table().find(key) { return Ok(value); } @@ -142,7 +142,7 @@ impl Storage for KipStorage { } #[inline] - async fn remove(&self, key: &[u8]) -> Result<()> { + async fn remove(&self, key: &[u8]) -> KernelResult<()> { match self.get(key).await? { Some(_) => { self.append_cmd_data((Bytes::copy_from_slice(key), None)) @@ -153,12 +153,12 @@ impl Storage for KipStorage { } #[inline] - async fn size_of_disk(&self) -> Result { + async fn size_of_disk(&self) -> KernelResult { Ok(self.current_version().await.size_of_disk()) } #[inline] - async fn len(&self) -> Result { + async fn len(&self) -> KernelResult { Ok(self.current_version().await.len() + self.mem_table().len()) } @@ -180,7 +180,7 @@ impl Drop for KipStorage { impl KipStorage { /// 追加数据 - async fn append_cmd_data(&self, data: KeyValue) -> Result<()> { + async fn append_cmd_data(&self, data: KeyValue) -> KernelResult<()> { if self.mem_table().insert_data(data)? { if let Err(TrySendError::Closed(_)) = self.compactor_tx.try_send(CompactTask::Flush(None)) @@ -194,7 +194,7 @@ impl KipStorage { /// 使用Config进行LsmStore初始化 #[inline] - pub async fn open_with_config(config: Config) -> Result + pub async fn open_with_config(config: Config) -> KernelResult where Self: Sized, { @@ -257,7 +257,12 @@ impl KipStorage { } #[inline] - pub async fn manual_compaction(&self, min: Bytes, max: Bytes, level: usize) -> Result<()> { + pub async fn manual_compaction( + &self, + min: Bytes, + max: Bytes, + level: usize, + ) -> KernelResult<()> { if min <= max { self.compactor_tx .send(CompactTask::Seek((Scope::from_range(0, min, max), level))) @@ -268,7 +273,7 @@ impl KipStorage { } #[allow(dead_code)] - async fn flush_background(&self) -> Result<()> { + async fn flush_background(&self) -> KernelResult<()> { self.compactor_tx.send(CompactTask::Flush(None)).await?; Ok(()) diff --git a/src/kernel/lsm/table/loader.rs b/src/kernel/lsm/table/loader.rs index 82fe2ec..949c6d2 100644 --- a/src/kernel/lsm/table/loader.rs +++ b/src/kernel/lsm/table/loader.rs @@ -9,7 +9,7 @@ use crate::kernel::lsm::table::ss_table::block::BlockCache; use crate::kernel::lsm::table::ss_table::SSTable; use crate::kernel::lsm::table::{BoxTable, Table, TableType}; use crate::kernel::utils::lru_cache::ShardingLruCache; -use crate::kernel::Result; +use crate::kernel::KernelResult; use bytes::Bytes; use itertools::Itertools; use std::collections::hash_map::RandomState; @@ -27,7 +27,11 @@ pub(crate) struct TableLoader { } impl TableLoader { - pub(crate) fn new(config: Config, factory: Arc, wal: LogLoader) -> Result { + pub(crate) fn new( + config: Config, + factory: Arc, + wal: LogLoader, + ) -> KernelResult { let inner = Arc::new(ShardingLruCache::new( config.table_cache_size, 16, @@ -54,7 +58,7 @@ impl TableLoader { vec_data: Vec, level: usize, table_type: TableType, - ) -> Result<(Scope, TableMeta)> { + ) -> KernelResult<(Scope, TableMeta)> { // 获取数据的Key涵盖范围 let scope = Scope::from_sorted_vec_data(gen, &vec_data)?; let table: Box = match table_type { @@ -103,7 +107,7 @@ impl TableLoader { gen: i64, reload_data: Vec<(Bytes, Option)>, level: usize, - ) -> Result { + ) -> KernelResult { SSTable::new( &self.factory, &self.config, @@ -124,7 +128,7 @@ impl TableLoader { self.inner.is_empty() } - pub(crate) fn clean(&self, gen: i64) -> Result<()> { + pub(crate) fn clean(&self, gen: i64) -> KernelResult<()> { let _ = self.remove(&gen); self.factory.clean(gen)?; self.wal.clean(gen)?; @@ -134,7 +138,7 @@ impl TableLoader { // Tips: 仅仅对持久化Table有效,SkipTable类内存Table始终为false #[allow(dead_code)] - pub(crate) fn is_table_file_exist(&self, gen: i64) -> Result { + pub(crate) fn is_table_file_exist(&self, gen: i64) -> KernelResult { self.factory.exists(gen) } } @@ -147,14 +151,14 @@ mod tests { use crate::kernel::lsm::storage::Config; use crate::kernel::lsm::table::loader::{TableLoader, TableType}; use crate::kernel::lsm::version::DEFAULT_SS_TABLE_PATH; - use crate::kernel::Result; + use crate::kernel::KernelResult; use bincode::Options; use bytes::Bytes; use std::sync::Arc; use tempfile::TempDir; #[test] - fn test_ss_table_loader() -> Result<()> { + fn test_ss_table_loader() -> KernelResult<()> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let value = Bytes::copy_from_slice( @@ -233,7 +237,7 @@ mod tests { Ok(()) } - fn clean_sst(gen: i64, loader: &TableLoader) -> Result<()> { + fn clean_sst(gen: i64, loader: &TableLoader) -> KernelResult<()> { loader.factory.clean(gen)?; Ok(()) diff --git a/src/kernel/lsm/table/mod.rs b/src/kernel/lsm/table/mod.rs index e1485de..eddf5cd 100644 --- a/src/kernel/lsm/table/mod.rs +++ b/src/kernel/lsm/table/mod.rs @@ -1,7 +1,7 @@ use crate::kernel::lsm::iterator::Iter; use crate::kernel::lsm::mem_table::KeyValue; use crate::kernel::lsm::table::meta::TableMeta; -use crate::kernel::Result; +use crate::kernel::KernelResult; use itertools::Itertools; pub(crate) mod loader; @@ -20,7 +20,7 @@ pub enum TableType { pub(crate) type BoxTable = Box; pub(crate) trait Table: Sync + Send { - fn query(&self, key: &[u8]) -> Result>; + fn query(&self, key: &[u8]) -> KernelResult>; fn len(&self) -> usize; @@ -30,11 +30,11 @@ pub(crate) trait Table: Sync + Send { fn level(&self) -> usize; - fn iter<'a>(&'a self) -> Result + 'a + Sync + Send>>; + fn iter<'a>(&'a self) -> KernelResult + 'a + Sync + Send>>; } /// 通过一组SSTable收集对应的Gen -pub(crate) fn collect_gen(vec_table: &[&dyn Table]) -> Result<(Vec, TableMeta)> { +pub(crate) fn collect_gen(vec_table: &[&dyn Table]) -> KernelResult<(Vec, TableMeta)> { let meta = TableMeta::from(vec_table); Ok(( diff --git a/src/kernel/lsm/table/scope.rs b/src/kernel/lsm/table/scope.rs index 7e1bbdd..16f1b7e 100644 --- a/src/kernel/lsm/table/scope.rs +++ b/src/kernel/lsm/table/scope.rs @@ -1,5 +1,5 @@ use crate::kernel::lsm::mem_table::KeyValue; -use crate::kernel::Result; +use crate::kernel::KernelResult; use crate::KernelError; use bytes::Bytes; use serde::{Deserialize, Serialize}; @@ -120,7 +120,10 @@ impl Scope { /// 由一组有序KeyValue组成一个scope #[allow(clippy::pattern_type_mismatch)] - pub(crate) fn from_sorted_vec_data(gen: i64, vec_mem_data: &Vec) -> Result { + pub(crate) fn from_sorted_vec_data( + gen: i64, + vec_mem_data: &Vec, + ) -> KernelResult { match vec_mem_data.as_slice() { [first, .., last] => Ok(Self::from_range(gen, first.0.clone(), last.0.clone())), [one] => Ok(Self::from_range(gen, one.0.clone(), one.0.clone())), diff --git a/src/kernel/lsm/table/skip_table/iter.rs b/src/kernel/lsm/table/skip_table/iter.rs index c7c1221..577b890 100644 --- a/src/kernel/lsm/table/skip_table/iter.rs +++ b/src/kernel/lsm/table/skip_table/iter.rs @@ -33,7 +33,7 @@ impl<'a> SkipTableIter<'a> { impl<'a> Iter<'a> for SkipTableIter<'a> { type Item = KeyValue; - fn try_next(&mut self) -> crate::kernel::Result> { + fn try_next(&mut self) -> crate::kernel::KernelResult> { Ok(self .inner .as_mut() @@ -45,7 +45,7 @@ impl<'a> Iter<'a> for SkipTableIter<'a> { true } - fn seek(&mut self, seek: Seek<'_>) -> crate::kernel::Result> { + fn seek(&mut self, seek: Seek<'_>) -> crate::kernel::KernelResult> { self._seek(seek); if let Seek::Last = seek { diff --git a/src/kernel/lsm/table/skip_table/mod.rs b/src/kernel/lsm/table/skip_table/mod.rs index f2e0c33..ad28666 100644 --- a/src/kernel/lsm/table/skip_table/mod.rs +++ b/src/kernel/lsm/table/skip_table/mod.rs @@ -32,7 +32,7 @@ impl SkipTable { } impl Table for SkipTable { - fn query(&self, key: &[u8]) -> crate::kernel::Result> { + fn query(&self, key: &[u8]) -> crate::kernel::KernelResult> { Ok(self.inner.get(key).cloned()) } @@ -55,7 +55,7 @@ impl Table for SkipTable { #[allow(clippy::todo)] fn iter<'a>( &'a self, - ) -> crate::kernel::Result + 'a + Send + Sync>> { + ) -> crate::kernel::KernelResult + 'a + Send + Sync>> { todo!("skiplist cannot support") } } diff --git a/src/kernel/lsm/table/ss_table/block.rs b/src/kernel/lsm/table/ss_table/block.rs index 00e4480..1dddc02 100644 --- a/src/kernel/lsm/table/ss_table/block.rs +++ b/src/kernel/lsm/table/ss_table/block.rs @@ -1,6 +1,6 @@ use crate::kernel::lsm::storage::Config; use crate::kernel::utils::lru_cache::ShardingLruCache; -use crate::kernel::Result; +use crate::kernel::KernelResult; use crate::KernelError; use bytes::{Buf, BufMut, Bytes}; use growable_bloom_filter::GrowableBloom; @@ -58,7 +58,7 @@ where } } - pub(crate) fn encode(&self) -> Result> { + pub(crate) fn encode(&self) -> KernelResult> { let mut buf = Vec::new(); let _ignore = buf.write_varint(self.unshared_len as u32)?; @@ -69,7 +69,7 @@ where Ok(buf) } - pub(crate) fn batch_decode(cursor: &mut Cursor>) -> Result> { + pub(crate) fn batch_decode(cursor: &mut Cursor>) -> KernelResult> { let mut vec_entry = Vec::new(); let mut index = 0; @@ -81,7 +81,7 @@ where Ok(vec_entry) } - pub(crate) fn decode(reader: &mut R) -> Result> { + pub(crate) fn decode(reader: &mut R) -> KernelResult> { let unshared_len = reader.read_varint::()? as usize; let shared_len = reader.read_varint::()? as usize; @@ -134,15 +134,15 @@ impl Index { pub(crate) trait BlockItem: Sized + Clone { /// 由于需要直接连续序列化,因此使用Read进行Bytes读取 - fn decode(reader: &mut T) -> Result + fn decode(reader: &mut T) -> KernelResult where T: Read + ?Sized; - fn encode(&self) -> Result>; + fn encode(&self) -> KernelResult>; } impl BlockItem for Value { - fn decode(mut reader: &mut T) -> Result + fn decode(mut reader: &mut T) -> KernelResult where T: Read + ?Sized, { @@ -158,7 +158,7 @@ impl BlockItem for Value { Ok(Value { value_len, bytes }) } - fn encode(&self) -> Result> { + fn encode(&self) -> KernelResult> { let mut buf = Vec::new(); let _ = buf.write_varint(self.value_len as u32)?; if let Some(value) = &self.bytes { @@ -169,7 +169,7 @@ impl BlockItem for Value { } impl BlockItem for Index { - fn decode(mut reader: &mut T) -> Result + fn decode(mut reader: &mut T) -> KernelResult where T: Read + ?Sized, { @@ -179,7 +179,7 @@ impl BlockItem for Index { Ok(Index { offset, len }) } - fn encode(&self) -> Result> { + fn encode(&self) -> KernelResult> { let mut buf = Vec::new(); let _ = buf.write_varint(self.offset)?; let _ = buf.write_varint(self.len as u32)?; @@ -359,7 +359,7 @@ impl BlockBuilder { } /// 构建多个Block连续序列化组合成的两个Bytes 前者为多个DataBlock,后者为单个IndexBlock - pub(crate) fn build(mut self) -> Result<(Vec, Vec)> { + pub(crate) fn build(mut self) -> KernelResult<(Vec, Vec)> { self.build_(); let mut offset = 0; @@ -534,7 +534,7 @@ where /// 序列化后进行压缩 /// /// 可选LZ4与不压缩 - pub(crate) fn encode(&self, compress_type: CompressType) -> Result> { + pub(crate) fn encode(&self, compress_type: CompressType) -> KernelResult> { let buf = self.to_raw()?; Ok(match compress_type { CompressType::None => buf, @@ -558,7 +558,7 @@ where buf: Vec, compress_type: CompressType, restart_interval: usize, - ) -> Result { + ) -> KernelResult { let buf = match compress_type { CompressType::None => buf, CompressType::LZ4 => { @@ -572,7 +572,7 @@ where } /// 读取Bytes进行Block的反序列化 - pub(crate) fn from_raw(mut buf: Vec, restart_interval: usize) -> Result { + pub(crate) fn from_raw(mut buf: Vec, restart_interval: usize) -> KernelResult { let date_bytes_len = buf.len() - CRC_SIZE; if crc32fast::hash(&buf) == u32::decode_fixed(&buf[date_bytes_len..]) { return Err(KernelError::CrcMisMatch); @@ -590,7 +590,7 @@ where /// 序列化该Block /// /// 与from_raw对应,序列化时会生成crc_code用于反序列化时校验 - pub(crate) fn to_raw(&self) -> Result> { + pub(crate) fn to_raw(&self) -> KernelResult> { let mut bytes_block = Vec::with_capacity(DEFAULT_BLOCK_SIZE); bytes_block.append( @@ -666,7 +666,7 @@ mod tests { Block, BlockBuilder, BlockItem, BlockOptions, CompressType, Entry, Index, Value, }; use crate::kernel::utils::lru_cache::LruCache; - use crate::kernel::Result; + use crate::kernel::KernelResult; use bincode::Options; use bytes::Bytes; use itertools::Itertools; @@ -676,7 +676,7 @@ mod tests { use std::io::Cursor; #[test] - fn test_entry_serialization() -> Result<()> { + fn test_entry_serialization() -> KernelResult<()> { let entry1 = Entry::new( 0, 1, @@ -704,7 +704,7 @@ mod tests { } #[test] - fn test_block() -> Result<()> { + fn test_block() -> KernelResult<()> { let value = Bytes::from_static(b"Let life be beautiful like summer flowers"); let mut vec_data = Vec::new(); @@ -767,7 +767,7 @@ mod tests { Ok(()) } - fn test_block_range(block: &Block) -> Result<()> { + fn test_block_range(block: &Block) -> KernelResult<()> { let all_item = block .vec_entry .iter() @@ -814,7 +814,7 @@ mod tests { block: Block, compress_type: CompressType, restart_interval: usize, - ) -> Result<()> { + ) -> KernelResult<()> { let de_block = Block::decode( block.encode(compress_type)?, compress_type, diff --git a/src/kernel/lsm/table/ss_table/block_iter.rs b/src/kernel/lsm/table/ss_table/block_iter.rs index b4b2f54..59da158 100644 --- a/src/kernel/lsm/table/ss_table/block_iter.rs +++ b/src/kernel/lsm/table/ss_table/block_iter.rs @@ -1,6 +1,6 @@ use crate::kernel::lsm::iterator::{ForwardIter, Iter, Seek}; use crate::kernel::lsm::table::ss_table::block::{Block, BlockItem, Entry}; -use crate::kernel::Result; +use crate::kernel::KernelResult; use bytes::Bytes; /// Block迭代器 @@ -63,7 +63,7 @@ impl<'a, V> ForwardIter<'a> for BlockIter<'a, V> where V: Sync + Send + BlockItem, { - fn try_prev(&mut self) -> Result> { + fn try_prev(&mut self) -> KernelResult> { Ok((self.is_valid() || self.offset == self.entry_len) .then(|| self.offset_move(self.offset - 1)) .flatten()) @@ -76,7 +76,7 @@ where { type Item = (Bytes, V); - fn try_next(&mut self) -> Result> { + fn try_next(&mut self) -> KernelResult> { Ok((self.is_valid() || self.offset == 0) .then(|| self.offset_move(self.offset + 1)) .flatten()) @@ -86,7 +86,7 @@ where self.offset > 0 && self.offset < self.entry_len } - fn seek(&mut self, seek: Seek<'_>) -> Result> { + fn seek(&mut self, seek: Seek<'_>) -> KernelResult> { Ok(match seek { Seek::First => Some(0), Seek::Last => Some(self.entry_len - 1), @@ -104,13 +104,13 @@ mod tests { use crate::kernel::lsm::iterator::{ForwardIter, Iter, Seek}; use crate::kernel::lsm::table::ss_table::block::{Block, Value, DEFAULT_DATA_RESTART_INTERVAL}; use crate::kernel::lsm::table::ss_table::block_iter::BlockIter; - use crate::kernel::Result; + use crate::kernel::KernelResult; use bincode::Options; use bytes::Bytes; use std::vec; #[test] - fn test_iterator() -> Result<()> { + fn test_iterator() -> KernelResult<()> { let data = vec![ (Bytes::from(vec![b'1']), Value::from(None)), ( @@ -187,7 +187,7 @@ mod tests { } #[test] - fn test_iterator_1000() -> Result<()> { + fn test_iterator_1000() -> KernelResult<()> { let mut vec_data = Vec::new(); let value = Bytes::from_static(b"What you are you do not see, what you see is your shadow."); diff --git a/src/kernel/lsm/table/ss_table/footer.rs b/src/kernel/lsm/table/ss_table/footer.rs index da28824..f66153b 100644 --- a/src/kernel/lsm/table/ss_table/footer.rs +++ b/src/kernel/lsm/table/ss_table/footer.rs @@ -1,5 +1,5 @@ use crate::kernel::io::IoReader; -use crate::kernel::Result; +use crate::kernel::KernelResult; use serde::{Deserialize, Serialize}; use std::io::SeekFrom; @@ -20,7 +20,7 @@ pub(crate) struct Footer { impl Footer { /// 从对应文件的IOHandler中将Footer读取出来 - pub(crate) fn read_to_file(reader: &mut dyn IoReader) -> Result { + pub(crate) fn read_to_file(reader: &mut dyn IoReader) -> KernelResult { let mut buf = [0; TABLE_FOOTER_SIZE]; let _ = reader.seek(SeekFrom::End(-(TABLE_FOOTER_SIZE as i64)))?; @@ -33,10 +33,10 @@ impl Footer { #[cfg(test)] mod test { use crate::kernel::lsm::table::ss_table::footer::{Footer, TABLE_FOOTER_SIZE}; - use crate::kernel::Result; + use crate::kernel::KernelResult; #[test] - fn test_footer() -> Result<()> { + fn test_footer() -> KernelResult<()> { let info = Footer { level: 0, index_offset: 0, diff --git a/src/kernel/lsm/table/ss_table/iter.rs b/src/kernel/lsm/table/ss_table/iter.rs index e0d4e83..85ee4e4 100644 --- a/src/kernel/lsm/table/ss_table/iter.rs +++ b/src/kernel/lsm/table/ss_table/iter.rs @@ -4,7 +4,7 @@ use crate::kernel::lsm::table::ss_table::block::{BlockType, Index, Value}; use crate::kernel::lsm::table::ss_table::block_iter::BlockIter; use crate::kernel::lsm::table::ss_table::SSTable; use crate::kernel::lsm::table::Table; -use crate::kernel::Result; +use crate::kernel::KernelResult; use crate::KernelError; pub(crate) struct SSTableIter<'a> { @@ -14,7 +14,7 @@ pub(crate) struct SSTableIter<'a> { } impl<'a> SSTableIter<'a> { - pub(crate) fn new(ss_table: &'a SSTable) -> Result> { + pub(crate) fn new(ss_table: &'a SSTable) -> KernelResult> { let mut index_iter = BlockIter::new(ss_table.index_block()?); let index = index_iter.try_next()?.ok_or(KernelError::DataEmpty)?.1; let data_iter = Self::data_iter_init(ss_table, index)?; @@ -26,7 +26,7 @@ impl<'a> SSTableIter<'a> { }) } - fn data_iter_init(ss_table: &'a SSTable, index: Index) -> Result> { + fn data_iter_init(ss_table: &'a SSTable, index: Index) -> KernelResult> { let block = { ss_table .cache @@ -44,7 +44,7 @@ impl<'a> SSTableIter<'a> { Ok(BlockIter::new(block)) } - fn data_iter_seek(&mut self, seek: Seek<'_>, index: Index) -> Result> { + fn data_iter_seek(&mut self, seek: Seek<'_>, index: Index) -> KernelResult> { self.data_iter = Self::data_iter_init(self.ss_table, index)?; Ok(self .data_iter @@ -54,7 +54,7 @@ impl<'a> SSTableIter<'a> { } impl<'a> ForwardIter<'a> for SSTableIter<'a> { - fn try_prev(&mut self) -> Result> { + fn try_prev(&mut self) -> KernelResult> { match self.data_iter.try_prev()? { None => { if let Some((_, index)) = self.index_iter.try_prev()? { @@ -71,7 +71,7 @@ impl<'a> ForwardIter<'a> for SSTableIter<'a> { impl<'a> Iter<'a> for SSTableIter<'a> { type Item = KeyValue; - fn try_next(&mut self) -> Result> { + fn try_next(&mut self) -> KernelResult> { match self.data_iter.try_next()? { None => { if let Some((_, index)) = self.index_iter.try_next()? { @@ -88,7 +88,7 @@ impl<'a> Iter<'a> for SSTableIter<'a> { self.data_iter.is_valid() } - fn seek(&mut self, seek: Seek<'_>) -> Result> { + fn seek(&mut self, seek: Seek<'_>) -> KernelResult> { if let Some((_, index)) = self.index_iter.seek(seek)? { self.data_iter_seek(seek, index) } else { @@ -106,7 +106,7 @@ mod tests { use crate::kernel::lsm::table::ss_table::SSTable; use crate::kernel::lsm::version::DEFAULT_SS_TABLE_PATH; use crate::kernel::utils::lru_cache::ShardingLruCache; - use crate::kernel::Result; + use crate::kernel::KernelResult; use bincode::Options; use bytes::Bytes; use std::collections::hash_map::RandomState; @@ -114,7 +114,7 @@ mod tests { use tempfile::TempDir; #[test] - fn test_iterator() -> Result<()> { + fn test_iterator() -> KernelResult<()> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let config = Config::new(temp_dir.into_path()); diff --git a/src/kernel/lsm/table/ss_table/mod.rs b/src/kernel/lsm/table/ss_table/mod.rs index a988cc4..ed11e3c 100644 --- a/src/kernel/lsm/table/ss_table/mod.rs +++ b/src/kernel/lsm/table/ss_table/mod.rs @@ -9,7 +9,7 @@ use crate::kernel::lsm::table::ss_table::block::{ use crate::kernel::lsm::table::ss_table::footer::{Footer, TABLE_FOOTER_SIZE}; use crate::kernel::lsm::table::ss_table::iter::SSTableIter; use crate::kernel::lsm::table::Table; -use crate::kernel::Result; +use crate::kernel::KernelResult; use crate::KernelError; use bytes::Bytes; use growable_bloom_filter::GrowableBloom; @@ -49,7 +49,7 @@ impl SSTable { vec_data: Vec, level: usize, io_type: IoType, - ) -> Result { + ) -> KernelResult { let len = vec_data.len(); let data_restart_interval = config.data_restart_interval; let index_restart_interval = config.index_restart_interval; @@ -115,7 +115,7 @@ impl SSTable { pub(crate) fn load_from_file( mut reader: Box, cache: Arc, - ) -> Result { + ) -> KernelResult { let gen = reader.get_gen(); let footer = Footer::read_to_file(reader.as_mut())?; let Footer { @@ -145,7 +145,7 @@ impl SSTable { }) } - pub(crate) fn data_block(&self, index: Index) -> Result { + pub(crate) fn data_block(&self, index: Index) -> KernelResult { Ok(BlockType::Data(Self::loading_block( self.reader.lock().as_mut(), index.offset(), @@ -155,7 +155,7 @@ impl SSTable { )?)) } - pub(crate) fn index_block(&self) -> Result<&Block> { + pub(crate) fn index_block(&self) -> KernelResult<&Block> { self.cache .get_or_insert((self.gen(), None), |_| { let Footer { @@ -184,7 +184,7 @@ impl SSTable { len: usize, compress_type: CompressType, restart_interval: usize, - ) -> Result> + ) -> KernelResult> where T: BlockItem, { @@ -197,7 +197,7 @@ impl SSTable { } impl Table for SSTable { - fn query(&self, key: &[u8]) -> Result> { + fn query(&self, key: &[u8]) -> KernelResult> { if self.meta.filter.contains(key) { let index_block = self.index_block()?; @@ -233,7 +233,7 @@ impl Table for SSTable { self.footer.level as usize } - fn iter<'a>(&'a self) -> Result + 'a + Send + Sync>> { + fn iter<'a>(&'a self) -> KernelResult + 'a + Send + Sync>> { Ok(SSTableIter::new(self).map(Box::new)?) } } @@ -249,7 +249,7 @@ mod tests { use crate::kernel::lsm::table::{Table, TableType}; use crate::kernel::lsm::version::DEFAULT_SS_TABLE_PATH; use crate::kernel::utils::lru_cache::ShardingLruCache; - use crate::kernel::Result; + use crate::kernel::KernelResult; use bincode::Options; use bytes::Bytes; use std::collections::hash_map::RandomState; @@ -257,7 +257,7 @@ mod tests { use tempfile::TempDir; #[test] - fn test_ss_table() -> Result<()> { + fn test_ss_table() -> KernelResult<()> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let value = Bytes::copy_from_slice( diff --git a/src/kernel/lsm/version/iter.rs b/src/kernel/lsm/version/iter.rs index 6f15045..5385450 100644 --- a/src/kernel/lsm/version/iter.rs +++ b/src/kernel/lsm/version/iter.rs @@ -3,7 +3,7 @@ use crate::kernel::lsm::iterator::merging_iter::MergingIter; use crate::kernel::lsm::iterator::{Iter, Seek}; use crate::kernel::lsm::mem_table::KeyValue; use crate::kernel::lsm::version::Version; -use crate::kernel::Result; +use crate::kernel::KernelResult; /// Version键值对迭代器 pub struct VersionIter<'a> { @@ -11,7 +11,7 @@ pub struct VersionIter<'a> { } impl<'a> VersionIter<'a> { - pub(crate) fn new(version: &'a Version) -> Result> { + pub(crate) fn new(version: &'a Version) -> KernelResult> { let vec_iter = Self::merging_with_version(version)?; Ok(Self { @@ -21,7 +21,7 @@ impl<'a> VersionIter<'a> { pub(crate) fn merging_with_version( version: &'a Version, - ) -> Result + 'a + Send + Sync>>> { + ) -> KernelResult + 'a + Send + Sync>>> { let mut vec_iter: Vec + 'a + Send + Sync>> = Vec::new(); for table in version.tables_by_level_0() { @@ -41,7 +41,7 @@ impl<'a> VersionIter<'a> { impl<'a> Iter<'a> for VersionIter<'a> { type Item = KeyValue; - fn try_next(&mut self) -> Result> { + fn try_next(&mut self) -> KernelResult> { self.merge_iter.try_next() } @@ -49,7 +49,7 @@ impl<'a> Iter<'a> for VersionIter<'a> { self.merge_iter.is_valid() } - fn seek(&mut self, seek: Seek<'_>) -> Result> { + fn seek(&mut self, seek: Seek<'_>) -> KernelResult> { self.merge_iter.seek(seek) } } diff --git a/src/kernel/lsm/version/meta.rs b/src/kernel/lsm/version/meta.rs index bedd21f..e8487b4 100644 --- a/src/kernel/lsm/version/meta.rs +++ b/src/kernel/lsm/version/meta.rs @@ -1,5 +1,5 @@ use crate::kernel::lsm::version::edit::EditType; -use crate::kernel::Result; +use crate::kernel::KernelResult; use itertools::Itertools; #[derive(Clone, Copy, Eq, PartialEq, Debug)] @@ -15,7 +15,7 @@ impl VersionMeta { pub(crate) fn statistical_process( &mut self, vec_statistics_sst_meta: Vec, - ) -> Result<()> { + ) -> KernelResult<()> { // 优先对新增数据进行统计再统一减去对应的数值避免删除动作聚集在前部分导致数值溢出 for event_type in vec_statistics_sst_meta.into_iter().sorted() { match event_type { diff --git a/src/kernel/lsm/version/mod.rs b/src/kernel/lsm/version/mod.rs index bab433e..f49c714 100644 --- a/src/kernel/lsm/version/mod.rs +++ b/src/kernel/lsm/version/mod.rs @@ -9,14 +9,14 @@ use crate::kernel::lsm::table::Table; use crate::kernel::lsm::version::cleaner::CleanTag; use crate::kernel::lsm::version::edit::{EditType, VersionEdit}; use crate::kernel::lsm::version::meta::VersionMeta; -use crate::kernel::{sorted_gen_list, Result}; +use crate::kernel::{sorted_gen_list, KernelResult}; use itertools::Itertools; use std::fmt; use std::sync::Arc; use tokio::sync::mpsc::UnboundedSender; use tracing::info; -mod cleaner; +pub(crate) mod cleaner; pub(crate) mod edit; pub(crate) mod iter; mod meta; @@ -35,7 +35,7 @@ pub(crate) enum SeekOption { Miss(Option), } -fn snapshot_gen(factory: &IoFactory) -> Result { +fn snapshot_gen(factory: &IoFactory) -> KernelResult { if let Ok(gen_list) = sorted_gen_list(factory.get_path(), FileExtension::Log) { return Ok(match *gen_list.as_slice() { [.., old_snapshot, new_snapshot] => { @@ -89,7 +89,7 @@ impl Version { vec_log: Vec, ss_table_loader: &Arc, clean_tx: UnboundedSender, - ) -> Result { + ) -> KernelResult { let mut version = Self { version_num: 0, table_loader: Arc::clone(ss_table_loader), @@ -114,7 +114,7 @@ impl Version { /// 也就是可能存在一次Version的冗余Table /// 可能是个确定,但是Minor Compactor比较起来更加频繁,也就是大多数情况不会冗余,因此我觉得影响较小 /// 也可以算作是一种Major Compaction异常时的备份? - pub(crate) fn apply(&mut self, vec_version_edit: Vec) -> Result<()> { + pub(crate) fn apply(&mut self, vec_version_edit: Vec) -> KernelResult<()> { let mut del_gens = Vec::new(); let mut vec_statistics_sst_meta = Vec::new(); @@ -236,7 +236,7 @@ impl Version { } /// 使用Key从现有Tables中获取对应的数据 - pub(crate) fn query(&self, key: &[u8]) -> Result<(Option, Option)> { + pub(crate) fn query(&self, key: &[u8]) -> KernelResult<(Option, Option)> { let table_loader = &self.table_loader; // Level 0的Table是无序且Table间的数据是可能重复的,因此需要遍历 for scope in self.level_slice[LEVEL_0].iter().rev() { @@ -271,7 +271,7 @@ impl Version { table_loader: &Arc, scope: &Scope, level: usize, - ) -> Result> { + ) -> KernelResult> { if scope.meet_by_key(key) { if let Some(ss_table) = table_loader.get(scope.gen()) { if let Some(value) = ss_table.query(key)? { diff --git a/src/kernel/lsm/version/status.rs b/src/kernel/lsm/version/status.rs index 43c7d38..7f18a15 100644 --- a/src/kernel/lsm/version/status.rs +++ b/src/kernel/lsm/version/status.rs @@ -7,7 +7,7 @@ use crate::kernel::lsm::version::edit::VersionEdit; use crate::kernel::lsm::version::{ snapshot_gen, Version, DEFAULT_SS_TABLE_PATH, DEFAULT_VERSION_PATH, }; -use crate::kernel::Result; +use crate::kernel::KernelResult; use itertools::Itertools; use std::mem; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -30,7 +30,7 @@ pub(crate) struct VersionStatus { } impl VersionStatus { - pub(crate) fn load_with_path(config: Config, wal: LogLoader) -> Result { + pub(crate) fn load_with_path(config: Config, wal: LogLoader) -> KernelResult { let sst_path = config.path().join(DEFAULT_SS_TABLE_PATH); let sst_factory = Arc::new(IoFactory::new(sst_path, FileExtension::SSTable)?); let ss_table_loader = Arc::new(TableLoader::new( @@ -81,7 +81,7 @@ impl VersionStatus { &self, vec_version_edit: Vec, snapshot_threshold: usize, - ) -> Result<()> { + ) -> KernelResult<()> { let mut new_version = Version::clone(self.current().await.as_ref()); let mut inner = self.inner.write().await; info!("[Version Status][log_and_apply]: {new_version}"); @@ -103,7 +103,10 @@ impl VersionStatus { Ok(()) } - async fn write_snap_shot(inner: &mut VersionInner, log_factory: &IoFactory) -> Result<()> { + async fn write_snap_shot( + inner: &mut VersionInner, + log_factory: &IoFactory, + ) -> KernelResult<()> { let version = &inner.version; info!( "[Version: {}][write_snap_shot]: Start Snapshot!", diff --git a/src/kernel/lsm/version/test.rs b/src/kernel/lsm/version/test.rs index bdec87c..149528d 100644 --- a/src/kernel/lsm/version/test.rs +++ b/src/kernel/lsm/version/test.rs @@ -6,7 +6,7 @@ use crate::kernel::lsm::version::edit::VersionEdit; use crate::kernel::lsm::version::status::VersionStatus; use crate::kernel::lsm::version::Version; use crate::kernel::lsm::version::DEFAULT_VERSION_PATH; -use crate::kernel::Result; +use crate::kernel::KernelResult; use bytes::Bytes; use std::sync::Arc; use std::time::Duration; @@ -14,7 +14,7 @@ use tempfile::TempDir; use tokio::time; #[test] -fn test_version_clean() -> Result<()> { +fn test_version_clean() -> KernelResult<()> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); tokio_test::block_on(async move { @@ -108,7 +108,7 @@ fn test_version_clean() -> Result<()> { } #[test] -fn test_version_apply_and_log() -> Result<()> { +fn test_version_apply_and_log() -> KernelResult<()> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); tokio_test::block_on(async move { diff --git a/src/kernel/mod.rs b/src/kernel/mod.rs index 6e98016..50d7ee1 100644 --- a/src/kernel/mod.rs +++ b/src/kernel/mod.rs @@ -16,7 +16,7 @@ pub mod lsm; pub mod sled_storage; pub mod utils; -pub type Result = std::result::Result; +pub type KernelResult = std::result::Result; pub(crate) const DEFAULT_LOCK_FILE: &str = "KipDB.lock"; @@ -29,23 +29,23 @@ pub trait Storage: Send + Sync + 'static + Sized { Self: Sized; /// 通过数据目录路径开启数据库 - async fn open(path: impl Into + Send) -> Result; + async fn open(path: impl Into + Send) -> KernelResult; /// 强制将数据刷入硬盘 - async fn flush(&self) -> Result<()>; + async fn flush(&self) -> KernelResult<()>; /// 设置键值对 - async fn set(&self, key: Bytes, value: Bytes) -> Result<()>; + async fn set(&self, key: Bytes, value: Bytes) -> KernelResult<()>; /// 通过键获取对应的值 - async fn get(&self, key: &[u8]) -> Result>; + async fn get(&self, key: &[u8]) -> KernelResult>; /// 通过键删除键值对 - async fn remove(&self, key: &[u8]) -> Result<()>; + async fn remove(&self, key: &[u8]) -> KernelResult<()>; - async fn size_of_disk(&self) -> Result; + async fn size_of_disk(&self) -> KernelResult; - async fn len(&self) -> Result; + async fn len(&self) -> KernelResult; async fn is_empty(&self) -> bool; } @@ -126,9 +126,9 @@ impl CommandData { } /// 现有日志文件序号排序 -fn sorted_gen_list(file_path: &Path, extension: FileExtension) -> Result> { +fn sorted_gen_list(file_path: &Path, extension: FileExtension) -> KernelResult> { let mut gen_list: Vec = fs::read_dir(file_path)? - .flat_map(|res| -> Result<_> { Ok(res?.path()) }) + .flat_map(|res| -> KernelResult<_> { Ok(res?.path()) }) .filter(|path| { path.is_file() && path.extension() == Some(extension.extension_str().as_ref()) }) @@ -147,7 +147,7 @@ fn sorted_gen_list(file_path: &Path, extension: FileExtension) -> Result Result { +async fn lock_or_time_out(path: &PathBuf) -> KernelResult { let mut lock_file = LockFile::open(path)?; let mut backoff = 1; diff --git a/src/kernel/sled_storage.rs b/src/kernel/sled_storage.rs index c983cc6..ccaee34 100644 --- a/src/kernel/sled_storage.rs +++ b/src/kernel/sled_storage.rs @@ -23,26 +23,26 @@ impl Storage for SledStorage { } #[inline] - async fn open(path: impl Into + Send) -> crate::kernel::Result { + async fn open(path: impl Into + Send) -> crate::kernel::KernelResult { let db = Arc::new(sled::open(path.into())?); Ok(SledStorage { data_base: db }) } #[inline] - async fn flush(&self) -> crate::kernel::Result<()> { + async fn flush(&self) -> crate::kernel::KernelResult<()> { let _ignore = self.data_base.flush_async().await?; Ok(()) } #[inline] - async fn set(&self, key: Bytes, value: Bytes) -> crate::kernel::Result<()> { + async fn set(&self, key: Bytes, value: Bytes) -> crate::kernel::KernelResult<()> { let _ignore = self.data_base.insert(key.as_slice(), value.to_vec())?; Ok(()) } #[inline] - async fn get(&self, key: &[u8]) -> crate::kernel::Result> { + async fn get(&self, key: &[u8]) -> crate::kernel::KernelResult> { match self.data_base.get(key)? { None => Ok(None), Some(i_vec) => Ok(Some(Bytes::from(i_vec.to_vec()))), @@ -50,7 +50,7 @@ impl Storage for SledStorage { } #[inline] - async fn remove(&self, key: &[u8]) -> crate::kernel::Result<()> { + async fn remove(&self, key: &[u8]) -> crate::kernel::KernelResult<()> { match self.data_base.remove(key) { Ok(Some(_)) => Ok(()), Ok(None) => Err(KernelError::KeyNotFound), @@ -59,12 +59,12 @@ impl Storage for SledStorage { } #[inline] - async fn size_of_disk(&self) -> crate::kernel::Result { + async fn size_of_disk(&self) -> crate::kernel::KernelResult { Ok(self.data_base.size_on_disk()?) } #[inline] - async fn len(&self) -> crate::kernel::Result { + async fn len(&self) -> crate::kernel::KernelResult { Ok(self.data_base.len()) } diff --git a/src/kernel/utils/lru_cache.rs b/src/kernel/utils/lru_cache.rs index 5ce5c7f..7b9d45d 100644 --- a/src/kernel/utils/lru_cache.rs +++ b/src/kernel/utils/lru_cache.rs @@ -1,4 +1,5 @@ -use crate::error::CacheError; +use crate::kernel::KernelResult; +use crate::KernelError; use parking_lot::Mutex; use std::borrow::Borrow; use std::cmp::Ordering; @@ -10,8 +11,6 @@ use std::ops::{Deref, DerefMut}; use std::ptr::NonNull; use std::sync::Arc; -pub type Result = std::result::Result; - // 只读Node操作裸指针 // https://course.rs/advance/concurrency-with-threads/send-sync.html#:~:text=%E5%AE%89%E5%85%A8%E7%9A%84%E4%BD%BF%E7%94%A8%E3%80%82-,%E4%B8%BA%E8%A3%B8%E6%8C%87%E9%92%88%E5%AE%9E%E7%8E%B0Send,-%E4%B8%8A%E9%9D%A2%E6%88%91%E4%BB%AC%E6%8F%90%E5%88%B0 // 通过只读数据已保证线程安全 @@ -115,10 +114,10 @@ impl Node { impl ShardingLruCache { #[inline] - pub fn new(cap: usize, sharding_size: usize, hasher: S) -> Result { + pub fn new(cap: usize, sharding_size: usize, hasher: S) -> KernelResult { let mut sharding_vec = Vec::with_capacity(sharding_size); if cap % sharding_size != 0 { - return Err(CacheError::ShardingNotAlign); + return Err(KernelError::ShardingNotAlign); } let sharding_cap = cap / sharding_size; for _ in 0..sharding_size { @@ -160,9 +159,9 @@ impl ShardingLruCache { } #[inline] - pub fn get_or_insert(&self, key: K, fn_once: F) -> Result<&V> + pub fn get_or_insert(&self, key: K, fn_once: F) -> KernelResult<&V> where - F: FnOnce(&K) -> Result, + F: FnOnce(&K) -> KernelResult, { self.shard(&key) .lock() @@ -184,9 +183,9 @@ impl ShardingLruCache { impl LruCache { #[inline] - pub fn new(cap: usize) -> Result { + pub fn new(cap: usize) -> KernelResult { if cap < 1 { - return Err(CacheError::CacheSizeOverFlow); + return Err(KernelError::CacheSizeOverFlow); } Ok(Self { @@ -306,9 +305,9 @@ impl LruCache { }) } - fn get_or_insert_node(&mut self, key: K, fn_once: F) -> Result> + fn get_or_insert_node(&mut self, key: K, fn_once: F) -> KernelResult> where - F: FnOnce(&K) -> Result, + F: FnOnce(&K) -> KernelResult, { if let Some(node) = self.inner.get(&key) { let node = *node; @@ -330,9 +329,9 @@ impl LruCache { } #[inline] - pub fn get_or_insert(&mut self, key: K, fn_once: F) -> Result<&V> + pub fn get_or_insert(&mut self, key: K, fn_once: F) -> KernelResult<&V> where - F: FnOnce(&K) -> Result, + F: FnOnce(&K) -> KernelResult, { self.get_or_insert_node(key, fn_once) .map(|node| unsafe { &node.as_ref().value }) diff --git a/src/server/client.rs b/src/server/client.rs index 8458a9b..1a41224 100644 --- a/src/server/client.rs +++ b/src/server/client.rs @@ -5,7 +5,7 @@ use crate::proto::{ }; use tonic::transport::Channel; -pub type Result = std::result::Result; +pub type ConnectionResult = Result; type Key = Vec; type Value = Vec; type KV = (Key, Value); @@ -15,13 +15,13 @@ pub struct KipdbClient { } impl KipdbClient { - pub async fn connect(addr: String) -> Result { + pub async fn connect(addr: String) -> ConnectionResult { let conn = KipdbRpcClient::connect(addr).await?; Ok(Self { conn }) } #[inline] - pub async fn set(&mut self, key: Key, value: Value) -> Result<()> { + pub async fn set(&mut self, key: Key, value: Value) -> ConnectionResult<()> { let req = tonic::Request::new(SetReq { key, value }); let resp = self.conn.set(req).await?; if resp.into_inner().success { @@ -32,7 +32,7 @@ impl KipdbClient { } #[inline] - pub async fn remove(&mut self, key: Key) -> Result<()> { + pub async fn remove(&mut self, key: Key) -> ConnectionResult<()> { let req = tonic::Request::new(RemoveReq { key }); let resp = self.conn.remove(req).await?; if resp.into_inner().success { @@ -43,14 +43,14 @@ impl KipdbClient { } #[inline] - pub async fn get(&mut self, key: Key) -> Result> { + pub async fn get(&mut self, key: Key) -> ConnectionResult> { let req = tonic::Request::new(GetReq { key }); let resp = self.conn.get(req).await?; Ok(resp.into_inner().value) } #[inline] - pub async fn batch_set(&mut self, kvs: Vec) -> Result> { + pub async fn batch_set(&mut self, kvs: Vec) -> ConnectionResult> { let req = tonic::Request::new(BatchSetReq { kvs: kvs .into_iter() @@ -67,21 +67,21 @@ impl KipdbClient { } #[inline] - pub async fn batch_remove(&mut self, keys: Vec) -> Result> { + pub async fn batch_remove(&mut self, keys: Vec) -> ConnectionResult> { let req = tonic::Request::new(BatchRemoveReq { keys }); let resp = self.conn.batch_remove(req).await?; Ok(resp.into_inner().failure) } #[inline] - pub async fn batch_get(&mut self, keys: Vec) -> Result> { + pub async fn batch_get(&mut self, keys: Vec) -> ConnectionResult> { let req = tonic::Request::new(BatchGetReq { keys }); let resp = self.conn.batch_get(req).await?; Ok(resp.into_inner().values) } #[inline] - pub async fn flush(&mut self) -> Result<()> { + pub async fn flush(&mut self) -> ConnectionResult<()> { let req = tonic::Request::new(Empty {}); let resp = self.conn.flush(req).await?; if resp.into_inner().success { @@ -92,14 +92,14 @@ impl KipdbClient { } #[inline] - pub async fn size_of_disk(&mut self) -> Result { + pub async fn size_of_disk(&mut self) -> ConnectionResult { let req = tonic::Request::new(Empty {}); let resp = self.conn.size_of_disk(req).await?; Ok(resp.into_inner().size) } #[inline] - pub async fn len(&mut self) -> Result { + pub async fn len(&mut self) -> ConnectionResult { let req = tonic::Request::new(Empty {}); let resp = self.conn.len(req).await?; Ok(resp.into_inner().len as usize) diff --git a/tests/tests.rs b/tests/tests.rs index 7724474..e3ccdd7 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -2,20 +2,20 @@ use bytes::Bytes; use kip_db::kernel::io::{FileExtension, IoFactory, IoType}; use kip_db::kernel::lsm::storage::KipStorage; use kip_db::kernel::sled_storage::SledStorage; -use kip_db::kernel::Result; +use kip_db::kernel::KernelResult; use kip_db::kernel::Storage; use std::io::{Read, Seek, SeekFrom, Write}; use tempfile::TempDir; use walkdir::WalkDir; #[test] -fn get_stored_value() -> Result<()> { +fn get_stored_value() -> KernelResult<()> { get_stored_value_with_kv_store::()?; get_stored_value_with_kv_store::()?; Ok(()) } -fn get_stored_value_with_kv_store() -> Result<()> { +fn get_stored_value_with_kv_store() -> KernelResult<()> { tokio_test::block_on(async move { let key1: Vec = encode_key("key1")?; let key2: Vec = encode_key("key2")?; @@ -48,14 +48,14 @@ fn get_stored_value_with_kv_store() -> Result<()> { // Should overwrite existent value. #[test] -fn overwrite_value() -> Result<()> { +fn overwrite_value() -> KernelResult<()> { overwrite_value_with_kv_store::()?; overwrite_value_with_kv_store::()?; Ok(()) } -fn overwrite_value_with_kv_store() -> Result<()> { +fn overwrite_value_with_kv_store() -> KernelResult<()> { tokio_test::block_on(async move { let key1: Vec = encode_key("key1")?; let value1: Vec = encode_key("value1")?; @@ -103,14 +103,14 @@ fn overwrite_value_with_kv_store() -> Result<()> { // Should get `None` when getting a non-existent key. #[test] -fn get_non_existent_value() -> Result<()> { +fn get_non_existent_value() -> KernelResult<()> { get_non_existent_value_with_kv_store::()?; get_non_existent_value_with_kv_store::()?; Ok(()) } -fn get_non_existent_value_with_kv_store() -> Result<()> { +fn get_non_existent_value_with_kv_store() -> KernelResult<()> { tokio_test::block_on(async move { let key1: Vec = encode_key("key1")?; let key2: Vec = encode_key("key2")?; @@ -135,13 +135,13 @@ fn get_non_existent_value_with_kv_store() -> Result<()> { } #[test] -fn remove_non_existent_key() -> Result<()> { +fn remove_non_existent_key() -> KernelResult<()> { remove_non_existent_key_with_kv_store::()?; remove_non_existent_key_with_kv_store::()?; Ok(()) } -fn remove_non_existent_key_with_kv_store() -> Result<()> { +fn remove_non_existent_key_with_kv_store() -> KernelResult<()> { tokio_test::block_on(async move { let key1: Vec = encode_key("key1")?; @@ -154,14 +154,14 @@ fn remove_non_existent_key_with_kv_store() -> Result<()> { } #[test] -fn remove_key() -> Result<()> { +fn remove_key() -> KernelResult<()> { remove_key_with_kv_store::()?; remove_key_with_kv_store::()?; Ok(()) } -fn remove_key_with_kv_store() -> Result<()> { +fn remove_key_with_kv_store() -> KernelResult<()> { tokio_test::block_on(async move { let key1: Vec = encode_key("key1")?; let value1: Vec = encode_key("value1")?; @@ -181,7 +181,7 @@ fn remove_key_with_kv_store() -> Result<()> { // Insert data until total size of the directory decreases. // Test data correctness after compaction. #[test] -fn compaction() -> Result<()> { +fn compaction() -> KernelResult<()> { compaction_with_kv_store::()?; compaction_with_kv_store::()?; @@ -189,7 +189,7 @@ fn compaction() -> Result<()> { } // 如果此处出现异常,可以尝试降低压缩阈值或者提高检测时间 -fn compaction_with_kv_store() -> Result<()> { +fn compaction_with_kv_store() -> KernelResult<()> { tokio_test::block_on(async move { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let kv_store = T::open(temp_dir.path()).await?; @@ -243,7 +243,7 @@ fn compaction_with_kv_store() -> Result<()> { } #[test] -fn test_io() -> Result<()> { +fn test_io() -> KernelResult<()> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let factory = IoFactory::new(temp_dir.path(), FileExtension::Log).unwrap(); @@ -253,7 +253,7 @@ fn test_io() -> Result<()> { Ok(()) } -fn io_type_test(factory: &IoFactory, io_type: IoType) -> Result<()> { +fn io_type_test(factory: &IoFactory, io_type: IoType) -> KernelResult<()> { let mut writer = factory.writer(1, io_type)?; let data_write1 = vec![b'1', b'2', b'3']; let data_write2 = vec![b'4', b'5', b'6']; @@ -286,6 +286,6 @@ fn io_type_test(factory: &IoFactory, io_type: IoType) -> Result<()> { Ok(()) } -fn encode_key(key: &str) -> Result> { +fn encode_key(key: &str) -> KernelResult> { Ok(bincode::serialize(key)?) }