diff --git a/.github/workflows/R-CMD-check.yaml b/.github/workflows/R-CMD-check.yaml index 9835a963..daf5ae82 100644 --- a/.github/workflows/R-CMD-check.yaml +++ b/.github/workflows/R-CMD-check.yaml @@ -22,20 +22,20 @@ jobs: - {os: macOS-latest, r: 'devel' } - {os: macOS-latest, r: 'release' } - {os: macOS-latest, r: 'oldrel' } - - {os: ubuntu-20.04, r: 'devel' } - - {os: ubuntu-20.04, r: 'release' } - - {os: ubuntu-20.04, r: 'oldrel' } - - {os: ubuntu-20.04, r: 'oldrel-1' } - - {os: ubuntu-20.04, r: 'oldrel-2' } - - {os: ubuntu-20.04, r: '3.4' } - - {os: ubuntu-20.04, r: 'release' , language: ko, label: ko } - - {os: ubuntu-20.04, r: 'release' , language: zh_CN, label: zh_CN } - - {os: ubuntu-20.04, r: 'release' , language: zh_TW, label: zh_TW } - - {os: ubuntu-20.04, r: 'release' , globals_keepWhere: true, label: 'keepWhere' } - - {os: ubuntu-20.04, r: 'release' , globals_keepWhere: false, label: '!keepWhere' } - - {os: ubuntu-20.04, r: 'release' , plan: multicore, fork_multithreading_enable: false, label: 'multicore, no-multithreading-in-forks' } - - {os: ubuntu-20.04, r: 'release' , plan: multisession, fork_multithreading_enable: false, label: 'multisession, no-multithreading-in-forks' } - - {os: ubuntu-20.04, r: 'release' , psock_relay_immediate: false, label: 'no-immediate-relay-in-psock' } + - {os: ubuntu-latest, r: 'devel' } + - {os: ubuntu-latest, r: 'release' } + - {os: ubuntu-latest, r: 'oldrel' } + - {os: ubuntu-latest, r: 'oldrel-1' } + - {os: ubuntu-latest, r: 'oldrel-2' } + - {os: ubuntu-latest, r: '3.5' } + - {os: ubuntu-latest, r: 'release' , language: ko, label: ko } + - {os: ubuntu-latest, r: 'release' , language: zh_CN, label: zh_CN } + - {os: ubuntu-latest, r: 'release' , language: zh_TW, label: zh_TW } + - {os: ubuntu-latest, r: 'release' , globals_keepWhere: true, label: 'keepWhere' } + - {os: ubuntu-latest, r: 'release' , globals_keepWhere: false, label: '!keepWhere' } + - {os: ubuntu-latest, r: 'release' , plan: multicore, fork_multithreading_enable: false, label: 'multicore, no-multithreading-in-forks' } + - {os: ubuntu-latest, r: 'release' , plan: multisession, fork_multithreading_enable: false, label: 'multisession, no-multithreading-in-forks' } + - {os: ubuntu-latest, r: 'release' , psock_relay_immediate: false, label: 'no-immediate-relay-in-psock' } env: GITHUB_PAT: ${{ secrets.GITHUB_TOKEN }} @@ -50,12 +50,14 @@ jobs: _R_CHECK_MATRIX_DATA_: true _R_CHECK_SUGGESTS_ONLY_: true _R_CHECK_THINGS_IN_TEMP_DIR_: true + _R_CHECK_TESTS_NLINES_: 300 + RCMDCHECK_ERROR_ON: note ## Specific to futures R_FUTURE_RNG_ONMISUSE: error R_FUTURE_GLOBALS_KEEPWHERE: ${{ matrix.config.globals_keepWhere }} steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - uses: r-lib/actions/setup-pandoc@v2 @@ -90,7 +92,7 @@ jobs: tryCatch(log("a"), error = conditionMessage) shell: Rscript {0} - - name: Check (!Windows) + - name: Check if: runner.os != 'Windows' env: ## FIXME: Eventually update to 'R_FUTURE_GLOBALS_ONREFERENCE=error' @@ -100,9 +102,10 @@ jobs: R_FUTURE_FORK_MULTITHREADING_ENABLE: ${{ matrix.config.fork_multithreading_enable }} R_FUTURE_PSOCK_RELAY_IMMEDIATE: ${{ matrix.config.psock_relay_immediate }} run: | + if (nzchar(Sys.getenv("R_FUTURE_PLAN")) || getRversion() < "3.5.0") Sys.setenv(RCMDCHECK_ERROR_ON = "error") rcmdcheck::rcmdcheck( - args = c("--no-manual", "--as-cran"), - error_on = if (nzchar(Sys.getenv("R_FUTURE_PLAN"))) "error" else "note", + build_args = if (getRversion() < "3.5.0") "--no-build-vignettes", + args = c("--no-manual", "--as-cran", if (getRversion() < "3.5.0") c("--no-vignettes", "--no-build-vignettes", "--ignore-vignettes")), check_dir = "check" ) shell: Rscript {0} @@ -112,7 +115,6 @@ jobs: run: | rcmdcheck::rcmdcheck( args = c("--no-manual", "--as-cran", if (.Platform$OS.type == "windows" && getRversion() >= "4.2.0") "--no-multiarch"), - error_on = "note", check_dir = "check" ) shell: Rscript {0} diff --git a/.github/workflows/covr.yaml b/.github/workflows/covr.yaml index a925743b..84489f69 100644 --- a/.github/workflows/covr.yaml +++ b/.github/workflows/covr.yaml @@ -8,7 +8,7 @@ jobs: timeout-minutes: 45 - runs-on: ubuntu-20.04 + runs-on: ubuntu-22.04 name: covr @@ -17,7 +17,7 @@ jobs: env: GITHUB_PAT: ${{ secrets.GITHUB_TOKEN }} - RSPM: https://packagemanager.rstudio.com/cran/__linux__/focal/latest + RSPM: https://packagemanager.rstudio.com/cran/__linux__/jammy/latest R_REMOTES_NO_ERRORS_FROM_WARNINGS: true ## R CMD check _R_CHECK_LENGTH_1_CONDITION_: true @@ -26,7 +26,7 @@ jobs: _R_CHECK_CRAN_INCOMING_: false steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - uses: r-lib/actions/setup-pandoc@v2 diff --git a/.github/workflows/future_tests.yaml b/.github/workflows/future_tests.yaml index 15179569..788d520e 100644 --- a/.github/workflows/future_tests.yaml +++ b/.github/workflows/future_tests.yaml @@ -8,7 +8,7 @@ jobs: timeout-minutes: 30 - runs-on: ubuntu-20.04 + runs-on: ubuntu-22.04 name: future.plan=${{ matrix.future.plan }} @@ -21,14 +21,14 @@ jobs: - { plan: 'multisession' } - { plan: 'sequential' } - { plan: 'future.batchtools::batchtools_local' } + - { plan: 'future.batchtools::batchtools_bash' } - { plan: 'future.callr::callr' } env: GITHUB_PAT: ${{ secrets.GITHUB_TOKEN }} - RSPM: https://packagemanager.rstudio.com/cran/__linux__/focal/latest + RSPM: https://packagemanager.rstudio.com/cran/__linux__/jammy/latest R_REMOTES_NO_ERRORS_FROM_WARNINGS: true ## R CMD check - _R_CHECK_LENGTH_1_CONDITION_: true _R_CHECK_LENGTH_1_LOGIC2_: true _R_CHECK_MATRIX_DATA_: true _R_CHECK_CRAN_INCOMING_: false @@ -36,7 +36,7 @@ jobs: R_FUTURE_RNG_ONMISUSE: error steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - uses: r-lib/actions/setup-pandoc@v2 diff --git a/.github/workflows/revdepcheck-top.yaml b/.github/workflows/revdepcheck-top.yaml index 96b7c560..d3a9d55d 100644 --- a/.github/workflows/revdepcheck-top.yaml +++ b/.github/workflows/revdepcheck-top.yaml @@ -8,7 +8,7 @@ jobs: timeout-minutes: 30 - runs-on: ubuntu-20.04 + runs-on: ubuntu-22.04 name: ${{ matrix.config.pkg }} (${{ matrix.config.r }}) ${{ matrix.config.label }} @@ -29,7 +29,7 @@ jobs: env: GITHUB_PAT: ${{ secrets.GITHUB_TOKEN }} - RSPM: https://packagemanager.rstudio.com/cran/__linux__/focal/latest + RSPM: https://packagemanager.rstudio.com/cran/__linux__/jammy/latest R_REMOTES_NO_ERRORS_FROM_WARNINGS: true ## R CMD check _R_CHECK_LENGTH_1_CONDITION_: true @@ -44,7 +44,7 @@ jobs: NOT_CRAN: false steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - uses: r-lib/actions/setup-pandoc@v2 diff --git a/DESCRIPTION b/DESCRIPTION index 7eb60502..817f8d70 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,5 +1,5 @@ Package: future -Version: 1.31.0-9005 +Version: 1.33.1 Title: Unified Parallel and Distributed Processing in R for Everyone Imports: digest, @@ -14,8 +14,10 @@ Suggests: R.rsp, markdown VignetteBuilder: R.rsp -Authors@R: c(person("Henrik", "Bengtsson", role=c("aut", "cre", "cph"), - email = "henrikb@braju.com")) +Authors@R: c(person("Henrik", "Bengtsson", + role = c("aut", "cre", "cph"), + email = "henrikb@braju.com", + comment = c(ORCID = "0000-0002-7579-5165"))) Description: The purpose of this package is to provide a lightweight and unified Future API for sequential and parallel processing of R expression via futures. The simplest way to evaluate an expression diff --git a/NAMESPACE b/NAMESPACE index 9ca1883d..e23fbb79 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -1,5 +1,6 @@ # Generated by roxygen2: do not edit by hand +S3method("$<-",Future) S3method("[",FutureGlobals) S3method("[",sessionDetails) S3method(as.FutureGlobals,FutureGlobals) @@ -16,6 +17,10 @@ S3method(getExpression,Future) S3method(getExpression,MulticoreFuture) S3method(getExpression,MultisessionFuture) S3method(getExpression,UniprocessFuture) +S3method(journal,Future) +S3method(journal,FutureJournal) +S3method(journal,FutureJournalCondition) +S3method(journal,list) S3method(mandelbrot,matrix) S3method(mandelbrot,numeric) S3method(nbrOfFreeWorkers,"NULL") @@ -33,6 +38,8 @@ S3method(nbrOfWorkers,uniprocess) S3method(plot,Mandelbrot) S3method(print,Future) S3method(print,FutureCondition) +S3method(print,FutureJournal) +S3method(print,FutureJournalSummary) S3method(print,FutureResult) S3method(print,FutureStrategy) S3method(print,FutureStrategyList) @@ -64,6 +71,7 @@ S3method(run,ConstantFuture) S3method(run,Future) S3method(run,MulticoreFuture) S3method(run,UniprocessFuture) +S3method(summary,FutureJournal) S3method(tweak,"function") S3method(tweak,character) S3method(tweak,future) @@ -89,9 +97,13 @@ export(Future) export(FutureCondition) export(FutureError) export(FutureGlobals) +export(FutureJournalCondition) export(FutureMessage) export(FutureResult) export(FutureWarning) +export(GlobalEnvFutureCondition) +export(GlobalEnvFutureError) +export(GlobalEnvFutureWarning) export(MulticoreFuture) export(MultiprocessFuture) export(MultisessionFuture) @@ -122,7 +134,6 @@ export(makeNodePSOCK) export(mandelbrot) export(mandelbrot_tiles) export(multicore) -export(multiprocess) export(multisession) export(nbrOfFreeWorkers) export(nbrOfWorkers) @@ -163,6 +174,7 @@ importFrom(parallelly,availableCores) importFrom(parallelly,availableWorkers) importFrom(parallelly,connectionId) importFrom(parallelly,isConnectionValid) +importFrom(parallelly,isNodeAlive) importFrom(parallelly,makeClusterMPI) importFrom(parallelly,makeClusterPSOCK) importFrom(parallelly,makeNodePSOCK) diff --git a/NEWS.md b/NEWS.md index bbaf510a..16e1e2c9 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,19 +1,81 @@ # Version (development version) + * ... + + +# Version 1.33.1 [2023-12-21] + +## Bug Fixes + + * `getExpression()` on 'cluster' future could under some + circumstances call `local()` on the global search path rather than + `base::local()` as intended. For example, if a package that + exports its own `local()` function was attached, then that would be + called instead, often leading to a hard-to-troubleshoot error. + + +# Version 1.33.0 [2023-07-01] + +## New Features + + * When a 'cluster' future fails to communicate with the parallel + worker, it does a post-mortem analysis to figure out why, including + inspecting whether the worker process is still alive or not. In + previous versions, this only worked for workers running on the + current machine. Starting with this version, it also attempts to + check this for remote versions. + +## Bug Fixes + + * If a 'multicore' future failed, because the parallel process + crashed, the corresponding parallel-worker slot was never released. + Now it is removed if it can confirm that the forked worker process + is no longer alive. + ## Deprecated and Defunct - * Deprecated `plan(multiprocess, ...)` is now defunct when running in - interactive mode. The next step is to make it defunct also when - running in batch mode. - + * The 'multiprocess' strategy has now been fully removed. Please use + 'multisession' (recommended) or 'multicore' instead. + + +# Version 1.32.0 [2023-03-06] + +## New Features + + * Add prototype of an internal event-logging framework for the + purpose of profiling futures and their backends. + + * Add option `future.globalenv.onMisuse` for optionally asserting + that a future expression does not result in variables being added + to the global environment. + + * Add option `future.onFutureCondition.keepFuture` for controlling + whether `FutureCondition` objects should keep a copy of the + `Future` object or not. The default is to keep a copy, but if the + future carries large global objects, then the `FutureCondition` + will also be large, which can result in memory issues and slow + downs. + +## Miscellaneous + + * Fix a **future.tests** check that occurred only on MS Windows. + +## Deprecated and Defunct + + * The 'multiprocess' strategy, which has been deprecated since future + 1.20.0 [2020-10-30] is now defunct. Please use 'multisession' + (recommended) or 'multicore' instead. + + * Add optional assertion of the internal Future `state` field. + # Version 1.31.0 [2023-01-31] -## Signficant Changes +## Significant Changes * Remove function `remote()`. Note that `plan(remote, ...)` has been deprecated since **future** 1.24.0 [2022-02-19] and defunct since - **future** 1.3.0 (2022-12-15). + **future** 1.30.0 (2022-12-15). ## Documentation diff --git a/R/ClusterFuture-class.R b/R/ClusterFuture-class.R index 02a0f042..53923ce2 100644 --- a/R/ClusterFuture-class.R +++ b/R/ClusterFuture-class.R @@ -42,7 +42,7 @@ ClusterFuture <- function(expr = NULL, substitute = TRUE, envir = parent.frame() future <- do.call(MultiprocessFuture, args = c(list(expr = quote(expr), substitute = FALSE, envir = envir, persistent = persistent, node = NA_integer_), args[future_args]), quote = FALSE) future <- do.call(as_ClusterFuture, args = c(list(future, workers = workers), args[!future_args]), quote = TRUE) - + future } @@ -109,8 +109,8 @@ run.ClusterFuture <- function(future, ...) { ## FutureRegistry to use reg <- sprintf("workers-%s", attr(workers, "name", exact = TRUE)) - ## Next available cluster node + t_start <- Sys.time() node_idx <- requestNode(await = function() { FutureRegistry(reg, action = "collect-first", earlySignal = TRUE) }, workers = workers) @@ -118,6 +118,16 @@ run.ClusterFuture <- function(future, ...) { ## Cluster node to use cl <- workers[node_idx] + + if (inherits(future$.journal, "FutureJournal")) { + appendToFutureJournal(future, + event = "getWorker", + category = "overhead", + parent = "launch", + start = t_start, + stop = Sys.time() + ) + } ## (i) Reset global environment of cluster node such that @@ -125,7 +135,17 @@ run.ClusterFuture <- function(future, ...) { ## may happen even if the future is evaluated inside a ## local, e.g. local({ a <<- 1 }). if (!persistent) { + t_start <- Sys.time() cluster_call(cl, fun = grmall, future = future, when = "call grmall() on") + if (inherits(future$.journal, "FutureJournal")) { + appendToFutureJournal(future, + event = "eraseWorker", + category = "overhead", + parent = "launch", + start = t_start, + stop = Sys.time() + ) + } } @@ -133,6 +153,7 @@ run.ClusterFuture <- function(future, ...) { ## NOTE: Already take care of by getExpression() of the Future class. ## However, if we need to get an early error about missing packages, ## we can get the error here before launching the future. + t_start <- Sys.time() packages <- packages(future) if (future$earlySignal && length(packages) > 0) { if (debug) mdebugf("Attaching %d packages (%s) on cluster node #%d ...", @@ -144,10 +165,20 @@ run.ClusterFuture <- function(future, ...) { length(packages), hpaste(sQuote(packages)), node_idx) } + if (inherits(future$.journal, "FutureJournal")) { + appendToFutureJournal(future, + event = "attachPackages", + category = "overhead", + parent = "launch", + start = t_start, + stop = Sys.time() + ) + } ## (iii) Export globals globals <- globals(future) if (length(globals) > 0) { + t_start <- Sys.time() if (debug) { total_size <- asIEC(objectSize(globals)) mdebugf("Exporting %d global objects (%s) to cluster node #%d ...", length(globals), total_size, node_idx) @@ -170,6 +201,16 @@ run.ClusterFuture <- function(future, ...) { value <- NULL } if (debug) mdebugf("Exporting %d global objects (%s) to cluster node #%d ... DONE", length(globals), total_size, node_idx) + + if (inherits(future$.journal, "FutureJournal")) { + appendToFutureJournal(future, + event = "exportGlobals", + category = "overhead", + parent = "launch", + start = t_start, + stop = Sys.time() + ) + } } ## Not needed anymore globals <- NULL @@ -238,9 +279,9 @@ resolved.ClusterFuture <- function(x, run = TRUE, timeout = NULL, ...) { isValid <- isConnectionValid(con) if (!isValid) { - label <- x$label - if (is.null(label)) label <- "" - stop(FutureError(sprintf("Cannot resolve %s (%s), because the connection to the worker is corrupt: %s", class(x)[1], label, attr(isValid, "reason", exact = TRUE)), future = future)) + ex <- simpleError("Connection to the worker is corrupt") + msg <- post_mortem_cluster_failure(ex, when = "checking resolved from", node = node, future = future) + stop(FutureError(msg, future = future)) } if (is.null(timeout)) { @@ -348,12 +389,14 @@ receiveMessageFromWorker <- function(future, ...) { if (debug) mdebugf("- Validating connection of %s", class(future)[1]) isValid <- isConnectionValid(con) if (!isValid) { - label <- future$label - if (is.null(label)) label <- "" - stop(FutureError(sprintf("Cannot receive results for %s (%s), because the connection to the worker is corrupt: %s", class(future)[1], label, attr(isValid, "reason", exact = TRUE)), future = future)) + ex <- simpleError("Connection to the worker is corrupt") + msg <- post_mortem_cluster_failure(ex, when = "receiving message from", node = node, future = future) + stop(FutureError(msg, future = future)) } } + t_start <- Sys.time() + ## If not, wait for process to finish, and ## then collect and record the value msg <- NULL @@ -364,7 +407,7 @@ receiveMessageFromWorker <- function(future, ...) { if (inherits(ack, "error")) { if (debug) mdebugf("- parallel:::recvResult() produced an error: %s", conditionMessage(ack)) - msg <- post_mortem_cluster_failure(ack, when = "receive results from", node = node, future = future) + msg <- post_mortem_cluster_failure(ack, when = "receive message results from", node = node, future = future) ex <- FutureError(msg, call = ack$call, future = future) future$result <- ex stop(ex) @@ -400,6 +443,16 @@ receiveMessageFromWorker <- function(future, ...) { if (inherits(msg, "FutureResult")) { result <- msg + if (inherits(future$.journal, "FutureJournal")) { + appendToFutureJournal(future, + event = "receiveResult", + category = "overhead", + parent = "gather", + start = t_start, + stop = Sys.time() + ) + } + ## Add back already signaled and muffled conditions so that also ## they will be resignaled each time value() is called. signaled <- future$.signaledConditions @@ -554,7 +607,7 @@ requestNode <- function(await, workers, timeout = getOption("future.wait.timeout #' @export getExpression.ClusterFuture <- local({ tmpl_expr_conditions <- bquote_compile({ - ...future.makeSendCondition <- local({ + ...future.makeSendCondition <- base::local({ sendCondition <- NULL function(frame = 1L) { @@ -667,6 +720,7 @@ cluster_call <- function(cl, ..., when = "call function on", future) { } +#' @importFrom parallelly isNodeAlive post_mortem_cluster_failure <- function(ex, when, node, future) { stop_if_not(inherits(ex, "error")) stop_if_not(length(when) == 1L, is.character(when)) @@ -710,9 +764,9 @@ post_mortem_cluster_failure <- function(ex, when, node, future) { ## (4) POST-MORTEM ANALYSIS: postmortem <- list() - ## (a) Did a localhost worker process terminate? - if (!is.null(host)) { - if (localhost && is.numeric(pid)) { + ## (a) Did the worker process terminate? + if (!is.null(host) && is.numeric(pid)) { + if (localhost) { pid_exists <- import_parallelly("pid_exists") alive <- pid_exists(pid) if (is.na(alive)) { @@ -722,8 +776,18 @@ post_mortem_cluster_failure <- function(ex, when, node, future) { } else { msg2 <- "No process exists with this PID, i.e. the localhost worker is no longer alive" } - postmortem$alive <- msg2 + } else { + ## Checking remote workers on hosts requires parallelly (>= 1.36.0) + alive <- isNodeAlive(node, timeout = getOption("future.alive.timeout", 30.0)) + if (is.na(alive)) { + msg2 <- "Failed to determined whether the process with this PID exists or not on the remote host, i.e. cannot infer whether remote worker is alive or not" + } else if (alive) { + msg2 <- "A process with this PID exists on the remote host, which suggests that the remote worker is still alive" + } else { + msg2 <- "No process exists with this PID on the remote host, i.e. the remote worker is no longer alive" + } } + postmortem$alive <- msg2 } ## (b) Did the worker use a connection that changed? diff --git a/R/Future-class.R b/R/Future-class.R index 210f7086..479eb5ce 100644 --- a/R/Future-class.R +++ b/R/Future-class.R @@ -102,7 +102,8 @@ #' @name Future-class Future <- function(expr = NULL, envir = parent.frame(), substitute = TRUE, stdout = TRUE, conditions = "condition", globals = list(), packages = NULL, seed = FALSE, lazy = FALSE, gc = FALSE, earlySignal = FALSE, label = NULL, ...) { if (substitute) expr <- substitute(expr) - + t_start <- Sys.time() + if (is.null(seed)) { } else if (isFALSE(seed)) { } else if (is_lecyer_cmrg_seed(seed)) { @@ -163,29 +164,9 @@ Future <- function(expr = NULL, envir = parent.frame(), substitute = TRUE, stdou .Defunct(msg = "Future field 'value' is defunct and must not be set", package = .packageName) } - ## 'local' is now defunct + ## 'local' is defunct if ("local" %in% args_names) { - dfcn <- .Defunct - msg <- "Argument 'local' is defunct as of future 1.31.0 (2023-01-31)" - - ## SPECIAL CASE: Temporarily allow the 'civis' package to keep using - ## 'local' for a tad longer, although it has zero effect since a - ## long time (https://github.com/civisanalytics/civis-r/issues/244) - ## Only allow for this is local = TRUE and interactive mode (to - ## prevent it from breaking 'R CMD check') - ## /HB 2023-02-09 - if (isTRUE(args$local) && - Sys.getenv("R_FUTURE_CHECK_IGNORE_CIVIS", "true") == "true") { - for (call in sys.calls()) { - if ("CivisFuture" %in% as.character(call[[1]])) { - msg <- sprintf("%s. In this case it was because civis::CivisFuture() was used. Please contact the maintainers of the 'civis' package about this problem.", msg) - if (!interactive()) dfcn <- .Deprecated - break - } - } - } - - dfcn(msg = msg, package = .packageName) + .Defunct(msg = "Argument 'local' is defunct as of future 1.31.0 (2023-01-31)", package = .packageName) } core <- new.env(parent = emptyenv()) @@ -478,12 +459,55 @@ run.Future <- function(future, ...) { future } -run <- function(...) UseMethod("run") +#' @export +#' @keywords internal +run <- function(future, ...) { + ## Automatically update journal entries for Future object + if (inherits(future, "Future") && + inherits(future$.journal, "FutureJournal")) { + start <- Sys.time() + on.exit({ + appendToFutureJournal(future, + event = "launch", + category = "overhead", + start = start, + stop = Sys.time() + ) + }) + } + UseMethod("run") +} #' @export #' @keywords internal -result <- function(...) UseMethod("result") +result <- function(future, ...) { + ## Automatically update journal entries for Future object + if (inherits(future, "Future") && + inherits(future$.journal, "FutureJournal")) { + start <- Sys.time() + on.exit({ + appendToFutureJournal(future, + event = "gather", + category = "overhead", + start = start, + stop = Sys.time() + ) + + ## Signal FutureJournalCondition? + if (!isTRUE(future$.journal_signalled)) { + journal <- journal(future) + label <- future$label + if (is.null(label)) label <- "" + msg <- sprintf("A future ('%s') of class %s was resolved", label, class(future)[1]) + cond <- FutureJournalCondition(message = msg, journal = journal) + signalCondition(cond) + future$.journal_signalled <- TRUE + } + }) + } + UseMethod("result") +} #' Get the results of a resolved future #' @@ -820,6 +844,7 @@ getExpression.Future <- local({ } ## getExpression() }) + globals <- function(future, ...) UseMethod("globals") globals.Future <- function(future, ...) { @@ -831,3 +856,24 @@ packages <- function(future, ...) UseMethod("packages") packages.Future <- function(future, ...) { future[["packages"]] } + + +#' @export +`$<-.Future` <- function(x, name, value) { + if (name == "state") { + if (!is.element(value, c("created", "running", "finished", "failed", "interrupted"))) { + action <- getOption("future.state.onInvalid", "warning") + + if (action != "ignore") { + msg <- sprintf("Trying to assign an invalid value to the internal '%s' field of a %s object: %s", name, class(x)[1], value) + if (action == "error") { + stop(FutureError(msg, call = sys.call(), future = x)) + } else { + warning(FutureWarning(msg, call = sys.call(), future = x)) + } + } + } + } + + NextMethod() +} diff --git a/R/FutureCondition-class.R b/R/FutureCondition-class.R index fbe9557f..23f5dc40 100644 --- a/R/FutureCondition-class.R +++ b/R/FutureCondition-class.R @@ -45,6 +45,10 @@ FutureCondition <- function(message, call = NULL, uuid = future$uuid, future = N stop_if_not(is.character(uuid), length(uuid) == 1L, !is.na(uuid)) } if (!is.null(future)) stop_if_not(inherits(future, "Future")) + + if (!getOption("future.onFutureCondition.keepFuture", TRUE)) { + future <- NULL + } ## Create a condition object class <- c("FutureCondition", class) @@ -175,3 +179,38 @@ UnexpectedFutureResultError <- function(future, hint = NULL) { class(cond) <- class[!duplicated(class, fromLast = TRUE)] cond } + + + +#' @rdname FutureCondition +#' @export +GlobalEnvFutureCondition <- function(message = NULL, call = NULL, globalenv = globalenv, uuid = future$uuid, future = NULL) { + if (is.null(message)) { + label <- future$label + if (is.null(label)) label <- "" + message <- sprintf("Future (%s) added variables to the global environment. A future expression should never assign variables to the global environment - neither by assign() nor by <<-: [n=%d] %s", label, length(globalenv$added), paste(sQuote(globalenv$added), collapse = ", ")) + } + cond <- FutureCondition(message = message, call = call, uuid = uuid, future = future) + cond$globalenv <- globalenv + class <- c("GlobalEnvFutureCondition", class(cond)) + class(cond) <- class[!duplicated(class, fromLast = TRUE)] + cond +} + +#' @rdname FutureCondition +#' @export +GlobalEnvFutureWarning <- function(...) { + cond <- GlobalEnvFutureCondition(...) + class <- c("GlobalEnvFutureWarning", "FutureWarning", "warning", class(cond)) + class(cond) <- class[!duplicated(class, fromLast = TRUE)] + cond +} + +#' @rdname FutureCondition +#' @export +GlobalEnvFutureError <- function(...) { + cond <- GlobalEnvFutureCondition(...) + class <- c("GlobalEnvFutureError", "FutureError", "error", class(cond)) + class(cond) <- class[!duplicated(class, fromLast = TRUE)] + cond +} diff --git a/R/FutureRegistry.R b/R/FutureRegistry.R index d3f3a698..e3759e25 100644 --- a/R/FutureRegistry.R +++ b/R/FutureRegistry.R @@ -47,7 +47,7 @@ FutureRegistry <- local({ } ## collectValues() - function(where, action = c("add", "remove", "list", "collect-first", "collect-all", "reset"), future = NULL, earlySignal = TRUE, ...) { + function(where, action = c("add", "remove", "list", "contains", "collect-first", "collect-all", "reset"), future = NULL, earlySignal = TRUE, ...) { stop_if_not(length(where) == 1, nzchar(where)) futures <- db[[where]] @@ -66,6 +66,9 @@ FutureRegistry <- local({ } futures[[length(futures)+1L]] <- future db[[where]] <<- futures + } else if (action == "contains") { + idx <- indexOf(futures, future = future) + return(!is.na(idx)) } else if (action == "remove") { idx <- indexOf(futures, future = future) if (is.na(idx)) { diff --git a/R/FutureResult-class.R b/R/FutureResult-class.R index a5b958a1..aa59651f 100644 --- a/R/FutureResult-class.R +++ b/R/FutureResult-class.R @@ -72,16 +72,17 @@ FutureResult <- local({ } structure(list( - value = value, - visible = visible, - stdout = stdout, - conditions = conditions, - rng = rng, + value = value, + visible = visible, + stdout = stdout, + conditions = conditions, + rng = rng, ..., - started = started, - finished = finished, - r_info = r_info, - version = version + started = started, + finished = finished, + session_uuid = session_uuid(), + r_info = r_info, + version = version ), class = "FutureResult") } }) diff --git a/R/MulticoreFuture-class.R b/R/MulticoreFuture-class.R index 456f8474..56a22bcb 100644 --- a/R/MulticoreFuture-class.R +++ b/R/MulticoreFuture-class.R @@ -51,18 +51,31 @@ run.MulticoreFuture <- function(future, ...) { expr <- getExpression(future) envir <- future$envir + t_start <- Sys.time() + ## Assign globals envir <- new.env(parent = envir) if (length(future$globals) > 0L) { envir <- assign_globals(envir, globals = future$globals) } + ## Get a free worker reg <- sprintf("multicore-%s", session_uuid()) requestCore( await = function() FutureRegistry(reg, action = "collect-first", earlySignal = TRUE), workers = future$workers ) + if (inherits(future$.journal, "FutureJournal")) { + appendToFutureJournal(future, + event = "getWorker", + category = "other", + parent = "launch", + start = t_start, + stop = Sys.time() + ) + } + ## Add to registry FutureRegistry(reg, action = "add", future = future, earlySignal = TRUE) @@ -136,6 +149,12 @@ resolved.MulticoreFuture <- function(x, run = TRUE, timeout = NULL, ...) { #' @export result.MulticoreFuture <- function(future, ...) { + debug <- getOption("future.debug", FALSE) + if (debug) { + mdebugf("result() for %s ...", class(future)[1]) + on.exit(mdebugf("result() for %s ... done", class(future)[1])) + } + ## Has the result already been collected? result <- future$result if (!is.null(result)) { @@ -172,32 +191,39 @@ result.MulticoreFuture <- function(future, ...) { ## Sanity checks if (!inherits(result, "FutureResult")) { + if (debug) mdebugf("Detected non-FutureResult result ...") + alive <- NA + pid <- job$pid + if (is.numeric(pid)) { + pid_exists <- import_parallelly("pid_exists") + alive <- pid_exists(pid) + } + + ## AD HOC: Record whether the forked process is alive or not + job$alive <- alive + future$job <- job + ## SPECIAL: Check for fallback 'fatal error in wrapper code' ## try-error from parallel:::mcparallel(). If detected, then ## turn into an error with a more informative error message, cf. ## https://github.com/HenrikBengtsson/future/issues/35 - if (identical(result, structure("fatal error in wrapper code", class = "try-error"))) { - label <- future$label - if (is.null(label)) label <- "" - msg <- result - ex <- FutureError(sprintf("Detected an error (%s) by the 'parallel' package while trying to retrieve the value of a %s (%s). This could be because the forked R process that evaluates the future was terminated before it was completed: %s", sQuote(msg), class(future)[1], sQuote(label), sQuote(hexpr(future$expr))), future = future) - } else if (is.null(result)) { + if (is.null(result) || identical(result, structure("fatal error in wrapper code", class = "try-error"))) { label <- future$label if (is.null(label)) label <- "" - pid <- job$pid pid_info <- if (is.numeric(pid)) sprintf("PID %.0f", pid) else NULL - info <- pid_info msg <- sprintf("Failed to retrieve the result of %s (%s) from the forked worker (on localhost; %s)", class(future)[1], label, info) + if (identical(result, structure("fatal error in wrapper code", class = "try-error"))) { + msg <- c(msg, sprintf("Error %s was reported by the 'parallel' package, which could be because the forked R process that evaluates the future was terminated before it was completed", sQuote(result))) + } + ## POST-MORTEM ANALYSIS: postmortem <- list() - - ## (a) Did a localhost worker process terminate? + + ## (a) Did the localhost worker process terminate? if (is.numeric(pid)) { - pid_exists <- import_parallelly("pid_exists") - alive <- pid_exists(pid) if (is.na(alive)) { msg2 <- "Failed to determined whether a process with this PID exists or not, i.e. cannot infer whether the forked localhost worker is alive or not" } else if (alive) { @@ -218,14 +244,30 @@ result.MulticoreFuture <- function(future, ...) { if (!is.null(postmortem)) { postmortem <- sprintf("Post-mortem diagnostic: %s", paste(postmortem, collapse = ". ")) - msg <- paste0(msg, ". ", postmortem) + msg <- c(msg, postmortem) } - + msg <- paste(msg, collapse = ". ") + ex <- FutureError(msg, future = future) + } else { ex <- UnexpectedFutureResultError(future) + alive <- NA ## For now, don't remove future when there's an unexpected error /HB 2023-04-19 } future$result <- ex + + ## Remove future from FutureRegistry? + if (!is.na(alive) && !alive) { + reg <- sprintf("multicore-%s", session_uuid()) + exists <- FutureRegistry(reg, action = "contains", future = future) + if (exists) { + if (debug) mdebugf("Removing %s from FutureRegistry (%s)", class(future)[1], reg) + FutureRegistry(reg, action = "remove", future = future, earlySignal = TRUE) + } + } + + if (debug) mdebugf("Detected non-FutureResult result ... done") + stop(ex) } diff --git a/R/capture_journals.R b/R/capture_journals.R new file mode 100644 index 00000000..05d2cc3c --- /dev/null +++ b/R/capture_journals.R @@ -0,0 +1,35 @@ +#' Evaluate an R expression while collecting journals from completed futures +#' +#' @param expr The R expression to evaluate +#' +#' @param substitute If TRUE, then `expr` is subtituted, otherwise not. +#' +#' @param envir The environment where `expr` should be evaluated +#' +#' @details +#' This function evaluates an R expression and capture the journals +#' signaled by futures as they are completed. A future [journal] comprise +#' a log of events appearing during the life-span of a future, e.g. +#' the timestamps when the future was created, launched, queried, +#' resolved, and its results are collected. +#' +#' @return +#' A list of \link[=journal]{FutureJournal}:s. +#' +#' @example incl/capture_journals.R +#' +#' @keywords internal +#' @noRd +capture_journals <- function(expr, substitute = TRUE, envir = parent.frame()) { + oopts <- options(future.journal = TRUE) + on.exit(options(oopts)) + + journals <- NULL + withCallingHandlers({ + eval(expr, envir = envir) + }, FutureJournalCondition = function(cond) { + journals <<- c(journals, list(cond$journal)) + }) + + journals +} diff --git a/R/cluster.R b/R/cluster.R index d131db62..f9f1cb7e 100644 --- a/R/cluster.R +++ b/R/cluster.R @@ -28,7 +28,6 @@ #' ``` #' #' @inheritParams ClusterFuture-class -#' @inheritParams multiprocess #' @inheritParams Future-class #' @inheritParams future #' diff --git a/R/expressions.R b/R/expressions.R index b26200d4..46849461 100644 --- a/R/expressions.R +++ b/R/expressions.R @@ -114,7 +114,9 @@ makeExpression <- local({ } if (length(args) > 0) base::do.call(base::Sys.setenv, args = args) - base::rm(list = c("args", "names", "old_names", "NAMES", "envs", "common", "added", "removed")) + + ## Not needed anymore + args <- names <- old_names <- NAMES <- envs <- common <- added <- removed <- NULL } else { base::do.call(base::Sys.setenv, args = base::as.list(...future.oldEnvVars)) } @@ -217,6 +219,11 @@ makeExpression <- local({ ...future.conditions <- base::list() ...future.rng <- base::globalenv()$.Random.seed + if (.(globalenv)) { + ## Record names of variables in the global environment + ...future.globalenv.names <- c(base::names(base::.GlobalEnv), "...future.value", "...future.globalenv.names", ".Random.seed") + } + ## NOTE: We don't want to use local(body) w/ on.exit() because ## evaluation in a local is optional, cf. argument 'local'. ## If this was mandatory, we could. Instead we use @@ -224,7 +231,14 @@ makeExpression <- local({ ...future.result <- base::tryCatch({ base::withCallingHandlers({ ...future.value <- base::withVisible(.(expr)) - future::FutureResult(value = ...future.value$value, visible = ...future.value$visible, rng = !identical(base::globalenv()$.Random.seed, ...future.rng), started = ...future.startTime, version = "1.8") + future::FutureResult( + value = ...future.value$value, + visible = ...future.value$visible, + rng = !identical(base::globalenv()$.Random.seed, ...future.rng), + globalenv = if (.(globalenv)) list(added = base::setdiff(base::names(base::.GlobalEnv), ...future.globalenv.names)) else NULL, + started = ...future.startTime, + version = "1.8" + ) }, condition = base::local({ ## WORKAROUND: If the name of any of the below objects/functions ## coincides with a promise (e.g. a future assignment) then we @@ -334,12 +348,13 @@ makeExpression <- local({ } ...future.result$conditions <- ...future.conditions + ...future.result$finished <- base::Sys.time() ...future.result }) - function(expr, local = TRUE, immediateConditions = FALSE, stdout = TRUE, conditionClasses = NULL, split = FALSE, globals.onMissing = getOption("future.globals.onMissing", NULL), enter = NULL, exit = NULL, version = "1.8") { + function(expr, local = TRUE, immediateConditions = FALSE, stdout = TRUE, conditionClasses = NULL, split = FALSE, globals.onMissing = getOption("future.globals.onMissing", NULL), globalenv = (getOption("future.globalenv.onMisuse", "ignore") != "ignore"), enter = NULL, exit = NULL, version = "1.8") { conditionClassesExclude <- attr(conditionClasses, "exclude", exact = TRUE) muffleInclude <- attr(conditionClasses, "muffleInclude", exact = TRUE) if (is.null(muffleInclude)) muffleInclude <- "^muffle" @@ -369,7 +384,7 @@ makeExpression <- local({ enter <- bquote_apply(tmpl_enter_workdir) enter <- bquote_apply(tmpl_enter_optenvar) enter <- bquote_apply(tmpl_enter_future_opts) - + exit <- bquote_apply(tmpl_exit_future_opts) exit <- bquote_apply(tmpl_exit_optenvar) exit <- bquote_apply(tmpl_exit_workdir) diff --git a/R/future.R b/R/future.R index 26cb0447..f94451d3 100644 --- a/R/future.R +++ b/R/future.R @@ -188,6 +188,7 @@ #' @name future future <- function(expr, envir = parent.frame(), substitute = TRUE, lazy = FALSE, seed = FALSE, globals = TRUE, packages = NULL, stdout = TRUE, conditions = "condition", earlySignal = FALSE, label = NULL, gc = FALSE, ...) { if (substitute) expr <- substitute(expr) + t_start <- Sys.time() gp <- getGlobalsAndPackages(expr, envir = envir, tweak = tweakExpression, globals = globals) expr <- gp$expr @@ -216,13 +217,18 @@ future <- function(expr, envir = parent.frame(), substitute = TRUE, lazy = FALSE ## Comment: Only allowed for persistent 'cluster' futures future$.defaultLocal <- !is.element("local", names(list(...))) + ## Enable journaling? + if (getOption("future.journal", FALSE)) { + future <- makeFutureJournal(future, event = "create", category = "overhead", start = t_start) + } + if (!lazy) { future <- run(future) future$lazy <- FALSE ## Assert that a future was returned stop_if_not(inherits(future, "Future"), !future$lazy) } - + future } diff --git a/R/futureSessionInfo.R b/R/futureSessionInfo.R index 6b9b4e3f..f10e133f 100644 --- a/R/futureSessionInfo.R +++ b/R/futureSessionInfo.R @@ -14,7 +14,7 @@ futureSessionInfo <- function(test = TRUE, anonymize = TRUE) { mprint0 <- function(...) mprint(..., prefix = NULL, debug = TRUE) mprintf0 <- function(...) mdebugf(..., prefix = NULL, debug = TRUE) mstr0 <- function(...) mstr(..., prefix = NULL, debug = TRUE) - + message("*** Package versions") p <- c("future", "parallelly", "parallel", "globals", "listenv") v <- vapply(p, FUN = function(pkg) { @@ -93,11 +93,14 @@ futureSessionInfo <- function(test = TRUE, anonymize = TRUE) { if (anonymize) vs <- anonymize_info(vs) message("Main R session details:") mprint0(vs) - + + delay <- getOption("future.futureSessionInfo.delay", 1.0) ## seconds + ## Information on the workers fs <- list() for (ii in seq_len(nbrOfWorkers())) { fs[[ii]] <- future({ + Sys.sleep(delay) data.frame( worker = ii, pid = Sys.getpid(), diff --git a/R/journal.R b/R/journal.R new file mode 100644 index 00000000..e524cf99 --- /dev/null +++ b/R/journal.R @@ -0,0 +1,341 @@ +#' Gets the logged journal of events for a future +#' +#' _WARNING: This function is under development. It can change at any time. +#' For now, please, do not depend on this function in a published R package._ +#' +#' @param x A [Future] object. +#' +## @param baseline (POSIXct; optional) A timestamp to server as time zero +## for the relative start time (`at`). If `TRUE` (default), then the +## earliest timepoint observed is used as the baseline. +#' +#' @param \ldots Not used. +#' +#' @return +#' A data frame of class `FutureJournal` with columns: +#' +#' 1. `event` (character string) - type of event that took place +#' 2. `category` (character string) - the category of the event +#' 3. `parent` (character string) - (to be describe) +#' 4. `start` (POSIXct) - the timestamp when the event started +#' 5. `at` (difftime) - the time when the event started relative to +#' first event +#' 6. `duration` (difftime) - the duration of the event +#' 7. `future_label` (character string) - the label of the future +#' 8. `future_uuid` (character string) - the UUID of the future +#' 9. `session_uuid` (character string) - the UUID of the R session +#' where the event took place +#' +#' The common events are: +#' +#' * `create` - the future was created (an `overhead`) +#' * `launch` - the future was launched (an `overhead`) +#' * `evaluate` - the future was evaluated (an `evaluation`) +#' * `resolved` - the future was queried (may be occur multiple times) +#' (an `overhead`) +#' * `gather` - the results was retrieved (an `overhead`) +#' +#' but others may be added by other Future classes. +#' +#' Common event categorys are: +#' +#' * `evaluation` - processing time is spent on evaluation +#' * `overhead` - processing time is spent on orchestrating the future +#' * `waiting` - processing time is spent on waiting to set up or +#' querying the future +#' +#' but others may be added by other Future classes. +#' +#' The data frame is sorted by the `at` time. +#' Note that the timestamps for the `evaluate` event are based on the local +#' time on the worker. The system clocks on the worker and the calling R +#' system may not be in perfect sync. +#' +#' @section Enabling and disabling event logging: +#' To enable logging of events, set option `future.journal` is TRUE. +#' To disable, set it to FALSE (default). +#' +#' @example incl/journal.R +#' +#' @seealso +#' Use [capture_journals()] to capture journals from all futures. +#' +#' @keywords internal +#' @noRd +journal <- function(x, ...) UseMethod("journal") + +#' @export +journal.Future <- function(x, ...) { + data <- x$.journal + if (is.null(data)) { + label <- x$label + if (is.null(label)) label <- "" + stop(sprintf("No journal is available for future ('%s'). Did you forget to enable journaling?", label)) + } + stop_if_not(inherits(data, "FutureJournal")) + session_uuid <- x$owner + stop_if_not(length(session_uuid) == 1L, is.character(session_uuid), !is.na(session_uuid)) + + session_uuid <- rep(session_uuid, times = nrow(data)) + + ## Backward compatibility (until all backends does this) + if (!is.element("evaluate", data$event) && !is.null(x$result)) { + stop_if_not(is.character(session_uuid)) + x <- appendToFutureJournal(x, + event = "evaluate", + category = "evaluation", + start = x$result$started, + stop = x$result$finished + ) + data <- x$.journal + stop_if_not(length(x$result$session_uuid) == 1L, is.character(x$result$session_uuid)) + session_uuid <- c(session_uuid, x$result$session_uuid) + stop_if_not(inherits(data, "FutureJournal")) + } + + ## Find relative time zero + baseline <- min(data$start, na.rm = TRUE) + + ## Append 'at' and 'duration' + data$at <- data$start - baseline + data$duration <- data$stop - data$start + data$stop <- NULL + + ## Append future 'label' + data$future_label <- if (is.null(x$label)) NA_character_ else x$label + + ## Append future UUID + data$future_uuid <- as.factor(x$uuid) + + ## Append session UUID + data$session_uuid <- as.factor(session_uuid) + + ## Coerce 'event' to a factor + known_levels <- c("lifespan", "create", "launch", "resolved", "gather", "evaluate") + extra_levels <- c("attachPackages", "eraseWorker", "exportGlobals", "receiveResult", "getWorker") + other_levels <- sort(setdiff(data$event, known_levels)) + levels <- c(known_levels, other_levels) + data$event <- factor(data$event, levels = levels) + + ## Sort by relative start time + if (nrow(data) > 1L) data <- data[order(data$at), ] + + data +} + +#' @export +journal.FutureJournal <- function(x, baseline = NULL, ...) { + ## Reset relative time zero? + if (!is.null(baseline)) { + if (isTRUE(baseline)) baseline <- min(x$start, na.rm = TRUE) + x$at <- x$start - baseline + } + x +} + +#' @export +journal.list <- function(x, baseline = TRUE, ...) { + ## Reset relative time zero to the first observed timestamp? + if (isTRUE(baseline)) { + stop_if_not(baseline >= 1L, baseline <= length(x)) + x <- lapply(x, FUN = journal, ...) + start <- lapply(x, FUN = function(x) min(x$start, na.rm = TRUE)) + start <- Reduce(c, start) + baseline <- min(start, na.rm = TRUE) + } + + js <- lapply(x, FUN = journal, baseline = baseline, ...) + + class <- class(js[[1]]) + js <- Reduce(rbind, js) + class(js) <- class + + js +} + + +#' @export +print.FutureJournal <- function(x, digits.secs = 3L, ...) { + oopts <- options(digits.secs = digits.secs) + on.exit(options(oopts)) + NextMethod("print") +} + + +#' @export +summary.FutureJournal <- function(object, ...) { + ## To please 'R CMD check' + event <- future_uuid <- median <- parent <- category <- NULL + + dt_top <- subset(object, is.na(parent)) + + uuids <- unique(dt_top$future_uuid) + nbr_of_futures <- length(uuids) + + ## Calculate 'stop' times + dt_top$stop <- dt_top$start + dt_top$duration + + ## ------------------------------------------------------- + ## 1. Calculate the total walltime + ## ------------------------------------------------------- + ## (a) timestamp when the first event starts + t_begin <- subset(dt_top, event == "create")[["start"]] + ## (b) timestamp when 'gather' finishes + t_end <- subset(dt_top, event == "gather")[["stop"]] + ## (c) durations (per future) + t_delta <- t_end - t_begin + ## (d) total duration + t_total <- sum(t_delta, na.rm = TRUE) + + ## (e) build table + t <- NULL + if (length(uuids) > 1L) { + t <- c(t, min = min(t_delta, na.rm = TRUE)) + t <- c(t, mean = mean(t_delta, na.rm = TRUE)) + t <- c(t, median = median(t_delta, na.rm = TRUE)) + t <- c(t, max = max(t_delta, na.rm = TRUE)) + t <- as.difftime(t, units = "secs") + } + t <- c(t, total = t_total) + stats <- data.frame(walltime = t) + + ## ------------------------------------------------------- + ## 2. Calculate efficiency + ## ------------------------------------------------------- + ## (a) Per future + eff <- list() + for (kk in seq_along(uuids)) { + uuid <- uuids[[kk]] + dt_uuid <- subset(dt_top, future_uuid == uuid) + res <- data.frame( + evaluate = subset(dt_uuid, category == "evaluation")[["duration"]], + overhead = sum(subset(dt_uuid, category == "overhead")[["duration"]]) + ) + res[["duration"]] <- t_delta[kk] + eff[[uuid]] <- res + } + eff <- Reduce(rbind, eff) + + ## (b) Summary + res <- NULL + if (length(uuids) > 1L) { + t <- lapply(c("min", "mean", "median", "max"), FUN = function(fcn_name) { + fcn <- get(fcn_name, mode = "function") + t <- as.data.frame(lapply(eff, FUN = fcn)) + rownames(t) <- fcn_name + t + }) + t <- Reduce(rbind, t) + res <- t + } + + ## (c) Total + t <- as.data.frame(lapply(eff, FUN = sum)) + rownames(t) <- "total" + res <- rbind(res, t) + + ## (d) Combine + stats <- cbind(stats, res) + + ## (e) Fractions + stats[["evaluate_ratio"]] <- as.numeric(stats[["evaluate"]]) / as.numeric(stats[["duration"]]) + stats[["overhead_ratio"]] <- as.numeric(stats[["overhead"]]) / as.numeric(stats[["duration"]]) + + stats[["summary"]] <- rownames(stats) + rownames(stats) <- NULL + stats <- stats[, c("summary", "evaluate", "evaluate_ratio", "overhead", "overhead_ratio", "duration", "walltime")] + + attr(stats, "nbr_of_futures") <- length(uuids) + class(stats) <- c("FutureJournalSummary", class(stats)) + stats +} + + +#' @export +print.FutureJournalSummary <- function(x, ...) { + cat(sprintf("Number of futures: %d\n", attr(x, "nbr_of_futures"))) + NextMethod("print") +} + + +makeFutureJournal <- function(x, event = "create", category = "other", parent = NA_character_, start = stop, stop = Sys.time()) { + stop_if_not( + inherits(x, "Future"), + is.null(x$.journal), + length(event) == 1L, is.character(event), !is.na(event), + length(category) == 1L, is.character(category), !is.na(event), + length(parent) == 1L, is.character(parent), + length(start) == 1L, inherits(start, "POSIXct"), + length(stop) == 1L, inherits(stop, "POSIXct") + ) + + data <- data.frame(event = event, category = category, parent = parent, start = start, stop = stop) + class(data) <- c("FutureJournal", class(data)) + x$.journal <- data + invisible(x) +} + +updateFutureJournal <- function(x, event, start = NULL, stop = Sys.time()) { + ## Nothing to do? + if (!inherits(x$.journal, "FutureJournal")) return(x) + + stop_if_not( + inherits(x, "Future"), + length(event) == 1L, is.character(event), !is.na(event), + is.null(start) || (length(start) == 1L && inherits(start, "POSIXct")), + is.null(stop) || (length(stop) == 1L && inherits(stop, "POSIXct")) + ) + + data <- x$.journal + stop_if_not(inherits(data, "FutureJournal")) + row <- which(data$event == event) + n <- length(row) + if (n == 0L) stop("No such 'event' entry in journal: ", sQuote(event)) + if (n > 1L) row <- row[n] + entry <- data[row, ] + if (!is.null(start)) entry$start <- start + if (!is.null(stop)) entry$stop <- stop + data[row, ] <- entry + stop_if_not(inherits(data, "FutureJournal")) + x$.journal <- data + invisible(x) +} + + +appendToFutureJournal <- function(x, event, category = "other", parent = NA_character_, start = Sys.time(), stop = as.POSIXct(NA_real_), skip = TRUE) { + ## Nothing to do? + if (!inherits(x$.journal, "FutureJournal")) return(x) + + if (skip && is.element(event, x$.journal$event)) return(x) + + stop_if_not( + inherits(x, "Future"), + length(event) == 1L, is.character(event), !is.na(event), + length(category) == 1L, is.character(category), !is.na(event), + length(parent) == 1L, is.character(parent), + length(start) == 1L, inherits(start, "POSIXct"), + length(stop) == 1L, inherits(stop, "POSIXct") + ) + + data <- data.frame(event = event, category = category, parent = parent, start = start, stop = stop) + x$.journal <- rbind(x$.journal, data) + invisible(x) +} + + + +#' @rdname FutureCondition +#' @export +FutureJournalCondition <- function(message, journal, call = NULL, uuid = future$uuid, future = NULL) { + stop_if_not(inherits(journal, "FutureJournal")) + cond <- FutureCondition(message = message, call = call, uuid = uuid, future = future) + cond$journal <- journal + class <- c("FutureJournalCondition", class(cond)) + class(cond) <- class[!duplicated(class, fromLast = TRUE)] + cond +} + +#' @export +journal.FutureJournalCondition <- function(x, ...) { + x$journal +} diff --git a/R/mandelbrot.R b/R/mandelbrot.R index 1204c0d0..e1a889b6 100644 --- a/R/mandelbrot.R +++ b/R/mandelbrot.R @@ -1,18 +1,5 @@ #' Mandelbrot convergence counts #' -#' @param Z A complex matrix for which convergence -#' counts should be calculated. -#' @param xmid,ymid,side,resolution Alternative specification of -#' the complex plane `Z`, where -#' `mean(Re(Z)) == xmid`, -#' `mean(Im(Z)) == ymid`, -#' `diff(range(Re(Z))) == side`, -#' `diff(range(Im(Z))) == side`, and -#' `dim(Z) == c(resolution, resolution)`. -#' @param maxIter Maximum number of iterations per bin. -#' @param tau A threshold; the radius when calling -#' divergence (Mod(z) > tau). -#' #' @return Returns an integer matrix (of class Mandelbrot) with #' non-negative counts. #' @@ -32,11 +19,29 @@ #' from ftp://stat.ethz.ch/U/maechler/R/ on 2005-02-18 (sic!). #' #' @aliases as.raster.Mandelbrot plot.Mandelbrot mandelbrot_tiles -#' @export -#' #' @keywords internal +#' +#' @export mandelbrot <- function(...) UseMethod("mandelbrot") + +#' @param Z A complex matrix for which convergence +#' counts should be calculated. +#' +#' @param maxIter Maximum number of iterations per bin. +#' +#' @param tau A threshold; the radius when calling +#' divergence (Mod(z) > tau). +#' +#' @param xmid,ymid,side,resolution Alternative specification of +#' the complex plane `Z`, where +#' `mean(Re(Z)) == xmid`, +#' `mean(Im(Z)) == ymid`, +#' `diff(range(Re(Z))) == side`, +#' `diff(range(Im(Z))) == side`, and +#' `dim(Z) == c(resolution, resolution)`. +#' +#' @rdname mandelbrot #' @export mandelbrot.matrix <- function(Z, maxIter = 200L, tau = 2.0, ...) { stop_if_not(is.matrix(Z), mode(Z) == "complex") @@ -82,6 +87,20 @@ mandelbrot.matrix <- function(Z, maxIter = 200L, tau = 2.0, ...) { } +#' @param xmid,ymid,side,resolution Alternative specification of +#' the complex plane `Z`, where +#' `mean(Re(Z)) == xmid`, +#' `mean(Im(Z)) == ymid`, +#' `diff(range(Re(Z))) == side`, +#' `diff(range(Im(Z))) == side`, and +#' `dim(Z) == c(resolution, resolution)`. +#' +#' @param maxIter Maximum number of iterations per bin. +#' +#' @param tau A threshold; the radius when calling +#' divergence (Mod(z) > tau). +#' +#' @rdname mandelbrot #' @export mandelbrot.numeric <- function(xmid = -0.75, ymid = 0.0, side = 3.0, resolution = 400L, maxIter = 200L, diff --git a/R/multicore.R b/R/multicore.R index 509401e7..6d1476bc 100644 --- a/R/multicore.R +++ b/R/multicore.R @@ -17,9 +17,13 @@ #' plan(multicore, workers = 2) #' ``` #' -#' @inheritParams multiprocess -#' @inheritParams Future-class #' @inheritParams future +#' @inheritParams Future-class +#' @inheritParams MulticoreFuture-class +#' +#' @param workers The number of parallel processes to use. +#' If a function, it is called without arguments _when the future +#' is created_ and its value is used to configure the workers. #' #' @return #' A \link{MulticoreFuture}. diff --git a/R/multiprocess.R b/R/multiprocess.R deleted file mode 100644 index 16e81606..00000000 --- a/R/multiprocess.R +++ /dev/null @@ -1,46 +0,0 @@ -#' Create a multiprocess future whose value will be resolved asynchronously using multicore or a multisession evaluation -#' -#' **WARNING: The 'multiprocess' future plan is deprecated. -#' Instead, explicitly specify 'multisession' or 'multicore'. The former works -#' everywhere and is the recommended one between the two. _Forked processing_, -#' which 'multicore' uses, is unstable in various environment and setups. -#' The 'multiprocess' alias is therefore being phased out, and is now -#' equal to using 'sequential' (sic!)** -#' -#' @inheritParams ClusterFuture-class -#' @inheritParams future -#' @inheritParams Future-class -#' -#' @param workers Ignored in **future** (>= 1.31.0). -#' -#' @param \dots Additional arguments passed to [Future()]. -#' -#' @return -#' A [SequentialFuture] (sic!) since **future** 1.31.0. -#' -#' @keywords internal -#' -#' @export -multiprocess <- function(..., workers = availableCores(), envir = parent.frame()) { - - msg1 <- "Detected creation of a 'multiprocess' future. Strategy 'multiprocess' is deprecated in future (>= 1.20.0) [2020-10-30]." - msg2 <- "Instead, specify either 'multisession' (recommended) or 'multicore'." - defunct <- getOption("future.deprecated.defunct") - if (is.element("multiprocess", defunct)) { - msg <- paste(msg1, "It will soon become defunct, i.e. produce an error.", msg2) - ## Need to wrap .Defunct() in another frame to avoid: - ## Error in as.vector(x, "character") : - ## cannot coerce type 'closure' to vector of type 'character' - dfcn <- function(...) .Defunct(...) - } else { - msg <- paste(msg1, msg2, "Starting with future 1.31.0 [2023-01-31], 'multiprocess' is the same as 'sequential'.") - dfcn <- .Deprecated - } - dfcn(msg = msg, package = .packageName) - - sequential(..., envir = envir) -} -class(multiprocess) <- c("sequential", "uniprocess", "future", "function") -## future (> 1.30.0): 'multiprocess' always resolves to 'sequential' -class(multiprocess) <- c(class(multiprocess), "multiprocess") -attr(multiprocess, "init") <- FALSE diff --git a/R/multisession.R b/R/multisession.R index 064ac291..83347e3f 100644 --- a/R/multisession.R +++ b/R/multisession.R @@ -18,10 +18,12 @@ #' plan(multisession, workers = 2) #' ``` #' -#' @inheritParams multiprocess +#' @inheritParams multicore #' @inheritParams cluster #' @inheritParams Future-class #' @inheritParams future +#' +#' @param \dots Additional arguments passed to [Future()]. #' #' @param rscript_libs A character vector of \R package library folders that #' the workers should use. The default is `.libPaths()` so that multisession diff --git a/R/options.R b/R/options.R index 80900094..08cf6b9d 100644 --- a/R/options.R +++ b/R/options.R @@ -46,6 +46,10 @@ #' #' \item{\option{future.rng.onMisuse}: (_beta feature - may change_)}{(character string) If random numbers are used in futures, then parallel (L'Ecuyer-CMRG) RNG should be used in order to get statistical sound RNGs. The defaults in the future framework assume that _no_ random number generation (RNG) is taken place in the future expression because L'Ecuyer-CMRG RNGs come with an unnecessary overhead if not needed. To protect against mistakes, the future framework attempts to detect when random numbers are used despite L'Ecuyer-CMRG RNGs are not in place. If this is detected, and `future.rng.onMisuse = "error"`, then an informative error message is produced. If `"warning"`, then a warning message is produced. If `"ignore"`, no check is performed. (Default: `"warning"`)} #' +#' \item{\option{future.globalenv.onMisuse}: (_beta feature - may change_)}{(character string) Assigning variables to the global environment for the purpose of using the variable at a later time makes no sense with futures, because the next future may be evaluated in different R process. To protect against mistakes, the future framework attempts to detect when variables are added to the global environment. If this is detected, and `future.globalenv.onMisuse = "error"`, then an informative error message is produced. If `"warning"`, then a warning message is produced. If `"ignore"`, no check is performed. (Default: `"ignore"`)} +#' +#' \item{\option{future.onFutureCondition.keepFuture}:}{(logical) If `TRUE`, a `FutureCondition` keeps a copy of the `Future` object that triggered the condition. If `FALSE`, it is dropped. (Default: `TRUE`)} +#' #' \item{\option{future.wait.timeout}:}{(numeric) Maximum waiting time (in seconds) for a free worker before a timeout error is generated. (Default: `30 * 24 * 60 * 60` (= 30 days))} #' #' \item{\option{future.wait.interval}:}{(numeric) Initial interval (in @@ -74,7 +78,7 @@ #' \describe{ #' \item{\option{future.fork.multithreading.enable} (_beta feature - may change_):}{(logical) Enable or disable _multi-threading_ while using _forked_ parallel processing. If `FALSE`, different multi-thread library settings are overridden such that they run in single-thread mode. Specifically, multi-threading will be disabled for OpenMP (which requires the \pkg{RhpcBLASctl} package) and for **RcppParallel**. If `TRUE`, or not set (the default), multi-threading is allowed. Parallelization via multi-threaded processing (done in native code by some packages and external libraries) while at the same time using forked (aka "multicore") parallel processing is known to unstable. Note that this is not only true when using `plan(multicore)` but also when using, for instance, \code{\link[=mclapply]{mclapply}()} of the \pkg{parallel} package. (Default: not set)} #' -#' \item{\option{future.output.windows.reencode} (_beta feature - may change_):}{(logical) Enable or disable re-encoding of UTF-8 symbols that were incorrectly encoded while captured. On MS Windows, R cannot capture UTF-8 symbols as-is when they are captured from the standard output. For examples, a UTF-8 check mark symbol (`"\u2713"`) would be relayed as `""` (a string with eight ASCII characters). This option will cause `value()` to attempt to recover the intended UTF-8 symbols from `` string components, if, and only if, the string was captured by a future resolved on MS Windows. (Default: `TRUE`)} +#' \item{\option{future.output.windows.reencode}:}{(logical) Enable or disable re-encoding of UTF-8 symbols that were incorrectly encoded while captured. In R (< 4.2.0) and on older versions of MS Windows, R cannot capture UTF-8 symbols as-is when they are captured from the standard output. For examples, a UTF-8 check mark symbol (`"\u2713"`) would be relayed as `""` (a string with eight ASCII characters). Setting this option to `TRUE` will cause `value()` to attempt to recover the intended UTF-8 symbols from `` string components, if, and only if, the string was captured by a future resolved on MS Windows. (Default: `TRUE`)} #' } #' #' See also [parallelly::parallelly.options]. @@ -150,7 +154,13 @@ #' R_FUTURE_GLOBALS_ONREFERENCE #' future.plan #' R_FUTURE_PLAN +#' future.onFutureCondition.keepFuture +#' R_FUTURE_ONFUTURECONDITION_KEEPFUTURE #' future.resolve.recursive +#' R_FUTURE_RESOLVE_RECURSIVE +#' future.globalenv.onMisuse +#' R_FUTURE_GLOBALENV_ONMISUSE +#' future.rng.onMisuse #' R_FUTURE_RNG_ONMISUSE #' future.wait.alpha #' R_FUTURE_WAIT_ALPHA @@ -161,6 +171,8 @@ #' R_FUTURE_RESOLVED_TIMEOUT #' future.output.windows.reencode #' R_FUTURE_OUTPUT_WINDOWS_REENCODE +#' future.journal +#' R_FUTURE_JOURNAL #' #' @name future.options NULL @@ -262,7 +274,7 @@ update_package_options <- function(debug = FALSE) { update_package_option("future.deprecated.ignore", split = ",", debug = debug) - update_package_option("future.deprecated.defunct", mode = "character", split = ",", default = if (interactive()) "multiprocess" else NULL, debug = debug) + update_package_option("future.deprecated.defunct", mode = "character", split = ",", debug = debug) update_package_option("future.fork.multithreading.enable", mode = "logical", debug = debug) @@ -290,14 +302,22 @@ update_package_options <- function(debug = FALSE) { update_package_option("future.resolve.recursive", mode = "integer", debug = debug) + ## Introduced in future 1.33.0: + update_package_option("future.alive.timeout", mode = "numeric", debug = debug) + ## Introduced in future 1.22.0: for (name in c("future.resolved.timeout", "future.cluster.resolved.timeout", "future.multicore.resolved.timeout")) { update_package_option(name, mode = "numeric", debug = debug) } + ## Introduced in future 1.32.0: + update_package_option("future.onFutureCondition.keepFuture", mode = "logical", debug = debug) + update_package_option("future.rng.onMisuse", debug = debug) - update_package_option("future.rng.onMisuse.keepFuture", mode = "logical", debug = debug) + ## Prototyping in future 1.32.0: + update_package_option("future.globalenv.onMisuse", debug = debug) + update_package_option("future.wait.timeout", mode = "numeric", debug = debug) update_package_option("future.wait.interval", mode = "numeric", debug = debug) update_package_option("future.wait.alpha", mode = "numeric", debug = debug) @@ -312,6 +332,12 @@ update_package_options <- function(debug = FALSE) { ## Prototyping in future 1.26.0: update_package_option("future.globals.globalsOf.locals", mode = "logical", debug = debug) + ## future 1.32.0: + update_package_option("future.state.onInvalid", mode = "character", debug = debug) + + ## future 1.32.0: + update_package_option("future.journal", mode = "logical", debug = debug) + ## SETTINGS USED FOR DEPRECATING FEATURES ## future 1.22.0: update_package_option("future.globals.keepWhere", mode = "logical", debug = debug) diff --git a/R/resolve.R b/R/resolve.R index 55234cca..d4e4db10 100644 --- a/R/resolve.R +++ b/R/resolve.R @@ -14,9 +14,12 @@ #' should be done. If TRUE, an infinite recursion is used. If FALSE or zero, #' no recursion is performed. #' -#' @param result (internal) If TRUE, the results are retrieved, otherwise not. +#' @param result (internal) If TRUE, the results are _retrieved_, otherwise not. +#' Note that this only collects the results from the parallel worker, which +#' can help lower the overall latency if there are multiple concurrent futures. +#' This does _not_ return the collected results. #' -#' @param stdout (internal) If TRUE, captured standard output is relayed, otherwise note. +#' @param stdout (internal) If TRUE, captured standard output is relayed, otherwise not. #' #' @param signal (internal) If TRUE, captured \link[base]{conditions} are relayed, #' otherwise not. @@ -54,6 +57,21 @@ resolve.Future <- function(x, idxs = NULL, recursive = 0, result = FALSE, stdout .Defunct(msg = "Argument 'value' of resolve() is defunct. It was deprecated in future (>= 1.15.0) [2019-11-07]. Use 'result' instead.", package = .packageName) } + ## Automatically update journal entries for Future object + if (inherits(future, "Future") && + inherits(future$.journal, "FutureJournal")) { + t_start <- Sys.time() + on.exit({ + appendToFutureJournal(x, + event = "resolve", + category = "overhead", + start = t_start, + stop = Sys.time(), + skip = FALSE + ) + }) + } + if (is.logical(recursive)) { if (recursive) recursive <- getOption("future.resolve.recursive", 99) } diff --git a/R/resolved.R b/R/resolved.R index 5ecdb77c..d3f1ba0b 100644 --- a/R/resolved.R +++ b/R/resolved.R @@ -1,7 +1,7 @@ #' Check whether a future is resolved or not #' #' @param x A \link{Future}, a list, or an environment (which also -#' includes \link[listenv:listenv]{list environment}. +#' includes \link[listenv:listenv]{list environment}). #' #' @param \dots Not used. #' @@ -19,7 +19,23 @@ #' e.g. `while (!resolved(future)) Sys.sleep(5)`. #' #' @export -resolved <- function(x, ...) UseMethod("resolved") +resolved <- function(x, ...) { + ## Automatically update journal entries for Future object + if (inherits(future, "Future") && + inherits(future$.journal, "FutureJournal")) { + start <- Sys.time() + on.exit({ + appendToFutureJournal(x, + event = "resolved", + category = "querying", + start = start, + stop = Sys.time(), + skip = FALSE + ) + }) + } + UseMethod("resolved") +} #' @export resolved.default <- function(x, ...) TRUE diff --git a/R/sequential.R b/R/sequential.R index 11106216..7344fd82 100644 --- a/R/sequential.R +++ b/R/sequential.R @@ -15,7 +15,6 @@ #' ``` #' #' @inheritParams future -#' @inheritParams multiprocess #' @inheritParams Future-class #' #' @return diff --git a/R/signalConditions.R b/R/signalConditions.R index c0acfa66..1bc2f07d 100644 --- a/R/signalConditions.R +++ b/R/signalConditions.R @@ -26,8 +26,7 @@ #' @keywords internal signalConditions <- function(future, include = "condition", exclude = NULL, resignal = TRUE, ...) { ## Future is not yet launched - ## FIXME: civis::CivisFuture uses 'succeeded' /HB 2019-06-18 - if (!future$state %in% c("finished", "failed", "succeeded")) { + if (!future$state %in% c("finished", "failed")) { stop(FutureError( sprintf( "Internal error: Cannot resignal future conditions. %s has not yet been resolved (state = %s)", diff --git a/R/value.R b/R/value.R index 973c1692..dc7045e2 100644 --- a/R/value.R +++ b/R/value.R @@ -73,6 +73,40 @@ value.Future <- function(future, stdout = TRUE, signal = TRUE, ...) { } + ## Were there any variables added to the global enviroment? + if (length(result$globalenv$added) > 0L) { + onMisuse <- getOption("future.globalenv.onMisuse", "ignore") + if (onMisuse != "ignore") { + if (onMisuse == "error") { + cond <- GlobalEnvFutureError(globalenv = result$globalenv, future = future) + } else if (onMisuse == "warning") { + cond <- GlobalEnvFutureWarning(globalenv = result$globalenv, future = future) + } else { + cond <- NULL + warnf("Unknown value on option 'future.globalenv.onMisuse': %s", + sQuote(onMisuse)) + } + + if (!is.null(cond)) { + ## FutureCondition to stack of captured conditions + new <- list(condition = cond, signaled = FALSE) + conditions <- result$conditions + n <- length(conditions) + + ## An existing run-time error takes precedence + if (n > 0L && inherits(conditions[[n]]$condition, "error")) { + conditions[[n + 1L]] <- conditions[[n]] + conditions[[n]] <- new + } else { + conditions[[n + 1L]] <- new + } + + result$conditions <- conditions + future$result <- result + } + } + } + ## Was RNG used without requesting RNG seeds? if (!isTRUE(future$.rng_checked) && isFALSE(future$seed) && isTRUE(result$rng)) { @@ -87,41 +121,33 @@ value.Future <- function(future, stdout = TRUE, signal = TRUE, ...) { } else { onMisuse <- getOption("future.rng.onMisuse", "warning") if (onMisuse != "ignore") { - label <- future$label - if (is.null(label)) label <- "" - cond <- RngFutureCondition(future = future) - msg <- conditionMessage(cond) - uuid <- future$uuid - if (getOption("future.rng.onMisuse.keepFuture", TRUE)) { - f <- future - } else { - f <- NULL - } if (onMisuse == "error") { - cond <- RngFutureError(msg, uuid = uuid, future = f) + cond <- RngFutureError(future = future) } else if (onMisuse == "warning") { - cond <- RngFutureWarning(msg, uuid = uuid, future = f) + cond <- RngFutureWarning(future = future) } else { cond <- NULL warnf("Unknown value on option 'future.rng.onMisuse': %s", sQuote(onMisuse)) } - ## RngFutureCondition to stack of captured conditions - new <- list(condition = cond, signaled = FALSE) - conditions <- result$conditions - n <- length(conditions) - - ## An existing run-time error takes precedence - if (n > 0L && inherits(conditions[[n]]$condition, "error")) { - conditions[[n + 1L]] <- conditions[[n]] - conditions[[n]] <- new - } else { - conditions[[n + 1L]] <- new + if (!is.null(cond)) { + ## RngFutureCondition to stack of captured conditions + new <- list(condition = cond, signaled = FALSE) + conditions <- result$conditions + n <- length(conditions) + + ## An existing run-time error takes precedence + if (n > 0L && inherits(conditions[[n]]$condition, "error")) { + conditions[[n + 1L]] <- conditions[[n]] + conditions[[n]] <- new + } else { + conditions[[n + 1L]] <- new + } + + result$conditions <- conditions + future$result <- result } - - result$conditions <- conditions - future$result <- result } } } diff --git a/R/zzz.plan.R b/R/zzz.plan.R index 32766b65..c9795f11 100644 --- a/R/zzz.plan.R +++ b/R/zzz.plan.R @@ -39,7 +39,7 @@ #' @section Built-in evaluation strategies: #' The \pkg{future} package provides the following built-in backends: #' -#' \itemize{ +#' \describe{ #' \item{[`sequential`]:}{ #' Resolves futures sequentially in the current \R process, e.g. #' `plan(sequential)`. @@ -75,16 +75,6 @@ #' are available on high-performance compute (HPC) clusters, e.g. LSF, #' Slurm, TORQUE/PBS, Sun Grid Engine, and OpenLava. #' -#' The following future strategies are _deprecated_ and must not be used: -#' -#' \itemize{ -#' \item{[`multiprocess`]:}{ (DEPRECATED since future 1.20.0) -#' If multicore evaluation is supported, that will be used, -#' otherwise multisession evaluation will be used. -#' _Please use `multisession`, or possibly `multicore` instead._ -#' } -#' } -#' #' To "close" any background workers (e.g. `multisession`), change #' the plan to something different; `plan(sequential)` is recommended #' for this. @@ -167,37 +157,6 @@ plan <- local({ FALSE } - warn_about_deprecated <- function(stack, strategy, fmtstr, ignore = NULL, defunct = NULL) { - for (kk in seq_along(stack)) { - if (evaluator_uses(stack[[kk]], strategy)) { - if (is.null(ignore)) ignore <- getOption("future.deprecated.ignore") - if (!is.element(strategy, ignore)) { - if (is.null(defunct)) defunct <- getOption("future.deprecated.defunct") - if (is.element(strategy, defunct)) { - msg <- sprintf(fmtstr, strategy, "defunct") - dfcn <- .Defunct - } else { - msg <- sprintf(fmtstr, strategy, "deprecated") - dfcn <- .Deprecated - } - dfcn(msg = msg, package = .packageName) - } - } - } - } - - warn_about_multiprocess <- function(stack) { - warn_about_deprecated(stack, strategy = "multiprocess", fmtstr = sprintf("Strategy '%%s' is %%s in future (>= 1.20.0) [2020-10-30]. Instead, explicitly specify either 'multisession' (recommended) or 'multicore'. Starting with future 1.31.0 [2023-01-31], 'multiprocess' is the same as 'sequential'.")) - } - - warn_about_remote <- function(stack) { - warn_about_deprecated(stack, strategy = "remote", fmtstr = "Strategy '%s' is %s in future (>= 1.30.0) [2022-12-15]. Instead, use plan(cluster, ..., persistent = TRUE).", ignore = "", defunct = "remote") - } - - warn_about_transparent <- function(stack) { - warn_about_deprecated(stack, strategy = "transparent", fmtstr = "Strategy '%s' is %s in future (>= 1.28.0) [2022-09-02]. It was designed to simplify interactive troubleshooting, but is now superseded by plan(sequential, split = TRUE).", defunct = "transparent") - } - warn_about_multicore <- local({ .warn <- TRUE @@ -304,9 +263,7 @@ plan <- local({ assert_no_disallowed_strategies(newStack) - warn_about_multiprocess(newStack) - warn_about_remote(newStack) - warn_about_transparent(newStack) + ## Warn about 'multicore' on certain systems warn_about_multicore(newStack) stack <<- newStack diff --git a/README.md b/README.md index daf0d607..52d7361f 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ The purpose of the [future] package is to provide a very simple and uniform way In programming, a _future_ is an abstraction for a _value_ that may be available at some point in the future. The state of a future can either be _unresolved_ or _resolved_. As soon as it is resolved, the value is available instantaneously. If the value is queried while the future is still unresolved, the current process is _blocked_ until the future is resolved. It is possible to check whether a future is resolved or not without blocking. Exactly how and when futures are resolved depends on what strategy is used to evaluate them. For instance, a future can be resolved using a sequential strategy, which means it is resolved in the current R session. Other strategies may be to resolve futures asynchronously, for instance, by evaluating expressions in parallel on the current machine or concurrently on a compute cluster. Here is an example illustrating how the basics of futures work. First, consider the following code snippet that uses plain R code: + ```r > v <- { + cat("Hello world!\n") @@ -22,9 +23,11 @@ Hello world! > v [1] 3.14 ``` + It works by assigning the value of an expression to variable `v` and we then print the value of `v`. Moreover, when the expression for `v` is evaluated we also print a message. Here is the same code snippet modified to use futures instead: + ```r > library(future) > v %<-% { @@ -35,9 +38,11 @@ Here is the same code snippet modified to use futures instead: Hello world! [1] 3.14 ``` + The difference is in how `v` is constructed; with plain R we use `<-` whereas with futures we use `%<-%`. The other difference is that output is relayed _after_ the future is resolved (not during) and when the value is queried (see Vignette 'Outputting Text'). So why are futures useful? Because we can choose to evaluate the future expression in a separate R process asynchronously by simply switching settings as: + ```r > library(future) > plan(multisession) @@ -49,6 +54,7 @@ So why are futures useful? Because we can choose to evaluate the future express Hello world! [1] 3.14 ``` + With asynchronous futures, the current/main R process does _not_ block, which means it is available for further processing while the futures are being resolved in separate processes running in the background. In other words, futures provide a simple but yet powerful construct for parallel and / or distributed processing in R. @@ -60,6 +66,7 @@ Now, if you cannot be bothered to read all the nitty-gritty details about future ## Implicit or Explicit Futures Futures can be created either _implicitly_ or _explicitly_. In the introductory example above we used _implicit futures_ created via the `v %<-% { expr }` construct. An alternative is _explicit futures_ using the `f <- future({ expr })` and `v <- value(f)` constructs. With these, our example could alternatively be written as: + ```r > library(future) > f <- future({ @@ -104,8 +111,6 @@ The future package implements the following types of futures: | `multicore` | not Windows/not RStudio | forked R processes (on current machine) | `cluster` | all | external R sessions on current, local, and/or remote machines -_Comment:_ The alias strategy `multiprocess` was deprecated in future (>= 1.20.0) in favor of `multisession` and `multicore`. - The future package is designed such that support for additional strategies can be implemented as well. For instance, the [future.callr] package provides future backends that evaluates futures in a background R process utilizing the [callr] package - they work similarly to `multisession` futures but has a few advantages. Continuing, the [future.batchtools] package provides futures for all types of _cluster functions_ ("backends") that the [batchtools] package supports. Specifically, futures for evaluating R expressions via job schedulers such as Slurm, TORQUE/PBS, Oracle/Sun Grid Engine (SGE) and Load Sharing Facility (LSF) are also available. By default, future expressions are evaluated eagerly (= instantaneously) and synchronously (in the current R session). This evaluation strategy is referred to as "sequential". In this section, we will go through each of these strategies and discuss what they have in common and how they differ. @@ -126,6 +131,7 @@ Because of this, the defaults of the different strategies are such that the resu * Future _expressions are only evaluated once_. As soon as the value (or an error) has been collected it will be available for all succeeding requests. Here is an example illustrating that all assignments are done to a local environment: + ```r > plan(sequential) > a <- 1 @@ -151,11 +157,12 @@ Synchronous futures are resolved one after another and most commonly by the R pr #### Sequential Futures Sequential futures are the default unless otherwise specified. They were designed to behave as similar as possible to regular R evaluation while still fulfilling the Future API and its behaviors. Here is an example illustrating their properties: + ```r > plan(sequential) > pid <- Sys.getpid() > pid -[1] 262086 +[1] 1437557 > a %<-% { + pid <- Sys.getpid() + cat("Future 'a' ...\n") @@ -173,15 +180,16 @@ Sequential futures are the default unless otherwise specified. They were design Future 'a' ... > b Future 'b' ... -[1] 262086 +[1] 1437557 > c Future 'c' ... [1] 6.28 > a [1] 3.14 > pid -[1] 262086 +[1] 1437557 ``` + Since eager sequential evaluation is taking place, each of the three futures is resolved instantaneously in the moment it is created. Note also how `pid` in the calling environment, which was assigned the process ID of the current process, is neither overwritten nor removed. This is because futures are evaluated in a local environment. Since synchronous (uni-)processing is used, future `b` is resolved by the main R process (still in a local environment), which is why the value of `b` and `pid` are the same. @@ -193,11 +201,12 @@ Next, we will turn to asynchronous futures, which are futures that are resolved #### Multisession Futures We start with multisession futures because they are supported by all operating systems. A multisession future is evaluated in a background R session running on the same machine as the calling R process. Here is our example with multisession evaluation: + ```r > plan(multisession) > pid <- Sys.getpid() > pid -[1] 262086 +[1] 1437557 > a %<-% { + pid <- Sys.getpid() + cat("Future 'a' ...\n") @@ -215,24 +224,27 @@ We start with multisession futures because they are supported by all operating s Future 'a' ... > b Future 'b' ... -[1] 262148 +[1] 1437616 > c Future 'c' ... [1] 6.28 > a [1] 3.14 > pid -[1] 262086 +[1] 1437557 ``` + The first thing we observe is that the values of `a`, `c` and `pid` are the same as previously. However, we notice that `b` is different from before. This is because future `b` is evaluated in a different R process and therefore it returns a different process ID. When multisession evaluation is used, the package launches a set of R sessions in the background that will serve multisession futures by evaluating their expressions as they are created. If all background sessions are busy serving other futures, the creation of the next multisession future is _blocked_ until a background session becomes available again. The total number of background processes launched is decided by the value of `availableCores()`, e.g. + ```r > availableCores() mc.cores 2 ``` + This particular result tells us that the `mc.cores` option was set such that we are allowed to use in total two (2) processes including the main process. In other words, with these settings, there will be two (2) background processes serving the multisession futures. The `availableCores()` is also agile to different options and system environment variables. For instance, if compute cluster schedulers are used (e.g. TORQUE/PBS and Slurm), they set specific environment variable specifying the number of cores that was allotted to any given job; `availableCores()` acknowledges these as well. If nothing else is specified, all available cores on the machine will be utilized, cf. `parallel::detectCores()`. For more details, please see `help("availableCores", package = "parallelly")`. @@ -254,11 +266,12 @@ On the other hand, process forking is also considered unstable in some R environ #### Cluster Futures Cluster futures evaluate expressions on an ad-hoc cluster (as implemented by the parallel package). For instance, assume you have access to three nodes `n1`, `n2` and `n3`, you can then use these for asynchronous evaluation as: + ```r > plan(cluster, workers = c("n1", "n2", "n3")) > pid <- Sys.getpid() > pid -[1] 262086 +[1] 1437557 > a %<-% { + pid <- Sys.getpid() + cat("Future 'a' ...\n") @@ -276,21 +289,23 @@ Cluster futures evaluate expressions on an ad-hoc cluster (as implemented by the Future 'a' ... > b Future 'b' ... -[1] 262265 +[1] 1437715 > c Future 'c' ... [1] 6.28 > a [1] 3.14 > pid -[1] 262086 +[1] 1437557 ``` Any types of clusters that `parallel::makeCluster()` creates can be used for cluster futures. For instance, the above cluster can be explicitly set up as: + ```r cl <- parallel::makeCluster(c("n1", "n2", "n3")) plan(cluster, workers = cl) ``` + Also, it is considered good style to shut down cluster `cl` when it is no longer needed, that is, calling `parallel::stopCluster(cl)`. However, it will shut itself down if the main process is terminated. For more information on how to set up and manage such clusters, see `help("makeCluster", package = "parallel")`. Clusters created implicitly using `plan(cluster, workers = hosts)` where `hosts` is a character vector will also be shut down when the main R session terminates, or when the future strategy is changed, e.g. by calling `plan(sequential)`. @@ -311,6 +326,7 @@ will run three workers on `n1`, one on `n2`, and five on `n3`, in total nine par This far we have discussed what can be referred to as "flat topology" of futures, that is, all futures are created in and assigned to the same environment. However, there is nothing stopping us from using a "nested topology" of futures, where one set of futures may, in turn, create another set of futures internally and so on. For instance, here is an example of two "top" futures (`a` and `b`) that uses multisession evaluation and where the second future (`b`) in turn uses two internal futures: + ```r > plan(multisession) > pid <- Sys.getpid() @@ -331,96 +347,105 @@ For instance, here is an example of two "top" futures (`a` and `b`) that uses mu + c(b.pid = Sys.getpid(), b1.pid = b1, b2.pid = b2) + } > pid -[1] 262086 +[1] 1437557 > a Future 'a' ... -[1] 262386 +[1] 1437804 > b Future 'b' ... Future 'b1' ... Future 'b2' ... - b.pid b1.pid b2.pid -262385 262385 262385 + b.pid b1.pid b2.pid +1437805 1437805 1437805 ``` + By inspection the process IDs, we see that there are in total three different processes involved for resolving the futures. There is the main R process -(pid 262086), +(pid 1437557), and there are the two processes used by `a` -(pid 262386) +(pid 1437804) and `b` -(pid 262385). +(pid 1437805). However, the two futures (`b1` and `b2`) that is nested by `b` are evaluated by the same R process as `b`. This is because nested futures use sequential evaluation unless otherwise specified. There are a few reasons for this, but the main reason is that it protects us from spawning off a large number of background processes by mistake, e.g. via recursive calls. To specify a different type of _evaluation topology_, other than the first level of futures being resolved by multisession evaluation and the second level by sequential evaluation, we can provide a list of evaluation strategies to `plan()`. First, the same evaluation strategies as above can be explicitly specified as: + ```r plan(list(multisession, sequential)) ``` + We would actually get the same behavior if we try with multiple levels of multisession evaluations; + ```r > plan(list(multisession, multisession)) [...] > pid -[1] 262086 +[1] 1437557 > a Future 'a' ... -[1] 262517 +[1] 1437901 > b Future 'b' ... Future 'b1' ... Future 'b2' ... - b.pid b1.pid b2.pid -262516 262516 262516 + b.pid b1.pid b2.pid +1437902 1437902 1437902 ``` + The reason for this is, also here, to protect us from launching more processes than what the machine can support. Internally, this is done by setting `mc.cores = 1` such that functions like `parallel::mclapply()` will fall back to run sequentially. This is the case for both multisession and multicore evaluation. Continuing, if we start off by sequential evaluation and then use multisession evaluation for any nested futures, we get: + ```r > plan(list(sequential, multisession)) [...] > pid -[1] 262086 +[1] 1437557 > a Future 'a' ... -[1] 262086 +[1] 1437557 > b Future 'b' ... Future 'b1' ... Future 'b2' ... - b.pid b1.pid b2.pid -262086 262664 262665 + b.pid b1.pid b2.pid +1437557 1438017 1438016 ``` + which clearly show that `a` and `b` are resolved in the calling process -(pid 262086) +(pid 1437557) whereas the two nested futures (`b1` and `b2`) are resolved in two separate R processes -(pids 262664 and 262665). +(pids 1438017 and 1438016). Having said this, it is indeed possible to use nested multisession evaluation strategies, if we explicitly specify (read _force_) the number of cores available at each level. In order to do this we need to "tweak" the default settings, which can be done as follows: + ```r > plan(list(tweak(multisession, workers = 2), tweak(multisession, + workers = 2))) [...] > pid -[1] 262086 +[1] 1437557 > a Future 'a' ... -[1] 262772 +[1] 1438105 > b Future 'b' ... Future 'b1' ... Future 'b2' ... - b.pid b1.pid b2.pid -262773 262883 262882 + b.pid b1.pid b2.pid +1438106 1438211 1438212 ``` + First, we see that both `a` and `b` are resolved in different processes -(pids 262772 and 262773) +(pids 1438105 and 1438106) than the calling process -(pid 262086). +(pid 1437557). Second, the two nested futures (`b1` and `b2`) are resolved in yet two other R processes -(pids 262883 and 262882). +(pids 1438211 and 1438212). For more details on working with nested futures and different evaluation strategies at each level, see Vignette '[Futures in R: Future Topologies]'. @@ -429,6 +454,7 @@ For more details on working with nested futures and different evaluation strateg ### Checking A Future without Blocking It is possible to check whether a future has been resolved or not without blocking. This can be done using the `resolved(f)` function, which takes an explicit future `f` as input. If we work with implicit futures (as in all the examples above), we can use the `f <- futureOf(a)` function to retrieve the explicit future from an implicit one. For example, + ```r > plan(multisession) > a %<-% { @@ -456,18 +482,18 @@ Waiting for 'a' to be resolved ... 8 9 10 -11 > cat("Waiting for 'a' to be resolved ... DONE\n") Waiting for 'a' to be resolved ... DONE > a Future 'a' ...done -[1] 262969 +[1] 1438287 ``` ## Failed Futures Sometimes the future is not what you expected. If an error occurs while evaluating a future, the error is propagated and thrown as an error in the calling environment _when the future value is requested_. For example, if we use lazy evaluation on a future that generates an error, we might see something like + ```r > plan(sequential) > b <- "hello" @@ -481,7 +507,9 @@ Everything is still ok although we have created a future that will fail. Future 'a' ... Error in log(b) : non-numeric argument to mathematical function ``` + The error is thrown each time the value is requested, that is, if we try to get the value again will generate the same error (and output): + ```r > a Future 'a' ... @@ -489,7 +517,9 @@ Error in log(b) : non-numeric argument to mathematical function In addition: Warning message: restarting interrupted promise evaluation ``` + To see the _last_ call in the call stack that gave the error, we can use the `backtrace()` function(\*) on the future, i.e. + ```r > backtrace(a) [[1]] @@ -512,6 +542,7 @@ Finally, it should be clarified that identifying globals from static code inspec ## Constraints when using Implicit Futures There is one limitation with implicit futures that does not exist for explicit ones. Because an explicit future is just like any other object in R it can be assigned anywhere/to anything. For instance, we can create several of them in a loop and assign them to a list, e.g. + ```r > plan(multisession) > f <- list() @@ -523,11 +554,13 @@ There is one limitation with implicit futures that does not exist for explicit o > v <- lapply(f, FUN = value) > str(v) List of 3 - $ : int 263104 - $ : int 263105 - $ : int 263104 + $ : int 1438377 + $ : int 1438378 + $ : int 1438377 ``` + This is _not_ possible to do when using implicit futures. This is because the `%<-%` assignment operator _cannot_ be used in all cases where the regular `<-` assignment operator can be used. It can only be used to assign future values to _environments_ (including the calling environment) much like how `assign(name, value, envir)` works. However, we can assign implicit futures to environments using _named indices_, e.g. + ```r > plan(multisession) > v <- new.env() @@ -539,13 +572,15 @@ This is _not_ possible to do when using implicit futures. This is because the ` > v <- as.list(v) > str(v) List of 3 - $ a: int 263220 - $ b: int 263221 - $ c: int 263220 + $ a: int 1438485 + $ b: int 1438486 + $ c: int 1438485 ``` + Here `as.list(v)` blocks until all futures in the environment `v` have been resolved. Then their values are collected and returned as a regular list. If _numeric indices_ are required, then _list environments_ can be used. List environments, which are implemented by the [listenv] package, are regular environments with customized subsetting operators making it possible to index them much like how lists can be indexed. By using list environments where we otherwise would use lists, we can also assign implicit futures to list-like objects using numeric indices. For example, + ```r > library(listenv) > plan(multisession) @@ -558,10 +593,11 @@ If _numeric indices_ are required, then _list environments_ can be used. List e > v <- as.list(v) > str(v) List of 3 - $ : int 263336 - $ : int 263335 - $ : int 263336 + $ : int 1438582 + $ : int 1438583 + $ : int 1438582 ``` + As previously, `as.list(v)` blocks until all futures are resolved. @@ -569,17 +605,22 @@ As previously, `as.list(v)` blocks until all futures are resolved. ## Demos To see a live illustration how different types of futures are evaluated, run the Mandelbrot demo of this package. First, try with the sequential evaluation, + ```r library(future) plan(sequential) demo("mandelbrot", package = "future", ask = FALSE) ``` + which resembles how the script would run if futures were not used. Then, try multisession evaluation, which calculates the different Mandelbrot planes using parallel R processes running in the background. Try, + ```r plan(multisession) demo("mandelbrot", package = "future", ask = FALSE) ``` + Finally, if you have access to multiple machines you can try to set up a cluster of workers and use them, e.g. + ```r plan(cluster, workers = c("n2", "n5", "n6", "n6", "n9")) demo("mandelbrot", package = "future", ask = FALSE) diff --git a/README_ja.md b/README_ja.md index 34c06ccf..3ea5cd32 100644 --- a/README_ja.md +++ b/README_ja.md @@ -1,4 +1,4 @@ -_This is a translation of [README.md](https://github.com/HenrikBengtsson/future/blob/develop/README.md) as of [2021-05-24](https://github.com/HenrikBengtsson/future/blob/74aea903791cbead5be7341766004571d4e0135b/README.md) done by [hoxo_m](https://github.com/hoxo-m)._ +_This is a translation of [README.md](https://github.com/HenrikBengtsson/future/blob/develop/README.md) as of [2023-06-17](https://github.com/HenrikBengtsson/future/blob/2a675abff2e3a729e6108d40710d0dcb22cc705b/README.md) done by [hoxo_m](https://github.com/hoxo-m)._
CRAN check status R CMD check status Top reverse-dependency checks status future.tests checks status Coverage Status @@ -69,7 +69,7 @@ Here is the same code snippet modified to use futures instead: 上のコードをフューチャを使った式に書き換えよう。 ``` r -> library("future") +> library(future) > v %<-% { + cat("Hello world!\n") + 3.14 @@ -99,7 +99,7 @@ Because we can choose to evaluate the future expression in a separate R process フューチャは何が便利なのだろうか? 式をフューチャにしておくと、式の評価を非同期実行したいときに、次のように簡単に切り替えることができる。 ``` r -> library("future") +> library(future) > plan(multisession) > v %<-% { + cat("Hello world!\n") @@ -140,7 +140,7 @@ future パッケージには、フューチャを作成する方法として、* <- value(f)` という2つの関数を使うスタイルがある。 上記の例を明示的なスタイルに書き換えると次のようになる。 ``` r -> library("future") +> library(future) > f <- future({ + cat("Hello world!\n") + 3.14 @@ -213,7 +213,7 @@ sequential all sequentially and in the current R process asynchronous: parallel: multisession all background R sessions (on current machine) multicore not Windows forked R processes (on current machine) -cluster all external R sessions on current, local, and/or remote machines +cluster all external R sessions on current, local, and remote machines --> | 名前 | OS | 説明 | @@ -225,12 +225,6 @@ cluster all external R sessions on current, local, and/or remote machines | `multicore` | Windows以外/RStudio以外 | フォークされた R プロセス(現行のマシン上) | | `cluster` | すべて | 外部 R セッション(現行、ローカル、リモートマシン上) | - - -**注意:** future (>= 1.20.0) では、`multiprocess` は非推奨となり、`multisession` または `multicore` の明確な指定が推奨される。 - 同期的フューチャはフューチャを作成した R プロセスで一つひとつ解決される。 同期的フューチャが解決されている間、メインプロセスはブロックされる。 -future パッケージの同期的フューチャには2つの種類がある。 **逐次的フューチャ**と**透過的フューチャ**である。 + #### 逐次的フューチャ (Sequential Future) @@ -369,7 +362,7 @@ Here is an example illustrating their properties: > plan(sequential) > pid <- Sys.getpid() > pid -[1] 23153 +[1] 1427324 > a %<-% { + pid <- Sys.getpid() + cat("Future 'a' ...\n") @@ -387,14 +380,14 @@ Here is an example illustrating their properties: Future 'a' ... > b Future 'b' ... -[1] 23153 +[1] 1427324 > c Future 'c' ... [1] 6.28 > a [1] 3.14 > pid -[1] 23153 +[1] 1427324 ``` - -透過的フューチャは `plan(transparent)` を指定することで利用できる。 -このフューチャは、評価時にエラーや警告などの通知を即時的に行う逐次的フューチャであり、代入は呼び出し環境で行われる。 -透過的フューチャは他の戦略では絞り込むことが難しいエラーのトラブルシューティングに役立つ。 ### 非同期的フューチャ @@ -451,7 +433,7 @@ Here is our example with multisession evaluation: > plan(multisession) > pid <- Sys.getpid() > pid -[1] 23153 +[1] 1427324 > a %<-% { + pid <- Sys.getpid() + cat("Future 'a' ...\n") @@ -469,14 +451,14 @@ Here is our example with multisession evaluation: Future 'a' ... > b Future 'b' ... -[1] 23246 +[1] 1427382 > c Future 'c' ... [1] 6.28 > a [1] 3.14 > pid -[1] 23153 +[1] 1427324 ``` + +各ノードで複数のワーカを実行したい場合は、次のようにノード名を複数回書けばよい。 + +``` +> plan(cluster, workers = c(rep("n1", times = 3), "n2", rep("n3", times = 5))) +``` + + + +この例では、`n1` では3つ、`n2` では1つ、`n3` では5つと、合計9つのワーカにより並列実行される。 + + ### フューチャのネストと評価トポロジー -プロセスIDを見ると、3つの異なるプロセスがフューチャの解決に使われていることがわかる。 メインプロセス (pid 23153)、`a` -に使われるプロセス (pid 23429)、`b` に使われるプロセス (pid 23430) である。 しかし、`b` +プロセスIDを見ると、3つの異なるプロセスがフューチャの解決に使われていることがわかる。 メインプロセス (pid 1427324)、`a` +に使われるプロセス (pid 1427606)、`b` に使われるプロセス (pid 1427607) である。 しかし、`b` にネストされている2つのフューチャ `b1` と `b2` は `b` と同じプロセスで評価されている。 これは、特に指定しない限り、ネストされたフューチャは逐次戦略を使って評価されるためである。 これにはいくつかの理由があるが、主な理由は、再帰呼び出しなどによって、誤って多くのバックグラウンドプロセスが発生するのを防ぐためである。 @@ -734,16 +734,16 @@ We would actually get the same behavior if we try with multiple levels of multis > plan(list(multisession, multisession)) [...] > pid -[1] 23153 +[1] 1427324 > a Future 'a' ... -[1] 23431 +[1] 1427721 > b Future 'b' ... Future 'b1' ... Future 'b2' ... - b.pid b1.pid b2.pid - 23432 23432 23432 + b.pid b1.pid b2.pid +1427722 1427722 1427722 ``` -`a` と `b` は呼び出しプロセス (pid 23153) で解決され、ネストされた2つのフューチャ(`b1` と -`b2`)はそれぞれ別のプロセス (pid 23433 と 23434) で解決されることがわかる。 +`a` と `b` は呼び出しプロセス (pid 1427324) で解決され、ネストされた2つのフューチャ(`b1` と +`b2`)はそれぞれ別のプロセス (pid 1427855 と 1427854) で解決されることがわかる。 -まず、`a` と `b` は呼び出しプロセス (pid 23153) とは異なるプロセス(pid 23435 と 23436)で解決される。 -次に、2つのネストされたフューチャ(`b1` と `b2`)もまた異なるプロセス(pid 23437 と 23438)で解決される。 +まず、`a` と `b` は呼び出しプロセス (pid 1427324) とは異なるプロセス(pid 1427973 と 1427972)で解決される。 +次に、2つのネストされたフューチャ(`b1` と `b2`)もまた異なるプロセス(pid 1428098 と 1428099)で解決される。