Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid acquiring lock during reading stats #4070

Merged
merged 3 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ impl RegionServer {

pub async fn region_disk_usage(&self, region_id: RegionId) -> Option<i64> {
match self.inner.region_map.get(&region_id) {
Some(e) => e.region_disk_usage(region_id).await,
Some(e) => e.region_disk_usage(region_id),
None => None,
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ impl RegionEngine for MockRegionEngine {
unimplemented!()
}

async fn region_disk_usage(&self, _region_id: RegionId) -> Option<i64> {
fn region_disk_usage(&self, _region_id: RegionId) -> Option<i64> {
unimplemented!()
}

Expand Down
2 changes: 1 addition & 1 deletion src/file-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl RegionEngine for FileRegionEngine {
self.inner.stop().await.map_err(BoxedError::new)
}

async fn region_disk_usage(&self, _: RegionId) -> Option<i64> {
fn region_disk_usage(&self, _: RegionId) -> Option<i64> {
None
}

Expand Down
16 changes: 4 additions & 12 deletions src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,9 @@ impl RegionEngine for MetricEngine {
/// Retrieves region's disk usage.
///
/// Note: Returns `None` if it's a logical region.
async fn region_disk_usage(&self, region_id: RegionId) -> Option<i64> {
fn region_disk_usage(&self, region_id: RegionId) -> Option<i64> {
if self.inner.is_physical_region(region_id) {
self.inner.mito.region_disk_usage(region_id).await
self.inner.mito.region_disk_usage(region_id)
} else {
None
}
Expand Down Expand Up @@ -383,15 +383,7 @@ mod test {
let logical_region_id = env.default_logical_region_id();
let physical_region_id = env.default_physical_region_id();

assert!(env
.metric()
.region_disk_usage(logical_region_id)
.await
.is_none());
assert!(env
.metric()
.region_disk_usage(physical_region_id)
.await
.is_some());
assert!(env.metric().region_disk_usage(logical_region_id).is_none());
assert!(env.metric().region_disk_usage(physical_region_id).is_some());
}
}
7 changes: 3 additions & 4 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,14 @@ impl MitoEngine {
}

/// Returns the region disk/memory usage information.
pub async fn get_region_usage(&self, region_id: RegionId) -> Result<RegionUsage> {
pub fn get_region_usage(&self, region_id: RegionId) -> Result<RegionUsage> {
let region = self
.inner
.workers
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })?;

Ok(region.region_usage().await)
Ok(region.region_usage())
}

/// Handle substrait query and return a stream of record batches
Expand Down Expand Up @@ -368,10 +368,9 @@ impl RegionEngine for MitoEngine {
self.inner.stop().await.map_err(BoxedError::new)
}

async fn region_disk_usage(&self, region_id: RegionId) -> Option<i64> {
fn region_disk_usage(&self, region_id: RegionId) -> Option<i64> {
let size = self
.get_region_usage(region_id)
.await
.map(|usage| usage.disk_usage())
.ok()?;
size.try_into().ok()
Expand Down
8 changes: 4 additions & 4 deletions src/mito2/src/engine/basic_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ async fn test_region_usage() {
.unwrap();
// region is empty now, check manifest size
let region = engine.get_region(region_id).unwrap();
let region_stat = region.region_usage().await;
let region_stat = region.region_usage();
assert_eq!(region_stat.manifest_usage, 686);

// put some rows
Expand All @@ -535,7 +535,7 @@ async fn test_region_usage() {

put_rows(&engine, region_id, rows).await;

let region_stat = region.region_usage().await;
let region_stat = region.region_usage();
assert!(region_stat.wal_usage > 0);

// delete some rows
Expand All @@ -545,13 +545,13 @@ async fn test_region_usage() {
};
delete_rows(&engine, region_id, rows).await;

let region_stat = region.region_usage().await;
let region_stat = region.region_usage();
assert!(region_stat.wal_usage > 0);

// flush region
flush_region(&engine, region_id, None).await;

let region_stat = region.region_usage().await;
let region_stat = region.region_usage();
assert_eq!(region_stat.sst_usage, 3010);

// region total usage
Expand Down
14 changes: 12 additions & 2 deletions src/mito2/src/manifest/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::atomic::AtomicU64;
use std::sync::Arc;

use common_datasource::compression::CompressionType;
Expand Down Expand Up @@ -121,12 +122,17 @@ pub struct RegionManifestManager {

impl RegionManifestManager {
/// Constructs a region's manifest and persist it.
pub async fn new(metadata: RegionMetadataRef, options: RegionManifestOptions) -> Result<Self> {
pub async fn new(
metadata: RegionMetadataRef,
options: RegionManifestOptions,
total_manifest_size: Arc<AtomicU64>,
) -> Result<Self> {
// construct storage
let mut store = ManifestObjectStore::new(
&options.manifest_dir,
options.object_store.clone(),
options.compress_type,
total_manifest_size,
);

info!(
Expand Down Expand Up @@ -168,7 +174,10 @@ impl RegionManifestManager {
/// Opens an existing manifest.
///
/// Returns `Ok(None)` if no such manifest.
pub async fn open(options: RegionManifestOptions) -> Result<Option<Self>> {
pub async fn open(
options: RegionManifestOptions,
total_manifest_size: Arc<AtomicU64>,
) -> Result<Option<Self>> {
let _t = MANIFEST_OP_ELAPSED
.with_label_values(&["open"])
.start_timer();
Expand All @@ -178,6 +187,7 @@ impl RegionManifestManager {
&options.manifest_dir,
options.object_store.clone(),
options.compress_type,
total_manifest_size,
);

// recover from storage
Expand Down
39 changes: 34 additions & 5 deletions src/mito2/src/manifest/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
use std::collections::HashMap;
use std::iter::Iterator;
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

use common_datasource::compression::CompressionType;
use common_telemetry::debug;
Expand Down Expand Up @@ -133,15 +135,22 @@ pub struct ManifestObjectStore {
path: String,
/// Stores the size of each manifest file.
manifest_size_map: HashMap<FileKey, u64>,
total_manifest_size: Arc<AtomicU64>,
}

impl ManifestObjectStore {
pub fn new(path: &str, object_store: ObjectStore, compress_type: CompressionType) -> Self {
pub fn new(
path: &str,
object_store: ObjectStore,
compress_type: CompressionType,
total_manifest_size: Arc<AtomicU64>,
) -> Self {
Self {
object_store,
compress_type,
path: util::normalize_dir(path),
manifest_size_map: HashMap::new(),
total_manifest_size,
}
}

Expand Down Expand Up @@ -338,10 +347,9 @@ impl ManifestObjectStore {
// delete manifest sizes
for (_, is_checkpoint, version) in &del_entries {
if *is_checkpoint {
self.manifest_size_map
.remove(&FileKey::Checkpoint(*version));
self.unset_file_size(&FileKey::Checkpoint(*version));
} else {
self.manifest_size_map.remove(&FileKey::Delta(*version));
self.unset_file_size(&FileKey::Delta(*version));
}
}

Expand Down Expand Up @@ -564,12 +572,28 @@ impl ManifestObjectStore {
/// Set the size of the delta file by delta version.
pub(crate) fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) {
self.manifest_size_map.insert(FileKey::Delta(version), size);
self.inc_total_manifest_size(size);
}

/// Set the size of the checkpoint file by checkpoint version.
pub(crate) fn set_checkpoint_file_size(&mut self, version: ManifestVersion, size: u64) {
self.manifest_size_map
.insert(FileKey::Checkpoint(version), size);
self.inc_total_manifest_size(size);
}

fn unset_file_size(&mut self, key: &FileKey) {
if let Some(val) = self.manifest_size_map.remove(key) {
self.dec_total_manifest_size(val);
}
}

fn inc_total_manifest_size(&self, val: u64) {
self.total_manifest_size.fetch_add(val, Ordering::Relaxed);
}

fn dec_total_manifest_size(&self, val: u64) {
self.total_manifest_size.fetch_sub(val, Ordering::Relaxed);
}
}

Expand Down Expand Up @@ -610,7 +634,12 @@ mod tests {
let mut builder = Fs::default();
let _ = builder.root(&tmp_dir.path().to_string_lossy());
let object_store = ObjectStore::new(builder).unwrap().finish();
ManifestObjectStore::new("/", object_store, CompressionType::Uncompressed)
ManifestObjectStore::new(
"/",
object_store,
CompressionType::Uncompressed,
Default::default(),
)
}

fn new_checkpoint_metadata_with_version(version: ManifestVersion) -> CheckpointMetadata {
Expand Down
26 changes: 17 additions & 9 deletions src/mito2/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub(crate) mod version;

use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::sync::{Arc, RwLock};

use common_telemetry::{error, info, warn};
Expand Down Expand Up @@ -106,6 +106,8 @@ pub(crate) struct MitoRegion {
time_provider: TimeProviderRef,
/// Memtable builder for the region.
pub(crate) memtable_builder: MemtableBuilderRef,
/// manifest stats
stats: ManifestStats,
}

pub(crate) type MitoRegionRef = Arc<MitoRegion>;
Expand Down Expand Up @@ -233,7 +235,7 @@ impl MitoRegion {
}

/// Returns the region usage in bytes.
pub(crate) async fn region_usage(&self) -> RegionUsage {
pub(crate) fn region_usage(&self) -> RegionUsage {
let region_id = self.region_id;

let version = self.version();
Expand All @@ -243,13 +245,7 @@ impl MitoRegion {
let sst_usage = version.ssts.sst_usage();

let wal_usage = self.estimated_wal_usage(memtable_usage);

let manifest_usage = self
.manifest_ctx
.manifest_manager
.read()
.await
.manifest_usage();
let manifest_usage = self.stats.total_manifest_size();

RegionUsage {
region_id,
Expand Down Expand Up @@ -526,6 +522,18 @@ impl OpeningRegions {

pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;

/// Manifest stats.
#[derive(Default, Debug, Clone)]
pub(crate) struct ManifestStats {
total_manifest_size: Arc<AtomicU64>,
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
}

impl ManifestStats {
fn total_manifest_size(&self) -> u64 {
self.total_manifest_size.load(Ordering::Relaxed)
}
}

#[cfg(test)]
mod tests {
use crossbeam_utils::atomic::AtomicCell;
Expand Down
20 changes: 16 additions & 4 deletions src/mito2/src/region/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::memtable::time_partition::TimePartitions;
use crate::memtable::MemtableBuilderProvider;
use crate::region::options::RegionOptions;
use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
use crate::region::{ManifestContext, MitoRegion, RegionState};
use crate::region::{ManifestContext, ManifestStats, MitoRegion, RegionState};
use crate::region_write_ctx::RegionWriteCtx;
use crate::request::OptionOutputTx;
use crate::schedule::scheduler::SchedulerRef;
Expand All @@ -63,6 +63,7 @@ pub(crate) struct RegionOpener {
skip_wal_replay: bool,
intermediate_manager: IntermediateManager,
time_provider: Option<TimeProviderRef>,
stats: ManifestStats,
}

impl RegionOpener {
Expand All @@ -87,6 +88,7 @@ impl RegionOpener {
skip_wal_replay: false,
intermediate_manager,
time_provider: None,
stats: Default::default(),
}
}

Expand Down Expand Up @@ -169,8 +171,12 @@ impl RegionOpener {
// Create a manifest manager for this region and writes regions to the manifest file.
let region_manifest_options = self.manifest_options(config, &options)?;
let metadata = Arc::new(self.metadata.unwrap());
let manifest_manager =
RegionManifestManager::new(metadata.clone(), region_manifest_options).await?;
let manifest_manager = RegionManifestManager::new(
metadata.clone(),
region_manifest_options,
self.stats.total_manifest_size.clone(),
)
.await?;

let memtable_builder = self
.memtable_builder_provider
Expand Down Expand Up @@ -217,6 +223,7 @@ impl RegionOpener {
last_flush_millis: AtomicI64::new(time_provider.current_time_millis()),
time_provider,
memtable_builder,
stats: self.stats,
})
}

Expand Down Expand Up @@ -267,7 +274,11 @@ impl RegionOpener {
let region_options = self.options.as_ref().unwrap().clone();

let region_manifest_options = self.manifest_options(config, &region_options)?;
let Some(manifest_manager) = RegionManifestManager::open(region_manifest_options).await?
let Some(manifest_manager) = RegionManifestManager::open(
region_manifest_options,
self.stats.total_manifest_size.clone(),
)
.await?
else {
return Ok(None);
};
Expand Down Expand Up @@ -350,6 +361,7 @@ impl RegionOpener {
last_flush_millis: AtomicI64::new(time_provider.current_time_millis()),
time_provider,
memtable_builder,
stats: self.stats.clone(),
};
Ok(Some(region))
}
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,11 +356,11 @@ impl TestEnv {
};

if let Some(metadata) = initial_metadata {
RegionManifestManager::new(metadata, manifest_opts)
RegionManifestManager::new(metadata, manifest_opts, Default::default())
.await
.map(Some)
} else {
RegionManifestManager::open(manifest_opts).await
RegionManifestManager::open(manifest_opts, Default::default()).await
}
}

Expand Down
Loading
Loading