Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tuning/reducing worker overhead costs #437

Closed
jeffkeller87 opened this issue Oct 24, 2020 · 11 comments
Closed

Tuning/reducing worker overhead costs #437

jeffkeller87 opened this issue Oct 24, 2020 · 11 comments

Comments

@jeffkeller87
Copy link

I have an R application with extremely high performance requirements with compute budgets in the 10s of milliseconds in total. In the past, I've tended to initialize workers with parallel and keep them hot (pre-loaded with data) for when they are needed. I really like that a similar consideration was made with future and I've been exploring a similar tactic using future:::clusterExportSticky (discussed here).

I really enjoy the ease of use of future and how it abstracts away a lot of the details of managing a background process/session, but I noticed that its compute overhead was significantly greater than other options. Of course, the abstraction comes at some cost, but I didn't expect the cost to be this high.

I ran some benchmarks in an attempt to isolate just the data transfer between processes (see below). I compared

  • parallel
  • callr
  • future

I ran into some garbage collection issues in the main process running the benchmark and also probably in the worker processes. In an attempt to minimize the impact of garbage collection, I ran the benchmark code via Rscript, set the R_GC_MEM_GROW to the highest available setting, and am not considering the max benchmark time because this is almost certainly tainted by garbage collection.

export R_GC_MEM_GROW=3
Rscript io_testing.R

For simplicity, I'm assuming that the compute cost is symmetrical, so am attempting to create a situation where the only meaningful data transfer is the return trip from the worker back to the main R process--the up-front data transfer should just be the input command.

# io_testing.R

library(parallel)
library(future)
library(callr)
library(microbenchmark)

gcinfo(FALSE) # Set this to TRUE to get an idea for how often gc triggers
options(width = 10000)

rs <- r_session$new()

cl_parallel <- makeCluster(1)
cl_future <- makeCluster(1)
plan(cluster, workers = cl_future)

(mb <- microbenchmark(
  `parallel:::send/recv` = {parallel:::sendCall(cl_parallel[[1]], fun = function() LETTERS, args = list());parallel:::recvResult(cl_parallel[[1]])},
  `r_session$call` = {rs$call(function() LETTERS);while (is.null(rs$read())) {}},
  `future` = value(future(LETTERS, globals = FALSE)),
  times = 100
))
# Unit: microseconds
#                  expr      min        lq       mean    median         uq       max neval
#  parallel:::send/recv    67.05   136.245   254.4019   195.525   276.5705   703.681   100
#        r_session$call 13293.44 13522.477 17383.0562 17819.488 20886.6350 29380.724   100
#                future 44827.92 47052.707 47797.4893 47718.788 48085.4330 53802.986   100

Note: As far as I know, parallel does not offer non-blocking variants of its functions (e.g., clusterCall, clusterEvalQ), so I'm using unexported functions directly as I would if I were to create this non-blocking behavior for myself. I also used separate clusters to further isolate the parallel and future code. At any rate, in order to benchmark these functions, the main process needs to be blocked anyway.

Conclusion: future seems to be two orders of magnitude slower than parallel and is takes about twice the time of callr.

Are there options available to users of future to tune this overhead? If not, are there obvious opportunities to optimize future code to bring its overhead closer to parallel? I believe that future uses parallel internally.

The above results are obviously very hardware dependent. Below are the details regarding my R session, CPU, and RAM timings.

