Skip to content

Commit

Permalink
make tcp send queue per-socket, add blocking enqueue impl
Browse files Browse the repository at this point in the history
  • Loading branch information
Suriya Kandaswamy committed Sep 13, 2022
1 parent 3685fcc commit b0e52b7
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 58 deletions.
2 changes: 1 addition & 1 deletion protocol/client/frontend/virtual/impl.c
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ bool virtual_wait_event(WhistFrontend* frontend, WhistFrontendEvent* event, int
void virtual_interrupt(WhistFrontend* frontend) {
WhistFrontendEvent event;
event.type = FRONTEND_EVENT_INTERRUPT;
if (fifo_queue_enqueue_item(events_queue, &event) != 0) {
if (fifo_queue_enqueue_item(events_queue, &event, false) != 0) {
LOG_ERROR("Virtual frontend interrupt failed");
}
}
Expand Down
2 changes: 1 addition & 1 deletion protocol/client/frontend/virtual/interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ static void vi_api_send_event(const WhistFrontendEvent* frontend_event) {
requested_width = frontend_event->resize.width;
requested_height = frontend_event->resize.height;
}
if (fifo_queue_enqueue_item(events_queue, frontend_event) != 0) {
if (fifo_queue_enqueue_item(events_queue, frontend_event, false) != 0) {
LOG_ERROR("Virtual event queuing failed");
}
}
Expand Down
2 changes: 0 additions & 2 deletions protocol/client/whist_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -494,8 +494,6 @@ int whist_client_main(int argc, const char* argv[]) {
// Destroy any resources being used by the client
LOG_INFO("Closing Client...");

destroy_tcp_sender();

destroy_frontend(frontend);

LOG_INFO("Client frontend has exited...");
Expand Down
2 changes: 0 additions & 2 deletions protocol/server/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -706,8 +706,6 @@ int main(int argc, char* argv[]) {
// Mark the client as permanently deactivated, which lets the threads reap
permanently_deactivate_client(server_state.client);

destroy_tcp_sender();

destroy_input_device(server_state.input_device);
server_state.input_device = NULL;

Expand Down
1 change: 0 additions & 1 deletion protocol/whist/network/network.c
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,6 @@ void set_tos(SOCKET socket, WhistTOSValue tos) {
}

void whist_init_networking(void) {
init_tcp_sender();
// Initialize any uninitialized port mappings with the identity
for (int i = 0; i <= USHRT_MAX; i++) {
if (port_mappings[i] == 0) {
Expand Down
87 changes: 55 additions & 32 deletions protocol/whist/network/tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,19 @@ typedef struct {
// Only recvp every RECV_INTERVAL_MS, to keep CPU usage low.
// This is because a recvp takes ~8ms sometimes
WhistTimer last_recvp;

// TCP send is not atomic, so we have to hold packets in a queue and send on a separate thread
WhistThread send_thread;
QueueContext* send_queue;
WhistSemaphore send_semaphore;
bool run_sender;
} TCPContext;

// TCP send is not atomic, so we have to hold packets in a queue and send on a separate thread
// Struct for holding packets on queue
typedef struct TCPQueueItem {
TCPContext* context;
TCPNetworkPacket* packet;
int packet_size;
} TCPQueueItem;
WhistThread tcp_send_thread = NULL;
QueueContext* tcp_send_queue = NULL;
bool run_tcp_sender = false;

// Time between consecutive pings
#define TCP_PING_INTERVAL_SEC 2.0
Expand Down Expand Up @@ -173,11 +175,12 @@ int tcp_send_constructed_packet(TCPContext* context, TCPPacket* packet);

/**
* @brief Multithreaded function to asynchronously
* send all TCP packets on the same thread.
* send all TCP packets for one socket context
* on the same thread.
* This prevents garbled TCP messages from
* being sent since large TCP sends are not atomic.
*
* @param opaque Unused pointer, pass NULL
* @param opaque Pointer to associated socket context
*
* @returns 0 on exit
*/
Expand Down Expand Up @@ -515,6 +518,14 @@ static void tcp_destroy_socket_context(void* raw_context) {
FATAL_ASSERT(raw_context != NULL);
TCPContext* context = raw_context;

// Destroy TCP send queue resources
context->run_sender = false;

// Any pending TCP packets will be dropped
whist_wait_thread(context->send_thread, NULL);
TCPQueueItem queue_item;
fifo_queue_destroy(context->send_queue);

closesocket(context->socket);
closesocket(context->listen_socket);
whist_destroy_mutex(context->mutex);
Expand Down Expand Up @@ -583,6 +594,9 @@ bool create_tcp_socket_context(SocketContext* network_context, char* destination
context->last_pong_id = -1;
start_timer(&context->last_ping_timer);
context->connection_lost = false;
context->send_queue = NULL;
context->send_semaphore = NULL;
context->send_thread = NULL;
start_timer(&context->last_recvp);

int ret;
Expand All @@ -601,6 +615,21 @@ bool create_tcp_socket_context(SocketContext* network_context, char* destination
return false;
}

// Set up TCP send queue
context->run_sender = true;
if ((context->send_queue = fifo_queue_create(sizeof(TCPQueueItem), 16)) == NULL ||
(context->send_semaphore = whist_create_semaphore(0)) == NULL ||
(context->send_thread = whist_create_thread(multithreaded_tcp_send, "multithreaded_tcp_send", context)) == NULL) {
// If any of the created resources are NULL, there was a failure and we need to clean up and return false
if (context->send_queue)
fifo_queue_destroy(context->send_queue);
if (context->send_semaphore)
whist_destroy_semaphore(context->send_semaphore);
free(context);
network_context->context = NULL;
return false;
}

// Restore the original timeout
set_timeout(context->socket, context->timeout);

Expand Down Expand Up @@ -648,18 +677,6 @@ int create_tcp_listen_socket(SOCKET* sock, int port, int timeout_ms) {
return 0;
}

void init_tcp_sender() {
run_tcp_sender = true;
tcp_send_queue = fifo_queue_create(sizeof(TCPQueueItem), 16);
tcp_send_thread = whist_create_thread(multithreaded_tcp_send, "multithreaded_tcp_send", NULL);
}

void destroy_tcp_sender() {
run_tcp_sender = false;
whist_wait_thread(tcp_send_thread, NULL);
fifo_queue_destroy(tcp_send_queue);
}

/*
============================
Private Function Implementations
Expand Down Expand Up @@ -799,29 +816,35 @@ int tcp_send_constructed_packet(TCPContext* context, TCPPacket* packet) {
}

// Add TCPNetworkPacket to the queue to be sent on the TCP send thread
TCPQueueItem* queue_item = safe_malloc(sizeof(TCPQueueItem));
queue_item->context = context;
queue_item->packet = network_packet;
queue_item->packet_size = packet_size;
return fifo_queue_enqueue_item(tcp_send_queue, queue_item);
TCPQueueItem queue_item;
queue_item.packet = network_packet;
queue_item.packet_size = packet_size;
if (fifo_queue_enqueue_item(context->send_queue, &queue_item, false) < 0)
return -1;
whist_post_semaphore(context->send_semaphore);
return 0;
}

int multithreaded_tcp_send(void* opaque) {
UNUSED(opaque);

TCPQueueItem queue_item;
TCPNetworkPacket* network_packet = NULL;
TCPContext* context = NULL;
TCPContext* context = (TCPContext*) opaque;
while (true) {
if (fifo_queue_dequeue_item_timeout(tcp_send_queue, &queue_item, 5000) < 0) {
if (!run_tcp_sender) {
break;
}
whist_wait_semaphore(context->send_semaphore);
// Check to see if the sender thread needs to stop running
if (!context->run_sender)
break;
// If the connection is lost, re-increment the semaphore and continue
// to try again later
if (context->connection_lost) {
whist_post_semaphore(context->send_semaphore);
continue;
}
// If there is no item to be dequeued, continue
if (fifo_queue_dequeue_item(context->send_queue, &queue_item) < 0)
continue;

network_packet = queue_item.packet;
context = queue_item.context;

int tcp_packet_size = get_tcp_network_packet_size(network_packet);

Expand Down
12 changes: 0 additions & 12 deletions protocol/whist/network/tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,4 @@ bool create_tcp_socket_context(SocketContext* context, char* destination, int po
* @return 0 on success, otherwise failure.
*/
int create_tcp_listen_socket(SOCKET* sock, int port, int timeout_ms);

/**
* @brief Create TCP send thread and resources
*
*/
void init_tcp_sender(void);

/**
* @brief Destroy TCP send thread and resources
*
*/
void destroy_tcp_sender(void);
#endif // WHIST_TCP_H
6 changes: 3 additions & 3 deletions protocol/whist/network/udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -1765,7 +1765,7 @@ void udp_handle_message(UDPContext* context, UDPPacket* packet) {
nack_id.frame_id = packet->udp_nack_data.id;
if ((short)packet->udp_nack_data.index >= 0) {
nack_id.packet_index = packet->udp_nack_data.index;
if (fifo_queue_enqueue_item(context->nack_queue, &nack_id) < 0) {
if (fifo_queue_enqueue_item(context->nack_queue, &nack_id, false) < 0) {
LOG_ERROR("Failed to enqueue NACK request");
}
} else {
Expand All @@ -1786,7 +1786,7 @@ void udp_handle_message(UDPContext* context, UDPPacket* packet) {
if (LOG_NACKING) {
LOG_INFO("Generating Nack for Frame ID %d, index %d", nack_id.frame_id, i);
}
if (fifo_queue_enqueue_item(context->nack_queue, &nack_id) < 0) {
if (fifo_queue_enqueue_item(context->nack_queue, &nack_id, false) < 0) {
LOG_ERROR("Failed to enqueue NACK request");
}
}
Expand All @@ -1809,7 +1809,7 @@ void udp_handle_message(UDPContext* context, UDPPacket* packet) {
i < packet->udp_bitarray_nack_data.numBits; i++) {
if (bit_array_test_bit(bit_arr, i)) {
nack_id.packet_index = i;
if (fifo_queue_enqueue_item(context->nack_queue, &nack_id) < 0) {
if (fifo_queue_enqueue_item(context->nack_queue, &nack_id, false) < 0) {
LOG_ERROR("Failed to enqueue NACK request");
}
}
Expand Down
35 changes: 32 additions & 3 deletions protocol/whist/utils/queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ typedef struct QueueContext {
int max_items;
WhistMutex mutex;
WhistCondition cond;
WhistSemaphore sem;
void *data;
bool destroying;
} QueueContext;

static void increment_idx(QueueContext *context, int *idx) {
Expand Down Expand Up @@ -55,26 +57,46 @@ QueueContext *fifo_queue_create(size_t item_size, int max_items) {
return NULL;
}

context->sem = whist_create_semaphore(max_items);
if (context->sem == NULL) {
fifo_queue_destroy(context);
return NULL;
}

context->item_size = item_size;
context->max_items = max_items;
context->destroying = false;
return context;
}

int fifo_queue_enqueue_item(QueueContext *context, const void *item) {
int fifo_queue_enqueue_item(QueueContext *context, const void *item, bool blocking) {
if (context == NULL) {
return -1;
}
whist_lock_mutex(context->mutex);
if (context->num_items >= context->max_items) {
while (context->num_items >= context->max_items) {
whist_unlock_mutex(context->mutex);
return -1;
if (blocking) {
whist_wait_semaphore(context->sem);
if (context->destroying) {
return -1;
}
whist_lock_mutex(context->mutex);
} else {
return -1;
}
}
context->num_items++;
void *target_item = (uint8_t *)context->data + (context->item_size * context->write_idx);
memcpy(target_item, item, context->item_size);
increment_idx(context, &context->write_idx);
whist_broadcast_cond(context->cond);
whist_unlock_mutex(context->mutex);
// If this enqueue call is not blocking, we still need to decrement the semaphore.
// Since we just successfully dequeued an element, we know that the semaphore
// count is greater than 0 and that this wait will be successful.
if (!blocking)
whist_wait_timeout_semaphore(context->sem, 0);
return 0;
}

Expand All @@ -89,6 +111,7 @@ int fifo_queue_dequeue_item(QueueContext *context, void *item) {
}
dequeue_item(context, item);
whist_unlock_mutex(context->mutex);
whist_post_semaphore(context->sem);
return 0;
}

Expand Down Expand Up @@ -118,6 +141,12 @@ void fifo_queue_destroy(QueueContext *context) {
if (context == NULL) {
return;
}
if (context->sem != NULL) {
// This ensures that a blocking enqueue will release
context->destroying = true;
whist_post_semaphore(context->sem);
whist_destroy_semaphore(context->sem);
}
if (context->data != NULL) {
free(context->data);
}
Expand Down
3 changes: 2 additions & 1 deletion protocol/whist/utils/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ QueueContext *fifo_queue_create(size_t item_size, int max_items);
*
* @param queue_context Queue's context pointer
* @param item Pointer to the item that needs to be enqueued
* @param blocking Whether the enqueue operation should be blocking
*
* @returns 0 on success, -1 on failure
*/
int fifo_queue_enqueue_item(QueueContext *queue_context, const void *item);
int fifo_queue_enqueue_item(QueueContext *queue_context, const void *item, bool blocking);

/**
* @brief Dequeue an item from the FIFO queue. If an item is not available,
Expand Down

0 comments on commit b0e52b7

Please sign in to comment.