Skip to content

Commit

Permalink
feat(mito): Flush framework for mito2 (GreptimeTeam#2262)
Browse files Browse the repository at this point in the history
* feat: write buffer manager

* feat: skeleton

* feat: add flush logic to write path

* feat: add methods to memtable trait

* feat: freeze memtable

* feat: define flush task

* feat: schedule_flush wip

* feat: adding pending requests/tasks

* feat: separate ddl request and background request

* feat: Remove RegionTask and RequestBody

* feat: handle flush related requests

* feat: make tests pass

* style: fix clippy

* docs: update comment

* refactor: rename background requests

* feat: replace Option<RegionWriteCtx> with an enum MaybeStalling
  • Loading branch information
evenyag authored and paomian committed Oct 19, 2023
1 parent 03e594e commit 4f0c38a
Show file tree
Hide file tree
Showing 14 changed files with 731 additions and 126 deletions.
23 changes: 16 additions & 7 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ use store_api::storage::{RegionId, ScanRequest};

use crate::config::MitoConfig;
use crate::error::{RecvSnafu, RegionNotFoundSnafu, Result};
use crate::flush::WriteBufferManagerImpl;
use crate::read::scan_region::{ScanRegion, Scanner};
use crate::request::{RegionTask, RequestBody};
use crate::request::WorkerRequest;
use crate::worker::WorkerGroup;

/// Region engine implementation for timeseries data.
Expand Down Expand Up @@ -105,8 +106,15 @@ impl EngineInner {
log_store: Arc<S>,
object_store: ObjectStore,
) -> EngineInner {
let write_buffer_manager = Arc::new(WriteBufferManagerImpl {});

EngineInner {
workers: WorkerGroup::start(config, log_store, object_store.clone()),
workers: WorkerGroup::start(
config,
log_store,
object_store.clone(),
write_buffer_manager,
),
object_store,
}
}
Expand All @@ -116,6 +124,9 @@ impl EngineInner {
self.workers.stop().await
}

/// Get metadata of a region.
///
/// Returns error if the region doesn't exist.
fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef> {
// Reading a region doesn't need to go through the region worker thread.
let region = self
Expand All @@ -125,12 +136,10 @@ impl EngineInner {
Ok(region.metadata())
}

/// Handles [RequestBody] and return its executed result.
/// Handles [RegionRequest] and return its executed result.
async fn handle_request(&self, region_id: RegionId, request: RegionRequest) -> Result<Output> {
// We validate and then convert the `request` into an inner `RequestBody` for ease of handling.
let body = RequestBody::try_from_region_request(region_id, request)?;
let (request, receiver) = RegionTask::from_request(region_id, body);
self.workers.submit_to_worker(request).await?;
let (request, receiver) = WorkerRequest::try_from_region_request(region_id, request)?;
self.workers.submit_to_worker(region_id, request).await?;

receiver.await.context(RecvSnafu)?
}
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ pub enum Error {
},
}

pub type Result<T> = std::result::Result<T, Error>;
pub type Result<T, E = Error> = std::result::Result<T, E>;

impl Error {
/// Returns true if we need to fill default value for a region.
Expand Down
255 changes: 255 additions & 0 deletions src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Flush related utilities and structs.
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;

use store_api::storage::RegionId;
use tokio::sync::oneshot::Sender;

use crate::error::Result;
use crate::region::MitoRegionRef;
use crate::request::{SenderDdlRequest, SenderWriteRequest};

const FLUSH_JOB_LIMIT: usize = 4;

/// Global write buffer (memtable) manager.
///
/// Tracks write buffer (memtable) usages and decide whether the engine needs to flush.
pub trait WriteBufferManager: Send + Sync + std::fmt::Debug {
/// Returns whether to trigger the engine.
fn should_flush_engine(&self) -> bool;

/// Returns whether the mutable memtable of this region needs to flush.
fn should_flush_region(&self, stats: RegionMemtableStats) -> bool;

/// Reserves `mem` bytes.
fn reserve_mem(&self, mem: usize);

/// Tells the manager we are freeing `mem` bytes.
///
/// We are in the process of freeing `mem` bytes, so it is not considered
/// when checking the soft limit.
fn schedule_free_mem(&self, mem: usize);

/// We have freed `mem` bytes.
fn free_mem(&self, mem: usize);

/// Returns the total memory used by memtables.
fn memory_usage(&self) -> usize;
}

pub type WriteBufferManagerRef = Arc<dyn WriteBufferManager>;

/// Statistics of a region's memtable.
#[derive(Debug)]
pub struct RegionMemtableStats {
/// Size of the mutable memtable.
pub bytes_mutable: usize,
/// Write buffer size of the region.
pub write_buffer_size: usize,
}

// TODO(yingwen): Implements the manager.
#[derive(Debug)]
pub struct WriteBufferManagerImpl {}

impl WriteBufferManager for WriteBufferManagerImpl {
fn should_flush_engine(&self) -> bool {
false
}

fn should_flush_region(&self, _stats: RegionMemtableStats) -> bool {
false
}

fn reserve_mem(&self, _mem: usize) {}

fn schedule_free_mem(&self, _mem: usize) {}

fn free_mem(&self, _mem: usize) {}

fn memory_usage(&self) -> usize {
0
}
}

/// Reason of a flush task.
pub enum FlushReason {
/// Other reasons.
Others,
/// Memtable is full.
MemtableFull,
/// Engine reaches flush threshold.
EngineFull,
// TODO(yingwen): Alter, manually.
}

/// Task to flush a region.
pub(crate) struct RegionFlushTask {
/// Region to flush.
pub(crate) region_id: RegionId,
/// Reason to flush.
pub(crate) reason: FlushReason,
/// Flush result sender.
pub(crate) sender: Option<Sender<Result<()>>>,
}

impl RegionFlushTask {
/// Consumes the task and notify the sender the job is success.
fn on_success(self) {
if let Some(sender) = self.sender {
let _ = sender.send(Ok(()));
}
}
}

/// Manages background flushes of a worker.
pub(crate) struct FlushScheduler {
/// Pending flush tasks.
queue: VecDeque<RegionFlushTask>,
region_status: HashMap<RegionId, FlushStatus>,
/// Number of running flush jobs.
num_flush_running: usize,
/// Max number of background flush jobs.
job_limit: usize,
}

impl Default for FlushScheduler {
fn default() -> Self {
FlushScheduler {
queue: VecDeque::new(),
region_status: HashMap::new(),
num_flush_running: 0,
job_limit: FLUSH_JOB_LIMIT,
}
}
}

impl FlushScheduler {
/// Returns true if the region is stalling.
pub(crate) fn is_stalling(&self, region_id: RegionId) -> bool {
if let Some(status) = self.region_status.get(&region_id) {
return status.stalling;
}

false
}

/// Schedules a flush `task` for specific `region`.
pub(crate) fn schedule_flush(&mut self, region: &MitoRegionRef, task: RegionFlushTask) {
debug_assert_eq!(region.region_id, task.region_id);

let version = region.version_control.current().version;
if version.memtables.mutable.is_empty() && version.memtables.immutable.is_none() {
debug_assert!(!self.region_status.contains_key(&region.region_id));
// The region has nothing to flush.
task.on_success();
return;
}

// Add this region to status map.
let flush_status = self
.region_status
.entry(region.region_id)
.or_insert_with(|| FlushStatus::new(region.clone()));
// Checks whether we can flush the region now.
if flush_status.flushing_task.is_some() {
// There is already a flush job running.
flush_status.stalling = true;
self.queue.push_back(task);
return;
}

// Checks flush job limit.
debug_assert!(self.num_flush_running <= self.job_limit);
if !self.queue.is_empty() || self.num_flush_running >= self.job_limit {
debug_assert!(self.num_flush_running == self.job_limit);
// We reach job limit.
self.queue.push_back(task);
return;
}

// TODO(yingwen): Submit the flush job to job scheduler.

todo!()
}

/// Add write `request` to pending queue.
///
/// Returns error if region is not stalling.
pub(crate) fn add_write_request_to_pending(
&mut self,
request: SenderWriteRequest,
) -> Result<(), SenderWriteRequest> {
if let Some(status) = self.region_status.get_mut(&request.request.region_id) {
if status.stalling {
status.pending_writes.push(request);
return Ok(());
}
}

Err(request)
}

/// Add ddl request to pending queue.
///
/// Returns error if region is not stalling.
pub(crate) fn add_ddl_request_to_pending(
&mut self,
request: SenderDdlRequest,
) -> Result<(), SenderDdlRequest> {
if let Some(status) = self.region_status.get_mut(&request.region_id) {
if status.stalling {
status.pending_ddls.push(request);
return Ok(());
}
}

Err(request)
}
}

/// Flush status of a region scheduled by the [FlushScheduler].
///
/// Tracks running and pending flusht tasks and all pending requests of a region.
struct FlushStatus {
/// Current region.
region: MitoRegionRef,
/// Current running flush task.
flushing_task: Option<RegionFlushTask>,
/// The number of flush requests waiting in queue.
num_queueing: usize,
/// The region is stalling.
stalling: bool,
/// Pending write requests.
pending_writes: Vec<SenderWriteRequest>,
/// Pending ddl requests.
pending_ddls: Vec<SenderDdlRequest>,
}

impl FlushStatus {
fn new(region: MitoRegionRef) -> FlushStatus {
FlushStatus {
region,
flushing_task: None,
num_queueing: 0,
stalling: false,
pending_writes: Vec::new(),
pending_ddls: Vec::new(),
}
}
}
2 changes: 2 additions & 0 deletions src/mito2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ pub mod config;
pub mod engine;
pub mod error;
#[allow(dead_code)]
mod flush;
#[allow(dead_code)]
#[allow(unused_variables)]
pub mod manifest;
#[allow(dead_code)]
Expand Down
13 changes: 13 additions & 0 deletions src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,14 @@ pub trait Memtable: Send + Sync + fmt::Debug {
/// Write key values into the memtable.
fn write(&self, kvs: &KeyValues) -> Result<()>;

/// Scans the memtable for `req`.
fn iter(&self, req: ScanRequest) -> BoxedBatchIterator;

/// Returns true if the memtable is empty.
fn is_empty(&self) -> bool;

/// Mark the memtable as immutable.
fn mark_immutable(&self);
}

pub type MemtableRef = Arc<dyn Memtable>;
Expand Down Expand Up @@ -85,6 +92,12 @@ impl Memtable for EmptyMemtable {
fn iter(&self, _req: ScanRequest) -> BoxedBatchIterator {
Box::new(std::iter::empty())
}

fn is_empty(&self) -> bool {
true
}

fn mark_immutable(&self) {}
}

/// Default memtable builder.
Expand Down
8 changes: 8 additions & 0 deletions src/mito2/src/memtable/time_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@ impl Memtable for TimeSeriesMemtable {

Box::new(self.series_set.iter_series(projection))
}

fn is_empty(&self) -> bool {
self.series_set.series.read().unwrap().is_empty()
}

fn mark_immutable(&self) {
// TODO(yingwen): AllocTracker.done_allocating()
}
}

type SeriesRwLockMap = RwLock<BTreeMap<Vec<u8>, Arc<RwLock<Series>>>>;
Expand Down
Loading

0 comments on commit 4f0c38a

Please sign in to comment.