> sessionInfo()
# R version 3.6.3 (2020-02-29)
# Platform: x86_64-pc-linux-gnu (64-bit)
# Running under: Linux Mint 20
# 
# Matrix products: default
# BLAS:   /usr/lib/x86_64-linux-gnu/blas/libblas.so.3.9.0
# LAPACK: /usr/lib/x86_64-linux-gnu/lapack/liblapack.so.3.9.0
# 
# locale:
#  [1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C               LC_TIME=en_US.UTF-8        LC_COLLATE=en_US.UTF-8    
#  [5] LC_MONETARY=en_US.UTF-8    LC_MESSAGES=en_US.UTF-8    LC_PAPER=en_US.UTF-8       LC_NAME=C                 
#  [9] LC_ADDRESS=C               LC_TELEPHONE=C             LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C       
# 
# attached base packages:
# [1] parallel  stats     graphics  grDevices utils     datasets  methods   base     
# 
# other attached packages:
# [1] microbenchmark_1.4-7 callr_3.2.0          future_1.19.1       
# 
# loaded via a namespace (and not attached):
#  [1] processx_3.3.1   compiler_3.6.3   R6_2.4.0         rsconnect_0.8.16 tools_3.6.3      listenv_0.8.0    codetools_0.2-16
#  [8] digest_0.6.23    ps_1.3.0         globals_0.13.1 
# CPU is not overclocked
$ cat /proc/cpuinfo | head -n5
processor	: 0
vendor_id	: AuthenticAMD
cpu family	: 23
model		: 113
model name	: AMD Ryzen 7 3700X 8-Core Processor
# RAM modules are running @ 1600MHz with slightly looser timings than the stock XMP
$ decode-dimms --bodyonly --side-by-side
Decoding EEPROM                                  0-0050           0-0051           0-0052           0-0053
Guessing DIMM is in                              bank 1           bank 2           bank 3           bank 4

---=== SPD EEPROM Information ===---
EEPROM CRC of bytes 0-125                        OK (0x242D)
# of bytes written to SDRAM EEPROM               384
Total number of bytes in EEPROM                  512
Fundamental Memory type                          DDR4 SDRAM
SPD Revision                                     1.1
Module Type                                      UDIMM
EEPROM CRC of bytes 128-253                      OK (0xA01C)

---=== Memory Characteristics ===---
Maximum module speed                             2132 MHz (PC4-17000)
Size                                             8192 MB
Banks x Rows x Columns x Bits                    16 x 16 x 10 x 64
SDRAM Device Width                               8 bits
Ranks                                            1
AA-RCD-RP-RAS (cycles)                           15-15-15-36
Supported CAS Latencies                          16T, 15T, 14T, 13T, 12T, 11T, 10T

---=== Timings at Standard Speeds ===---
AA-RCD-RP-RAS (cycles) as DDR4-1866              13-13-13-31
AA-RCD-RP-RAS (cycles) as DDR4-1600              11-11-11-27

---=== Timing Parameters ===---
Minimum Cycle Time (tCKmin)                      0.938 ns
Maximum Cycle Time (tCKmax)                      1.600 ns
Minimum CAS Latency Time (tAA)                   13.750 ns
Minimum RAS to CAS Delay (tRCD)                  13.750 ns
Minimum Row Precharge Delay (tRP)                13.750 ns
Minimum Active to Precharge Delay (tRAS)         33.000 ns
Minimum Active to Auto-Refresh Delay (tRC)       46.750 ns
Minimum Recovery Delay (tRFC1)                   350.000 ns
Minimum Recovery Delay (tRFC2)                   260.000 ns
Minimum Recovery Delay (tRFC4)                   160.000 ns
Minimum Four Activate Window Delay (tFAW)        21.000 ns
Minimum Row Active to Row Active Delay (tRRD_S)  3.700 ns
Minimum Row Active to Row Active Delay (tRRD_L)  5.300 ns
Minimum CAS to CAS Delay (tCCD_L)                5.625 ns
Minimum Write Recovery Time (tWR)                15.000 ns
Minimum Write to Read Time (tWTR_S)              2.500 ns
Minimum Write to Read Time (tWTR_L)              7.500 ns

---=== Other Information ===---
Package Type                                     Monolithic
Maximum Activate Count                           Unlimited
Post Package Repair                              One row per bank group
Soft PPR                                         Supported
Module Nominal Voltage                           1.2 V
Thermal Sensor                                   No

---=== Physical Characteristics ===---
Module Height                                    32 mm
Module Thickness                                 2 mm front, 2 mm back
Module Reference Card                            A revision 1

---=== Manufacturer Data ===---
Module Manufacturer                              Undefined
Part Number                                      Undefined


Number of SDRAM DIMMs detected and decoded: 4
@HenrikBengtsson
Copy link
Collaborator

Hi.

... compute budgets in the 10s of milliseconds in total.

For these levels of turn arounds/latencies, I think you want to stick with an as-bare-bone parallelization framework as possible, where I expect something like persistent PSOCK cluster workers should be the fastest. Looking at the PSOCK code I think one can squeeze out a little more (e.g. different serialization protocols) but it's pretty low-level with minimal overhead.

