Skip to content

Commit

Permalink
nixos/tests/phoebus/alarm: use Kraft-mode Kafka, improve reliability
Browse files Browse the repository at this point in the history
  • Loading branch information
minijackson committed Mar 8, 2024
1 parent dcab862 commit f8f085b
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 43 deletions.
95 changes: 67 additions & 28 deletions nixos/tests/phoebus/alarm.nix
Original file line number Diff line number Diff line change
@@ -1,24 +1,84 @@
# This tests both the phoebus-alarm-server, and phoebus-alarm-logger services
{
epnixLib,
lib,
pkgs,
...
}: {
{epnixLib, ...}: {
name = "phoebus-alarm-server-simple-check";
meta.maintainers = with epnixLib.maintainers; [minijackson];

nodes = {
server = {config, ...}: {
client = {pkgs, ...}: {
environment = {
sessionVariables.EPICS_CA_ADDR_LIST = ["ioc"];
systemPackages = [pkgs.kcat pkgs.epnix.epics-base];
};
};

ioc = {pkgs, ...}: {
systemd.services.ioc = {
description = "Test IOC to be monitored with the Phoebus Alarm server";
serviceConfig.ExecStart = "${pkgs.epnix.epics-base}/bin/softIoc -S -d ${./ioc.db}";
wantedBy = ["multi-user.target"];
after = ["network.target"];
};

networking.firewall = {
allowedTCPPorts = [5064];
allowedUDPPorts = [5064];
};
};

server = {config, lib, pkgs, ...}: let
serverAddr = "192.168.1.3";
kafkaListenSockAddr = "${serverAddr}:9092";
kafkaControllerListenSockAddr = "${serverAddr}:9093";
in {
services.phoebus-alarm-server = {
enable = true;
openFirewall = true;
settings = {
"org.phoebus.pv.ca/addr_list" = ["ioc"];
"org.phoebus.pv.ca/auto_addr_list" = false;
"org.phoebus.applications.alarm/server" = kafkaListenSockAddr;
};
};

services.phoebus-alarm-logger.settings."bootstrap.servers" = kafkaListenSockAddr;

services.apache-kafka = {
enable = true;
clusterId = "Wwbk0wwKTueL2hJD0IGGdQ";
formatLogDirs = true;
settings = {
listeners = [
"PLAINTEXT://${kafkaListenSockAddr}"
"CONTROLLER://${kafkaControllerListenSockAddr}"
];
"listener.security.protocol.map" = [
"PLAINTEXT:PLAINTEXT"
"CONTROLLER:PLAINTEXT"
];
"controller.quorum.voters" = [
"1@${kafkaControllerListenSockAddr}"
];
"controller.listener.names" = ["CONTROLLER"];

"node.id" = 1;
"process.roles" = ["broker" "controller"];

"log.dirs" = ["/var/lib/apache-kafka"];
"offsets.topic.replication.factor" = 1;
"transaction.state.log.replication.factor" = 1;
"transaction.state.log.min.isr" = 1;
};
};

systemd.services.apache-kafka.unitConfig.StateDirectory = ["apache-kafka"];

networking.firewall.allowedTCPPorts = [9092];

services.elasticsearch = {
enable = true;
package = pkgs.elasticsearch7;
};

nixpkgs.config.allowUnfreePredicate = pkg:
builtins.elem (lib.getName pkg) [
# Elasticsearch can be used as an SSPL-licensed software, which is
Expand All @@ -29,27 +89,6 @@

virtualisation.memorySize = 3072;
};

ioc = {
systemd.services.ioc = {
description = "Test IOC to be monitored with the Phoebus Alarm server";
serviceConfig.ExecStart = "${pkgs.epnix.epics-base}/bin/softIoc -S -d ${./ioc.db}";
wantedBy = ["multi-user.target"];
after = ["network.target"];
};

networking.firewall = {
allowedTCPPorts = [5064];
allowedUDPPorts = [5064];
};
};

client = {
environment = {
sessionVariables.EPICS_CA_ADDR_LIST = ["ioc"];
systemPackages = [pkgs.kcat pkgs.epnix.epics-base];
};
};
};

testScript = builtins.readFile ./alarm.py;
Expand Down
38 changes: 23 additions & 15 deletions nixos/tests/phoebus/alarm.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,25 @@

start_all()

server.wait_for_unit("apache-kafka.service")
server.wait_for_unit("zookeeper.service")
server.wait_for_unit("phoebus-alarm-server.service")
server.wait_for_unit("phoebus-alarm-logger.service")
server.wait_for_open_port(8080)

ioc.wait_for_unit("ioc.service")
def wait_for_boot():
with subtest("Machines boot correctly"):
server.wait_for_unit("apache-kafka.service")
server.wait_for_unit("elasticsearch.service")
server.wait_for_unit("phoebus-alarm-server.service")
server.wait_for_unit("phoebus-alarm-logger.service")
server.wait_for_open_port(9092, "192.168.1.3")
server.wait_for_open_port(9200)
server.wait_for_open_port(8080)

ioc.wait_for_unit("ioc.service")

client.wait_for_unit("multi-user.target")

with subtest("Alarm logger is connected to Elasticsearch"):
status = get_logger("/")
assert status["elastic"]["status"] == "Connected"

client.wait_for_unit("multi-user.target")

alarm_path = "/Accelerator/ALARM_TEST"
alarm_config = f"config:{alarm_path}"
Expand All @@ -22,7 +32,7 @@
def send_kafka(key: str, value: dict[str, Any]):
value_s = json.dumps(value)
client.succeed(
f"echo '{key}={value_s}' | kcat -P -b server:9092 -t Accelerator -K="
f"echo '{key}={value_s}' | kcat -P -b server:9092 -t Accelerator -T -K="
)


Expand All @@ -43,9 +53,7 @@ def get_logger(uri: str):

# -----

with subtest("Alarm logger is connected to Elasticsearch"):
status = get_logger("/")
assert status["elastic"]["status"] == "Connected"
wait_for_boot()

with subtest("We initialize the PV"):
# This is done so that the PV is processed at least once, else the
Expand Down Expand Up @@ -135,6 +143,7 @@ def logger_has_config(_):
def logger_has_latest_state(_):
global logger_alarms
logger_alarms = get_logger("/search/alarm/pv/ALARM_TEST")
logger_alarms.sort(key=lambda event: event.get("time", ""), reverse=True)
return (
logger_alarms[0]["current_severity"] == "OK"
and logger_alarms[0]["severity"] == "OK"
Expand Down Expand Up @@ -165,16 +174,15 @@ def logger_has_latest_state(_):
with subtest("The data is still here after a server reboot"):
server.shutdown()
server.start()
server.wait_for_unit("apache-kafka.service")

wait_for_boot()

alarm = get_alarm()
assert alarm["current_severity"] == "OK"
assert alarm["severity"] == "OK"

server.wait_for_unit("phoebus-alarm-logger.service")
server.wait_for_open_port(8080)

logger_alarms = get_logger("/search/alarm/pv/ALARM_TEST")
logger_alarms.sort(key=lambda event: event.get("time", ""), reverse=True)
alarm_states = [
alarm for alarm in logger_alarms if alarm["config"].startswith("state:")
]
Expand Down

0 comments on commit f8f085b

Please sign in to comment.