Skip to content

Commit

Permalink
Periodically report memory usage (#1503)
Browse files Browse the repository at this point in the history
  • Loading branch information
andyleiserson authored Dec 21, 2024
1 parent dc91dcd commit 3413c3c
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 17 deletions.
15 changes: 7 additions & 8 deletions ipa-core/benches/oneshot/ipa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,6 @@ use ipa_step::StepNarrow;
use rand::{random, rngs::StdRng, SeedableRng};
use tokio::runtime::Builder;

#[cfg(jemalloc)]
#[global_allocator]
static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;

#[cfg(feature = "dhat-heap")]
#[global_allocator]
static ALLOC: dhat::Alloc = dhat::Alloc;

/// A benchmark for the full IPA protocol.
#[derive(Parser)]
#[command(about, long_about = None)]
Expand Down Expand Up @@ -165,6 +157,13 @@ async fn run(args: Args) -> Result<(), Error> {
}

fn main() -> Result<(), Error> {
#[cfg(jemalloc)]
ipa_core::use_jemalloc!();

#[cfg(feature = "dhat-heap")]
#[global_allocator]
static ALLOC: dhat::Alloc = dhat::Alloc;

#[cfg(feature = "dhat-heap")]
let _profiler = dhat::Profiler::new_heap();

Expand Down
15 changes: 7 additions & 8 deletions ipa-core/src/bin/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,6 @@ use ipa_core::{
use tokio::runtime::Runtime;
use tracing::{error, info};

#[cfg(jemalloc)]
#[global_allocator]
static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;

#[cfg(feature = "dhat-heap")]
#[global_allocator]
static ALLOC: dhat::Alloc = dhat::Alloc;

#[derive(Debug, Parser)]
#[clap(
name = "helper",
Expand Down Expand Up @@ -369,6 +361,13 @@ fn new_query_runtime(logging_handle: &LoggingHandle) -> Runtime {
/// runtimes to use in MPC queries and HTTP.
#[tokio::main(flavor = "current_thread")]
pub async fn main() {
#[cfg(jemalloc)]
ipa_core::use_jemalloc!();

#[cfg(feature = "dhat-heap")]
#[global_allocator]
static ALLOC: dhat::Alloc = dhat::Alloc;

let args = Args::parse();
let handle = args.logging.setup_logging();

Expand Down
16 changes: 16 additions & 0 deletions ipa-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ mod seq_join;
mod serde;
pub mod sharding;
pub mod utils;

pub use app::{AppConfig, HelperApp, Setup as AppSetup};
pub use utils::NonZeroU32PowerOfTwo;

Expand Down Expand Up @@ -348,6 +349,21 @@ pub(crate) mod test_executor {

pub const CRATE_NAME: &str = env!("CARGO_CRATE_NAME");

/// This macro should be called in a binary that uses `ipa_core`, if that binary wishes
/// to use jemalloc.
///
/// Besides declaring the `#[global_allocator]`, the macro also activates some memory
/// reporting.
#[macro_export]
macro_rules! use_jemalloc {
() => {
#[global_allocator]
static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;

$crate::telemetry::memory::jemalloc::activate();
};
}

#[macro_export]
macro_rules! const_assert {
($x:expr $(,)?) => {
Expand Down
7 changes: 7 additions & 0 deletions ipa-core/src/seq_join/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use std::{
use futures::{stream::Fuse, Future, Stream, StreamExt};
use pin_project::pin_project;

use crate::telemetry::memory::periodic_memory_report;

enum ActiveItem<F: IntoFuture> {
Pending(Pin<Box<F::IntoFuture>>),
Resolved(F::Output),
Expand Down Expand Up @@ -56,6 +58,7 @@ where
#[pin]
source: Fuse<S>,
active: VecDeque<ActiveItem<F>>,
spawned: usize,
_marker: PhantomData<fn(&'unused ()) -> &'unused ()>,
}

Expand All @@ -68,6 +71,7 @@ where
Self {
source: source.fuse(),
active: VecDeque::with_capacity(active.get()),
spawned: 0,
_marker: PhantomData,
}
}
Expand All @@ -88,6 +92,8 @@ where
if let Poll::Ready(Some(f)) = this.source.as_mut().poll_next(cx) {
this.active
.push_back(ActiveItem::Pending(Box::pin(f.into_future())));
periodic_memory_report(*this.spawned);
*this.spawned += 1;
} else {
break;
}
Expand All @@ -104,6 +110,7 @@ where
Poll::Pending
}
} else if this.source.is_done() {
periodic_memory_report(*this.spawned);
Poll::Ready(None)
} else {
Poll::Pending
Expand Down
10 changes: 9 additions & 1 deletion ipa-core/src/seq_join/multi_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use futures::{stream::Fuse, Stream, StreamExt};
use pin_project::pin_project;
use tracing::{Instrument, Span};

use crate::telemetry::memory::periodic_memory_report;

#[cfg(feature = "shuttle")]
mod shuttle_spawner {
use std::future::Future;
Expand Down Expand Up @@ -62,6 +64,7 @@ where
#[pin]
source: Fuse<S>,
capacity: usize,
spawned: usize,
}

impl<S, F> SequentialFutures<'_, S, F>
Expand All @@ -75,6 +78,7 @@ where
spawner: unsafe { create_spawner() },
source: source.fuse(),
capacity: active.get(),
spawned: 0,
}
}
}
Expand Down Expand Up @@ -103,11 +107,14 @@ where
// a dependency between futures, pending one will never complete.
// Cancellable futures will be cancelled when spawner is dropped which is
// the behavior we want.
let task_index = this.spawner.len();
let task_index = *this.spawned;
this.spawner
.spawn_cancellable(f.into_future().instrument(Span::current()), move || {
panic!("SequentialFutures: spawned task {task_index} cancelled")
});

periodic_memory_report(*this.spawned);
*this.spawned += 1;
} else {
break;
}
Expand All @@ -127,6 +134,7 @@ where
None => None,
})
} else if this.source.is_done() {
periodic_memory_report(*this.spawned);
Poll::Ready(None)
} else {
Poll::Pending
Expand Down
76 changes: 76 additions & 0 deletions ipa-core/src/telemetry/memory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
pub fn periodic_memory_report(count: usize) {
#[cfg(not(jemalloc))]
let _ = count;

#[cfg(jemalloc)]
jemalloc::periodic_memory_report(count);
}

#[cfg(jemalloc)]
pub mod jemalloc {
use std::sync::RwLock;

use tikv_jemalloc_ctl::{epoch_mib, stats::allocated_mib};

const MB: usize = 2 << 20;

// In an unfortunate acronym collision, `mib` in the names of the jemalloc
// statistics stands for "Management Information Base", not "mebibytes".
// The reporting unit is bytes.

struct JemallocControls {
epoch: epoch_mib,
allocated: allocated_mib,
}

static CONTROLS: RwLock<Option<JemallocControls>> = RwLock::new(None);

/// Activates periodic memory usage reporting during `seq_join`.
///
/// # Panics
/// If `RwLock` is poisoned.
pub fn activate() {
let mut controls = CONTROLS.write().unwrap();

let epoch = tikv_jemalloc_ctl::epoch::mib().unwrap();
let allocated = tikv_jemalloc_ctl::stats::allocated::mib().unwrap();

*controls = Some(JemallocControls { epoch, allocated });
}

fn report_memory_usage(controls: &JemallocControls, count: usize) {
// Some of the information jemalloc uses when reporting statistics is cached, and
// refreshed only upon advancing the epoch.
controls.epoch.advance().unwrap();
let allocated = controls.allocated.read().unwrap() / MB;
tracing::debug!("i={count}: {allocated} MiB allocated");
}

fn should_print_report(count: usize) -> bool {
if count == 0 {
return true;
}

let bits = count.ilog2();
let report_interval_log2 = std::cmp::max(bits.saturating_sub(1), 8);
let report_interval_mask = (1 << report_interval_log2) - 1;
(count & report_interval_mask) == 0
}

/// Print a memory report periodically, based on the value of `count`.
///
/// As `count` increases, so does the report interval. This results in
/// a tolerable amount of log messages for loops with many iterations,
/// while still providing some reporting for shorter loops.
///
/// # Panics
/// If `RwLock` is poisoned.
pub fn periodic_memory_report(count: usize) {
let controls_opt = CONTROLS.read().unwrap();
if let Some(controls) = controls_opt.as_ref() {
if should_print_report(count) {
report_memory_usage(controls, count);
}
}
}
}
1 change: 1 addition & 0 deletions ipa-core/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod memory;
pub mod stats;
mod step_stats;

Expand Down

0 comments on commit 3413c3c

Please sign in to comment.