Skip to content
This repository has been archived by the owner on Aug 4, 2024. It is now read-only.

Commit

Permalink
Replace failure with thiserror (#52)
Browse files Browse the repository at this point in the history
* Replace failure with thiserror

* Mark transparent for some errors
  • Loading branch information
lewiszlw authored Oct 13, 2023
1 parent 0dff115 commit 9399f64
Show file tree
Hide file tree
Showing 36 changed files with 358 additions and 394 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ debug = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
failure = { version = "0.1.5", features = ["derive"] }
thiserror = "1.0.24"
# 序列化
serde = { version = "1.0.89", features = ["derive", "rc"] }
bincode = "1.3.3"
Expand Down
4 changes: 2 additions & 2 deletions src/bin/cli.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use clap::{Parser, Subcommand};
use itertools::Itertools;
use kip_db::server::client::ConnectionResult;
use kip_db::server::client::KipdbClient;
use kip_db::server::client::Result;
use kip_db::DEFAULT_PORT;
use serde::{Deserialize, Serialize};
use tracing::{error, info};
Expand Down Expand Up @@ -33,7 +33,7 @@ struct Cli {
/// 就是说客户端没必要多线程,强制单线程避免产生额外线程
/// 调用方法基本:./kip-db-cli get key1 value1
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
async fn main() -> ConnectionResult<()> {
// Enable logging
tracing_subscriber::fmt::try_init().unwrap();
let cli: Cli = Cli::parse();
Expand Down
4 changes: 2 additions & 2 deletions src/bin/server.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use clap::Parser;

use kip_db::server::client::Result;
use kip_db::server::client::ConnectionResult;
use kip_db::server::server::serve;
use kip_db::{DEFAULT_PORT, LOCAL_IP};

/// 服务启动方法
/// 二进制执行文件调用方法:./kip-db-cli
#[tokio::main]
pub async fn main() -> Result<()> {
pub async fn main() -> ConnectionResult<()> {
tracing_subscriber::fmt::try_init().unwrap();

let cli = Cli::parse();
Expand Down
208 changes: 72 additions & 136 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,179 +1,115 @@
use failure::Fail;
use crate::kernel::lsm::compactor::CompactTask;
use crate::kernel::lsm::version::cleaner::CleanTag;
use std::io;
use thiserror::Error;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::oneshot::error::RecvError;

/// Error type for kvs
#[derive(Fail, Debug)]
#[derive(Error, Debug)]
#[non_exhaustive]
pub enum KernelError {
/// IO error
#[fail(display = "{}", _0)]
Io(#[cause] io::Error),
#[fail(display = "{}", _0)]
Recv(#[cause] RecvError),
#[error(transparent)]
Io(#[from] io::Error),

#[error(transparent)]
RecvError(#[from] RecvError),

#[error("Failed to send compact task")]
SendCompactTaskError(#[from] SendError<CompactTask>),

#[error("Failed to send clean tag")]
SendCleanTagError(#[from] SendError<CleanTag>),

/// Serialization or deserialization error
#[fail(display = "{}", _0)]
SerdeBinCode(#[cause] Box<bincode::ErrorKind>),
#[error(transparent)]
SerdeBinCode(#[from] Box<bincode::ErrorKind>),

/// Remove no-existent key error
#[fail(display = "Key not found")]
#[error("Key not found")]
KeyNotFound,
#[fail(display = "Data is empty")]

#[error("Data is empty")]
DataEmpty,
#[fail(display = "Max Level is 7")]

#[error("Max Level is 7")]
LevelOver,
#[fail(display = "Not the correct type of Cmd")]

#[error("Not the correct type of Cmd")]
NotMatchCmd,
#[fail(display = "CRC code does not match")]

#[error("CRC code does not match")]
CrcMisMatch,
#[fail(display = "{}", _0)]
SledErr(#[cause] sled::Error),
#[fail(display = "Cache size overflow")]

#[error(transparent)]
SledErr(#[from] sled::Error),

#[error("Cache size overflow")]
CacheSizeOverFlow,
#[fail(display = "Cache sharding and size overflow")]

#[error("Cache sharding and size overflow")]
CacheShardingNotAlign,
#[fail(display = "File not found")]

#[error("File not found")]
FileNotFound,

/// 正常情况wal在内存中存在索引则表示硬盘中存在有对应的数据
/// 而错误则是内存存在索引却在硬盘中不存在这个数据
#[fail(display = "WAL log load error")]
#[error("WAL log load error")]
WalLoad,

/// Unexpected command type error.
/// It indicated a corrupted log or a program bug.
#[fail(display = "Unexpected command type")]
#[error("Unexpected command type")]
UnexpectedCommandType,
#[fail(display = "Process already exists")]

#[error("Process already exists")]
ProcessExists,
#[fail(display = "channel is closed")]

#[error("channel is closed")]
ChannelClose,
#[fail(display = "{}", _0)]
NotSupport(&'static str),
}

#[derive(Fail, Debug)]
#[non_exhaustive]
pub enum ConnectionError {
#[fail(display = "{}", _0)]
IO(#[cause] io::Error),
#[fail(display = "disconnected")]
Disconnected,
#[fail(display = "write failed")]
WriteFailed,
#[fail(display = "wrong instruction")]
WrongInstruction,
#[fail(display = "encode error")]
EncodeErr,
#[fail(display = "decode error")]
DecodeErr,
#[fail(display = "server flush error")]
FlushError,
#[fail(display = "{}", _0)]
StoreErr(#[cause] KernelError),
#[fail(display = "Failed to connect to server, {}", _0)]
TonicTransportErr(#[cause] tonic::transport::Error),
#[fail(display = "Failed to call server, {}", _0)]
TonicFailureStatus(#[cause] tonic::Status),
#[fail(display = "Failed to parse addr, {}", _0)]
AddrParseError(#[cause] std::net::AddrParseError),
}
#[error("{0}")]
NotSupport(&'static str),

#[derive(Fail, Debug)]
#[non_exhaustive]
#[allow(missing_copy_implementations)]
pub enum CacheError {
#[fail(display = "The number of caches cannot be divisible by the number of shards")]
#[error("The number of caches cannot be divisible by the number of shards")]
ShardingNotAlign,
#[fail(display = "Cache size overflow")]
CacheSizeOverFlow,
#[fail(display = "{}", _0)]
StoreErr(#[cause] KernelError),
}

impl<T> From<SendError<T>> for KernelError {
#[inline]
fn from(_: SendError<T>) -> Self {
KernelError::ChannelClose
}
}

impl From<io::Error> for ConnectionError {
#[inline]
fn from(err: io::Error) -> Self {
ConnectionError::IO(err)
}
}
#[derive(Error, Debug)]
#[non_exhaustive]
pub enum ConnectionError {
#[error(transparent)]
IO(#[from] io::Error),

impl From<io::Error> for KernelError {
#[inline]
fn from(err: io::Error) -> Self {
KernelError::Io(err)
}
}
#[error("disconnected")]
Disconnected,

impl From<RecvError> for KernelError {
#[inline]
fn from(err: RecvError) -> Self {
KernelError::Recv(err)
}
}
#[error("write failed")]
WriteFailed,

impl From<Box<bincode::ErrorKind>> for KernelError {
#[inline]
fn from(err: Box<bincode::ErrorKind>) -> Self {
KernelError::SerdeBinCode(err)
}
}
#[error("wrong instruction")]
WrongInstruction,

impl From<sled::Error> for KernelError {
#[inline]
fn from(err: sled::Error) -> Self {
KernelError::SledErr(err)
}
}
#[error("encode error")]
EncodeErr,

impl From<KernelError> for ConnectionError {
#[inline]
fn from(err: KernelError) -> Self {
ConnectionError::StoreErr(err)
}
}
#[error("decode error")]
DecodeErr,

impl From<tonic::Status> for ConnectionError {
#[inline]
fn from(status: tonic::Status) -> Self {
ConnectionError::TonicFailureStatus(status)
}
}
#[error("server flush error")]
FlushError,

impl From<tonic::transport::Error> for ConnectionError {
#[inline]
fn from(err: tonic::transport::Error) -> Self {
ConnectionError::TonicTransportErr(err)
}
}
#[error("Failed to connect to server, {0}")]
TonicTransportErr(#[from] tonic::transport::Error),

impl From<std::net::AddrParseError> for ConnectionError {
#[inline]
fn from(err: std::net::AddrParseError) -> Self {
ConnectionError::AddrParseError(err)
}
}
#[error("Failed to call server, {0}")]
TonicFailureStatus(#[from] tonic::Status),

impl From<CacheError> for KernelError {
#[inline]
fn from(value: CacheError) -> Self {
match value {
CacheError::StoreErr(kv_error) => kv_error,
CacheError::CacheSizeOverFlow => KernelError::CacheSizeOverFlow,
CacheError::ShardingNotAlign => KernelError::CacheShardingNotAlign,
}
}
}
#[error("Failed to parse addr, {0}")]
AddrParseError(#[from] std::net::AddrParseError),

impl From<KernelError> for CacheError {
#[inline]
fn from(value: KernelError) -> Self {
CacheError::StoreErr(value)
}
#[error(transparent)]
KernelError(#[from] KernelError),
}
12 changes: 6 additions & 6 deletions src/kernel/io/buf.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::kernel::io::{FileExtension, IoReader, IoType, IoWriter};
use crate::kernel::Result;
use crate::kernel::KernelResult;
use std::fs::{File, OpenOptions};
use std::io;
use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
Expand All @@ -22,7 +22,7 @@ impl BufIoReader {
dir_path: Arc<PathBuf>,
gen: i64,
extension: Arc<FileExtension>,
) -> Result<Self> {
) -> KernelResult<Self> {
let path = extension.path_with_gen(&dir_path, gen);

let reader = BufReaderWithPos::new(File::open(path)?)?;
Expand All @@ -46,7 +46,7 @@ impl BufIoWriter {
dir_path: Arc<PathBuf>,
gen: i64,
extension: Arc<FileExtension>,
) -> Result<Self> {
) -> KernelResult<Self> {
// 通过路径构造写入器
let file = OpenOptions::new()
.create(true)
Expand Down Expand Up @@ -103,7 +103,7 @@ impl Seek for BufIoWriter {
}

impl IoWriter for BufIoWriter {
fn current_pos(&mut self) -> Result<u64> {
fn current_pos(&mut self) -> KernelResult<u64> {
Ok(self.writer.pos)
}
}
Expand All @@ -115,7 +115,7 @@ pub(crate) struct BufReaderWithPos<R: Read + Seek> {
}

impl<R: Read + Seek> BufReaderWithPos<R> {
fn new(mut inner: R) -> Result<Self> {
fn new(mut inner: R) -> KernelResult<Self> {
let pos = inner.stream_position()?;
Ok(BufReaderWithPos {
reader: BufReader::new(inner),
Expand Down Expand Up @@ -146,7 +146,7 @@ pub(crate) struct BufWriterWithPos<W: Write + Seek> {
}

impl<W: Write + Seek> BufWriterWithPos<W> {
fn new(mut inner: W) -> Result<Self> {
fn new(mut inner: W) -> KernelResult<Self> {
let pos = inner.stream_position()?;
Ok(BufWriterWithPos {
writer: BufWriter::new(inner),
Expand Down
8 changes: 4 additions & 4 deletions src/kernel/io/direct.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::kernel::io::{FileExtension, IoReader, IoType, IoWriter};
use crate::kernel::Result;
use crate::kernel::KernelResult;
use std::fs::{File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::PathBuf;
Expand All @@ -23,7 +23,7 @@ impl DirectIoReader {
dir_path: Arc<PathBuf>,
gen: i64,
extension: Arc<FileExtension>,
) -> Result<Self> {
) -> KernelResult<Self> {
let path = extension.path_with_gen(&dir_path, gen);
let fs = File::open(path)?;

Expand All @@ -41,7 +41,7 @@ impl DirectIoWriter {
dir_path: Arc<PathBuf>,
gen: i64,
extension: Arc<FileExtension>,
) -> Result<Self> {
) -> KernelResult<Self> {
let path = extension.path_with_gen(&dir_path, gen);
let fs = OpenOptions::new()
.create(true)
Expand Down Expand Up @@ -96,7 +96,7 @@ impl Seek for DirectIoWriter {
}

impl IoWriter for DirectIoWriter {
fn current_pos(&mut self) -> Result<u64> {
fn current_pos(&mut self) -> KernelResult<u64> {
Ok(self.fs.stream_position()?)
}
}
Loading

0 comments on commit 9399f64

Please sign in to comment.