diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs new file mode 100644 index 000000000000..660cfd61e405 --- /dev/null +++ b/src/mito2/src/access_layer.rs @@ -0,0 +1,59 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use object_store::{util, ObjectStore}; +use snafu::ResultExt; + +use crate::error::{DeleteSstSnafu, Result}; +use crate::sst::file::FileId; + +pub type AccessLayerRef = Arc; + +/// Sst access layer. +pub struct AccessLayer { + sst_dir: String, + object_store: ObjectStore, +} + +impl std::fmt::Debug for AccessLayer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AccessLayer") + .field("sst_dir", &self.sst_dir) + .finish() + } +} + +impl AccessLayer { + pub fn new(sst_dir: &str, object_store: ObjectStore) -> AccessLayer { + AccessLayer { + sst_dir: sst_dir.to_string(), + object_store, + } + } + + fn sst_file_path(&self, file_name: &str) -> String { + util::join_path(&self.sst_dir, file_name) + } + + /// Deletes a SST file with given file id. + pub async fn delete_sst(&self, file_id: FileId) -> Result<()> { + let path = self.sst_file_path(&file_id.as_parquet()); + self.object_store + .delete(&path) + .await + .context(DeleteSstSnafu { file_id }) + } +} diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 7de2bd22bdbf..2b52e20ce225 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -26,6 +26,7 @@ use snafu::{Location, Snafu}; use store_api::manifest::ManifestVersion; use store_api::storage::RegionId; +use crate::sst::file::FileId; use crate::worker::WorkerId; #[derive(Debug, Snafu)] @@ -387,6 +388,13 @@ pub enum Error { source: table::error::Error, location: Location, }, + + #[snafu(display("Failed to delete SST file, file id: {}, source: {}", file_id, source))] + DeleteSst { + file_id: FileId, + source: object_store::Error, + location: Location, + }, } pub type Result = std::result::Result; @@ -448,6 +456,7 @@ impl ErrorExt for Error { InvalidSchedulerState { .. } => StatusCode::InvalidArguments, StopScheduler { .. } => StatusCode::Internal, BuildPredicate { source, .. } => source.status_code(), + DeleteSst { .. } => StatusCode::StorageUnavailable, } } diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index 7e82e56e1329..87322aa24887 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -20,6 +20,8 @@ pub mod test_util; // TODO(yingwen): Remove all `allow(dead_code)` after finish refactoring mito. +#[allow(dead_code)] +mod access_layer; pub mod config; #[allow(dead_code)] pub mod engine; diff --git a/src/mito2/src/schedule/scheduler.rs b/src/mito2/src/schedule/scheduler.rs index 832ed9a9c9d6..8ee896b5a8c0 100644 --- a/src/mito2/src/schedule/scheduler.rs +++ b/src/mito2/src/schedule/scheduler.rs @@ -38,7 +38,7 @@ const STATE_AWAIT_TERMINATION: u8 = 2; #[async_trait::async_trait] pub trait Scheduler { /// Schedules a Job - fn schedule(&self, req: Job) -> Result<()>; + fn schedule(&self, job: Job) -> Result<()>; /// Stops scheduler. If `await_termination` is set to true, the scheduler will wait until all tasks are processed. async fn stop(&self, await_termination: bool) -> Result<()>; @@ -77,8 +77,8 @@ impl LocalScheduler { break; } req_opt = receiver.recv_async() =>{ - if let Ok(req) = req_opt { - req.await; + if let Ok(job) = req_opt { + job.await; } } } @@ -86,8 +86,8 @@ impl LocalScheduler { // When task scheduler is cancelled, we will wait all task finished if state_clone.load(Ordering::Relaxed) == STATE_AWAIT_TERMINATION { // recv_async waits until all sender's been dropped. - while let Ok(req) = receiver.recv_async().await { - req.await; + while let Ok(job) = receiver.recv_async().await { + job.await; } state_clone.store(STATE_STOP, Ordering::Relaxed); } @@ -111,7 +111,7 @@ impl LocalScheduler { #[async_trait::async_trait] impl Scheduler for LocalScheduler { - fn schedule(&self, req: Job) -> Result<()> { + fn schedule(&self, job: Job) -> Result<()> { ensure!( self.state.load(Ordering::Relaxed) == STATE_RUNNING, InvalidSchedulerStateSnafu @@ -121,7 +121,7 @@ impl Scheduler for LocalScheduler { .unwrap() .as_ref() .context(InvalidSchedulerStateSnafu)? - .send(req) + .send(job) .map_err(|_| InvalidFlumeSenderSnafu {}.build()) } diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index e659812547ef..0fbd97874844 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -113,6 +113,11 @@ impl fmt::Debug for FileHandle { } impl FileHandle { + pub fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandle { + FileHandle { + inner: Arc::new(FileHandleInner::new(meta, file_purger)), + } + } /// Returns the file id. pub fn file_id(&self) -> FileId { self.inner.meta.file_id @@ -127,6 +132,12 @@ impl FileHandle { pub fn time_range(&self) -> FileTimeRange { self.inner.meta.time_range } + + /// Mark the file as deleted and will delete it on drop asynchronously + #[inline] + pub fn mark_deleted(&self) { + self.inner.deleted.store(true, Ordering::Relaxed); + } } /// Inner data of [FileHandle]. @@ -150,6 +161,17 @@ impl Drop for FileHandleInner { } } +impl FileHandleInner { + fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandleInner { + FileHandleInner { + meta, + compacting: AtomicBool::new(false), + deleted: AtomicBool::new(false), + file_purger, + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index 010fc3ffc255..9b289f80fd5f 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -14,8 +14,11 @@ use std::sync::Arc; +use common_telemetry::{error, info}; use store_api::storage::RegionId; +use crate::access_layer::AccessLayerRef; +use crate::schedule::scheduler::{LocalScheduler, Scheduler}; use crate::sst::file::FileId; /// Request to remove a file. @@ -35,11 +38,94 @@ pub trait FilePurger: Send + Sync { pub type FilePurgerRef = Arc; -// TODO(yingwen): Remove this once we implement the real purger. -/// A purger that does nothing. -#[derive(Debug)] -struct NoopPurger {} +pub struct LocalFilePurger { + scheduler: Arc, + + sst_layer: AccessLayerRef, +} + +impl LocalFilePurger { + pub fn new(scheduler: Arc, sst_layer: AccessLayerRef) -> Self { + Self { + scheduler, + sst_layer, + } + } +} + +impl FilePurger for LocalFilePurger { + fn send_request(&self, request: PurgeRequest) { + let file_id = request.file_id; + let region_id = request.region_id; + let sst_layer = self.sst_layer.clone(); + + if let Err(e) = self.scheduler.schedule(Box::pin(async move { + if let Err(e) = sst_layer.delete_sst(file_id).await { + error!(e; "Failed to delete SST file, file: {}, region: {}", + file_id.as_parquet(), region_id); + } else { + info!( + "Successfully deleted SST file: {}, region: {}", + file_id.as_parquet(), + region_id + ); + } + })) { + error!(e; "Failed to schedule the file purge request"); + } + } +} + +#[cfg(test)] +mod tests { + use common_test_util::temp_dir::create_temp_dir; + use object_store::services::Fs; + use object_store::{util, ObjectStore}; + + use super::*; + use crate::access_layer::AccessLayer; + use crate::schedule::scheduler::LocalScheduler; + use crate::sst::file::{FileHandle, FileId, FileMeta, FileTimeRange}; + + #[tokio::test] + async fn test_file_purge() { + common_telemetry::init_default_ut_logging(); + + let dir = create_temp_dir("file-purge"); + let mut builder = Fs::default(); + builder.root(dir.path().to_str().unwrap()); + let object_store = ObjectStore::new(builder).unwrap().finish(); + let sst_file_id = FileId::random(); + let sst_dir = "table1"; + let path = util::join_path(sst_dir, &sst_file_id.as_parquet()); + + object_store.write(&path, vec![0; 4096]).await.unwrap(); + + let scheduler = Arc::new(LocalScheduler::new(3)); + let layer = Arc::new(AccessLayer::new(sst_dir, object_store.clone())); + + let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer)); + + { + let handle = FileHandle::new( + FileMeta { + region_id: 0.into(), + file_id: sst_file_id, + time_range: FileTimeRange::default(), + level: 0, + file_size: 4096, + }, + file_purger, + ); + // mark file as deleted and drop the handle, we expect the file is deleted. + handle.mark_deleted(); + } + + scheduler.stop(true).await.unwrap(); -impl FilePurger for NoopPurger { - fn send_request(&self, _request: PurgeRequest) {} + assert!(!object_store + .is_exist(&format!("{}/{}", sst_dir, sst_file_id.as_parquet())) + .await + .unwrap()); + } }