Skip to content

Commit

Permalink
Change stored processes type
Browse files Browse the repository at this point in the history
  • Loading branch information
shilangyu committed Nov 12, 2023
1 parent 42b6673 commit f251e25
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 27 deletions.
22 changes: 14 additions & 8 deletions src/include/best_effort_broadcast.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,16 @@
/// 3. No creation - no message is delivered unless it was broadcast
class BestEffortBroadcast {
public:
BestEffortBroadcast(
const PerfectLink::ProcessIdType id,
const std::vector<std::tuple<in_addr_t, in_port_t>> processes);
struct ProcessAddress {
in_addr_t host;
in_port_t port;
};

using AvailableProcesses =
std::unordered_map<PerfectLink::ProcessIdType, ProcessAddress>;

BestEffortBroadcast(const PerfectLink::ProcessIdType id,
const AvailableProcesses processes);

/// @brief Binds this broadcast link to a host and port. Once done cannot be
/// done again.
Expand All @@ -38,17 +45,16 @@ class BestEffortBroadcast {
auto broadcast(Data... datas) -> void;

/// @brief A list of processes this broadcast link knowns.
auto processes() const
-> const std::vector<std::tuple<in_addr_t, in_port_t>>&;
auto processes() const -> const AvailableProcesses&;

private:
PerfectLink _link;
const std::vector<std::tuple<in_addr_t, in_port_t>> _processes;
const AvailableProcesses _processes;
};

template <typename... Data, class, class>
auto BestEffortBroadcast::broadcast(Data... datas) -> void {
for (auto& [host, port] : _processes) {
_link.send(host, port, datas...);
for (const auto& [_, address] : _processes) {
_link.send(address.host, address.port, datas...);
}
}
10 changes: 5 additions & 5 deletions src/include/parser.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class Parser {

public:
struct Host {
Host(size_t id, std::string& ip_or_hostname, unsigned short port)
Host(uint8_t id, std::string& ip_or_hostname, unsigned short port)
: id{id}, port{htons(port)} {
if (isValidIpAddress(ip_or_hostname.c_str())) {
ip = inet_addr(ip_or_hostname.c_str());
Expand All @@ -55,7 +55,7 @@ class Parser {

unsigned short portReadable() const { return ntohs(port); }

unsigned long id;
uint8_t id;
in_addr_t ip;
in_port_t port;

Expand Down Expand Up @@ -122,7 +122,7 @@ class Parser {
std::cout << "List of resolved hosts is:\n";
std::cout << "==========================\n";
for (auto& host : hosts()) {
std::cout << host.id << "\n";
std::cout << +host.id << "\n";
std::cout << "Human-readable IP: " << host.ipReadable() << "\n";
std::cout << "Machine-readable IP: " << host.ip << "\n";
std::cout << "Human-readable Port: " << host.portReadable() << "\n";
Expand Down Expand Up @@ -228,7 +228,7 @@ class Parser {
continue;
}

unsigned long id;
int id;
std::string ip;
unsigned short port;

Expand All @@ -238,7 +238,7 @@ class Parser {
throw std::invalid_argument(os.str());
}

hosts.push_back(Host(id, ip, port));
hosts.push_back(Host(static_cast<uint8_t>(id), ip, port));
}

if (hosts.size() < 2UL) {
Expand Down
15 changes: 9 additions & 6 deletions src/include/perfect_link.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ class PerfectLink {
/// @brief The type used to store ID of a process.
using ProcessIdType = std::uint8_t;

/// @brief The type used to store ID of a message.
using MessageIdType = std::uint32_t;

static constexpr std::uint8_t MAX_MESSAGE_COUNT_IN_PACKET = 8;
static constexpr ProcessIdType MAX_PROCESSES = 128;

PerfectLink(const ProcessIdType id);

Expand Down Expand Up @@ -59,9 +63,6 @@ class PerfectLink {
auto send(const in_addr_t host, const in_port_t port, Data... datas) -> void;

private:
/// @brief The type used to store ID of a message.
using MessageIdType = std::uint32_t;

/// @brief The type used to store the size of data.
using MessageSizeType = std::uint16_t;

Expand All @@ -85,20 +86,22 @@ class PerfectLink {
/// @brief Id of this process.
const ProcessIdType _id;

// TODO: std::tuple<ProcessIdType, MessageIdType> fits in a single uint64.
// Could be compressed to avoid hashing.
/// @brief Hash function for `_delivered`.
struct hash_delivered {
size_t operator()(
inline size_t operator()(
const std::tuple<ProcessIdType, MessageIdType>& arg) const noexcept {
return std::get<0>(arg) ^ std::get<1>(arg);
}
};

/// @brief Bound socket file descriptor. None if no bind was performed.
std::optional<int> _sock_fd = std::nullopt;
std::optional<int> _sock_fd;
/// @brief Current sequence number of messages.
MessageIdType _seq_nr = 1;
/// @brief Map of sent messages that have not yet sent back an ACK.
std::unordered_map<MessageIdType, PendingMessage> _pending_for_ack = {};
std::unordered_map<MessageIdType, PendingMessage> _pending_for_ack;
std::mutex _pending_for_ack_mutex;
std::condition_variable _pending_for_ack_cv;
/// @brief A map of messages that have been delivered.
Expand Down
2 changes: 1 addition & 1 deletion src/include/uniform_reliable_broadcast.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class UniformReliableBroadcast {
public:
UniformReliableBroadcast(
const PerfectLink::ProcessIdType id,
const std::vector<std::tuple<in_addr_t, in_port_t>> processes);
const BestEffortBroadcast::AvailableProcesses processes);

/// @brief Binds this broadcast link to a host and port. Once done cannot be
/// done again.
Expand Down
4 changes: 2 additions & 2 deletions src/src/best_effort_broadcast.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

BestEffortBroadcast::BestEffortBroadcast(
const PerfectLink::ProcessIdType id,
const std::vector<std::tuple<in_addr_t, in_port_t>> processes)
const BestEffortBroadcast::AvailableProcesses processes)
: _link(id), _processes(processes) {}

auto BestEffortBroadcast::bind(const in_addr_t host, const in_port_t port)
Expand All @@ -16,6 +16,6 @@ auto BestEffortBroadcast::listen(PerfectLink::ListenCallback callback) -> void {
}

auto BestEffortBroadcast::processes() const
-> const std::vector<std::tuple<in_addr_t, in_port_t>>& {
-> const BestEffortBroadcast::AvailableProcesses& {
return _processes;
}
8 changes: 4 additions & 4 deletions src/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
#include <mutex>
#include <thread>
#include <vector>
#include "best_effort_broadcast.hpp"
#include "common.hpp"
#include "parser.hpp"
#include "perfect_link.hpp"
#include "uniform_reliable_broadcast.hpp"

using SendType = std::uint32_t;
using Delivered = std::tuple<PerfectLink::ProcessIdType, SendType>;
Expand Down Expand Up @@ -102,12 +102,12 @@ static void stop(int) {
}

auto map_hosts(std::vector<Parser::Host> hosts)
-> std::vector<std::tuple<in_addr_t, in_port_t>> {
std::vector<std::tuple<in_addr_t, in_port_t>> result;
-> BestEffortBroadcast::AvailableProcesses {
BestEffortBroadcast::AvailableProcesses result;
result.reserve(hosts.size());

for (const auto& host : hosts) {
result.emplace_back(host.ip, host.port);
result[host.id] = {host.ip, host.port};
}

return result;
Expand Down
1 change: 1 addition & 0 deletions src/src/perfect_link.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ auto PerfectLink::listen(ListenCallback callback) -> void {
}

if (message_size < 0 && errno == EAGAIN) {
// TODO: consider scoping resends to a single process
// timed out, resend messages without ACKs
std::lock_guard<std::mutex> guard(_pending_for_ack_mutex);
for (auto& [seq_nr, pending] : _pending_for_ack) {
Expand Down
2 changes: 1 addition & 1 deletion src/src/uniform_reliable_broadcast.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

UniformReliableBroadcast::UniformReliableBroadcast(
const PerfectLink::ProcessIdType id,
const std::vector<std::tuple<in_addr_t, in_port_t>> processes)
const BestEffortBroadcast::AvailableProcesses processes)
: _link(id, processes) {}

auto UniformReliableBroadcast::bind(const in_addr_t host, const in_port_t port)
Expand Down

0 comments on commit f251e25

Please sign in to comment.