Skip to content

Commit

Permalink
feat(mito2): add file purger and cooperate with scheduler to purge ss…
Browse files Browse the repository at this point in the history
…t files (GreptimeTeam#2251)

* feat: add file purger and use scheduler

Signed-off-by: ZhuZiyi <[email protected]>

* chore: code format

Signed-off-by: ZhuZiyi <[email protected]>

* chore: code format

Signed-off-by: ZhuZiyi <[email protected]>

* feat: print some information about handling error message

Signed-off-by: ZhuZiyi <[email protected]>

* fix: resolve conversion

Signed-off-by: ZhuZiyi <[email protected]>

* chore: code format

Signed-off-by: ZhuZiyi <[email protected]>

* chore: resolve conversation

Signed-off-by: ZhuZiyi <[email protected]>

* fix: resolve conflicting files

Signed-off-by: ZhuZiyi <[email protected]>

* chore: code format

Signed-off-by: ZhuZiyi <[email protected]>

* chore: code format

Signed-off-by: ZhuZiyi <[email protected]>

---------

Signed-off-by: ZhuZiyi <[email protected]>
  • Loading branch information
Nateiru authored and paomian committed Oct 19, 2023
1 parent 4f0c38a commit 596d63e
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 13 deletions.
59 changes: 59 additions & 0 deletions src/mito2/src/access_layer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use object_store::{util, ObjectStore};
use snafu::ResultExt;

use crate::error::{DeleteSstSnafu, Result};
use crate::sst::file::FileId;

pub type AccessLayerRef = Arc<AccessLayer>;

/// Sst access layer.
pub struct AccessLayer {
sst_dir: String,
object_store: ObjectStore,
}

impl std::fmt::Debug for AccessLayer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AccessLayer")
.field("sst_dir", &self.sst_dir)
.finish()
}
}

impl AccessLayer {
pub fn new(sst_dir: &str, object_store: ObjectStore) -> AccessLayer {
AccessLayer {
sst_dir: sst_dir.to_string(),
object_store,
}
}

fn sst_file_path(&self, file_name: &str) -> String {
util::join_path(&self.sst_dir, file_name)
}

/// Deletes a SST file with given file id.
pub async fn delete_sst(&self, file_id: FileId) -> Result<()> {
let path = self.sst_file_path(&file_id.as_parquet());
self.object_store
.delete(&path)
.await
.context(DeleteSstSnafu { file_id })
}
}
9 changes: 9 additions & 0 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use snafu::{Location, Snafu};
use store_api::manifest::ManifestVersion;
use store_api::storage::RegionId;

use crate::sst::file::FileId;
use crate::worker::WorkerId;

#[derive(Debug, Snafu)]
Expand Down Expand Up @@ -387,6 +388,13 @@ pub enum Error {
source: table::error::Error,
location: Location,
},