In contrast, the future framework does quite a bit more when it comes to orchestration the parallelization. Here's some sources of overhead that I can think of:

  • Main: globals and packages
  • Main: Validating maximum global size
  • Main: Asserting no references
  • Main: Locate a free, available worker and collecting results from finished workers, relaying output, etc.
  • Main: Exporting globals to workers with useXDR=TRUE/FALSE
  • Worker: Setting up R options
  • Worker: Capturing and relaying of (i) stdout and (ii) conditions
  • Worker: Try catch when evaluating the future expression
  • Main: Overhead from waiting and polling for results
  • Worker: Check for RNG changes
  • Worker: Sending results
  • Worker: Cleanup and garbage collection
  • Main: Relaying stdout and conditions

Some of the above overhead can be controlled by the developer, e.g. seed = FALSE, globals = FALSE, stdout = NA, conditions = NULL (in develop branch).

Important: In case someone else reads this - the overhead is basically a constant. Then run-time of the actual code that we run in parallel is effectively the same for all parallelization framework. So, for longer running tasks, the relative impact from the overhead can often be ignored.

I haven't done proper profiling so I don't know which are the dominant ones here(*). I also have attempted to optimize any of these beyond trying not to do silly things while writing the code. A wild guess is that one might bring done the current overhead to ~50%. It might be that one can push it even further if one allows for running in "risky" mode, i.e. drop validation and sanity checks that protect against common and corner-case developer mistakes. However, I doubt that the orchestration taking place in the future framework will ever be able to compete with a barebone setup with near-zero validation and almost no condition or output handling.

(*) I'd like to get to the point where one can enable internal journaling that logs the different steps performed on a future, on the main R process, and on the R worker. This will make it possible to generate flame graphs and similarly displaying the lifespan of a future. This can be particularly handy when studying the CPU utilization across multiple futures, e.g. in map-reduce calls. The internal journaling framework should be optional and should have zero overhead when not in use. The latter can be achieved by code-injections during package load (the most famous example is debugme). This in turn requires some type of "pragma" syntax to be invented. So, in summary, I'm interested in good profiling to be able to narrow down obvious things that can be improved, but there are several things that need to be in place first for that to happen. With a good profiling framework, we'll also be able to troubleshooting and optimize other future backends, e.g. future.batchtools.

@jeffkeller87
Copy link
Author

Thank you for the thoughtful response, I really appreciate it.

I fear I may have stumbled on a separate issue that makes future's overhead appear greater than it truly is. If I change the object being passed from the worker process to the main process in my tests from LETTERS to iris (much larger) then the overhead advantage of the lower-level parallel approach shrinks.

Linux (CPU = AMD Ryzen 7 3700X)

microbenchmark(
  `parallel:::send/recv` = {parallel:::sendCall(cl_parallel[[1]], fun = function() iris, args = list());parallel:::recvResult(cl_parallel[[1]])},
  `r_session$call` = {rs$call(function() iris);while (is.null(rs$read())) {}},
  `future` = value(future(iris, globals = FALSE)),
  times = 100
)
#Unit: microseconds
#                 expr       min         lq     mean   median       uq      max neval cld
# parallel:::send/recv   239.379   303.2235 19901.29 18728.37 39707.16 40193.50   100 a 
#       r_session$call 39939.493 40068.1780 44392.58 40204.60 40601.45 77935.59   100  b
#               future 79647.501 79978.8595 80927.29 80002.85 80231.84 88276.01   100   c 

But this all seemed far too slow for such small data, so I ran the the same benchmarks on Windows and got very different results--the parallel and future approaches are much faster, with future having much more reasonable overhead compared to raw parallel in my opinion. The Windows machine is running a different processor, but it's from the same Zen 2 architecture generation, and is a laptop processor so should be less capable than the CPU in the Linux tests. I see similar results running the benchmark on a 2018 Macbook Pro.

Windows (CPU = AMD Ryzen 7 4800H)

Unit: microseconds
                 expr       min        lq       mean    median         uq       max neval
 parallel:::send/recv   217.501   361.501   532.3019   478.751   533.3005  7326.001   100
       r_session$call 45902.501 46942.651 51566.8450 47727.751 51883.0505 92967.701   100
               future  8376.401  9366.501 11825.5280 11036.752 13393.6515 24709.701   100

