Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mito): Integrate access layer and file purger to region #2296

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 41 additions & 9 deletions src/mito2/src/access_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,44 +16,76 @@ use std::sync::Arc;

use object_store::{util, ObjectStore};
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;

use crate::error::{DeleteSstSnafu, Result};
use crate::sst::file::FileId;
use crate::read::Source;
use crate::sst::file::{FileHandle, FileId};
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::writer::ParquetWriter;

pub type AccessLayerRef = Arc<AccessLayer>;

/// Sst access layer.
/// A layer to access SST files under the same directory.
pub struct AccessLayer {
sst_dir: String,
region_dir: String,
object_store: ObjectStore,
}

impl std::fmt::Debug for AccessLayer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AccessLayer")
.field("sst_dir", &self.sst_dir)
.field("region_dir", &self.region_dir)
.finish()
}
}

impl AccessLayer {
pub fn new(sst_dir: &str, object_store: ObjectStore) -> AccessLayer {
/// Returns a new [AccessLayer] for specific `region_dir`.
pub fn new(region_dir: impl Into<String>, object_store: ObjectStore) -> AccessLayer {
AccessLayer {
sst_dir: sst_dir.to_string(),
region_dir: region_dir.into(),
object_store,
}
}

fn sst_file_path(&self, file_name: &str) -> String {
util::join_path(&self.sst_dir, file_name)
/// Returns the directory of the region.
pub fn region_dir(&self) -> &str {
&self.region_dir
}

/// Returns the object store of the layer.
pub fn object_store(&self) -> &ObjectStore {
&self.object_store
}

/// Deletes a SST file with given file id.
pub async fn delete_sst(&self, file_id: FileId) -> Result<()> {
pub(crate) async fn delete_sst(&self, file_id: FileId) -> Result<()> {
let path = self.sst_file_path(&file_id.as_parquet());
self.object_store
.delete(&path)
.await
.context(DeleteSstSnafu { file_id })
}

/// Returns a reader builder for specific `file`.
pub(crate) fn read_sst(&self, file: FileHandle) -> ParquetReaderBuilder {
ParquetReaderBuilder::new(self.region_dir.clone(), file, self.object_store.clone())
}

/// Returns a new parquet writer to write the SST for specific `file_id`.
pub(crate) fn write_sst(
&self,
file_id: FileId,
metadata: RegionMetadataRef,
source: Source,
) -> ParquetWriter {
let path = self.sst_file_path(&file_id.as_parquet());
ParquetWriter::new(path, metadata, source, self.object_store.clone())
}

/// Returns the `file_path` for the `file_name` in the object store.
fn sst_file_path(&self, file_name: &str) -> String {
util::join_path(&self.region_dir, file_name)
}
}
12 changes: 12 additions & 0 deletions src/mito2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use common_telemetry::warn;

/// Default region worker num.
const DEFAULT_NUM_WORKERS: usize = 1;
/// Default max running background job.
const DEFAULT_MAX_BG_JOB: usize = 4;
/// Default region write buffer size.
pub(crate) const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(32);

Expand All @@ -40,6 +42,10 @@ pub struct MitoConfig {
pub manifest_checkpoint_distance: u64,
/// Manifest compression type (default uncompressed).
pub manifest_compress_type: CompressionType,

// Background job configs:
/// Max number of running background jobs.
pub max_background_jobs: usize,
}

impl Default for MitoConfig {
Expand All @@ -50,6 +56,7 @@ impl Default for MitoConfig {
worker_request_batch_size: 64,
manifest_checkpoint_distance: 10,
manifest_compress_type: CompressionType::Uncompressed,
max_background_jobs: DEFAULT_MAX_BG_JOB,
}
}
}
Expand All @@ -75,5 +82,10 @@ impl MitoConfig {
warn!("Sanitize channel size 0 to 1");
self.worker_channel_size = 1;
}

if self.max_background_jobs == 0 {
warn!("Sanitize max background jobs 0 to {}", DEFAULT_MAX_BG_JOB);
self.max_background_jobs = DEFAULT_MAX_BG_JOB;
}
}
}
17 changes: 2 additions & 15 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use store_api::storage::{RegionId, ScanRequest};

