Skip to content

Commit

Permalink
Rename everything to Globus Compute (#104)
Browse files Browse the repository at this point in the history
  • Loading branch information
WardLT authored May 15, 2023
1 parent a89fa07 commit 930860d
Show file tree
Hide file tree
Showing 12 changed files with 89 additions and 368 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
- name: Install dependencies
run: |
pip install -U pip setuptools
pip install -e.[funcx,test]
pip install -e.[globus,test]
- name: Start Redis
uses: supercharge/[email protected]
- name: Lint with flake8
Expand Down
2 changes: 1 addition & 1 deletion colmena/task_server/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def perform_callback(self, future: Future, result: Result, topic: str):
"""Send a completed result back to queue. Used as a callback for complete tasks
Args:
future: Future created by FuncX
future: Future for a task
result: Initial result object. Used if the future throws an exception
topic: Topic used to send back to the user
"""
Expand Down
54 changes: 28 additions & 26 deletions colmena/task_server/funcx.py → colmena/task_server/globus.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
"""Task server based on FuncX
"""Task server based on Globus Compute
FuncX provides the ability to execute functions on remote "endpoints" that provide access to computational resources (e.g., cloud providers, HPC).
Globus Compute provides the ability to execute functions on remote "endpoints" that provide access to
computational resources (e.g., cloud providers, HPC).
Tasks and results are communicated to/from the endpoint through a cloud service secured using Globus Auth."""

import logging
from functools import partial, update_wrapper
from typing import Dict, Callable, Optional, Tuple
from concurrent.futures import Future

from funcx import FuncXClient, FuncXExecutor
from globus_compute_sdk import Client, Executor

from colmena.task_server.base import run_and_record_timing, FutureBasedTaskServer
from colmena.queue.python import PipeQueues
Expand All @@ -18,50 +19,51 @@
logger = logging.getLogger(__name__)


class FuncXTaskServer(FutureBasedTaskServer):
"""Task server that uses FuncX to execute tasks on remote systems
class GlobusComputeTaskServer(FutureBasedTaskServer):
"""Task server that uses Globus Compute to execute tasks on remote systems
Create a FuncXTaskServer by providing a dictionary of functions along with a FuncX endpoint ID
Create a task server by providing a dictionary of functions
mapped to the `endpoint <https://funcx.readthedocs.io/en/latest/endpoints.html>`_
on which it should run. The task server will wrap the provided function
on which each should run. The task server will wrap the provided function
in an interface that tracks execution information (e.g., runtime) and
`registers <https://funcx.readthedocs.io/en/latest/sdk.html#registering-functions>`_
the wrapped function with FuncX.
You must also provide a :class:`FuncXClient` that the task server can use to authenticate with the
FuncX web service.
The task server works using the :class:`FuncXExecutor` to communicate with FuncX via a RabbitMQ.
Once the task service process is created, the `FuncXClient` is used to instantiate a new
`FuncXExecutor` to perform work, and we use callbacks on the Python :class:`Future` objects
to send completed work back to the task queue.
the wrapped function with Globus Compute.
You must also provide a Globus Compute :class:`~globus_compute_sdk.client.Client`
that the task server will use to authenticate with the web service.
The task server works using Globus Compute's :class:`~globus_compute_sdk.executor.Executor`
to communicate to the web service over a web socket.
The functions used by the executor are registered when you create the task server,
and the Executor is launched when you start the task server.
"""

def __init__(self, methods: Dict[Callable, str],
funcx_client: FuncXClient,
def __init__(self,
methods: Dict[Callable, str],
funcx_client: Client,
queues: PipeQueues,
timeout: Optional[int] = None,
batch_size: int = 128):
"""
Args:
methods: Map of functions to the endpoint on which it will run
funcx_client: Authenticated FuncX client
funcx_client: Authenticated Globus Compute client
queues: Queues used to communicate with thinker
timeout: Timeout for requests from the task queue
batch_size: Maximum number of task request to receive before submitting
"""
# Store the client that has already been authenticated.
self.fx_client = funcx_client
self.fx_exec: FuncXExecutor = None
self.fx_exec: Optional[Executor] = None

# Create a function with the latest version of the wrapper function
self.registered_funcs: Dict[str, Tuple[Callable, str]] = {} # Function name -> (funcX id, endpoints)
self.registered_funcs: Dict[str, Tuple[str, str]] = {} # Function name -> (funcX id, endpoints)
for func, endpoint in methods.items():
# Make a wrapped version of the function
func_name = func.__name__
new_func = partial(run_and_record_timing, func)
update_wrapper(new_func, func)
func_fxid = self.fx_client.register_function(new_func)
# Store the FuncX information for the function
# Store the information for the function
self.registered_funcs[func_name] = (func_fxid, endpoint)

self._batch_options = dict(
Expand All @@ -73,7 +75,7 @@ def __init__(self, methods: Dict[Callable, str],

def perform_callback(self, future: Future, result: Result, topic: str):
# Check if the failure was due to a ManagerLost
# TODO (wardlt): Remove when we have retry support in FuncX
# TODO (wardlt): Remove when we have retry support in Globus Compute
exc = future.exception()
if 'Task failure due to loss of manager' in str(exc):
logger.info('Caught an task that failed due to a lost manager. Resubmitting')
Expand All @@ -97,9 +99,9 @@ def _submit(self, task: Result, topic: str) -> Future:
return future

def _setup(self):
# Create an executor to asynchronously transmit funcX tasks and recieve results
self.fx_exec = FuncXExecutor(funcx_client=self.fx_client,
batch_size=self._batch_options['batch_size'])
# Create an executor to asynchronously transmit funcX tasks and receive results
self.fx_exec = Executor(funcx_client=self.fx_client,
batch_size=self._batch_options['batch_size'])

def _cleanup(self):
self.fx_exec.shutdown()
self.fx_exec.shutdown(cancel_futures=True)
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
from pytest import fixture, mark

from colmena.queue.python import PipeQueues
from colmena.task_server.funcx import FuncXTaskServer
from colmena.task_server.globus import GlobusComputeTaskServer

my_funcs: dict = {}


class FakeClient:
"""Faked FuncXClient that allows you to register functions"""
"""Faked Client that allows you to register functions"""

def register_function(self, new_func, function_name: str = None, **kwargs):
global my_funcs
Expand Down Expand Up @@ -46,12 +46,12 @@ def shutdown(self):


@fixture()
def mock_funcx(mocker: MockFixture):
mocker.patch('colmena.task_server.funcx.FuncXExecutor', FakeExecutor)
def mock_globus(mocker: MockFixture):
mocker.patch('colmena.task_server.globus.Executor', FakeExecutor)


@mark.timeout(10)
def test_mocked_server(mock_funcx):
def test_mocked_server(mock_globus):
# Create the task server with a single, no-op function
client = FakeClient()
queues = PipeQueues()
Expand All @@ -61,7 +61,7 @@ def func(x):
raise MemoryError()
return x

fts = FuncXTaskServer({func: 'fake_endp'}, client, queues)
fts = GlobusComputeTaskServer({func: 'fake_endp'}, client, queues)
fts.start()

# Submit a task to the queue and see how it works
Expand Down
22 changes: 0 additions & 22 deletions demo_apps/funcx/README.md

This file was deleted.

25 changes: 25 additions & 0 deletions demo_apps/globus/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Globus Compute Task Server Demo

This demo recreates the "streaming" optimization application using the Globus Compute server.
Globus Compute allows the calculations to run on remote resources without any network configuration changes.

## Setup

Using Globus Compute requires a few additional steps beyond what is required for a standard Colmena application.

First, install and start a [Globus Compute endpoint](https://funcx.readthedocs.io/en/latest/endpoints.html) on the system that you would like to run.
Make sure to install the Colmena python library in the Python environment used by your endpoint (e.g., by calling `pip install colmena` in the environment in which you are installing Globus Compute).
Record the endpoint ID given to you when you install the Globus Compute endpoint.

Colmena also requires additional libraries to run Globus Compute. When installing Colmena, add them using PyPi's extra dependencies mechanism: `pip install colmena[globus]`.

## Running the Example

The example requires you to specify the endpoint on which tasks will run.
Provide it as the only requirement of the function, which will look something like:

```commandline
python run.py c845e24a-154e-4340-abc1-f83948d9454b
```


9 changes: 4 additions & 5 deletions demo_apps/funcx/run.py → demo_apps/globus/run.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
"""Perform GPR Active Learning where Bayesian optimization is used to select a new
calculation as soon as one calculation completes"""
from funcx import FuncXClient

from colmena.queue.base import ColmenaQueues
from colmena.queue.python import PipeQueues
from colmena.task_server.funcx import FuncXTaskServer
from colmena.task_server.globus import GlobusComputeTaskServer
from colmena.thinker import BaseThinker, agent
from globus_compute_sdk import Client
from sklearn.gaussian_process import GaussianProcessRegressor, kernels
from sklearn.preprocessing import MinMaxScaler
from sklearn.pipeline import Pipeline
Expand Down Expand Up @@ -176,12 +175,12 @@ def operate(self):
queues = PipeQueues()

# Log in to FuncX
fx_client = FuncXClient()
gc_client = Client()

# Create the task server and task generator
my_ackley = partial(ackley, mean_rt=args.runtime, std_rt=args.runtime_var)
update_wrapper(my_ackley, ackley)
doer = FuncXTaskServer({my_ackley: args.endpoint}, fx_client, queues)
doer = GlobusComputeTaskServer({my_ackley: args.endpoint}, gc_client, queues)
thinker = Thinker(queues, out_dir, dim=args.dim, n_guesses=args.num_guesses,
batch_size=args.num_parallel, opt_delay=args.opt_delay)
logging.info('Created the task server and task generator')
Expand Down
22 changes: 11 additions & 11 deletions demo_apps/synthetic-data/synthetic.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from proxystore.connectors.redis import RedisConnector
from proxystore.store import Store
from proxystore.store import register_store
from funcx import FuncXClient
from globus_compute_sdk import Client
from parsl import HighThroughputExecutor
from parsl.addresses import address_by_hostname
from parsl.config import Config
Expand All @@ -27,7 +27,7 @@
from colmena.queue.base import ColmenaQueues
from colmena.task_server import ParslTaskServer
from colmena.task_server.base import BaseTaskServer
from colmena.task_server.funcx import FuncXTaskServer
from colmena.task_server.globus import GlobusComputeTaskServer
from colmena.thinker import agent
from colmena.thinker import BaseThinker

Expand All @@ -39,9 +39,9 @@ def get_args():

backend_group = parser.add_mutually_exclusive_group(required=True)
backend_group.add_argument(
'--funcx',
'--globus',
action='store_true',
help='Use the FuncX Colmena Task Server',
help='Use the Globus Compute Colmena Task Server',
)
backend_group.add_argument(
'--parsl',
Expand Down Expand Up @@ -103,11 +103,11 @@ def get_args():
help='output directory',
)

funcx_group = parser.add_argument_group()
funcx_group.add_argument(
globus_compute = parser.add_argument_group()
globus_compute.add_argument(
'--endpoint',
required='--funcx' in sys.argv,
help='FuncX endpoint for task execution',
required='--globus' in sys.argv,
help='Globus compute endpoint for task execution',
)

parsl_group = parser.add_argument_group()
Expand Down Expand Up @@ -290,9 +290,9 @@ def producer(self):

# Create the task server
doer: BaseTaskServer
if args.funcx:
fcx = FuncXClient()
doer = FuncXTaskServer(
if args.globus:
fcx = Client()
doer = GlobusComputeTaskServer(
{target_function: args.endpoint},
fcx,
queues,
Expand Down
Loading

0 comments on commit 930860d

Please sign in to comment.