Skip to content

Commit

Permalink
JOURNAL: Summarize 'evaluation' memory rss and vms use
Browse files Browse the repository at this point in the history
  • Loading branch information
HenrikBengtsson committed Jan 30, 2024
1 parent c4e34d3 commit 43ce12c
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 78 deletions.
2 changes: 1 addition & 1 deletion R/Future-class.R
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,7 @@ getExpression.Future <- local({
enter <- bquote_apply(tmpl_enter_rng)
}

expr <- makeExpression(expr = expr, local = local, stdout = stdout, conditionClasses = conditionClasses, split = split, enter = enter, exit = exit, ..., version = version)
expr <- makeExpression(expr = expr, local = local, stdout = stdout, conditionClasses = conditionClasses, journal.memory = inherits(future$.journal, "FutureJournal"), split = split, enter = enter, exit = exit, ..., version = version)
if (getOption("future.debug", FALSE)) mprint(expr)

## mdebug("getExpression() ... DONE")
Expand Down
30 changes: 18 additions & 12 deletions R/FutureResult-class.R
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
FutureResult <- local({
r_info <- NULL

function(value = NULL, visible = TRUE, stdout = NULL, conditions = NULL, rng = FALSE, ..., started = .POSIXct(NA_real_), finished = Sys.time(), version = "1.8") {
function(value = NULL, visible = TRUE, stdout = NULL, conditions = NULL, rng = FALSE, ..., started = .POSIXct(NA_real_), finished = Sys.time(), memory_started = .memory(FALSE), memory_finished = .memory(), version = "1.9") {
args <- list(...)
if (length(args) > 0) {
names <- names(args)
Expand Down Expand Up @@ -72,17 +72,19 @@ FutureResult <- local({
}

structure(list(
value = value,
visible = visible,
stdout = stdout,
conditions = conditions,
rng = rng,
...,
started = started,
finished = finished,
session_uuid = session_uuid(),
r_info = r_info,
version = version
value = value,
visible = visible,
stdout = stdout,
conditions = conditions,
rng = rng,
...,
started = started,
finished = finished,
memory_started = memory_started,
memory_finished = memory_finished,
session_uuid = session_uuid(),
r_info = r_info,
version = version
), class = "FutureResult")
}
})
Expand Down Expand Up @@ -122,6 +124,10 @@ print.FutureResult <- function(x, ...) {
t0 <- x[["started"]]
t1 <- x[["finished"]]
s <- c(s, sprintf("duration: %s (started %s)", format(t1-t0), t0))

m0 <- x[["memory_started"]]
m1 <- x[["memory_finished"]]
s <- c(s, sprintf("memory change: %s (from %s)", format(m1 - m0), m0))
s <- c(s, sprintf("version: %s", x[["version"]]))
cat(s, sep = "\n")
}
15 changes: 14 additions & 1 deletion R/expressions.R
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ makeExpression <- local({
tmpl_enter_optenvar <- bquote_compile({
## Start time for future evaluation
...future.startTime <- base::Sys.time()
if (.(journal.memory)) {
## ...future.memory <- future:::.memory()
...future.memory <- .(.memory)
} else {
...future.memory <- function(...) {
structure(c(rss = NA_real_, vms = NA_real_), class = "object_size")
}
}
...future.startMemory <- ...future.memory(NA)

## Required packages are loaded and attached here
.(enter)
Expand Down Expand Up @@ -237,6 +246,8 @@ makeExpression <- local({
rng = !identical(base::globalenv()$.Random.seed, ...future.rng),
globalenv = if (.(globalenv)) list(added = base::setdiff(base::names(base::.GlobalEnv), ...future.globalenv.names)) else NULL,
started = ...future.startTime,
memory_started = ...future.startMemory,
memory_finished = ...future.memory(),
version = "1.8"
)
}, condition = base::local({
Expand Down Expand Up @@ -329,6 +340,8 @@ makeExpression <- local({
rng = !identical(base::globalenv()$.Random.seed, ...future.rng),
started = ...future.startTime,
finished = Sys.time(),
memory_started = ...future.startMemory,
memory_finished = ...future.memory(),
session_uuid = NA_character_,
version = "1.8"
), class = "FutureResult")
Expand All @@ -354,7 +367,7 @@ makeExpression <- local({
})


function(expr, local = TRUE, immediateConditions = FALSE, stdout = TRUE, conditionClasses = NULL, split = FALSE, globals.onMissing = getOption("future.globals.onMissing", NULL), globalenv = (getOption("future.globalenv.onMisuse", "ignore") != "ignore"), enter = NULL, exit = NULL, version = "1.8") {
function(expr, local = TRUE, immediateConditions = FALSE, stdout = TRUE, conditionClasses = NULL, split = FALSE, globals.onMissing = getOption("future.globals.onMissing", NULL), journal.memory = FALSE, globalenv = (getOption("future.globalenv.onMisuse", "ignore") != "ignore"), enter = NULL, exit = NULL, version = "1.8") {
conditionClassesExclude <- attr(conditionClasses, "exclude", exact = TRUE)
muffleInclude <- attr(conditionClasses, "muffleInclude", exact = TRUE)
if (is.null(muffleInclude)) muffleInclude <- "^muffle"
Expand Down
141 changes: 79 additions & 62 deletions R/journal.R
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
#' 5. `at` (difftime) - the time when the event started
#' relative to first event
#' 6. `duration` (difftime) - the duration of the event
#' 7. `memory_rss_start` (numeric) - the memory consumption at the beginning
#' 7. `memory_start` (numeric) - the memory consumption at the beginning
#' of the event
#' 8. `memory_rss_stop` (numeric) - the memory consumption at the end of
#' 8. `memory_stop` (numeric) - the memory consumption at the end of
#' the event
#' 9. `future_label` (character string) - the label of the future
#' 10. `future_uuid` (factor) - the UUID of the future
Expand Down Expand Up @@ -87,12 +87,12 @@ journal.Future <- function(x, ...) {
if (!is.element("evaluate", data$event) && !is.null(x$result)) {
stop_if_not(is.character(session_uuid))
x <- appendToFutureJournal(x,
event = "evaluate",
category = "evaluation",
start = x$result$started,
stop = x$result$finished,
memory_rss_start = structure(NA_real_, class = "object_size"),
memory_rss_stop = structure(NA_real_, class = "object_size")
event = "evaluate",
category = "evaluation",
start = x$result$started,
stop = x$result$finished,
memory_start = x$result$memory_started,
memory_stop = x$result$memory_finished
)
data <- x$.journal
stop_if_not(length(x$result$session_uuid) == 1L, is.character(x$result$session_uuid))
Expand Down Expand Up @@ -266,47 +266,53 @@ summary.FutureJournal <- function(object, ...) {


## -------------------------------------------------------
## 3. Summarize memory use
## 3. Summarize memory use for evaluation
## -------------------------------------------------------
## (a) memory use when the first event starts
t_begin <- subset(dt_top, event == "create")[["memory_rss_start"]]
## (b) memory use when 'gather' finishes
t_end <- subset(dt_top, event == "gather")[["memory_rss_stop"]]
## (c) memory change (per future)
t_delta <- t_end - t_begin
## (d) total memory change
t_delta <- t_delta[!is.na(t_delta)]

if (length(t_delta) > 0) {
t_total <- sum(t_delta, na.rm = FALSE)
} else {
t_total <- NA_real_
## (a) Per future
eff <- list()
for (kk in seq_along(uuids)) {
uuid <- uuids[[kk]]
dt_uuid <- subset(dt_top, future_uuid == uuid)
dt <- subset(dt_uuid, category == "evaluation")
res_kk <- list()
for (type in c("rss", "vms")) {
t_delta <- dt[[paste0("memory_stop_", type)]] - dt[[paste0("memory_start_", type)]]
res_kk[[paste0("memory_", type)]] <- t_delta
}
res_kk <- as.data.frame(res_kk)
eff[[uuid]] <- res_kk
}
eff <- Reduce(rbind, eff)

## (e) build table
t <- NULL
## (b) Summary
res <- NULL
if (length(uuids) > 1L) {
if (length(t_delta) > 0) {
t <- c(t, min = min(t_delta, na.rm = FALSE))
t <- c(t, mean = mean(t_delta, na.rm = FALSE))
t <- c(t, median = median(t_delta, na.rm = FALSE))
t <- c(t, max = max(t_delta, na.rm = FALSE))
} else {
t <- c(t, min = NA_real_, mean = NA_real_, median = NA_real_, max = NA_real_)
}
t <- lapply(c("min", "mean", "median", "max"), FUN = function(fcn_name) {
fcn <- get(fcn_name, mode = "function")
t <- as.data.frame(lapply(eff, FUN = fcn))
rownames(t) <- fcn_name
t
})
t <- Reduce(rbind, t)
res <- t
}

t <- c(t, total = t_total)
t <- structure(t, class = "object_size")
mem_stats <- data.frame(memory_rss_change = t)
stats[["memory"]] <- mem_stats
## (c) Total
t <- as.data.frame(lapply(eff, FUN = sum))
rownames(t) <- "total"
res <- rbind(res, t)
colnames(res) <- paste0("evaluate_", colnames(res))

## (d) Combine
stats <- cbind(stats, res)


## -------------------------------------------------------
## 4. Wrap up
## -------------------------------------------------------
stats[["summary"]] <- rownames(stats)
rownames(stats) <- NULL
stats <- stats[, c("summary", "evaluate", "evaluate_ratio", "overhead", "overhead_ratio", "duration", "walltime", "memory")]
stats <- stats[, c("summary", "evaluate", "evaluate_ratio", "overhead", "overhead_ratio", "duration", "walltime", "evaluate_memory_rss", "evaluate_memory_vms")]

attr(stats, "nbr_of_futures") <- length(uuids)
class(stats) <- c("FutureJournalSummary", class(stats))
Expand All @@ -321,27 +327,36 @@ print.FutureJournalSummary <- function(x, ...) {
}


memory_rss <- local({
.memory <- local({
na <- c(rss = NA_real_, vms = NA_real_)
ps_handle <- NULL

function() {
if (getOption("future.journal.memory", FALSE)) {
if (is.null(ps_handle)) {
function(probe = TRUE) {
if (is.na(probe)) {
ps_handle <<- NULL
} else {
if (!probe || identical(ps_handle, NA)) return(na)
}

if (is.null(ps_handle)) {
value <- Sys.getenv("R_FUTURE_JOURNAL_MEMORY", "TRUE")
value <- as.logical(value)
if (isTRUE(value)) {
if (!requireNamespace("ps", quietly = TRUE)) {
stop(FutureError("Package 'ps' is not installed")
warning(FutureWarning("Package 'ps' is required in order to profile the memory"))
ps_handle <<- NA
return(na)
}
ps_handle <<- ps::ps_handle()
}
value <- ps::ps_memory_info(ps_handle)[["rss"]]
} else {
value <- NA_real_
ps_handle <<- ps::ps_handle()
}
structure(value, class = "object_size")

ps::ps_memory_info(ps_handle)[c("rss", "vms")]
}
})


makeFutureJournal <- function(x, event = "create", category = "other", parent = NA_character_, start = stop, stop = Sys.time(), memory_rss_start = memory_rss(), memory_rss_stop = structure(NA_real_, class = "object_size")) {
makeFutureJournal <- function(x, event = "create", category = "other", parent = NA_character_, start = stop, stop = Sys.time(), memory_start = .memory(), memory_stop = .memory(FALSE)) {
stop_if_not(
inherits(x, "Future"),
is.null(x$.journal),
Expand All @@ -350,17 +365,18 @@ makeFutureJournal <- function(x, event = "create", category = "other", parent =
length(parent) == 1L, is.character(parent),
length(start) == 1L, inherits(start, "POSIXct"),
length(stop) == 1L, inherits(stop, "POSIXct"),
length(memory_rss_start) == 1L, is.numeric(memory_rss_start),
length(memory_rss_stop) == 1L, is.numeric(memory_rss_stop)
length(memory_start) == 2L, is.numeric(memory_start),
length(memory_stop) == 2L, is.numeric(memory_stop)
)

data <- data.frame(event = event, category = category, parent = parent, start = start, stop = stop, memory_rss_start = memory_rss_start, memory_rss_stop = memory_rss_stop)
data <- data.frame(event = event, category = category, parent = parent, start = start, stop = stop, memory_start = as.list(memory_start), memory_stop = as.list(memory_stop))
colnames(data) <- gsub(".", "_", colnames(data), fixed = TRUE)
class(data) <- c("FutureJournal", class(data))
x$.journal <- data
invisible(x)
}

updateFutureJournal <- function(x, event, start = NULL, stop = Sys.time(), memory_rss_start = NULL, memory_rss_stop = NULL) {
updateFutureJournal <- function(x, event, start = NULL, stop = Sys.time(), memory_start = NULL, memory_stop = .memory()) {
## Nothing to do?
if (!inherits(x$.journal, "FutureJournal")) return(x)

Expand All @@ -369,8 +385,8 @@ updateFutureJournal <- function(x, event, start = NULL, stop = Sys.time(), memor
length(event) == 1L, is.character(event), !is.na(event),
is.null(start) || (length(start) == 1L && inherits(start, "POSIXct")),
is.null(stop) || (length(stop) == 1L && inherits(stop, "POSIXct")),
is.null(memory_rss_start) || length(memory_rss_start) == 1L, is.numeric(memory_rss_start),
is.null(memory_rss_stop) || length(memory_rss_stop) == 1L, is.numeric(memory_rss_stop)
is.null(memory_start) || length(memory_start) == 2L, is.numeric(memory_start),
is.null(memory_stop) || length(memory_stop) == 2L, is.numeric(memory_stop)
)

data <- x$.journal
Expand All @@ -382,16 +398,16 @@ updateFutureJournal <- function(x, event, start = NULL, stop = Sys.time(), memor
entry <- data[row, ]
if (!is.null(start)) entry$start <- start
if (!is.null(stop)) entry$stop <- stop
if (!is.null(memory_rss_start)) entry$memory_rss_start <- memory_rss_start
if (!is.null(memory_rss_stop)) entry$memory_rss_stop <- memory_rss_stop
if (!is.null(memory_start)) entry$memory_start <- as.list(memory_start)
if (!is.null(memory_stop)) entry$memory_stop <- as.list(memory_stop)
data[row, ] <- entry
stop_if_not(inherits(data, "FutureJournal"))
x$.journal <- data
invisible(x)
}


appendToFutureJournal <- function(x, event, category = "other", parent = NA_character_, start = Sys.time(), stop = as.POSIXct(NA_real_), memory_rss_start = memory_rss(), memory_rss_stop = structure(NA_real_, class = "object_size"), skip = TRUE) {
appendToFutureJournal <- function(x, event, category = "other", parent = NA_character_, start = Sys.time(), stop = as.POSIXct(NA_real_), memory_start = .memory(), memory_stop = .memory(FALSE), skip = TRUE) {
## Nothing to do?
if (!inherits(x$.journal, "FutureJournal")) return(x)

Expand All @@ -404,15 +420,16 @@ appendToFutureJournal <- function(x, event, category = "other", parent = NA_char
length(parent) == 1L, is.character(parent),
length(start) == 1L, inherits(start, "POSIXct"),
length(stop) == 1L, inherits(stop, "POSIXct"),
length(memory_rss_start) == 1L, is.numeric(memory_rss_start),
length(memory_rss_stop) == 1L, is.numeric(memory_rss_stop)
length(memory_start) == 2L, is.numeric(memory_start),
length(memory_stop) == 2L, is.numeric(memory_stop)
)

if (missing(memory_rss_stop) && !missing(stop)) {
memory_rss_stop <- memory_rss()
if (missing(memory_stop) && !missing(stop)) {
memory_stop <- .memory()
}

data <- data.frame(event = event, category = category, parent = parent, start = start, stop = stop, memory_rss_start = memory_rss_start, memory_rss_stop = memory_rss_stop)
data <- data.frame(event = event, category = category, parent = parent, start = start, stop = stop, memory_start = as.list(memory_start), memory_stop = as.list(memory_stop))
colnames(data) <- gsub(".", "_", colnames(data), fixed = TRUE)
x$.journal <- rbind(x$.journal, data)
invisible(x)
}
Expand Down
4 changes: 3 additions & 1 deletion man/FutureResult.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion tests/capture_journals.R
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ slow_fcn <- function(x) {
Sys.sleep(0.5 + 1/x)
}

plan(multisession, workers = 2)
plan(multisession, workers = 2L)

js <- capture_journals({
fs <- lapply(3:1, FUN = function(x) future(slow_fcn(x)))
vs <- value(fs)
Expand Down

0 comments on commit 43ce12c

Please sign in to comment.