Skip to content

Commit

Permalink
remove some unwraps
Browse files Browse the repository at this point in the history
Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc committed Dec 29, 2023
1 parent 2f9319a commit 50d8a1b
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 60 deletions.
73 changes: 68 additions & 5 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,14 +423,69 @@ pub enum Error {
#[snafu(display("Failed to build index applier"))]
BuildIndexApplier {
#[snafu(source)]
error: index::inverted_index::error::Error,
source: index::inverted_index::error::Error,
location: Location,
},

#[snafu(display("Failed to push index value"))]
PushIndexValue {
#[snafu(source)]
error: index::inverted_index::error::Error,
source: index::inverted_index::error::Error,
location: Location,
},

#[snafu(display("Failed to apply index"))]
ApplyIndex {
#[snafu(source)]
source: index::inverted_index::error::Error,
location: Location,
},

#[snafu(display("Failed to write index completely"))]
IndexFinish {
#[snafu(source)]
source: index::inverted_index::error::Error,
location: Location,
},

#[snafu(display("Failed to read puffin metadata"))]
PuffinReadMetadata {
#[snafu(source)]
source: puffin::error::Error,
location: Location,
},

#[snafu(display("Failed to read puffin blob"))]
PuffinReadBlob {
#[snafu(source)]
source: puffin::error::Error,
location: Location,
},

#[snafu(display("Blob type not found, blob_type: {blob_type}"))]
PuffinBlobTypeNotFound {
blob_type: String,
location: Location,
},

#[snafu(display("Failed to write puffin completely"))]
PuffinFinish {
#[snafu(source)]
source: puffin::error::Error,
location: Location,
},

#[snafu(display("Failed to add blob to puffin file"))]
PuffinAddBlob {
#[snafu(source)]
source: puffin::error::Error,
location: Location,
},

#[snafu(display("Failed to convert value"))]
ConvertValue {
#[snafu(source)]
source: datatypes::error::Error,
location: Location,
},

Expand Down Expand Up @@ -477,6 +532,7 @@ impl ErrorExt for Error {
| RegionCorrupted { .. }
| CreateDefault { .. }
| InvalidParquet { .. }
| PuffinBlobTypeNotFound { .. }
| UnexpectedReplay { .. } => StatusCode::Unexpected,
RegionNotFound { .. } => StatusCode::RegionNotFound,
ObjectStoreNotFound { .. }
Expand All @@ -486,15 +542,13 @@ impl ErrorExt for Error {
| FillDefault { .. }
| ConvertColumnDataType { .. }
| ColumnNotFound { .. }
| BuildIndexApplier { .. }
| InvalidMetadata { .. } => StatusCode::InvalidArguments,
RegionMetadataNotFound { .. }
| Join { .. }
| WorkerStopped { .. }
| Recv { .. }
| EncodeWal { .. }
| DecodeWal { .. }
| PushIndexValue { .. } => StatusCode::Internal,
| DecodeWal { .. } => StatusCode::Internal,
WriteBuffer { source, .. } => source.status_code(),
WriteGroup { source, .. } => source.status_code(),
FieldTypeMismatch { source, .. } => source.status_code(),
Expand Down Expand Up @@ -524,6 +578,15 @@ impl ErrorExt for Error {
JsonOptions { .. } => StatusCode::InvalidArguments,
EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound,
ArrowReader { .. } => StatusCode::StorageUnavailable,
ConvertValue { source, .. } => source.status_code(),
BuildIndexApplier { source, .. }
| PushIndexValue { source, .. }
| ApplyIndex { source, .. }
| IndexFinish { source, .. } => source.status_code(),
PuffinReadMetadata { source, .. }
| PuffinReadBlob { source, .. }
| PuffinFinish { source, .. }
| PuffinAddBlob { source, .. } => source.status_code(),
}
}

Expand Down
21 changes: 16 additions & 5 deletions src/mito2/src/sst/index/applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@ use index::inverted_index::search::index_apply::{
IndexApplier, IndexNotFoundStrategy, SearchContext,
};
use puffin::file_format::reader::{PuffinAsyncReader, PuffinFileReader};
use snafu::{OptionExt, ResultExt};

use crate::error::Result;
use crate::error::{
ApplyIndexSnafu, PuffinBlobTypeNotFoundSnafu, PuffinReadBlobSnafu, PuffinReadMetadataSnafu,
Result,
};
use crate::metrics::{
INDEX_APPLY_COST_TIME, INDEX_APPLY_MEMORY_USAGE, INDEX_PUFFIN_READ_BYTES_TOTAL,
};
Expand Down Expand Up @@ -66,14 +70,21 @@ impl SstIndexApplier {
.await?;
let mut puffin_reader = PuffinFileReader::new(file_reader);

let file_meta = puffin_reader.metadata().await.unwrap();
let file_meta = puffin_reader
.metadata()
.await
.context(PuffinReadMetadataSnafu)?;
let blob_meta = file_meta
.blobs
.iter()
.find(|blob| blob.blob_type == INDEX_BLOB_TYPE)
.unwrap();
.context(PuffinBlobTypeNotFoundSnafu {
blob_type: INDEX_BLOB_TYPE,
})?;

let blob_reader = puffin_reader.blob_reader(blob_meta).unwrap();
let blob_reader = puffin_reader
.blob_reader(blob_meta)
.context(PuffinReadBlobSnafu)?;
let mut index_reader = InvertedIndexBlobReader::new(blob_reader);

let context = SearchContext {
Expand All @@ -83,7 +94,7 @@ impl SstIndexApplier {
.index_applier
.apply(context, &mut index_reader)
.await
.unwrap();
.context(ApplyIndexSnafu)?;

Ok(res)
}
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/sst/index/applier/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use object_store::ObjectStore;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadata;

use crate::error::{BuildIndexApplierSnafu, ColumnNotFoundSnafu, Result};
use crate::error::{BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, Result};
use crate::row_converter::SortField;
use crate::sst::index::applier::SstIndexApplier;
use crate::sst::index::codec::IndexValueCodec;
Expand Down Expand Up @@ -139,7 +139,7 @@ impl<'a> SstIndexApplierBuilder<'a> {
}

fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result<Vec<u8>> {
let value = Value::try_from(lit.clone()).unwrap();
let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?;
let mut bytes = vec![];
let field = SortField::new(data_type);
IndexValueCodec::encode_value(value.as_value_ref(), &field, &mut bytes)?;
Expand Down
33 changes: 19 additions & 14 deletions src/mito2/src/sst/index/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ use store_api::metadata::RegionMetadataRef;
use tokio::io::duplex;
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};

use crate::error::{PushIndexValueSnafu, Result};
use crate::error::{
IndexFinishSnafu, PuffinAddBlobSnafu, PuffinFinishSnafu, PushIndexValueSnafu, Result,
};
use crate::metrics::INDEX_PUFFIN_WRITE_BYTES_TOTAL;
use crate::read::Batch;
use crate::sst::file::FileId;
Expand Down Expand Up @@ -104,9 +106,10 @@ impl SstIndexCreator {
if let Err(err) = self.do_update(batch).await {
// clean up garbage if failed to update
if let Err(err) = self.do_cleanup().await {
let region_dir = &self.region_dir;
let sst_file_id = &self.sst_file_id;
warn!(err; "Failed to clean up index creator, region_dir: {region_dir}, sst_file_id: {sst_file_id}");
warn!(
err; "Failed to clean up index creator, region_dir: {}, sst_file_id: {}",
self.region_dir, self.sst_file_id,
);
}
return Err(err);
}
Expand All @@ -121,13 +124,13 @@ impl SstIndexCreator {
}

let finish_res = self.do_finish().await;
// clean up garbage no matter finish success or not
let cleanup_res = self.do_cleanup().await;

if let Err(err) = cleanup_res {
let region_dir = &self.region_dir;
let sst_file_id = &self.sst_file_id;
warn!(err; "Failed to clean up index creator, region_dir: {region_dir}, sst_file_id: {sst_file_id}");
// clean up garbage no matter finish success or not
if let Err(err) = self.do_cleanup().await {
warn!(
err; "Failed to clean up index creator, region_dir: {}, sst_file_id: {}",
self.region_dir, self.sst_file_id,
);
}

finish_res.map(|_| (self.stats.row_count(), self.stats.byte_count()))
Expand All @@ -144,6 +147,7 @@ impl SstIndexCreator {
IndexValueCodec::encode_value(value.as_value_ref(), field, &mut self.value_buf)?;
}

// null value -> None
let v = value.is_some().then_some(self.value_buf.as_slice());
self.index_creator
.push_with_name_n(column_name, v, n)
Expand Down Expand Up @@ -173,21 +177,22 @@ impl SstIndexCreator {
};

let mut index_writer = InvertedIndexBlobWriter::new(tx.compat_write());
let (source, sink) = futures::join!(
let (index_finish, puffin_add_blob) = futures::join!(
self.index_creator.finish(&mut index_writer),
puffin_writer.add_blob(blob)
);

source.unwrap();
sink.unwrap();
index_finish.context(IndexFinishSnafu)?;
puffin_add_blob.context(PuffinAddBlobSnafu)?;

let byte_count = puffin_writer.finish().await.unwrap();
let byte_count = puffin_writer.finish().await.context(PuffinFinishSnafu)?;
guard.inc_byte_count(byte_count);
Ok(())
}

async fn do_cleanup(&mut self) -> Result<()> {
let _guard = self.stats.record_cleanup();

self.temp_file_provider.cleanup().await
}
}
44 changes: 22 additions & 22 deletions src/mito2/src/sst/index/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,20 @@ impl InstrumentedStore {
Self { object_store }
}

pub async fn reader(
pub async fn reader<'a>(
&self,
path: &str,
recoder: &'static IntCounter,
) -> Result<InstrumentedAsyncRead<object_store::Reader>> {
recoder: &'a IntCounter,
) -> Result<InstrumentedAsyncRead<'a, object_store::Reader>> {
let reader = self.object_store.reader(path).await.context(OpenDalSnafu)?;
Ok(InstrumentedAsyncRead::new(reader, recoder))
}

pub async fn writer(
pub async fn writer<'a>(
&self,
path: &str,
recoder: &'static IntCounter,
) -> Result<InstrumentedAsyncWrite<object_store::Writer>> {
recoder: &'a IntCounter,
) -> Result<InstrumentedAsyncWrite<'a, object_store::Writer>> {
let writer = self.object_store.writer(path).await.context(OpenDalSnafu)?;
Ok(InstrumentedAsyncWrite::new(writer, recoder))
}
Expand All @@ -66,22 +66,22 @@ impl InstrumentedStore {
}

#[pin_project]
pub(crate) struct InstrumentedAsyncRead<R> {
pub(crate) struct InstrumentedAsyncRead<'a, R> {
#[pin]
inner: R,
recorder: BytesRecorder,
recorder: BytesRecorder<'a>,
}

impl<R> InstrumentedAsyncRead<R> {
fn new(inner: R, recorder: &'static IntCounter) -> Self {
impl<'a, R> InstrumentedAsyncRead<'a, R> {
fn new(inner: R, recorder: &'a IntCounter) -> Self {
Self {
inner,
recorder: BytesRecorder::new(recorder),
}
}
}

impl<R: AsyncRead + Unpin + Send> AsyncRead for InstrumentedAsyncRead<R> {
impl<'a, R: AsyncRead + Unpin + Send> AsyncRead for InstrumentedAsyncRead<'a, R> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand All @@ -95,7 +95,7 @@ impl<R: AsyncRead + Unpin + Send> AsyncRead for InstrumentedAsyncRead<R> {
}
}

impl<R: AsyncSeek + Unpin + Send> AsyncSeek for InstrumentedAsyncRead<R> {
impl<'a, R: AsyncSeek + Unpin + Send> AsyncSeek for InstrumentedAsyncRead<'a, R> {
fn poll_seek(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand All @@ -106,22 +106,22 @@ impl<R: AsyncSeek + Unpin + Send> AsyncSeek for InstrumentedAsyncRead<R> {
}

#[pin_project]
pub(crate) struct InstrumentedAsyncWrite<W> {
pub(crate) struct InstrumentedAsyncWrite<'a, W> {
#[pin]
inner: W,
recorder: BytesRecorder,
recorder: BytesRecorder<'a>,
}

impl<W> InstrumentedAsyncWrite<W> {
fn new(inner: W, recorder: &'static IntCounter) -> Self {
impl<'a, W> InstrumentedAsyncWrite<'a, W> {
fn new(inner: W, recorder: &'a IntCounter) -> Self {
Self {
inner,
recorder: BytesRecorder::new(recorder),
}
}
}

impl<W: AsyncWrite + Unpin + Send> AsyncWrite for InstrumentedAsyncWrite<W> {
impl<'a, W: AsyncWrite + Unpin + Send> AsyncWrite for InstrumentedAsyncWrite<'a, W> {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand All @@ -143,13 +143,13 @@ impl<W: AsyncWrite + Unpin + Send> AsyncWrite for InstrumentedAsyncWrite<W> {
}
}

struct BytesRecorder {
struct BytesRecorder<'a> {
bytes: usize,
recorder: &'static IntCounter,
recorder: &'a IntCounter,
}

impl BytesRecorder {
fn new(recorder: &'static IntCounter) -> Self {
impl<'a> BytesRecorder<'a> {
fn new(recorder: &'a IntCounter) -> Self {
Self { bytes: 0, recorder }
}

Expand All @@ -158,7 +158,7 @@ impl BytesRecorder {
}
}

impl Drop for BytesRecorder {
impl<'a> Drop for BytesRecorder<'a> {
fn drop(&mut self) {
if self.bytes > 0 {
self.recorder.inc_by(self.bytes as _);
Expand Down
Loading

0 comments on commit 50d8a1b

Please sign in to comment.