Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non-block batch assigns #307

Open
dipterix opened this issue May 12, 2019 · 12 comments
Open

Non-block batch assigns #307

dipterix opened this issue May 12, 2019 · 12 comments

Comments

@dipterix
Copy link

Hi, I have a project which requires loading of ~30GB data (~100 segments with each 300MB) into RAM, which is super time-consuming. The application only requires 5 segment of data to start analysis. I was wondering if future package can support non-blocked batch assignment. Basically lots of future::futureAssign in lapply way but non-blocking.

Right now if I use multicore plan, and set max worker number to be 8, then the 9th future will block the session. However, it could be best if this procedure is non-blocking. I implemented an ugly version using future and later packages, but you must have better idea of doing so.

The idea:

future::plan(future::multiprocess)

a = new.env(parent = baseenv())
b = new.env(parent = baseenv())
a$plus = 10

# For each element of x, assign corresponding letter character to environment b with el + 10
NONBLOCKING_lapply(
  x = 1:26, varnames = LETTERS[1:26], {
    Sys.sleep(3)
    return(el + plus)
  }, nworkers = 2, envir = a, assign.env = b
)
Sys.sleep(3)

# Yay, I can start to view A in environment b at once
b$A
# >> 11
@nbenn
Copy link

nbenn commented May 13, 2019

Disclaimer: I haven't really looked at your code, nor have I really taken the time to understand the specifics of your problem.

Maybe promises can help solving your issue.

@dipterix
Copy link
Author

@nbenn I'm so sorry I didn't state the issue clearly. Is there any way to plan multicore with 4 workers, and queue 100 futures without blocking the main process?

Basically is there any way to run the following code without blocking the main R process?

# 4 cores
plan(multicore)
lapply(1:100, function(i){
  future::future({
    Sys.sleep(3)
    return(i)
  })
})
print('I am printed out!')

@HenrikBengtsson
Copy link
Collaborator

Hi. No, there's no concept of queuing available in the Future API. As soon as all workers are occupied, where the number depends on your future plan set, the creation of a new future will block and wait for a worker to be freed up.

Some flavor of a queuing mechanism could probably be built on top of the Future API, but I don't think it should be part of the core, minimal Future API. Orchestrating queues can be rather complicated and opens up doors for other types of feature requests such as workload balancing etc. It would also require asynchronous processing, i.e. a mechanism for automatically launching queued futures in the background. @nbenn mentioned the 'promises' package, which in turn relies on the 'later' package for async processing.

Related: Issue #264 is a feature request on a simpler problem - provide a function for checking if the next future creation will block or not. Even how to implement that might not be obvious because the problem might not be well defined.

Hope this helps/clarifies

@dipterix
Copy link
Author

@HenrikBengtsson Thanks. I think I'll write my own "promise" function then. I notice that for the new version of future package, plan will reset workers. If I my workers are not fully occupied, and call plan, then the previous future object crashes.

> future::plan(future::multiprocess)
> f = future::future({Sys.sleep(10);1});future::plan(future::sequential);future::value(f)
Error: Cannot receive results for MultisessionFuture (<none>), because the connection to the worker is corrupt: Connection (connection: description="NA", class="NA", mode="NA", text="NA", opened="NA", can read="NA", can write="NA", id=3846) is no longer valid. There is currently no registered R connection with that index 3

It used to work though. I guess plan kills the active rsessions. Is there any way no to terminate them? I think this is actually a good feature.

@HenrikBengtsson
Copy link
Collaborator

I've created Issue #309 separately for this. Please note that the behavior of resetting/shutting down cluster/multisession workers is not new and was not introduced in future 1.13.0; i's been there since future 1.3.0 (2017-01-18). The difference between future 1.12.0 and future 1.13.0 is most likely due to you running RStudio and plan(multiprocess) is no longer plan(multicore) but plan(multisession).

@dipterix
Copy link
Author

@HenrikBengtsson Does future plan deprecate forked clusters in the future? I'm developing a package that requires multicore feature. PSOCK is too slow to "serialize" the data.

@HenrikBengtsson
Copy link
Collaborator

No, it'll always be available as long as R/parallel itself supports it. The latest move is just to lowering the risk of using it by mistake. It can always be re-enabled by overriding the default value of option future.fork.enable or env var R_FUTURE_FORK_ENABLE.

@schloerke
Copy link

Is there any way to plan multicore with 4 workers, and queue 100 futures without blocking the main process?

Basically is there any way to run the following code without blocking the main R process?

# 4 cores
plan(multicore)
lapply(1:100, function(i){
  future::future({
    Sys.sleep(3)
    return(i)
  })
})
print('I am printed out!')

I am hoping to standardize promising a future computation in rstudio/promises#60. (Similar to @dipterix's approach) It will most likely be a combination of promises, later, and fastmap which eventually calls future::future()

Assuming the function in rstudio/promises#60 is called promises::future_promise()... Your code could be:

# 4 cores
plan(multicore)

print("start")
fps <- lapply(1:100, function(i){
  promises::future_promise({
    Sys.sleep(3)
    return(i)
  })
})
print('I am printed out! 1') # printed immediately

# Add a _then_ statement to the first promise
fps[[1]] %...>% {
  print('Process 1 completed!')
}

print('I am printed out! 2') # printed immediately

With output looking like:

[1] "start"
[1] "I am printed out! 1"
[1] "I am printed out! 2"
[1] "Process 1 completed!

@dipterix
Copy link
Author

Hi @schloerke this is cool. How do you handle the situation where variables within the environments get changed when future is in the queue? (I'm not the maintainer, just curious)

@schloerke
Copy link

@dipterix

How do you handle the situation where variables within the environments get changed when future is in the queue?

future::getGlobalsAndPackages(expr) is a wonderful function that allows authors to capture values, globals, and packages needed to evaluate the expression at submission time. (This information will then be used in future_promise() at a later time when submitting the future::future(expr) call.)

@dipterix
Copy link
Author

dipterix commented Dec 1, 2020

Sorry @schloerke I wasn't making myself clear. In current implementation, future with lazy=TRUE executes immediately. If you want to have non-blocking future instances, there must be a queue for futures (I assume you want to put them into a fastmap). There exists a time between scheduling a future and evaluating the future.

What if the globals contain an environment and the environment is changed during while the future object waits to execute?

plan(multisession, workers = 2)

env <- new.env()
env$a <- 1
replicate(3, { future_promise({Sys.sleep(10); env$a })); env$a <- 2

Then what's returned from the third promise?

@schloerke
Copy link

Good test! ... Moving conversation to rstudio/promises#60

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants