diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 5f216f1..d67279c 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -2,7 +2,7 @@ # You can, however, change the list of files that comprise this variable. include_directories(include) -set(SOURCES src/main.cpp src/perfect_link.cpp) +set(SOURCES src/main.cpp src/perfect_link.cpp src/best_effort_broadcast.cpp) # DO NOT EDIT THE FOLLOWING LINES find_package(Threads) diff --git a/src/include/best_effort_broadcast.hpp b/src/include/best_effort_broadcast.hpp new file mode 100644 index 0000000..dd84141 --- /dev/null +++ b/src/include/best_effort_broadcast.hpp @@ -0,0 +1,50 @@ +#pragma once + +#include +#include +#include +#include "perfect_link.hpp" + +/// Enforces 3 properties for broadcast communication: +/// 1. Validity - if pi and pj are correct, then every message broadcast by pi +/// is eventually delivered to pj +/// 2. No duplication - no message is delivered more than once +/// 3. No creation - no message is delivered unless it was broadcast +class BestEffortBroadcast { + public: + BestEffortBroadcast( + const PerfectLink::ProcessIdType id, + const std::vector> processes); + + /// @brief Binds this broadcast link to a host and port. Once done cannot be + /// done again. + auto bind(const in_addr_t host, const in_port_t port) -> void; + + /// @brief Starts listening to incoming broadcast messages. Sends ACKs for new + /// messages. Receives ACKs and resends messages with missing ACKs. Thread + /// safe. + /// @param callback Function that will be called when a message is delivered. + auto listen(PerfectLink::ListenCallback callback) -> void; + + /// @brief Broadcasts a message to all processes. The data has to be smaller + /// than about 64KiB. Sending is possible only after performing a bind. At + /// most 8 messages can be packed in a single packet. + template < + typename... Data, + class = std::enable_if_t< + are_equal, Data...>::value>, + class = std::enable_if_t<(sizeof...(Data) <= + PerfectLink::MAX_MESSAGE_COUNT_IN_PACKET)>> + auto send(Data... datas) -> void; + + private: + PerfectLink _link; + const std::vector> _processes; +}; + +template +auto BestEffortBroadcast::send(Data... datas) -> void { + for (auto& [host, port] : _processes) { + _link.send(host, port, datas...); + } +} diff --git a/src/src/best_effort_broadcast.cpp b/src/src/best_effort_broadcast.cpp new file mode 100644 index 0000000..2b296c4 --- /dev/null +++ b/src/src/best_effort_broadcast.cpp @@ -0,0 +1,16 @@ +#include "best_effort_broadcast.hpp" +#include "perfect_link.hpp" + +BestEffortBroadcast::BestEffortBroadcast( + const PerfectLink::ProcessIdType id, + const std::vector> processes) + : _link(id), _processes(processes) {} + +auto BestEffortBroadcast::bind(const in_addr_t host, const in_port_t port) + -> void { + _link.bind(host, port); +} + +auto BestEffortBroadcast::listen(PerfectLink::ListenCallback callback) -> void { + _link.listen(callback); +} diff --git a/src/src/main.cpp b/src/src/main.cpp index 1c33a48..bb66b26 100644 --- a/src/src/main.cpp +++ b/src/src/main.cpp @@ -5,6 +5,7 @@ #include #include #include +#include "best_effort_broadcast.hpp" #include "common.hpp" #include "parser.hpp" #include "perfect_link.hpp"