diff --git a/flake.nix b/flake.nix index 602a3481f..ec352a60f 100644 --- a/flake.nix +++ b/flake.nix @@ -95,6 +95,7 @@ boxfort clang-tools criterion + gdb jq libffi libgit2 diff --git a/include/villas/nodes/c37_118.hpp b/include/villas/nodes/c37_118.hpp index f8fd6be16..35e1706cd 100644 --- a/include/villas/nodes/c37_118.hpp +++ b/include/villas/nodes/c37_118.hpp @@ -1,48 +1,77 @@ -/** - * @file - * @author Philipp Jungkamp - * @copyright 2014-2022, Institute for Automation of Complex Power Systems, EONERC - * @license Apache 2.0 - *********************************************************************************/ +/* Node type: C37-118. + * + * Author: Philipp Jungkamp + * SPDX-FileCopyrightText: 2014-2024 Institute for Automation of Complex Power Systems, RWTH Aachen University + * SPDX-License-Identifier: Apache-2.0 + */ #pragma once +#include #include #include -#include -#include +#include +#include #include +#include +#include +#include #include +#include +#include -namespace villas { -namespace node { -namespace c37_118 { +namespace villas::node::c37_118 { -class C37_118 : public Node { -protected: - struct Input { - std::string address; - } input; +using parser::Parser; +using types::Frame; +using types::Config; - virtual - int _read(struct Sample *smps[], unsigned cnt) override; +class C37_118 final : public Node { +private: + struct Server { + std::string addr; + uint16_t port; + int socktype; + Config config; -public: - C37_118(const std::string &name = ""); + int listener_fd; + std::vector connection_fds; + }; + + struct Client { + std::string addr; + uint16_t port; + uint16_t idcode; + int socktype; - virtual - ~C37_118() override; + std::optional config; + int connection_fd; + std::vector recv_buf = std::vector(1024); + Parser parser; - virtual - int parse(json_t *json, const uuid_t sn_uuid) override; + void send(Frame::Variant message, timespec ts = time_now()); + std::optional recv(); + }; - virtual - int start() override; + std::variant role; + + void parseServer(json_t *json); + void parseClient(json_t *json); + + void prepareServer(Server &server); + void prepareClient(Client &client); + + virtual int _read(struct Sample *smps[], unsigned cnt) override; + +public: + C37_118(const uuid_t &id = {}, const std::string &name = "") : Node{id, name} {} - virtual - int stop() override; + virtual ~C37_118() override; + virtual int parse(json_t *json) override; + virtual int prepare() override; + virtual int start() override; + virtual int stop() override; + virtual std::vector getPollFDs() override; }; -} /* namespace c37_118 */ -} /* namespace node */ -} /* namespace villas */ +} // namespace villas::node::c37_118 diff --git a/lib/nodes/c37_118.cpp b/lib/nodes/c37_118.cpp index dc61e3e9b..9098faa65 100644 --- a/lib/nodes/c37_118.cpp +++ b/lib/nodes/c37_118.cpp @@ -1,3 +1,338 @@ +/* Node type: C37-118. + * + * Author: Philipp Jungkamp + * SPDX-FileCopyrightText: 2014-2024 Institute for Automation of Complex Power Systems, RWTH Aachen University + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include +#include +#include + +#include #include +#include +#include +#include +using namespace std::literals::string_view_literals; +using namespace villas::node; using namespace villas::node::c37_118; +using namespace villas::node::c37_118::types; + +void C37_118::Client::send(Frame::Variant message, timespec ts) { + std::vector send_buf = parser.serialize({ + .idcode = idcode, + .soc = static_cast(ts.tv_sec & 0xFFFFFF), + .fracsec = static_cast(ts.tv_nsec * static_cast(config->time_base)) / 1'000'000'000, + .message = message, + }, config ? &*config : nullptr); + + size_t sum = 0; + int ret = 0; + if (socktype == SOCK_DGRAM) { + do { + ret = ::send(connection_fd, send_buf.data() + sum, send_buf.size() - sum, 0); + sum += ret; + } while (sum < send_buf.size() && ret > 0); + } else { + ret = ::write(connection_fd, send_buf.data(), send_buf.size()); + } + + if (ret == 0) + throw RuntimeError{"end of stream"}; + + if (ret < 0) + throw SystemError{"send error {}", ::strerror(errno)}; + + sum += ret; +} + +std::optional C37_118::Client::recv() { + int ret; + + if (socktype == SOCK_DGRAM) + ret = ::recv(connection_fd, recv_buf.data(), recv_buf.size(), 0); + else + ret = ::read(connection_fd, recv_buf.data(), recv_buf.size()); + + if (ret == 0) + throw RuntimeError{"end of stream"}; + + if (ret == -EWOULDBLOCK) + return std::nullopt; + + if (ret < 0) + throw SystemError{"socket error {}", ::strerror(errno)}; + + auto frame = parser.deserialize(recv_buf.data(), ret, config ? &*config : nullptr); + if (!frame.has_value()) + return std::nullopt; + + std::copy(recv_buf.begin() + frame->framesize, recv_buf.end(), recv_buf.begin()); + return *frame; +} + +int C37_118::_read(struct Sample *smps[], unsigned cnt) { + if (auto *client = std::get_if(&role)) { + auto frame = client->recv(); + if (!frame.has_value()) + return 0; + + return std::visit(villas::utils::overloaded { + [&](Config2 &config2){ + client->config = config2; + auto &sigs = in.signals; + auto &pmu = config2->pmus[0]; + + sigs->clear(); + for (auto &p : pmu.phinfo) + sigs->emplace_back(std::make_shared(p.nam, std::to_string(p.unit), SignalType::COMPLEX)); + for (auto &a : pmu.aninfo) + sigs->emplace_back(std::make_shared(a.nam, std::to_string(a.unit), SignalType::INTEGER)); + + client->send(Command{Command::DATA_START}); + return 0; + }, + + [&](Data &data){ + auto smp = smps[0]; + auto &pmu = data.pmus[0]; + + std::size_t s = 0; + for (auto p = pmu.phasor.begin(); p != pmu.phasor.end() && s < smp->capacity; ++p) + smp->data[s++].z = p->to_complex(); + for (auto a = pmu.analog.begin(); a != pmu.analog.end() && s < smp->capacity; ++a) + smp->data[s++].f = a->to_float(); + smp->length = s; + smp->ts.origin = { + .tv_sec = frame->soc, + .tv_nsec = static_cast(frame->fracsec & 0xFFFFFF) * 1'000'000'000 / client->config->time_base, + }; + smp->flags = (int) SampleFlags::HAS_DATA | (int) SampleFlags::HAS_TS_ORIGIN; + + return 1; + }, + + [&](auto &other){ + logger->warn("received unknown frame"); + return 0; + }, + }, frame->message); + } + + return 0; +} + +C37_118::~C37_118() { + +} + +void C37_118::parseServer(json_t *json) { + json_error_t err; + + char *addr = nullptr; + char *transport = nullptr; + int port = 0; + int idcode = 0; + json_t *config_json = nullptr; + if (json_unpack_ex(json, &err, 0, "{ s:s, s:s, s:i, s:o }", + "transport", &transport, "address", &addr, "port", &port, "idcode", &idcode, + "config", &config_json)) + throw ConfigError(json, err, "node-config-node-c37.118-server"); + + int socktype; + if (transport == "tcp"sv) + socktype = SOCK_STREAM; + else if (transport == "udp"sv) + socktype = SOCK_DGRAM; + else + throw RuntimeError{"invalid transport {}", transport}; + + throw RuntimeError{"unimplemented"}; + + this->role = Server { + .addr = std::string(addr), + .port = static_cast(port), + }; +} + +void C37_118::parseClient(json_t *json) { + json_error_t err; + char *addr = nullptr; + char *transport = nullptr; + int port = 0; + int idcode = 0; + if (json_unpack_ex(json, &err, 0, "{ s:s, s:s, s:i, s:i }", + "transport", &transport, "address", &addr, "port", &port, "idcode", &idcode)) + throw ConfigError(json, err, "node-config-node-c37.118-client"); + + int socktype; + if (transport == "tcp"sv) + socktype = SOCK_STREAM; + else if (transport == "udp"sv) + socktype = SOCK_DGRAM; + else + throw RuntimeError{"invalid transport {}", transport}; + + this->role = Client { + .addr = std::string(addr), + .port = static_cast(port), + .idcode = static_cast(idcode), + .socktype = socktype, + }; +} + +int C37_118::parse(json_t *json) { + int ret = Node::parse(json); + if (ret) + return ret; + + json_error_t err; + json_t *server = nullptr; + json_t *client = nullptr; + if (json_unpack_ex(json, &err, 0, "{ s?:o, s?:o }", + "server", &server, "client", &client)) + throw ConfigError(json, err, "node-config-node-c37.118"); + + if (server && client) throw RuntimeError{"c37_118: can't be both server and client"}; + else if (server) parseServer(server); + else if (client) parseClient(client); + + return 0; +} + +void C37_118::prepareServer(Server &server) { + addrinfo hints; + std::memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = server.socktype; + hints.ai_flags = AI_PASSIVE; + + addrinfo *addrs_ret; + std::string service = std::to_string(server.port); + if (auto err = ::getaddrinfo(server.addr.c_str(), service.c_str(), &hints, + &addrs_ret)) // TODO: make configurable + throw SystemError{"c37_118: getaddrinfo {}", ::gai_strerror(err)}; + + auto addrs = std::unique_ptr{ + addrs_ret, &freeaddrinfo}; + int sock; + addrinfo *addr; + for (addr = addrs.get(); addr != nullptr; addr = addr->ai_next) { + sock = ::socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); + + if (sock < 0) + continue; + + if (::bind(sock, addr->ai_addr, addr->ai_addrlen) == 0) { + server.listener_fd = sock; + break; + } + + ::close(sock); + } + + if (addr == nullptr) + throw RuntimeError{"could not bind port {} on {}", server.port, server.addr}; + + throw RuntimeError{"unimplemented"}; +} + +void C37_118::prepareClient(Client &client) { + addrinfo hints; + std::memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = client.socktype; + + addrinfo *addrs_ret; + std::string service = std::to_string(client.port); + if (auto err = ::getaddrinfo(client.addr.c_str(), service.c_str(), &hints, + &addrs_ret)) + throw SystemError{"c37_118: getaddrinfo {}", ::gai_strerror(err)}; + + auto addrs = std::unique_ptr{ + addrs_ret, &freeaddrinfo}; + addrinfo *addr; + for (addr = addrs.get(); addr != nullptr; addr = addr->ai_next) { + int sock = ::socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); + + if (sock < 0) + continue; + + if (::connect(sock, addr->ai_addr, addr->ai_addrlen) == 0) { + client.connection_fd = sock; + break; + } + + ::close(sock); + } + + if (addr == nullptr) + throw RuntimeError{"could not connect to port {} on {}", client.port, client.addr}; +} + +int C37_118::prepare() { + int ret = Node::prepare(); + if (ret) + return ret; + + std::visit(villas::utils::overloaded { + [&](Server &server){ prepareServer(server); }, + [&](Client &client){ prepareClient(client); }, + }, role); + + return 0; +} + +int C37_118::start() { + int ret = Node::start(); + if (ret) + return ret; + + std::visit(villas::utils::overloaded { + [&](Server &server){ }, + [&](Client &client){ + client.send(Command{Command::GET_CONFIG2}); + }, + }, role); + + return 0; +} + +int C37_118::stop() { + int ret = Node::stop(); + if (ret) + return ret; + + std::visit(villas::utils::overloaded { + [&](Server &server){ }, + [&](Client &client){ + client.send(Command{Command::DATA_STOP}); + }, + }, role); + + return 0; +} + +std::vector C37_118::getPollFDs() { + return std::visit(villas::utils::overloaded { + [&](Server &server){ + return std::vector { }; + }, + [&](Client &client){ + return std::vector { client.connection_fd }; + }, + }, role); +} + +// Register node +static char n[] = "c37.118"; +static char d[] = "A node for a C37.118 TCP/UDP server/client"; +static NodePlugin + p;