Skip to content

Commit

Permalink
feat: Ignore file not found error temporarily while compacting files (#…
Browse files Browse the repository at this point in the history
…2745)

* feat: support ignoring file not found error

* feat: ignore not found during compaction
  • Loading branch information
evenyag authored Nov 15, 2023
1 parent a9e5b90 commit f92b55c
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 3 deletions.
2 changes: 2 additions & 0 deletions src/mito2/src/compaction/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ async fn build_sst_reader(
) -> error::Result<BoxedBatchReader> {
SeqScan::new(sst_layer, ProjectionMapper::all(&schema)?)
.with_files(inputs.to_vec())
// We ignore file not found error during compaction.
.with_ignore_file_not_found(true)
.build_reader()
.await
}
9 changes: 9 additions & 0 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use common_macro::stack_trace_debug;
use common_runtime::JoinError;
use datatypes::arrow::error::ArrowError;
use datatypes::prelude::ConcreteDataType;
use object_store::ErrorKind;
use prost::{DecodeError, EncodeError};
use snafu::{Location, Snafu};
use store_api::manifest::ManifestVersion;
Expand Down Expand Up @@ -411,6 +412,14 @@ impl Error {
pub(crate) fn is_fill_default(&self) -> bool {
matches!(self, Error::FillDefault { .. })
}

/// Returns true if the file is not found on the object store.
pub(crate) fn is_object_not_found(&self) -> bool {
match self {
Error::OpenDal { error, .. } => error.kind() == ErrorKind::NotFound,
_ => false,
}
}
}

impl ErrorExt for Error {
Expand Down
28 changes: 25 additions & 3 deletions src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use async_stream::try_stream;
use common_error::ext::BoxedError;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatch, RecordBatchStreamAdaptor, SendableRecordBatchStream};
use common_telemetry::debug;
use common_telemetry::{debug, error};
use common_time::range::TimestampRange;
use snafu::ResultExt;
use table::predicate::Predicate;
Expand Down Expand Up @@ -55,6 +55,8 @@ pub struct SeqScan {
files: Vec<FileHandle>,
/// Cache.
cache_manager: Option<CacheManagerRef>,
/// Ignores file not found error.
ignore_file_not_found: bool,
}

impl SeqScan {
Expand All @@ -69,6 +71,7 @@ impl SeqScan {
memtables: Vec::new(),
files: Vec::new(),
cache_manager: None,
ignore_file_not_found: false,
}
}

Expand Down Expand Up @@ -101,11 +104,19 @@ impl SeqScan {
}

/// Sets cache for this query.
#[must_use]
pub(crate) fn with_cache(mut self, cache: Option<CacheManagerRef>) -> Self {
self.cache_manager = cache;
self
}

/// Ignores file not found error.
#[must_use]
pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
self.ignore_file_not_found = ignore;
self
}

/// Builds a stream for the query.
pub async fn build_stream(&self) -> Result<SendableRecordBatchStream> {
let start = Instant::now();
Expand Down Expand Up @@ -147,15 +158,26 @@ impl SeqScan {
builder.push_batch_iter(iter);
}
for file in &self.files {
let reader = self
let maybe_reader = self
.access_layer
.read_sst(file.clone())
.predicate(self.predicate.clone())
.time_range(self.time_range)
.projection(Some(self.mapper.column_ids().to_vec()))
.cache(self.cache_manager.clone())
.build()
.await?;
.await;
let reader = match maybe_reader {
Ok(reader) => reader,
Err(e) => {
if e.is_object_not_found() && self.ignore_file_not_found {
error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
continue;
} else {
return Err(e);
}
}
};
if compat::has_same_columns(self.mapper.metadata(), reader.metadata()) {
builder.push_batch_reader(Box::new(reader));
} else {
Expand Down

0 comments on commit f92b55c

Please sign in to comment.