Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: region disk usage statistic #2665

Merged
merged 9 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,15 +280,23 @@ impl HeartbeatTask {

async fn load_region_stats(region_server: &RegionServer) -> Vec<RegionStat> {
let regions = region_server.opened_regions();
regions
.into_iter()
.map(|(region_id, engine)| RegionStat {

let mut region_stats = Vec::new();
for (region_id, engine) in regions {
let approximate_bytes = region_server
.region_disk_usage(region_id)
.await
.unwrap_or(0);
let region_stat = RegionStat {
region_id: region_id.as_u64(),
engine,
approximate_bytes,
// TODO(ruihang): scratch more info
..Default::default()
})
.collect::<Vec<_>>()
};
region_stats.push(region_stat);
}
region_stats
}

pub async fn close(&self) -> Result<()> {
Expand Down
7 changes: 7 additions & 0 deletions src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@ impl RegionServer {
self.inner.runtime.clone()
}

pub async fn region_disk_usage(&self, region_id: RegionId) -> Option<i64> {
match self.inner.region_map.get(&region_id) {
Some(e) => e.region_disk_usage(region_id).await,
None => None,
}
}

/// Stop the region server.
pub async fn stop(&self) -> Result<()> {
self.inner.stop().await
Expand Down
4 changes: 4 additions & 0 deletions src/datanode/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ impl RegionEngine for MockRegionEngine {
unimplemented!()
}

async fn region_disk_usage(&self, _region_id: RegionId) -> Option<i64> {
unimplemented!()
}

async fn stop(&self) -> Result<(), BoxedError> {
Ok(())
}
Expand Down
4 changes: 4 additions & 0 deletions src/file-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ impl RegionEngine for FileRegionEngine {
self.inner.stop().await.map_err(BoxedError::new)
}

async fn region_disk_usage(&self, _: RegionId) -> Option<i64> {
unimplemented!("not implemented for file engine yet")
QuenKar marked this conversation as resolved.
Show resolved Hide resolved
}

fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<(), BoxedError> {
self.inner
.set_writable(region_id, writable)
Expand Down
21 changes: 21 additions & 0 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use crate::config::MitoConfig;
use crate::error::{RecvSnafu, RegionNotFoundSnafu, Result};
use crate::metrics::{HANDLE_REQUEST_ELAPSED, TYPE_LABEL};
use crate::read::scan_region::{ScanRegion, Scanner};
use crate::region::RegionUsage;
use crate::request::WorkerRequest;
use crate::worker::WorkerGroup;

Expand Down Expand Up @@ -86,6 +87,17 @@ impl MitoEngine {
self.inner.workers.is_region_exists(region_id)
}

/// Returns the region disk/memory usage information.
pub async fn get_region_usage(&self, region_id: RegionId) -> Result<RegionUsage> {
let region = self
.inner
.workers
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })?;

Ok(region.region_usage().await)
}

/// Returns a scanner to scan for `request`.
fn scanner(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
self.inner.handle_query(region_id, request)
Expand Down Expand Up @@ -221,6 +233,15 @@ impl RegionEngine for MitoEngine {
self.inner.stop().await.map_err(BoxedError::new)
}

async fn region_disk_usage(&self, region_id: RegionId) -> Option<i64> {
let size = self
.get_region_usage(region_id)
.await
.map(|usage| usage.disk_usage())
.ok()?;
size.try_into().ok()
}

fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<(), BoxedError> {
self.inner
.set_writable(region_id, writable)
Expand Down
46 changes: 23 additions & 23 deletions src/mito2/src/engine/basic_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use super::*;
use crate::region::version::VersionControlData;
use crate::test_util::{
build_delete_rows_for_key, build_rows, build_rows_for_key, delete_rows, delete_rows_schema,
put_rows, rows_schema, CreateRequestBuilder, TestEnv,
flush_region, put_rows, rows_schema, CreateRequestBuilder, TestEnv,
};

#[tokio::test]
Expand Down Expand Up @@ -459,8 +459,8 @@ async fn test_absent_and_invalid_columns() {
}

#[tokio::test]
async fn test_estimated_wal_size() {
let mut env = TestEnv::with_prefix("estimate-region-wal-size");
async fn test_region_usage() {
let mut env = TestEnv::with_prefix("region_usage");
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
Expand All @@ -472,39 +472,39 @@ async fn test_estimated_wal_size() {
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// region is empty now, check manifest size
let region = engine.get_region(region_id).unwrap();
let region_stat = region.region_usage().await;
assert_eq!(region_stat.manifest_usage, 686);

// put some rows
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("a", 0, 10, 0),
};
put_rows(&engine, region_id, rows).await;

// check wal size
let region = engine.get_region(region_id).unwrap();
assert_eq!(region.estimated_wal_size(), 124);

let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("b", 0, 10, 0),
};
put_rows(&engine, region_id, rows).await;

// check wal size
assert_eq!(region.estimated_wal_size(), 249);
let region_stat = region.region_usage().await;
assert!(region_stat.wal_usage > 0);

// Delete (a, 0), (a, 1), (a, 2)
// delete some rows
let rows = Rows {
schema: delete_schema.clone(),
rows: build_delete_rows_for_key("a", 0, 3),
};
delete_rows(&engine, region_id, rows).await;
// Delete (b, 0), (b, 1)
let rows = Rows {
schema: delete_schema,
rows: build_delete_rows_for_key("b", 0, 2),
};
delete_rows(&engine, region_id, rows).await;

// check wal size
assert_eq!(region.estimated_wal_size(), 292);
let region_stat = region.region_usage().await;
assert!(region_stat.wal_usage > 0);

// flush region
flush_region(&engine, region_id, None).await;

let region_stat = region.region_usage().await;
assert!(region_stat.wal_usage == 0);
assert_eq!(region_stat.sst_usage, 2827);

// region total usage
assert_eq!(region_stat.disk_usage(), 3833);
}
8 changes: 4 additions & 4 deletions src/mito2/src/manifest/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl RegionManifestManager {
}

/// Returns total manifest size.
pub async fn manifest_size(&self) -> u64 {
pub async fn manifest_usage(&self) -> u64 {
let inner = self.inner.read().await;
inner.total_manifest_size()
}
Expand Down Expand Up @@ -617,7 +617,7 @@ mod test {
manager.validate_manifest(&new_metadata, 1).await;

// get manifest size
let manifest_size = manager.manifest_size().await;
let manifest_size = manager.manifest_usage().await;
assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);

// update 10 times nop_action to trigger checkpoint
Expand All @@ -637,7 +637,7 @@ mod test {
}

// check manifest size again
let manifest_size = manager.manifest_size().await;
let manifest_size = manager.manifest_usage().await;
assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);

// Reopen the manager,
Expand All @@ -651,7 +651,7 @@ mod test {
manager.validate_manifest(&new_metadata, 11).await;

// get manifest size again
let manifest_size = manager.manifest_size().await;
let manifest_size = manager.manifest_usage().await;
assert_eq!(manifest_size, 1312);
}
}
45 changes: 39 additions & 6 deletions src/mito2/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,21 @@ use crate::sst::file_purger::FilePurgerRef;
/// This is the approximate factor to estimate the size of wal.
const ESTIMATED_WAL_FACTOR: f32 = 0.42825;

/// Region status include region id, memtable usage, sst usage, wal usage and manifest usage.
#[derive(Debug)]
pub struct RegionUsage {
pub region_id: RegionId,
pub wal_usage: u64,
pub sst_usage: u64,
pub manifest_usage: u64,
}

impl RegionUsage {
pub fn disk_usage(&self) -> u64 {
self.wal_usage + self.sst_usage + self.manifest_usage
}
}

/// Metadata and runtime status of a region.
///
/// Writing and reading a region follow a single-writer-multi-reader rule:
Expand Down Expand Up @@ -113,14 +128,32 @@ impl MitoRegion {
self.writable.store(writable, Ordering::Relaxed);
}

/// Returns the region usage in bytes.
pub(crate) async fn region_usage(&self) -> RegionUsage {
let region_id = self.region_id;

let version = self.version();
let memtables = &version.memtables;
let memtable_usage = (memtables.mutable_usage() + memtables.immutables_usage()) as u64;

let sst_usage = version.ssts.sst_usage();

let wal_usage = self.estimated_wal_usage(memtable_usage);

let manifest_usage = self.manifest_manager.manifest_usage().await;

RegionUsage {
region_id,
wal_usage,
sst_usage,
manifest_usage,
}
}

/// Estimated WAL size in bytes.
/// Use the memtables size to estimate the size of wal.
// TODO(Quenkar): after impl region size, remove #[allow(dead_code)]
#[allow(dead_code)]
pub(crate) fn estimated_wal_size(&self) -> usize {
let memtables = &self.version().memtables;
let memtable_size = memtables.mutable_usage() + memtables.immutables_usage();
((memtable_size as f32) * ESTIMATED_WAL_FACTOR) as usize
fn estimated_wal_usage(&self, memtable_usage: u64) -> u64 {
((memtable_usage as f32) * ESTIMATED_WAL_FACTOR) as u64
}
}

Expand Down
14 changes: 14 additions & 0 deletions src/mito2/src/sst/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,20 @@ impl SstVersion {
}
}
}

/// Returns SST files'space occupied in current version.
pub(crate) fn sst_usage(&self) -> u64 {
self.levels
.iter()
.map(|level_meta| {
level_meta
.files
.values()
.map(|file_handle| file_handle.meta().file_size)
.sum::<u64>()
})
.sum()
}
}

// We only has fixed number of level, so we use array to hold elements. This implementation
Expand Down
3 changes: 3 additions & 0 deletions src/store-api/src/region_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ pub trait RegionEngine: Send + Sync {
/// Retrieves region's metadata.
async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError>;

/// Retrieves region's disk usage.
async fn region_disk_usage(&self, region_id: RegionId) -> Option<i64>;

/// Stops the engine
async fn stop(&self) -> Result<(), BoxedError>;

Expand Down
Loading