From 42b6673a424f20208c0e3463396d5b479e70eea9 Mon Sep 17 00:00:00 2001 From: shilangyu Date: Sun, 12 Nov 2023 18:14:43 +0100 Subject: [PATCH] Start impl of URB --- .gitignore | 1 + src/CMakeLists.txt | 2 +- src/include/perfect_link.hpp | 2 + src/include/uniform_reliable_broadcast.hpp | 45 ++++++++++++++++++++++ src/src/uniform_reliable_broadcast.cpp | 11 ++++++ 5 files changed, 60 insertions(+), 1 deletion(-) create mode 100644 src/include/uniform_reliable_broadcast.hpp create mode 100644 src/src/uniform_reliable_broadcast.cpp diff --git a/.gitignore b/.gitignore index f9b618e..54b5f58 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ __pycache__/ bin/da_proc target/ +.vscode ### C ### # Prerequisites diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index d67279c..6c223b6 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -2,7 +2,7 @@ # You can, however, change the list of files that comprise this variable. include_directories(include) -set(SOURCES src/main.cpp src/perfect_link.cpp src/best_effort_broadcast.cpp) +set(SOURCES src/main.cpp src/perfect_link.cpp src/best_effort_broadcast.cpp src/uniform_reliable_broadcast.cpp) # DO NOT EDIT THE FOLLOWING LINES find_package(Threads) diff --git a/src/include/perfect_link.hpp b/src/include/perfect_link.hpp index 8447962..771986c 100644 --- a/src/include/perfect_link.hpp +++ b/src/include/perfect_link.hpp @@ -181,6 +181,8 @@ auto PerfectLink::send(const in_addr_t host, addr.sin_port = port; { + // TODO: if all messages inflight were sent to a process that crashed, this + // will lock forever std::unique_lock lock(_pending_for_ack_mutex); _pending_for_ack_cv.wait( lock, [this] { return _pending_for_ack.size() < MAX_IN_FLIGHT; }); diff --git a/src/include/uniform_reliable_broadcast.hpp b/src/include/uniform_reliable_broadcast.hpp new file mode 100644 index 0000000..623811b --- /dev/null +++ b/src/include/uniform_reliable_broadcast.hpp @@ -0,0 +1,45 @@ +#pragma once + +#include +#include +#include +#include "best_effort_broadcast.hpp" +#include "perfect_link.hpp" + +/// Enforces 3 properties for broadcast communication: +/// 1. Validity - if pi and pj are correct, then every message broadcast by pi +/// is eventually delivered to pj +/// 2. No duplication - no message is delivered more than once +/// 3. No creation - no message is delivered unless it was broadcast +/// 4. Uniform agreement - if any process delivers m, then all correct processes +/// eventually deliver m +class UniformReliableBroadcast { + public: + UniformReliableBroadcast( + const PerfectLink::ProcessIdType id, + const std::vector> processes); + + /// @brief Binds this broadcast link to a host and port. Once done cannot be + /// done again. + auto bind(const in_addr_t host, const in_port_t port) -> void; + + /// @brief Starts listening to incoming broadcast messages. Sends ACKs for new + /// messages. Receives ACKs and resends messages with missing ACKs. Thread + /// safe. + /// @param callback Function that will be called when a message is delivered. + auto listen(PerfectLink::ListenCallback callback) -> void; + + /// @brief Broadcasts a message to all processes. The data has to be smaller + /// than about 64KiB. Sending is possible only after performing a bind. At + /// most 8 messages can be packed in a single packet. + template < + typename... Data, + class = std::enable_if_t< + are_equal, Data...>::value>, + class = std::enable_if_t<(sizeof...(Data) <= + PerfectLink::MAX_MESSAGE_COUNT_IN_PACKET)>> + auto broadcast(Data... datas) -> void; + + private: + BestEffortBroadcast _link; +}; diff --git a/src/src/uniform_reliable_broadcast.cpp b/src/src/uniform_reliable_broadcast.cpp new file mode 100644 index 0000000..e9c04ef --- /dev/null +++ b/src/src/uniform_reliable_broadcast.cpp @@ -0,0 +1,11 @@ +#include "uniform_reliable_broadcast.hpp" + +UniformReliableBroadcast::UniformReliableBroadcast( + const PerfectLink::ProcessIdType id, + const std::vector> processes) + : _link(id, processes) {} + +auto UniformReliableBroadcast::bind(const in_addr_t host, const in_port_t port) + -> void { + _link.bind(host, port); +}