#[snafu(display("Failed to delete SST file, file id: {}, source: {}", file_id, source))]
DeleteSst {
file_id: FileId,
source: object_store::Error,
location: Location,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -448,6 +456,7 @@ impl ErrorExt for Error {
InvalidSchedulerState { .. } => StatusCode::InvalidArguments,
StopScheduler { .. } => StatusCode::Internal,
BuildPredicate { source, .. } => source.status_code(),
DeleteSst { .. } => StatusCode::StorageUnavailable,
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/mito2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
pub mod test_util;

// TODO(yingwen): Remove all `allow(dead_code)` after finish refactoring mito.
#[allow(dead_code)]
mod access_layer;
pub mod config;
#[allow(dead_code)]
pub mod engine;
Expand Down
14 changes: 7 additions & 7 deletions src/mito2/src/schedule/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const STATE_AWAIT_TERMINATION: u8 = 2;
#[async_trait::async_trait]
pub trait Scheduler {
/// Schedules a Job
fn schedule(&self, req: Job) -> Result<()>;
fn schedule(&self, job: Job) -> Result<()>;

/// Stops scheduler. If `await_termination` is set to true, the scheduler will wait until all tasks are processed.
async fn stop(&self, await_termination: bool) -> Result<()>;
Expand Down Expand Up @@ -77,17 +77,17 @@ impl LocalScheduler {
break;
}
req_opt = receiver.recv_async() =>{
if let Ok(req) = req_opt {
req.await;
if let Ok(job) = req_opt {
job.await;
}
}
}
}
// When task scheduler is cancelled, we will wait all task finished
if state_clone.load(Ordering::Relaxed) == STATE_AWAIT_TERMINATION {
// recv_async waits until all sender's been dropped.
while let Ok(req) = receiver.recv_async().await {
req.await;
while let Ok(job) = receiver.recv_async().await {
job.await;
}
state_clone.store(STATE_STOP, Ordering::Relaxed);
}
Expand All @@ -111,7 +111,7 @@ impl LocalScheduler {

#[async_trait::async_trait]
impl Scheduler for LocalScheduler {
fn schedule(&self, req: Job) -> Result<()> {
fn schedule(&self, job: Job) -> Result<()> {
ensure!(
self.state.load(Ordering::Relaxed) == STATE_RUNNING,
InvalidSchedulerStateSnafu
Expand All @@ -121,7 +121,7 @@ impl Scheduler for LocalScheduler {
.unwrap()
.as_ref()
.context(InvalidSchedulerStateSnafu)?
.send(req)
.send(job)
.map_err(|_| InvalidFlumeSenderSnafu {}.build())
}

Expand Down
22 changes: 22 additions & 0 deletions src/mito2/src/sst/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ impl fmt::Debug for FileHandle {
}

impl FileHandle {
pub fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandle {
FileHandle {
inner: Arc::new(FileHandleInner::new(meta, file_purger)),
}
}
/// Returns the file id.
pub fn file_id(&self) -> FileId {
self.inner.meta.file_id
Expand All @@ -127,6 +132,12 @@ impl FileHandle {
pub fn time_range(&self) -> FileTimeRange {
self.inner.meta.time_range
}

/// Mark the file as deleted and will delete it on drop asynchronously
#[inline]
pub fn mark_deleted(&self) {
self.inner.deleted.store(true, Ordering::Relaxed);
}
}

/// Inner data of [FileHandle].
Expand All @@ -150,6 +161,17 @@ impl Drop for FileHandleInner {
}
}

impl FileHandleInner {
fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandleInner {
FileHandleInner {
meta,
compacting: AtomicBool::new(false),
deleted: AtomicBool::new(false),
file_purger,
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
98 changes: 92 additions & 6 deletions src/mito2/src/sst/file_purger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@

use std::sync::Arc;

use common_telemetry::{error, info};
use store_api::storage::RegionId;

use crate::access_layer::AccessLayerRef;
use crate::schedule::scheduler::{LocalScheduler, Scheduler};
use crate::sst::file::FileId;

/// Request to remove a file.
Expand All @@ -35,11 +38,94 @@ pub trait FilePurger: Send + Sync {

pub type FilePurgerRef = Arc<dyn FilePurger>;

// TODO(yingwen): Remove this once we implement the real purger.
/// A purger that does nothing.
#[derive(Debug)]
struct NoopPurger {}
pub struct LocalFilePurger {
scheduler: Arc<LocalScheduler>,

sst_layer: AccessLayerRef,
}

impl LocalFilePurger {
pub fn new(scheduler: Arc<LocalScheduler>, sst_layer: AccessLayerRef) -> Self {
Self {
scheduler,
sst_layer,
}
}
}

impl FilePurger for LocalFilePurger {
fn send_request(&self, request: PurgeRequest) {
let file_id = request.file_id;
let region_id = request.region_id;
let sst_layer = self.sst_layer.clone();

if let Err(e) = self.scheduler.schedule(Box::pin(async move {
if let Err(e) = sst_layer.delete_sst(file_id).await {
error!(e; "Failed to delete SST file, file: {}, region: {}",
file_id.as_parquet(), region_id);
} else {
info!(
"Successfully deleted SST file: {}, region: {}",
file_id.as_parquet(),
region_id
);
}
})) {
error!(e; "Failed to schedule the file purge request");
}
}
}

#[cfg(test)]
mod tests {
use common_test_util::temp_dir::create_temp_dir;
use object_store::services::Fs;
use object_store::{util, ObjectStore};

use super::*;
use crate::access_layer::AccessLayer;
use crate::schedule::scheduler::LocalScheduler;
use crate::sst::file::{FileHandle, FileId, FileMeta, FileTimeRange};

#[tokio::test]
async fn test_file_purge() {
common_telemetry::init_default_ut_logging();

let dir = create_temp_dir("file-purge");
let mut builder = Fs::default();
builder.root(dir.path().to_str().unwrap());
let object_store = ObjectStore::new(builder).unwrap().finish();
let sst_file_id = FileId::random();
let sst_dir = "table1";
let path = util::join_path(sst_dir, &sst_file_id.as_parquet());

object_store.write(&path, vec![0; 4096]).await.unwrap();

let scheduler = Arc::new(LocalScheduler::new(3));
let layer = Arc::new(AccessLayer::new(sst_dir, object_store.clone()));

let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer));

{
let handle = FileHandle::new(
FileMeta {
region_id: 0.into(),
file_id: sst_file_id,
time_range: FileTimeRange::default(),
level: 0,
file_size: 4096,
},
file_purger,
);
// mark file as deleted and drop the handle, we expect the file is deleted.
handle.mark_deleted();
}

scheduler.stop(true).await.unwrap();

impl FilePurger for NoopPurger {
fn send_request(&self, _request: PurgeRequest) {}
assert!(!object_store
.is_exist(&format!("{}/{}", sst_dir, sst_file_id.as_parquet()))
.await
.unwrap());
}
}

0 comments on commit 596d63e

Please sign in to comment.