Have you experienced such discrepancies when working with PSOCK connections across platforms? On the Linux side, I tested with Ubuntu 18.04 and 20.04 derivatives on a couple different processors (including Intel) and got the same poor results.

Regarding serialization protocols, the simplest I could think of was to tweak serialize/unserialize which is what parallel ultimately uses. Assuming little-endianness is an easy change that seems to scale well as the size of the data increases.

cl_bigendian <- makeCluster(1)
cl_lilendian <- makeCluster(1, useXDR = FALSE)
microbenchmark(
  clusterEvalQ(cl_bigendian, iris),
  clusterEvalQ(cl_lilendian, iris),
  clusterEvalQ(cl_bigendian, rep(iris, 10)),
  clusterEvalQ(cl_lilendian, rep(iris, 10)),
  clusterEvalQ(cl_bigendian, rep(iris, 100)),
  clusterEvalQ(cl_lilendian, rep(iris, 100)),
  clusterEvalQ(cl_bigendian, rep(iris, 1000)),
  clusterEvalQ(cl_lilendian, rep(iris, 1000))
)
#Unit: microseconds
#                                        expr       min         lq       mean      median          uq        max neval
#            clusterEvalQ(cl_bigendian, iris)   298.201   512.7510    668.385    641.3010    832.2510   1102.502   100
#            clusterEvalQ(cl_lilendian, iris)   294.301   489.8005    605.130    581.6015    705.4515   1104.400   100
#   clusterEvalQ(cl_bigendian, rep(iris, 10))   563.001   941.1510   1397.917   1054.6015   1245.1010  21638.101   100
#   clusterEvalQ(cl_lilendian, rep(iris, 10))   454.700   760.9015    890.981    884.5510   1063.0510   1287.401   100
#  clusterEvalQ(cl_bigendian, rep(iris, 100))  3048.901  3980.4015  14271.212   5332.9005   5544.6510 318265.501   100
#  clusterEvalQ(cl_lilendian, rep(iris, 100))  2226.801  2986.3510  16206.174   3881.2010   4108.5010 317174.301   100
# clusterEvalQ(cl_bigendian, rep(iris, 1000)) 23705.901 46664.3000 220999.680 342916.4015 348548.1010 675678.602   100
# clusterEvalQ(cl_lilendian, rep(iris, 1000)) 15136.101 20645.3010  71831.841  32377.1005  34967.1015 655633.900   100

@HenrikBengtsson
Copy link
Collaborator

That's interesting; no I haven't noticed/explored PSOCK performance differences across platforms. That (=the parallel results) might be worth bringing up on the R-devel list - it could be that some there has a good explanation for it.

Regarding XDR: In the next release of future, all functions related to the PSOCK cluster have been moved to the parallelly package and will use that package for setting up PSOCK cluster. Changing the default to be useXDR = FALSE will take place in parallelly, cf. futureverse/parallelly#27.

@HenrikBengtsson
Copy link
Collaborator

