diff --git a/pkgs/default.nix b/pkgs/default.nix index 822f204a..861a9058 100644 --- a/pkgs/default.nix +++ b/pkgs/default.nix @@ -64,5 +64,8 @@ with prev; # EPNix specific packages book = callPackage ./book {}; manpages = callPackage ./manpages {}; + + # Documentation support packages + psu-simulator = callPackage ./doc-support/psu-simulator {}; }; } diff --git a/pkgs/doc-support/psu-simulator/default.nix b/pkgs/doc-support/psu-simulator/default.nix new file mode 100644 index 00000000..2f233396 --- /dev/null +++ b/pkgs/doc-support/psu-simulator/default.nix @@ -0,0 +1,14 @@ +{ + poetry2nix, + lib, + epnixLib, +}: +poetry2nix.mkPoetryApplication { + projectDir = ./.; + + meta = { + homepage = "https://epics-extensions.github.io/EPNix/"; + license = lib.licenses.asl20; + maintainers = with epnixLib.maintainers; [minijackson]; + }; +} diff --git a/pkgs/doc-support/psu-simulator/poetry.lock b/pkgs/doc-support/psu-simulator/poetry.lock new file mode 100644 index 00000000..fde1b804 --- /dev/null +++ b/pkgs/doc-support/psu-simulator/poetry.lock @@ -0,0 +1,83 @@ +# This file is automatically @generated by Poetry and should not be changed by hand. + +[[package]] +name = "click" +version = "8.1.7" +description = "Composable command line interface toolkit" +category = "main" +optional = false +python-versions = ">=3.7" +files = [ + {file = "click-8.1.7-py3-none-any.whl", hash = "sha256:ae74fb96c20a0277a1d615f1e4d73c8414f5a98db8b799a7931d1582f3390c28"}, + {file = "click-8.1.7.tar.gz", hash = "sha256:ca9853ad459e787e2192211578cc907e7594e294c7ccc834310722b41b9ca6de"}, +] + +[package.dependencies] +colorama = {version = "*", markers = "platform_system == \"Windows\""} +importlib-metadata = {version = "*", markers = "python_version < \"3.8\""} + +[[package]] +name = "colorama" +version = "0.4.6" +description = "Cross-platform colored terminal text." +category = "main" +optional = false +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7" +files = [ + {file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"}, + {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, +] + +[[package]] +name = "importlib-metadata" +version = "6.7.0" +description = "Read metadata from Python packages" +category = "main" +optional = false +python-versions = ">=3.7" +files = [ + {file = "importlib_metadata-6.7.0-py3-none-any.whl", hash = "sha256:cb52082e659e97afc5dac71e79de97d8681de3aa07ff18578330904a9d18e5b5"}, + {file = "importlib_metadata-6.7.0.tar.gz", hash = "sha256:1aaf550d4f73e5d6783e7acb77aec43d49da8017410afae93822cc9cca98c4d4"}, +] + +[package.dependencies] +typing-extensions = {version = ">=3.6.4", markers = "python_version < \"3.8\""} +zipp = ">=0.5" + +[package.extras] +docs = ["furo", "jaraco.packaging (>=9)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"] +perf = ["ipython"] +testing = ["flufl.flake8", "importlib-resources (>=1.3)", "packaging", "pyfakefs", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=1.3)", "pytest-mypy (>=0.9.1)", "pytest-perf (>=0.9.2)", "pytest-ruff"] + +[[package]] +name = "typing-extensions" +version = "4.7.1" +description = "Backported and Experimental Type Hints for Python 3.7+" +category = "main" +optional = false +python-versions = ">=3.7" +files = [ + {file = "typing_extensions-4.7.1-py3-none-any.whl", hash = "sha256:440d5dd3af93b060174bf433bccd69b0babc3b15b1a8dca43789fd7f61514b36"}, + {file = "typing_extensions-4.7.1.tar.gz", hash = "sha256:b75ddc264f0ba5615db7ba217daeb99701ad295353c45f9e95963337ceeeffb2"}, +] + +[[package]] +name = "zipp" +version = "3.15.0" +description = "Backport of pathlib-compatible object wrapper for zip files" +category = "main" +optional = false +python-versions = ">=3.7" +files = [ + {file = "zipp-3.15.0-py3-none-any.whl", hash = "sha256:48904fc76a60e542af151aded95726c1a5c34ed43ab4134b597665c86d7ad556"}, + {file = "zipp-3.15.0.tar.gz", hash = "sha256:112929ad649da941c23de50f356a2b5570c954b65150642bccdd66bf194d224b"}, +] + +[package.extras] +docs = ["furo", "jaraco.packaging (>=9)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"] +testing = ["big-O", "flake8 (<5)", "jaraco.functools", "jaraco.itertools", "more-itertools", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=1.3)", "pytest-flake8", "pytest-mypy (>=0.9.1)"] + +[metadata] +lock-version = "2.0" +python-versions = ">=3.7.0" +content-hash = "1e70b172657bdbef13c5d18991199035640ab2ff4e638ec047db7a4c7a18602c" diff --git a/pkgs/doc-support/psu-simulator/psu_simulator/__init__.py b/pkgs/doc-support/psu-simulator/psu_simulator/__init__.py new file mode 100644 index 00000000..65ac5ecc --- /dev/null +++ b/pkgs/doc-support/psu-simulator/psu_simulator/__init__.py @@ -0,0 +1,231 @@ +"""A simple power supply simulator.""" + +import random +import socketserver + +import click + +__version__ = "0.1.0" + +_current = 0 +_voltage = 0 +_resistance = 0 + + +def get_current() -> float: + """Get the current current.""" + return _current + + +def set_current(val: float) -> None: + """Set the current.""" + global _current + global _voltage + _current = val + _voltage = _current * get_resistance() + + +def get_voltage() -> float: + """Get the current voltage.""" + return _voltage + + +def set_voltage(val: float) -> None: + """Set the voltage.""" + global _voltage + global _current + _voltage = val + _current = _voltage / get_resistance() + + +def get_resistance() -> float: + """Get the resistance.""" + return _resistance + + +def set_resistance(val: float) -> None: + """Set the resistance.""" + global _resistance + _resistance = val + + +# TODO: add limits + + +class Server(socketserver.ThreadingMixIn, socketserver.TCPServer): + """TCP server.""" + + allow_reuse_address = True + + +class PowerSupply(socketserver.StreamRequestHandler): + """The power supply protocol handler.""" + + def handle(self: "PowerSupply") -> None: + """Handle incoming connections.""" + print("received connection") + + self._dispatch = { + b"help": self.cmd_help, + b":idn?": self.cmd_get_identification, + b"meas:curr?": self.cmd_get_measured_current, + b":curr?": self.cmd_get_current, + b":curr": self.cmd_set_current, + b"meas:volt?": self.cmd_get_measured_voltage, + b":volt?": self.cmd_get_voltage, + b":volt": self.cmd_set_voltage, + } + + while True: + try: + args = self.rfile.readline().strip().split() + except BrokenPipeError: + return + + if args == []: + try: + self.wfile.write(b".\n") + except BrokenPipeError: + return + continue + + command = args[0].lower() + params = args[1:] + + decoded_params = [param.decode() for param in params] + print(f"received command: {command.decode()}{decoded_params}") + + if command in self._dispatch: + result = self._dispatch[command](*params) + self.wfile.write(str(result).encode()) + self.wfile.write(b"\n") + else: + self.wfile.write(f"command not found: {command.decode()}\n".encode()) + + def finish(self: "PowerSupply") -> None: + """Clean up connections.""" + print("closed connection") + + def cmd_help(self: "PowerSupply", *args: str) -> str: + """Get help about various commands. + + Usage: help . + """ + if len(args) >= 1: + command = args[0] + if command in self._dispatch: + doc = self._dispatch[command].__doc__ + self.wfile.write(doc.encode()) + else: + self.wfile.write(f"command not found: {command!s}".encode()) + return "" + + self.wfile.write(b"Available commands:\n") + for command, func in self._dispatch.items(): + doc = func.__doc__.splitlines()[0].encode() + self.wfile.write(b" - '" + command + b"': " + doc + b"\n") + + return "" + + def cmd_get_identification(self: "PowerSupply", *_args: str) -> int: + """Return the identification of the power supply. + + Usage: :idn? + Returns: string + """ + return f"psu-simulator {__version__}" + + def cmd_get_measured_current(self: "PowerSupply", *_args: str) -> int: + """Return the measured current, in Amps. + + Usage: meas:curr? + Returns: float + """ + return get_current() + random.uniform(-1.5, 1.5) + + def cmd_get_current(self: "PowerSupply", *_args: str) -> int: + """Return the current current command, in Amps. + + Usage: :curr? + Returns: float + """ + return get_current() + + def cmd_set_current(self: "PowerSupply", *args: str) -> str: + """Set the current, in Amps. + + Usage: :curr + Returns: 'OK' | 'ERR' + """ + try: + val = float(args[0]) + except ValueError: + return "ERR" + else: + set_current(val) + return "OK" + + def cmd_get_measured_voltage(self: "PowerSupply", *_args: str) -> int: + """Return the measured voltage, in Volts. + + Usage: meas:volt? + Returns: float + """ + return get_voltage() + random.uniform(-1.5, 1.5) + + def cmd_get_voltage(self: "PowerSupply", *_args: str) -> int: + """Return the voltage voltage command, in Volts. + + Usage: :volt? + Returns: float + """ + return get_voltage() + + def cmd_set_voltage(self: "PowerSupply", *args: str) -> str: + """Set the voltage, in Volts. + + Usage: :volt + Returns: 'OK' | 'ERR' + """ + try: + val = float(args[0]) + except ValueError: + return "ERR" + else: + set_voltage(val) + return "OK" + + +@click.command() +@click.option( + "-l", + "--listen-address", + default="localhost", + show_default=True, + help="Listening address", +) +@click.option( + "-p", + "--port", + default=8727, + show_default=True, + help="Listening TCP port", +) +@click.option( + "--resistance", + default=20, + show_default=True, + help="Resistance of the circuit connected to the power supply, in Ohms.", +) +def main(listen_address: str, port: int, resistance: int) -> None: + """Start a power supply simulator server.""" + set_resistance(resistance) + + with Server((listen_address, port), PowerSupply) as server: + print(f"Listening on {listen_address}:{port}") + print(f"Resistance is {resistance} Ohms") + + try: + server.serve_forever() + except KeyboardInterrupt: + return diff --git a/pkgs/doc-support/psu-simulator/pyproject.toml b/pkgs/doc-support/psu-simulator/pyproject.toml new file mode 100644 index 00000000..cc599d2f --- /dev/null +++ b/pkgs/doc-support/psu-simulator/pyproject.toml @@ -0,0 +1,19 @@ +[tool.poetry] +name = "psu-simulator" +version = "0.1.0" +description = "A power supply simulator for the StreamDevice tutorial" +authors = ["RĂ©mi NICOLE "] + +[tool.poetry.scripts] +psu-simulator = "psu_simulator:main" + +[tool.poetry.dependencies] +python = ">=3.7.0" +click = "^8.1.7" + +[build-system] +requires = ["poetry-core>=1.0.0"] +build-backend = "poetry.core.masonry.api" + +[tool.ruff] +select = ["ALL"]