Skip to content

Commit

Permalink
pkgs/psu-simulator: init
Browse files Browse the repository at this point in the history
  • Loading branch information
minijackson committed Jan 9, 2024
1 parent 96269b2 commit 67251ac
Show file tree
Hide file tree
Showing 5 changed files with 350 additions and 0 deletions.
3 changes: 3 additions & 0 deletions pkgs/default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,8 @@ with prev;
# EPNix specific packages
book = callPackage ./book {};
manpages = callPackage ./manpages {};

# Documentation support packages
psu-simulator = callPackage ./doc-support/psu-simulator {};
};
}
14 changes: 14 additions & 0 deletions pkgs/doc-support/psu-simulator/default.nix
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
poetry2nix,
lib,
epnixLib,
}:
poetry2nix.mkPoetryApplication {
projectDir = ./.;

meta = {
homepage = "https://epics-extensions.github.io/EPNix/";
license = lib.licenses.asl20;
maintainers = with epnixLib.maintainers; [minijackson];
};
}
83 changes: 83 additions & 0 deletions pkgs/doc-support/psu-simulator/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

231 changes: 231 additions & 0 deletions pkgs/doc-support/psu-simulator/psu_simulator/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
"""A simple power supply simulator."""

import random
import socketserver

import click

__version__ = "0.1.0"

_current = 0
_voltage = 0
_resistance = 0


def get_current() -> float:
"""Get the current current."""
return _current


def set_current(val: float) -> None:
"""Set the current."""
global _current
global _voltage
_current = val
_voltage = _current * get_resistance()


def get_voltage() -> float:
"""Get the current voltage."""
return _voltage


def set_voltage(val: float) -> None:
"""Set the voltage."""
global _voltage
global _current
_voltage = val
_current = _voltage / get_resistance()


def get_resistance() -> float:
"""Get the resistance."""
return _resistance


def set_resistance(val: float) -> None:
"""Set the resistance."""
global _resistance
_resistance = val


# TODO: add limits


class Server(socketserver.ThreadingMixIn, socketserver.TCPServer):
"""TCP server."""

allow_reuse_address = True


class PowerSupply(socketserver.StreamRequestHandler):
"""The power supply protocol handler."""

def handle(self: "PowerSupply") -> None:
"""Handle incoming connections."""
print("received connection")

self._dispatch = {
b"help": self.cmd_help,
b":idn?": self.cmd_get_identification,
b"meas:curr?": self.cmd_get_measured_current,
b":curr?": self.cmd_get_current,
b":curr": self.cmd_set_current,
b"meas:volt?": self.cmd_get_measured_voltage,
b":volt?": self.cmd_get_voltage,
b":volt": self.cmd_set_voltage,
}

while True:
try:
args = self.rfile.readline().strip().split()
except BrokenPipeError:
return

if args == []:
try:
self.wfile.write(b".\n")
except BrokenPipeError:
return
continue

command = args[0].lower()
params = args[1:]

decoded_params = [param.decode() for param in params]
print(f"received command: {command.decode()}{decoded_params}")

if command in self._dispatch:
result = self._dispatch[command](*params)
self.wfile.write(str(result).encode())
self.wfile.write(b"\n")
else:
self.wfile.write(f"command not found: {command.decode()}\n".encode())

def finish(self: "PowerSupply") -> None:
"""Clean up connections."""
print("closed connection")

def cmd_help(self: "PowerSupply", *args: str) -> str:
"""Get help about various commands.
Usage: help <command>.
"""
if len(args) >= 1:
command = args[0]
if command in self._dispatch:
doc = self._dispatch[command].__doc__
self.wfile.write(doc.encode())
else:
self.wfile.write(f"command not found: {command!s}".encode())
return ""

self.wfile.write(b"Available commands:\n")
for command, func in self._dispatch.items():
doc = func.__doc__.splitlines()[0].encode()
self.wfile.write(b" - '" + command + b"': " + doc + b"\n")

return ""

def cmd_get_identification(self: "PowerSupply", *_args: str) -> int:
"""Return the identification of the power supply.
Usage: :idn?
Returns: string
"""
return f"psu-simulator {__version__}"

def cmd_get_measured_current(self: "PowerSupply", *_args: str) -> int:
"""Return the measured current, in Amps.
Usage: meas:curr?
Returns: float
"""
return get_current() + random.uniform(-1.5, 1.5)

def cmd_get_current(self: "PowerSupply", *_args: str) -> int:
"""Return the current current command, in Amps.
Usage: :curr?
Returns: float
"""
return get_current()

def cmd_set_current(self: "PowerSupply", *args: str) -> str:
"""Set the current, in Amps.
Usage: :curr <current(float)>
Returns: 'OK' | 'ERR'
"""
try:
val = float(args[0])
except ValueError:
return "ERR"
else:
set_current(val)
return "OK"

def cmd_get_measured_voltage(self: "PowerSupply", *_args: str) -> int:
"""Return the measured voltage, in Volts.
Usage: meas:volt?
Returns: float
"""
return get_voltage() + random.uniform(-1.5, 1.5)

def cmd_get_voltage(self: "PowerSupply", *_args: str) -> int:
"""Return the voltage voltage command, in Volts.
Usage: :volt?
Returns: float
"""
return get_voltage()

def cmd_set_voltage(self: "PowerSupply", *args: str) -> str:
"""Set the voltage, in Volts.
Usage: :volt <voltage(float)>
Returns: 'OK' | 'ERR'
"""
try:
val = float(args[0])
except ValueError:
return "ERR"
else:
set_voltage(val)
return "OK"


@click.command()
@click.option(
"-l",
"--listen-address",
default="localhost",
show_default=True,
help="Listening address",
)
@click.option(
"-p",
"--port",
default=8727,
show_default=True,
help="Listening TCP port",
)
@click.option(
"--resistance",
default=20,
show_default=True,
help="Resistance of the circuit connected to the power supply, in Ohms.",
)
def main(listen_address: str, port: int, resistance: int) -> None:
"""Start a power supply simulator server."""
set_resistance(resistance)

with Server((listen_address, port), PowerSupply) as server:
print(f"Listening on {listen_address}:{port}")
print(f"Resistance is {resistance} Ohms")

try:
server.serve_forever()
except KeyboardInterrupt:
return
19 changes: 19 additions & 0 deletions pkgs/doc-support/psu-simulator/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[tool.poetry]
name = "psu-simulator"
version = "0.1.0"
description = "A power supply simulator for the StreamDevice tutorial"
authors = ["Rémi NICOLE <[email protected]>"]

[tool.poetry.scripts]
psu-simulator = "psu_simulator:main"

[tool.poetry.dependencies]
python = ">=3.7.0"
click = "^8.1.7"

[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"

[tool.ruff]
select = ["ALL"]

0 comments on commit 67251ac

Please sign in to comment.