For the record, @jeffkeller87 posted 'parallel PSOCK connection latency is greater on Linux?' to R-devel on 2020-11-01 (https://stat.ethz.ch/pipermail/r-devel/2020-November/080060.html) and the above slowness on Linux has already been answered and suggestions for improvements have also been posted (including from one R Core member).

@HenrikBengtsson
Copy link
Collaborator

HenrikBengtsson commented Mar 11, 2021

Some good news. I've finally got around to speed up the creation of the R expression that is compiled from the future expression and then sent to the worker. It's in the develop branch. You should see a significant improvement; probably something like twice as fast compared with future 1.21.0. It might be that there's room for further improvements on expression creation - this is just the first iteration of a new approach.

The gist is that previously I relied on expr <- bquote({ ... .(inject_this) }) for each future, whereas now pre-compiling (when package is installed) tmpl <- bquote_compile({ ... .(inject_this) }) (slow) and then expr <- bquote_apply(tmpl) (super fast) for each future. This has been on my to-do list for several years to do and it turned out much better than I anticipated.

@jeffkeller87
Copy link
Author

FYI, it seems like there has been some progress on the TCP_NODELAY front in wch/r-source@82369f7 (thanks to Katie at RStudio for pointing this out).

I tried it out but didn't see any difference in latency.

library(parallel)
library(microbenchmark)
options(socketOptions = "no-delay")
cl <- makeCluster(1)
(x <- microbenchmark(clusterEvalQ(cl, iris), times = 100, unit = "us"))
# Unit: microseconds
#                    expr     min       lq     mean   median       uq      max neval
#  clusterEvalQ(cl, iris) 172.297 43987.13 44104.53 43998.25 44011.02 518496.6  100

@HenrikBengtsson
Copy link
Collaborator

Thanks for the followup.

I tried it out but didn't see any difference in latency.

I see that it was added back on 2021-03-30. Maybe it's worth following up with a note on your https://stat.ethz.ch/pipermail/r-devel/2020-November/080060.html post regarding this to sort out what to expect from those changes. Hopefully, it can be resolved in time before the R 4.2.0 freeze mid March 2022 or so.

@jeffkeller87
Copy link
Author

I misunderstood the usage of the new socketOptions option. It is meant to be set on worker processes during creation. Performance is then as expected and can be combined with other tweaks like useXDR = FALSE as previously noted.

library(parallel)
library(microbenchmark)

cl <- makeCluster(1)
cl_nd <- makeCluster(1, rscript_args="-e 'options(socketOptions=\"no-delay\")'")
cl_nd_nxdr <- makeCluster(1, rscript_args="-e 'options(socketOptions=\"no-delay\")'", useXDR = FALSE)

(x <- microbenchmark(
  clusterEvalQ(cl, iris),
  clusterEvalQ(cl_nd, iris),
  clusterEvalQ(cl_nd_nxdr, iris),
  times = 100, unit = "us"
))

# Unit: microseconds
#                            expr    min        lq       mean     median       uq       max neval
#          clusterEvalQ(cl, iris) 137.64 42928.099 42324.3028 43636.5300 43993.72 48007.906   100
#       clusterEvalQ(cl_nd, iris) 106.89   287.115   313.6334   325.3805   367.76   432.181   100
#  clusterEvalQ(cl_nd_nxdr, iris)  93.69   259.905   285.2995   302.2800   347.33   386.740   100

Looks like it is now possible to have highly responsive pre-heated workers on Linux!

@HenrikBengtsson
Copy link
Collaborator

That's great. Thanks for this. FWIW, parallelly uses XDR=FALSE by default and provides argument rscript_startup, so an alternative is:

library(parallelly)
cl_nd_nxdr <- makeClusterPSOCK(1, rscript_startup=quote(options(socketOptions="no-delay")))

BTW, I think you've got a case for adding an argument rscript_options analogously to rscript_envs, so one can write something like:

cl <- makeClusterPSOCK(1, rscript_options=list(socketOptions="no-delay"))

but also

options(socketOptions="no-delay")
cl <- makeClusterPSOCK(1, rscript_options="socketOptions")

This is not just neater but also less error prone than doing it via a code string.

Added to the to-do list: futureverse/parallelly#70

HenrikBengtsson added a commit that referenced this issue Nov 9, 2021
…ers = 2, rscript_startup = quote(options(socketOptions="no-delay"))) [#437]
@HenrikBengtsson
Copy link
Collaborator

A small update here regarding using argument rscript_startup for parallelly::makeClusterPSOCK(). As shown above, we can already now use:

cl <- makeClusterPSOCK(n, rscript_startup = quote(options(socketOptions = "no-delay")))

It turned out that some work was needed in future to get:

plan(cluster, workers = n, rscript_startup = quote(options(socketOptions = "no-delay")))
plan(multisession, workers = n, rscript_startup = quote(options(socketOptions = "no-delay")))

to work, but it now works in future (>= 1.23.0-9002).

@HenrikBengtsson
Copy link
Collaborator

FYI, parallelly 1.29.0 is now on CRAN, and it sets options(socketOptions = "no-delay") by default on cluster nodes (and uses useXDR=FALSE by default), so:

cl <- parallelly::makeClusterPSOCK(n)

is like:

cl <- parallel::makeCluster(n, rscript_args=c("-e", shQuote('options(socketOptions="no-delay")')), useXDR = FALSE)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants