Skip to content

Commit

Permalink
If thats not good enough then i give up
Browse files Browse the repository at this point in the history
  • Loading branch information
shilangyu committed Nov 1, 2023
1 parent 51628e6 commit 405478d
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 84 deletions.
9 changes: 4 additions & 5 deletions src/include/perfect_link.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#include <limits>
#include <mutex>
#include <optional>
#include <thread>
#include <type_traits>
#include <unordered_map>
#include <unordered_set>
Expand Down Expand Up @@ -43,10 +42,9 @@ class PerfectLink {
auto(ProcessIdType process_id, OwnedSlice<std::uint8_t>& data)->void>;

/// @brief Starts listening to incoming messages. Sends ACKs for new messages.
/// Receives ACKs and resends messages with missing ACKs.
/// Receives ACKs and resends messages with missing ACKs. Thread safe.
/// @param callback Function that will be called when a message is delivered.
/// @return The created thread handle.
auto listen(ListenCallback callback) -> std::thread;
auto listen(ListenCallback callback) -> void;

/// @brief Sends a message from this link to a chosen host and port. The
/// data has to be smaller than about 64KiB. Sending is possible only
Expand All @@ -70,7 +68,7 @@ class PerfectLink {
static constexpr std::size_t MAX_MESSAGE_SIZE =
std::numeric_limits<MessageSizeType>::max();
static constexpr timeval RESEND_TIMEOUT = {0, 200000};
static constexpr std::uint16_t MAX_IN_FLIGHT = 512;
static constexpr std::uint16_t MAX_IN_FLIGHT = 64;

/// @brief Data structure to hold temporary data of a message that was sent
/// but where no ACK for it was yet received.
Expand Down Expand Up @@ -106,6 +104,7 @@ class PerfectLink {
/// @brief A map of messages that have been delivered.
std::unordered_set<std::tuple<ProcessIdType, MessageIdType>, hash_delivered>
_delivered = {};
std::mutex _delivered_mutex;
/// @brief Flag indicating whether this link should do no more work.
std::atomic_bool _done = false;

Expand Down
25 changes: 16 additions & 9 deletions src/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ struct Logger {
}
}

inline auto freeze() -> void { _frozen = {_sent_amount, _delivered_size}; }
inline auto freeze() -> void {
_frozen = {_sent_amount, _delivered_size};
_mutex.lock();
}

inline auto set_sent_amount(const SendType sent_amount) -> void {
_sent_amount = sent_amount;
Expand Down Expand Up @@ -126,22 +129,26 @@ int main(int argc, char** argv) {
// we are the receiver process
// preallocate about 16MiB for delivery logs
logger.reserve_delivered_memory(16 * (1 << 20));
auto listen_handle = link.listen([](auto process_id, auto& data) {
SendType msg = 0;
for (size_t i = 0; i < sizeof(SendType); i++) {
msg |= static_cast<SendType>(data[i]) << (i * 8);
}
auto listen_handle = std::thread([&] {
link.listen([](auto process_id, auto& data) {
SendType msg = 0;
for (size_t i = 0; i < sizeof(SendType); i++) {
msg |= static_cast<SendType>(data[i]) << (i * 8);
}

logger.deliver(process_id, msg);
logger.deliver(process_id, msg);
});
});
listen_handle.join();
} else {
auto receiverHost = parser.hostById(i);
if (!receiverHost.has_value()) {
throw std::runtime_error("Receiver host not defined in hosts file");
}
auto resend_handle = link.listen(
[]([[maybe_unused]] auto process_id, [[maybe_unused]] auto& data) {});
auto resend_handle = std::thread([&] {
link.listen(
[]([[maybe_unused]] auto process_id, [[maybe_unused]] auto& data) {});
});

// we are a sender process
// pack 8 datas in one message
Expand Down
143 changes: 73 additions & 70 deletions src/src/perfect_link.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,92 +73,95 @@ inline auto PerfectLink::_decode_message(
return {is_ack, seq_nr, process_id};
}

auto PerfectLink::listen(ListenCallback callback) -> std::thread {
auto PerfectLink::listen(ListenCallback callback) -> void {
if (!_sock_fd.has_value()) {
throw std::runtime_error("Cannot listen if not bound");
}
auto sock_fd = _sock_fd.value();

std::thread listener([sock_fd, callback, this]() {
std::array<uint8_t, MAX_MESSAGE_SIZE> message;
std::vector<Slice<std::uint8_t>> data_buffer;
data_buffer.reserve(MAX_MESSAGE_COUNT_IN_PACKET);
std::array<uint8_t, MAX_MESSAGE_SIZE> message;
std::vector<Slice<std::uint8_t>> data_buffer;
data_buffer.reserve(MAX_MESSAGE_COUNT_IN_PACKET);

sockaddr_in sender_addr;
std::memset(&sender_addr, 0, sizeof(sender_addr));
socklen_t sender_addr_len = sizeof(sender_addr);
sockaddr_in sender_addr;
std::memset(&sender_addr, 0, sizeof(sender_addr));
socklen_t sender_addr_len = sizeof(sender_addr);

while (true) {
// wait for a message
auto message_size =
recvfrom(sock_fd, message.data(), message.size(), MSG_WAITALL,
reinterpret_cast<sockaddr*>(&sender_addr), &sender_addr_len);
while (true) {
// wait for a message
auto message_size =
recvfrom(sock_fd, message.data(), message.size(), MSG_WAITALL,
reinterpret_cast<sockaddr*>(&sender_addr), &sender_addr_len);

if (_done) {
return;
}
if (_done) {
return;
}

if (message_size < 0 && errno == EINTR) {
// got interrupted, try again
continue;
if (message_size < 0 && errno == EINTR) {
// got interrupted, try again
continue;
}

if (message_size < 0 && errno == EAGAIN) {
// timed out, resend messages without ACKs
std::lock_guard<std::mutex> guard(_pending_for_ack_mutex);
for (auto& [seq_nr, pending] : _pending_for_ack) {
perror_check<ssize_t>(
[&, &seq_nr = seq_nr, &pending = pending] {
return sendto(sock_fd, pending.message.data(),
pending.message_size, 0,
reinterpret_cast<const sockaddr*>(&pending.addr),
sizeof(pending.addr));
},
[](auto res) { return res < 0; }, "failed to resend message");
}
continue;
}

if (message_size < 0) {
perror("failed to receive message");
continue;
}

auto [is_ack, seq_nr, process_id] = _decode_message(
message, static_cast<size_t>(message_size), data_buffer);

if (message_size < 0 && errno == EAGAIN) {
// timed out, resend messages without ACKs
if (is_ack) {
// mark a sent message as being acknowledged, we will no longer be
// sending it
{
std::lock_guard<std::mutex> guard(_pending_for_ack_mutex);
for (auto& [seq_nr, pending] : _pending_for_ack) {
perror_check<ssize_t>(
[&, &seq_nr = seq_nr, &pending = pending] {
return sendto(sock_fd, pending.message.data(),
pending.message_size, 0,
reinterpret_cast<const sockaddr*>(&pending.addr),
sizeof(pending.addr));
},
[](auto res) { return res < 0; }, "failed to resend message");
}
continue;
_pending_for_ack.erase(seq_nr);
}

if (message_size < 0) {
perror("failed to receive message");
continue;
_pending_for_ack_cv.notify_one();
} else {
// we received a potentially new message
_delivered_mutex.lock();
auto has_not_been_delivered =
_delivered.find({process_id, seq_nr}) == _delivered.end();
if (has_not_been_delivered) {
_delivered.emplace(process_id, seq_nr);
}
_delivered_mutex.unlock();

auto [is_ack, seq_nr, process_id] = _decode_message(
message, static_cast<size_t>(message_size), data_buffer);

if (is_ack) {
// mark a sent message as being acknowledged, we will no longer be
// sending it
{
std::lock_guard<std::mutex> guard(_pending_for_ack_mutex);
_pending_for_ack.erase(seq_nr);
if (has_not_been_delivered) {
// we have not yet delivered the message, do it now
for (auto& data : data_buffer) {
OwnedSlice owned = data;
callback(process_id, owned);
}
_pending_for_ack_cv.notify_one();
} else {
// we received a potentially new message
if (_delivered.find({process_id, seq_nr}) == _delivered.end()) {
// we have not yet delivered the message, do it now
for (auto& data : data_buffer) {
OwnedSlice owned = data;
callback(process_id, owned);
}
_delivered.emplace(process_id, seq_nr);
}

// send an ACK
auto [ack_message, ack_message_size] = _prepare_message(seq_nr, true);
perror_check<ssize_t>(
[&, &ack_message = ack_message,
&ack_message_size = ack_message_size] {
return sendto(sock_fd, ack_message.data(), ack_message_size, 0,
reinterpret_cast<sockaddr*>(&sender_addr),
sender_addr_len);
},
[](auto res) { return res < 0; }, "failed to send ack");
}
}
});

return listener;
// send an ACK
auto [ack_message, ack_message_size] = _prepare_message(seq_nr, true);
perror_check<ssize_t>(
[&, &ack_message = ack_message,
&ack_message_size = ack_message_size] {
return sendto(sock_fd, ack_message.data(), ack_message_size, 0,
reinterpret_cast<sockaddr*>(&sender_addr),
sender_addr_len);
},
[](auto res) { return res < 0; }, "failed to send ack");
}
}
}

0 comments on commit 405478d

Please sign in to comment.