From f92b55c745701d7b52444cb1f90f222d3246be3f Mon Sep 17 00:00:00 2001 From: Yingwen Date: Wed, 15 Nov 2023 11:48:51 +0800 Subject: [PATCH] feat: Ignore file not found error temporarily while compacting files (#2745) * feat: support ignoring file not found error * feat: ignore not found during compaction --- src/mito2/src/compaction/output.rs | 2 ++ src/mito2/src/error.rs | 9 +++++++++ src/mito2/src/read/seq_scan.rs | 28 +++++++++++++++++++++++++--- 3 files changed, 36 insertions(+), 3 deletions(-) diff --git a/src/mito2/src/compaction/output.rs b/src/mito2/src/compaction/output.rs index bedbcb741729..6111e95c40e7 100644 --- a/src/mito2/src/compaction/output.rs +++ b/src/mito2/src/compaction/output.rs @@ -79,6 +79,8 @@ async fn build_sst_reader( ) -> error::Result { SeqScan::new(sst_layer, ProjectionMapper::all(&schema)?) .with_files(inputs.to_vec()) + // We ignore file not found error during compaction. + .with_ignore_file_not_found(true) .build_reader() .await } diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 5d76901a36c6..8ce6e8719efe 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -22,6 +22,7 @@ use common_macro::stack_trace_debug; use common_runtime::JoinError; use datatypes::arrow::error::ArrowError; use datatypes::prelude::ConcreteDataType; +use object_store::ErrorKind; use prost::{DecodeError, EncodeError}; use snafu::{Location, Snafu}; use store_api::manifest::ManifestVersion; @@ -411,6 +412,14 @@ impl Error { pub(crate) fn is_fill_default(&self) -> bool { matches!(self, Error::FillDefault { .. }) } + + /// Returns true if the file is not found on the object store. + pub(crate) fn is_object_not_found(&self) -> bool { + match self { + Error::OpenDal { error, .. } => error.kind() == ErrorKind::NotFound, + _ => false, + } + } } impl ErrorExt for Error { diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 9e797e23cc0c..b0a86072c1ac 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -21,7 +21,7 @@ use async_stream::try_stream; use common_error::ext::BoxedError; use common_recordbatch::error::ExternalSnafu; use common_recordbatch::{RecordBatch, RecordBatchStreamAdaptor, SendableRecordBatchStream}; -use common_telemetry::debug; +use common_telemetry::{debug, error}; use common_time::range::TimestampRange; use snafu::ResultExt; use table::predicate::Predicate; @@ -55,6 +55,8 @@ pub struct SeqScan { files: Vec, /// Cache. cache_manager: Option, + /// Ignores file not found error. + ignore_file_not_found: bool, } impl SeqScan { @@ -69,6 +71,7 @@ impl SeqScan { memtables: Vec::new(), files: Vec::new(), cache_manager: None, + ignore_file_not_found: false, } } @@ -101,11 +104,19 @@ impl SeqScan { } /// Sets cache for this query. + #[must_use] pub(crate) fn with_cache(mut self, cache: Option) -> Self { self.cache_manager = cache; self } + /// Ignores file not found error. + #[must_use] + pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self { + self.ignore_file_not_found = ignore; + self + } + /// Builds a stream for the query. pub async fn build_stream(&self) -> Result { let start = Instant::now(); @@ -147,7 +158,7 @@ impl SeqScan { builder.push_batch_iter(iter); } for file in &self.files { - let reader = self + let maybe_reader = self .access_layer .read_sst(file.clone()) .predicate(self.predicate.clone()) @@ -155,7 +166,18 @@ impl SeqScan { .projection(Some(self.mapper.column_ids().to_vec())) .cache(self.cache_manager.clone()) .build() - .await?; + .await; + let reader = match maybe_reader { + Ok(reader) => reader, + Err(e) => { + if e.is_object_not_found() && self.ignore_file_not_found { + error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id()); + continue; + } else { + return Err(e); + } + } + }; if compat::has_same_columns(self.mapper.metadata(), reader.metadata()) { builder.push_batch_reader(Box::new(reader)); } else {