use crate::config::MitoConfig;
use crate::error::{RecvSnafu, RegionNotFoundSnafu, Result};
use crate::flush::WriteBufferManagerImpl;
use crate::read::scan_region::{ScanRegion, Scanner};
use crate::request::WorkerRequest;
use crate::worker::WorkerGroup;
Expand Down Expand Up @@ -106,15 +105,8 @@ impl EngineInner {
log_store: Arc<S>,
object_store: ObjectStore,
) -> EngineInner {
let write_buffer_manager = Arc::new(WriteBufferManagerImpl {});

EngineInner {
workers: WorkerGroup::start(
config,
log_store,
object_store.clone(),
write_buffer_manager,
),
workers: WorkerGroup::start(config, log_store, object_store.clone()),
object_store,
}
}
Expand Down Expand Up @@ -152,12 +144,7 @@ impl EngineInner {
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })?;
let version = region.version();
let scan_region = ScanRegion::new(
version,
region.region_dir.clone(),
self.object_store.clone(),
request,
);
let scan_region = ScanRegion::new(version, region.access_layer.clone(), request);

scan_region.scanner()
}
Expand Down
37 changes: 9 additions & 28 deletions src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

//! Flush related utilities and structs.
use std::collections::{HashMap, VecDeque};
use std::collections::HashMap;
use std::sync::Arc;

use store_api::storage::RegionId;
Expand All @@ -23,8 +23,7 @@ use tokio::sync::oneshot::Sender;
use crate::error::Result;
use crate::region::MitoRegionRef;
use crate::request::{SenderDdlRequest, SenderWriteRequest};

const FLUSH_JOB_LIMIT: usize = 4;
use crate::schedule::scheduler::SchedulerRef;

