Skip to content

Commit

Permalink
Merge pull request #60 from epics-extensions/no-kafka-elasticsearch-m…
Browse files Browse the repository at this point in the history
…agic

No Kafka ElasticSearch enable magic
  • Loading branch information
minijackson authored Mar 8, 2024
2 parents e040bce + 4d3c896 commit e968026
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 121 deletions.
51 changes: 42 additions & 9 deletions doc/nixos/guides/phoebus-alarm.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ The Phoebus Alarm Logging Service can also be called the Phoebus Alarm Logger.

{{< include _pre-requisites.md >}}

# Enabling the Phoebus Alarm services
# Single server Phoebus Alarm setup

To enable the Phoebus Alarm server and the Phoebus Alarm Logger,
To configure Phoebus Alarm, Phoebus Alarm Logger, Apache Kafka, and ElasticSearch on a single server,
add this to your configuration:

``` nix
Expand All @@ -37,20 +37,53 @@ add this to your configuration:
# Replace this with your machine's IP address
# or DNS domain name
ip = "192.168.1.42";
kafkaSock = "${ip}:${kafkaPort}";
kafkaListenSockAddr = "${ip}:${kafkaPort}";
in {
# The Phoebus Alarm server also automatically enables the Phoebus Alarm Logger
services.phoebus-alarm-server = {
enable = true;
openFirewall = true;
settings."org.phoebus.applications.alarm/server" = "${kafkaSock}";
settings."org.phoebus.applications.alarm/server" = kafkaListenSockAddr;
};
# Tell Apache Kafka to listen on this IP address
# If you don't have a DNS domain name, it's best to set a specific, non-local IP address.
services.apache-kafka.extraProperties = ''
listeners=PLAINTEXT://${kafkaSock}
'';
services.phoebus-alarm-logger.settings."bootstrap.servers" = kafkaListenSockAddr;
services.elasticsearch = {
enable = true;
package = pkgs.elasticsearch7;
};
# Single-server Kafka setup
services.apache-kafka = {
enable = true;
logDirs = ["/var/lib/apache-kafka"];
# Tell Apache Kafka to listen on this IP address
# If you don't have a DNS domain name, it's best to set a specific, non-local IP address.
extraProperties = ''
listeners=PLAINTEXT://${kafkaListenSockAddr}
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
'';
};
systemd.services.apache-kafka = {
after = ["zookeeper.service"];
unitConfig.StateDirectory = "apache-kafka";
};
services.zookeeper = {
enable = true;
extraConf = ''
# Port conflicts by default with phoebus-alarm-logger's port
admin.enableServer=false
'';
};
# Open kafka to the outside world
networking.firewall.allowedTCPPorts = [
config.services.apache-kafka.port
];
# Elasticsearch, needed by Phoebus Alarm Logger, is not free software (SSPL | Elastic License).
# To accept the license, add the code below:
Expand Down
1 change: 0 additions & 1 deletion nixos/module-list.nix
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
./modules/ca-gateway.nix
./modules/phoebus/alarm-logger.nix
./modules/phoebus/alarm-server.nix
./modules/phoebus/local-kafka.nix
./modules/phoebus/olog.nix
./modules/phoebus/save-and-restore.nix
]
30 changes: 2 additions & 28 deletions nixos/modules/phoebus/alarm-logger.nix
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,10 @@
lib,
pkgs,
...
} @ moduleAttrs: let
}: let
cfg = config.services.phoebus-alarm-logger;
settingsFormat = pkgs.formats.javaProperties {};
configFile = settingsFormat.generate "phoebus-alarm-logger.properties" cfg.settings;

localKafka = lib.hasPrefix "localhost:" cfg.settings."bootstrap.servers";
localElasticsearch = cfg.settings.es_host == "localhost";
in {
options.services.phoebus-alarm-logger = {
enable = lib.mkEnableOption ''
Expand Down Expand Up @@ -141,10 +138,6 @@ in {
description = "Phoebus Alarm Logger";

wantedBy = ["multi-user.target"];
after = lib.mkMerge [
(lib.mkIf localKafka ["apache-kafka.service"])
(lib.mkIf localElasticsearch ["elasticsearch.service"])
];

# Weirdly not "phoebus.user"
environment.JAVA_OPTS = "-Djava.util.prefs.userRoot=/var/lib/phoebus-alarm-logger";
Expand All @@ -162,29 +155,10 @@ in {
};
};

services.apache-kafka = lib.mkIf localKafka {
enable = true;
localSetup.enable = true;
};
# Port conflicts by default with phoebus-alarm-logger's port
services.zookeeper.extraConf = lib.mkIf localKafka ''
admin.enableServer=false
'';

services.elasticsearch = lib.mkIf localElasticsearch {
enable = true;
# Should be kept in sync with the phoebus-olog and phoebus-save-and-restore services
package = pkgs.elasticsearch7;
};

networking.firewall.allowedTCPPorts = lib.mkIf cfg.openFirewall [
(lib.toInt cfg.settings."server.port")
];
};

meta = {
maintainers = with epnixLib.maintainers; [minijackson];
# TODO:
# doc = ./alarm-logger.md;
};
meta.maintainers = with epnixLib.maintainers; [minijackson];
}
21 changes: 2 additions & 19 deletions nixos/modules/phoebus/alarm-server.nix
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@
lib,
pkgs,
...
} @ moduleAttrs: let
}: let
cfg = config.services.phoebus-alarm-server;
settingsFormat = pkgs.formats.javaProperties {};
configFile = settingsFormat.generate "phoebus-alarm-server.properties" cfg.settings;

localKafka = lib.hasPrefix "localhost:" cfg.settings."org.phoebus.applications.alarm/server";
in {
options.services.phoebus-alarm-server = {
enable = lib.mkEnableOption ''
Expand All @@ -34,7 +32,6 @@ in {
Includes services:
- Apache Kafka (if configured locally)
- Phoebus Alarm Logger (if not disabled)
Warning: this opens the firewall on all network interfaces.
Expand Down Expand Up @@ -168,7 +165,6 @@ in {
description = "Phoebus Alarm Server";

wantedBy = ["multi-user.target"];
after = lib.mkIf localKafka ["apache-kafka.service"];

environment.JAVA_OPTS = "-Dphoebus.user=/var/lib/phoebus-alarm-server";

Expand All @@ -191,20 +187,7 @@ in {
enable = lib.mkDefault true;
openFirewall = lib.mkIf cfg.openFirewall (lib.mkDefault true);
};

services.apache-kafka = lib.mkIf localKafka {
enable = true;
localSetup.enable = true;
};

networking.firewall.allowedTCPPorts = lib.mkIf cfg.openFirewall [
config.services.apache-kafka.port
];
};

meta = {
maintainers = with epnixLib.maintainers; [minijackson];
# TODO:
# doc = ./alarm-server.md;
};
meta.maintainers = with epnixLib.maintainers; [minijackson];
}
28 changes: 0 additions & 28 deletions nixos/modules/phoebus/local-kafka.nix

This file was deleted.

92 changes: 70 additions & 22 deletions nixos/tests/phoebus/alarm.nix
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,11 @@
meta.maintainers = with epnixLib.maintainers; [minijackson];

nodes = {
server = {config, ...}: {
services.phoebus-alarm-server = {
enable = true;
openFirewall = true;
settings = {
"org.phoebus.pv.ca/addr_list" = ["ioc"];
"org.phoebus.pv.ca/auto_addr_list" = false;
};
client = {
environment = {
sessionVariables.EPICS_CA_ADDR_LIST = ["ioc"];
systemPackages = [pkgs.kcat pkgs.epnix.epics-base];
};

nixpkgs.config.allowUnfreePredicate = pkg:
builtins.elem (lib.getName pkg) [
# Elasticsearch can be used as an SSPL-licensed software, which is
# not open-source. But as we're using it run tests, not exposing
# any service, this should be fine.
"elasticsearch"
];

virtualisation.memorySize = 3072;
};

ioc = {
Expand All @@ -44,11 +30,73 @@
};
};

client = {
environment = {
sessionVariables.EPICS_CA_ADDR_LIST = ["ioc"];
systemPackages = [pkgs.kcat pkgs.epnix.epics-base];
server = {
config,
pkgs,
...
}: let
kafkaPort = toString config.services.apache-kafka.port;
serverAddr = "192.168.1.3";
kafkaListenSockAddr = "${serverAddr}:${kafkaPort}";
in {
services.phoebus-alarm-server = {
enable = true;
openFirewall = true;
settings = {
"org.phoebus.pv.ca/addr_list" = ["ioc"];
"org.phoebus.pv.ca/auto_addr_list" = false;
"org.phoebus.applications.alarm/server" = kafkaListenSockAddr;
};
};

services.phoebus-alarm-logger.settings."bootstrap.servers" = kafkaListenSockAddr;

services.elasticsearch = {
enable = true;
package = pkgs.elasticsearch7;
};

# Single-server Kafka setup
services.apache-kafka = {
enable = true;
logDirs = ["/var/lib/apache-kafka"];
# Tell Apache Kafka to listen on this IP address
# If you don't have a DNS domain name, it's best to set a specific, non-local IP address.
extraProperties = ''
listeners=PLAINTEXT://${kafkaListenSockAddr}
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
'';
};

systemd.services.apache-kafka = {
after = ["zookeeper.service"];
unitConfig.StateDirectory = "apache-kafka";
};

services.zookeeper = {
enable = true;
extraConf = ''
# Port conflicts by default with phoebus-alarm-logger's port
admin.enableServer=false
'';
};

# Open kafka to the outside world
networking.firewall.allowedTCPPorts = [
config.services.apache-kafka.port
];

nixpkgs.config.allowUnfreePredicate = pkg:
builtins.elem (lib.getName pkg) [
# Elasticsearch can be used as an SSPL-licensed software, which is
# not open-source. But as we're using it run tests, not exposing
# any service, this should be fine.
"elasticsearch"
];

virtualisation.memorySize = 3072;
};
};

Expand Down
36 changes: 22 additions & 14 deletions nixos/tests/phoebus/alarm.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,25 @@

start_all()

server.wait_for_unit("apache-kafka.service")
server.wait_for_unit("zookeeper.service")
server.wait_for_unit("phoebus-alarm-server.service")
server.wait_for_unit("phoebus-alarm-logger.service")
server.wait_for_open_port(8080)

ioc.wait_for_unit("ioc.service")
def wait_for_boot():
with subtest("Machines boot correctly"):
server.wait_for_unit("apache-kafka.service")
server.wait_for_unit("elasticsearch.service")
server.wait_for_unit("phoebus-alarm-server.service")
server.wait_for_unit("phoebus-alarm-logger.service")
server.wait_for_open_port(9092, "192.168.1.3")
server.wait_for_open_port(9200)
server.wait_for_open_port(8080)

ioc.wait_for_unit("ioc.service")

client.wait_for_unit("multi-user.target")

with subtest("Alarm logger is connected to Elasticsearch"):
status = get_logger("/")
assert status["elastic"]["status"] == "Connected"

client.wait_for_unit("multi-user.target")

alarm_path = "/Accelerator/ALARM_TEST"
alarm_config = f"config:{alarm_path}"
Expand Down Expand Up @@ -43,9 +53,7 @@ def get_logger(uri: str):

# -----

with subtest("Alarm logger is connected to Elasticsearch"):
status = get_logger("/")
assert status["elastic"]["status"] == "Connected"
wait_for_boot()

with subtest("We initialize the PV"):
# This is done so that the PV is processed at least once, else the
Expand Down Expand Up @@ -135,6 +143,7 @@ def logger_has_config(_):
def logger_has_latest_state(_):
global logger_alarms
logger_alarms = get_logger("/search/alarm/pv/ALARM_TEST")
logger_alarms.sort(key=lambda event: event.get("time", ""), reverse=True)
return (
logger_alarms[0]["current_severity"] == "OK"
and logger_alarms[0]["severity"] == "OK"
Expand Down Expand Up @@ -165,16 +174,15 @@ def logger_has_latest_state(_):
with subtest("The data is still here after a server reboot"):
server.shutdown()
server.start()
server.wait_for_unit("apache-kafka.service")

wait_for_boot()

alarm = get_alarm()
assert alarm["current_severity"] == "OK"
assert alarm["severity"] == "OK"

server.wait_for_unit("phoebus-alarm-logger.service")
server.wait_for_open_port(8080)

logger_alarms = get_logger("/search/alarm/pv/ALARM_TEST")
logger_alarms.sort(key=lambda event: event.get("time", ""), reverse=True)
alarm_states = [
alarm for alarm in logger_alarms if alarm["config"].startswith("state:")
]
Expand Down

0 comments on commit e968026

Please sign in to comment.