Skip to content

Commit

Permalink
ENH: seed sim iocs with live control system info
Browse files Browse the repository at this point in the history
  • Loading branch information
ZLLentz committed Nov 13, 2024
1 parent 1bf1e43 commit 9c190f1
Show file tree
Hide file tree
Showing 5 changed files with 257 additions and 105 deletions.
65 changes: 62 additions & 3 deletions beams/bin/gen_test_ioc_main.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,86 @@
from __future__ import annotations

import json
import time
from dataclasses import dataclass
from functools import partial
from pathlib import Path
from typing import Iterator
from typing import Any, Iterable, Iterator

import jinja2
from caproto import ReadNotifyResponse
from caproto.threading.client import Batch, Context


@dataclass(frozen=True, eq=True, order=True)
class PVInfoForJ2:
python_name: str
pvname: str
value: Any
dtype: str
enum_strings: list[str]

@classmethod
def from_result(cls: type[PVInfoForJ2], pvname: str, response: ReadNotifyResponse):
try:
enum_strings = response.metadata.enum_strings
except AttributeError:
enum_strings = []
return cls(
python_name=pvname.lower().replace(":", "_").replace(".", "_"),
pvname=pvname,
value=response.data if response.data_count > 1 else response.data[0],
dtype=response.data_type.name.removeprefix("CTRL_"),
enum_strings=[bt.decode("utf8") for bt in enum_strings],
)


def collect_pv_info(pvnames: Iterable[str]) -> list[PVInfoForJ2]:
ctx = Context()
pvs = ctx.get_pvs(*pvnames)
results = []

for pv in pvs:
pv.wait_for_connection()

def stash_results(pvname: str, response: ReadNotifyResponse):
try:
results.append(PVInfoForJ2.from_result(pvname=pvname, response=response))
except Exception as exc:
print(exc)

with Batch() as batch:
for pv in pvs:
batch.read(pv, callback=partial(stash_results, pv.name), data_type="control")

start_time = time.monotonic()
while time.monotonic() - start_time < 3:
time.sleep(0.01)
if len(results) >= len(pvs):
break

return results


def main(
filepath: str,
):
with open(filepath, "r") as fd:
deser = json.load(fd)
all_pvnames = sorted(list(set(pv for pv in walk_dict_pvs(deser))))
all_pvnames = set(pv for pv in walk_dict_pvs(deser))

if not all_pvnames:
raise RuntimeError(f"Found zero PVs in {filepath}")

all_pvinfo = sorted(collect_pv_info(all_pvnames))

jinja_env = jinja2.Environment(
loader=jinja2.FileSystemLoader(Path(__file__).parent),
trim_blocks=True,
lstrip_blocks=True,
)
template = jinja_env.get_template("test_ioc.py.j2")
rendered = template.render(all_pvnames=all_pvnames)
rendered = template.render(all_pvinfo=all_pvinfo)
print(rendered)


Expand Down
16 changes: 11 additions & 5 deletions beams/bin/test_ioc.py.j2
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
import os
from textwrap import dedent

from caproto import ChannelType
from caproto.server import PVGroup, ioc_arg_parser, pvproperty, run


class BTSimIOC(PVGroup):
"""
An IOC to replicate the PVs used by your behavior tree.
"""
{% for pvname in all_pvnames %}
{{ pvname.lower().replace(":","_").replace(".","_") }} = pvproperty(
value=0,
name="{{ pvname }}",
doc="Fake {{ pvname }}",
{% for pv_info in all_pvinfo %}
{{ pv_info.python_name }} = pvproperty(
name="{{ pv_info.pvname }}",
doc="Fake {{ pv_info.pvname }}",
value={{ pv_info.value }},
dtype=ChannelType.{{ pv_info.dtype }},
{% if pv_info.enum_strings %}
enum_strings={{ pv_info.enum_strings }},
{% endif %}
)
{% endfor %}

Expand Down
Loading

0 comments on commit 9c190f1

Please sign in to comment.