Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #56 from anyscale/fix-ray-remote-options
Browse files Browse the repository at this point in the history
Fix ray remote options
  • Loading branch information
ahuang11 authored Nov 1, 2022
2 parents 6089262 + 097f404 commit 5e6521e
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 3 deletions.
10 changes: 7 additions & 3 deletions prefect_ray/task_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,13 @@ async def submit(
remote_options = RemoteOptionsContext.get().current_remote_options
# Ray does not support the submission of async functions and we must create a
# sync entrypoint
self._ray_refs[key] = ray.remote(
sync_compatible(call.func), **remote_options
).remote(**call_kwargs)
if remote_options:
ray_decorator = ray.remote(**remote_options)
else:
ray_decorator = ray.remote
self._ray_refs[key] = ray_decorator(sync_compatible(call.func)).remote(
**call_kwargs
)

def _optimize_futures(self, expr):
"""
Expand Down
14 changes: 14 additions & 0 deletions tests/test_task_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import tests
from prefect_ray import RayTaskRunner
from prefect_ray.context import remote_options


@pytest.fixture(scope="session")
Expand Down Expand Up @@ -230,3 +231,16 @@ def base_flow():

base_flow()
assert tmp_file.read_text() == "d"

def test_ray_options(self):
@task
def process(x):
return x + 1

@flow(task_runner=RayTaskRunner())
def my_flow():
# equivalent to setting @ray.remote(max_calls=1)
with remote_options(max_calls=1):
process.submit(42)

my_flow()

0 comments on commit 5e6521e

Please sign in to comment.