/// Global write buffer (memtable) manager.
///
Expand Down Expand Up @@ -119,27 +118,21 @@ impl RegionFlushTask {

/// Manages background flushes of a worker.
pub(crate) struct FlushScheduler {
/// Pending flush tasks.
queue: VecDeque<RegionFlushTask>,
/// Tracks regions need to flush.
region_status: HashMap<RegionId, FlushStatus>,
/// Number of running flush jobs.
num_flush_running: usize,
/// Max number of background flush jobs.
job_limit: usize,
/// Background job scheduler.
scheduler: SchedulerRef,
}

impl Default for FlushScheduler {
fn default() -> Self {
impl FlushScheduler {
/// Creates a new flush scheduler.
pub(crate) fn new(scheduler: SchedulerRef) -> FlushScheduler {
FlushScheduler {
queue: VecDeque::new(),
region_status: HashMap::new(),
num_flush_running: 0,
job_limit: FLUSH_JOB_LIMIT,
scheduler,
}
}
}

impl FlushScheduler {
/// Returns true if the region is stalling.
pub(crate) fn is_stalling(&self, region_id: RegionId) -> bool {
if let Some(status) = self.region_status.get(&region_id) {
Expand Down Expand Up @@ -170,21 +163,9 @@ impl FlushScheduler {
if flush_status.flushing_task.is_some() {
// There is already a flush job running.
flush_status.stalling = true;
self.queue.push_back(task);
return;
}

// Checks flush job limit.
debug_assert!(self.num_flush_running <= self.job_limit);
if !self.queue.is_empty() || self.num_flush_running >= self.job_limit {
debug_assert!(self.num_flush_running == self.job_limit);
// We reach job limit.
self.queue.push_back(task);
return;
}

// TODO(yingwen): Submit the flush job to job scheduler.

todo!()
}

Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub mod request;
#[allow(dead_code)]
mod row_converter;
#[allow(dead_code)]
mod schedule;
pub(crate) mod schedule;
#[allow(dead_code)]
pub mod sst;
pub mod wal;
Expand Down
16 changes: 6 additions & 10 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::debug;
use common_time::range::TimestampRange;
use object_store::ObjectStore;
use snafu::ResultExt;
use store_api::storage::ScanRequest;
use table::predicate::{Predicate, TimeRangePredicateBuilder};

use crate::access_layer::AccessLayerRef;
use crate::error::{BuildPredicateSnafu, Result};
use crate::read::projection::ProjectionMapper;
use crate::read::seq_scan::SeqScan;
Expand Down Expand Up @@ -85,10 +85,8 @@ impl Scanner {
pub(crate) struct ScanRegion {
/// Version of the region at scan.
version: VersionRef,
/// Directory of SST files.
file_dir: String,
/// Object store that stores SST files.
object_store: ObjectStore,
/// Access layer of the region.
access_layer: AccessLayerRef,
/// Scan request.
request: ScanRequest,
}
Expand All @@ -97,14 +95,12 @@ impl ScanRegion {
/// Creates a [ScanRegion].
pub(crate) fn new(
version: VersionRef,
file_dir: String,
object_store: ObjectStore,
access_layer: AccessLayerRef,
request: ScanRequest,
) -> ScanRegion {
ScanRegion {
version,
file_dir,
object_store,
access_layer,
request,
}
}
Expand Down Expand Up @@ -152,7 +148,7 @@ impl ScanRegion {
None => ProjectionMapper::all(&self.version.metadata)?,
};

let seq_scan = SeqScan::new(self.file_dir, self.object_store, mapper, self.request)
let seq_scan = SeqScan::new(self.access_layer.clone(), mapper, self.request)
.with_time_range(Some(time_range))
.with_predicate(Some(predicate))
.with_memtables(memtables)
Expand Down
33 changes: 13 additions & 20 deletions src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,24 @@ use common_error::ext::BoxedError;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatchStreamAdaptor, SendableRecordBatchStream};
use common_time::range::TimestampRange;
use object_store::ObjectStore;
use snafu::ResultExt;
use store_api::storage::ScanRequest;
use table::predicate::Predicate;

use crate::access_layer::AccessLayerRef;
use crate::error::Result;
use crate::memtable::MemtableRef;
use crate::read::merge::MergeReaderBuilder;
use crate::read::projection::ProjectionMapper;
use crate::read::BatchReader;
use crate::sst::file::FileHandle;
use crate::sst::parquet::reader::ParquetReaderBuilder;

/// Scans a region and returns rows in a sorted sequence.
///
/// The output order is always `order by primary key, time index`.
pub struct SeqScan {
/// Directory of SST files.
file_dir: String,
/// Object store that stores SST files.
object_store: ObjectStore,
/// Region SST access layer.
access_layer: AccessLayerRef,
/// Maps projected Batches to RecordBatches.
mapper: Arc<ProjectionMapper>,
/// Original scan request to scan memtable.
Expand All @@ -62,14 +59,12 @@ impl SeqScan {
/// Creates a new [SeqScan].
#[must_use]
pub(crate) fn new(
file_dir: String,
object_store: ObjectStore,
access_layer: AccessLayerRef,
mapper: ProjectionMapper,
request: ScanRequest,
) -> SeqScan {
SeqScan {
file_dir,
object_store,
access_layer,
mapper: Arc::new(mapper),
time_range: None,
predicate: None,
Expand Down Expand Up @@ -116,16 +111,14 @@ impl SeqScan {
builder.push_batch_iter(iter);
}
for file in &self.files {
let reader = ParquetReaderBuilder::new(
self.file_dir.clone(),
file.clone(),
self.object_store.clone(),
)
.predicate(self.predicate.clone())
.time_range(self.time_range)
.projection(Some(self.mapper.column_ids().to_vec()))
.build()
.await?;
let reader = self
.access_layer
.read_sst(file.clone())
.predicate(self.predicate.clone())
.time_range(self.time_range)
.projection(Some(self.mapper.column_ids().to_vec()))
.build()
.await?;
builder.push_batch_reader(Box::new(reader));
}
let mut reader = builder.build().await?;
Expand Down
Loading