Skip to content

Commit

Permalink
new CallModule script
Browse files Browse the repository at this point in the history
  • Loading branch information
thusser committed Dec 24, 2023
1 parent b6d2bfb commit 7b6c43c
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 0 deletions.
1 change: 1 addition & 0 deletions pyobs/robotic/scripts/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
from .parallel import ParallelRunner
from .selector import SelectorScript
from .conditional import ConditionalRunner
from .callmodule import CallModule
51 changes: 51 additions & 0 deletions pyobs/robotic/scripts/callmodule.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from __future__ import annotations
import logging
from typing import Any, Dict, Optional, List, TYPE_CHECKING

if TYPE_CHECKING:
from pyobs.robotic import TaskRunner, TaskSchedule, TaskArchive
from pyobs.robotic.scripts import Script

log = logging.getLogger(__name__)


class CallModule(Script):
"""Script for calling method on a module."""

__module__ = "pyobs.modules.robotic"

def __init__(
self,
module: str,
method: str,
params: Optional[List[Any]],
**kwargs: Any,
):
"""Initialize a new SequentialRunner.
Args:
script: list or dict of scripts to run in a sequence.
"""
Script.__init__(self, **kwargs)
self.module = module
self.method = method
self.params = params or []

async def can_run(self) -> bool:
try:
self.comm.proxy(self.module)
return True
except ValueError:
return False

async def run(
self,
task_runner: TaskRunner,
task_schedule: Optional[TaskSchedule] = None,
task_archive: Optional[TaskArchive] = None,
) -> None:
proxy = self.comm.proxy(self.module)
await proxy.execute(self.method, *self.params)


__all__ = ["CallModule"]

0 comments on commit 7b6c43c

Please sign in to comment.