Skip to content

Commit

Permalink
Merge pull request #98 from panregedit:feature/v0.2.1/opt
Browse files Browse the repository at this point in the history
Feature/v0.2.1/opt
  • Loading branch information
panregedit authored Nov 25, 2024
2 parents 0f09cea + 3aadc6b commit 23ca7b9
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 2 deletions.
15 changes: 15 additions & 0 deletions docs/concepts/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,21 @@ worker_config = build_from_file('path/to/config/directory')
```
Note: You must provide a ```workers``` directory in the configuration path which contains all configurations for the workers.

## Run workers
OmAgent provides a TaskHandler class to manage worker instance creation and management. Here's how to use TaskHandler:
```python
from omagent_core.engine.automator.task_handler import TaskHandler
task_handler = TaskHandler(worker_config=worker_config, workers=[MyWorker()])
task_handler.start_processes()
task_handler.stop_processes()
```
The `worker_config` parameter accepts a set of worker configurations and launches the corresponding number of processes based on each worker's concurrency attribute value.

You can also use the `workers` parameter to directly pass in instantiated worker objects. Instances of these workers are deepcopied based on the concurrency setting. If your worker instances contain objects that cannot be deepcopied, set the instance's concurrency property to 1 and actively expand the concurrency count in the workers list.

Then, use `start_processes` to start all workers and `stop_processes` to stop all workers.

## Important Notes
- Always use the @registry.register_worker() decorator to register the worker
- The ```_run``` method is mandatory and contains your core logic
Expand Down
21 changes: 20 additions & 1 deletion omagent-core/src/omagent_core/engine/automator/task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from multiprocessing import Process, freeze_support, Queue, set_start_method, get_context
from sys import platform
from typing import List
from copy import deepcopy

from omagent_core.engine.automator.task_runner import TaskRunner
from omagent_core.engine.configuration.configuration import Configuration
Expand Down Expand Up @@ -51,6 +52,17 @@ def __init__(
metrics_settings: MetricsSettings = None,
import_modules: List[str] = None
):
"""Initialize a new TaskHandler instance.
Args:
worker_config (List): List of worker configurations. Each config should be a dict containing
worker name, optional concurrency and other settings. The Worker replicates and starts the corresponding number of processes based on the concurrency parameter
workers (List[BaseWorker]): List of pre-configured worker instances. Instances of these workers are deepcopied based on the concurrency setting.If your worker instances contain objects that cannot be deepcopied, set the instance's concurrency property to 1 and actively expand the concurrency count in the workers list.
metrics_settings (MetricsSettings, optional): Configuration for metrics collection.
If None, metrics collection will be disabled.
import_modules (List[str], optional): List of module paths to import during initialization.
"""

self.logger_process, self.queue = _setup_logging_queue(container.conductor_config)

# imports
Expand All @@ -59,7 +71,14 @@ def __init__(
for module in import_modules:
logger.info(f'loading module {module}')
importlib.import_module(module)


existing_workers = []
for worker in workers:
concurrency = getattr(worker, 'concurrency', BaseWorker.model_fields['concurrency'].default)
if concurrency > 1:
existing_workers.extend([deepcopy(worker) for _ in range(concurrency - 1)])
workers.extend(existing_workers)

for config in worker_config:
worker_cls = registry.get_worker(config['name'])
concurrency = config.get('concurrency', BaseWorker.model_fields['concurrency'].default)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from omagent_core.engine.workflow.task.task import TaskInterface
from omagent_core.engine.workflow.task.task_type import TaskType
from omagent_core.engine.worker.base import BaseWorker


class SimpleTask(TaskInterface):
Expand All @@ -13,7 +14,9 @@ def __init__(self, task_def_name: str, task_reference_name: str) -> Self:
)


def simple_task(task_def_name: str, task_reference_name: str, inputs: dict[str, object] = {}) -> TaskInterface:
def simple_task(task_def_name: str|Type[BaseWorker], task_reference_name: str, inputs: dict[str, object] = {}) -> TaskInterface:
if issubclass(task_def_name, BaseWorker):
task_def_name = task_def_name.name
task = SimpleTask(task_def_name=task_def_name, task_reference_name=task_reference_name)
task.input_parameters.update(inputs)
return task

0 comments on commit 23ca7b9

Please sign in to comment.