diff --git a/protocol/client/frontend/virtual/impl.c b/protocol/client/frontend/virtual/impl.c index 21ff1ef6c71..485fd1d85f4 100644 --- a/protocol/client/frontend/virtual/impl.c +++ b/protocol/client/frontend/virtual/impl.c @@ -223,7 +223,7 @@ bool virtual_wait_event(WhistFrontend* frontend, WhistFrontendEvent* event, int void virtual_interrupt(WhistFrontend* frontend) { WhistFrontendEvent event; event.type = FRONTEND_EVENT_INTERRUPT; - if (fifo_queue_enqueue_item(events_queue, &event) != 0) { + if (fifo_queue_enqueue_item(events_queue, &event, false) != 0) { LOG_ERROR("Virtual frontend interrupt failed"); } } diff --git a/protocol/client/frontend/virtual/interface.cpp b/protocol/client/frontend/virtual/interface.cpp index 9dc32a62ea9..eb089a3902a 100644 --- a/protocol/client/frontend/virtual/interface.cpp +++ b/protocol/client/frontend/virtual/interface.cpp @@ -209,7 +209,7 @@ static void vi_api_send_event(const WhistFrontendEvent* frontend_event) { requested_width = frontend_event->resize.width; requested_height = frontend_event->resize.height; } - if (fifo_queue_enqueue_item(events_queue, frontend_event) != 0) { + if (fifo_queue_enqueue_item(events_queue, frontend_event, false) != 0) { LOG_ERROR("Virtual event queuing failed"); } } diff --git a/protocol/client/whist_client.cpp b/protocol/client/whist_client.cpp index 778a4071460..227e8a7d0ef 100644 --- a/protocol/client/whist_client.cpp +++ b/protocol/client/whist_client.cpp @@ -494,8 +494,6 @@ int whist_client_main(int argc, const char* argv[]) { // Destroy any resources being used by the client LOG_INFO("Closing Client..."); - destroy_tcp_sender(); - destroy_frontend(frontend); LOG_INFO("Client frontend has exited..."); diff --git a/protocol/server/main.c b/protocol/server/main.c index cb856e6b3d8..d01d6ca07ab 100644 --- a/protocol/server/main.c +++ b/protocol/server/main.c @@ -706,8 +706,6 @@ int main(int argc, char* argv[]) { // Mark the client as permanently deactivated, which lets the threads reap permanently_deactivate_client(server_state.client); - destroy_tcp_sender(); - destroy_input_device(server_state.input_device); server_state.input_device = NULL; diff --git a/protocol/whist/network/network.c b/protocol/whist/network/network.c index dbb420825f5..6dcb5405862 100644 --- a/protocol/whist/network/network.c +++ b/protocol/whist/network/network.c @@ -250,7 +250,6 @@ void set_tos(SOCKET socket, WhistTOSValue tos) { } void whist_init_networking(void) { - init_tcp_sender(); // Initialize any uninitialized port mappings with the identity for (int i = 0; i <= USHRT_MAX; i++) { if (port_mappings[i] == 0) { diff --git a/protocol/whist/network/tcp.c b/protocol/whist/network/tcp.c index 57e8d7a7938..0f8f3df4f79 100644 --- a/protocol/whist/network/tcp.c +++ b/protocol/whist/network/tcp.c @@ -89,17 +89,19 @@ typedef struct { // Only recvp every RECV_INTERVAL_MS, to keep CPU usage low. // This is because a recvp takes ~8ms sometimes WhistTimer last_recvp; + + // TCP send is not atomic, so we have to hold packets in a queue and send on a separate thread + WhistThread send_thread; + QueueContext* send_queue; + WhistSemaphore send_semaphore; + bool run_sender; } TCPContext; -// TCP send is not atomic, so we have to hold packets in a queue and send on a separate thread +// Struct for holding packets on queue typedef struct TCPQueueItem { - TCPContext* context; TCPNetworkPacket* packet; int packet_size; } TCPQueueItem; -WhistThread tcp_send_thread = NULL; -QueueContext* tcp_send_queue = NULL; -bool run_tcp_sender = false; // Time between consecutive pings #define TCP_PING_INTERVAL_SEC 2.0 @@ -173,11 +175,12 @@ int tcp_send_constructed_packet(TCPContext* context, TCPPacket* packet); /** * @brief Multithreaded function to asynchronously - * send all TCP packets on the same thread. + * send all TCP packets for one socket context + * on the same thread. * This prevents garbled TCP messages from * being sent since large TCP sends are not atomic. * - * @param opaque Unused pointer, pass NULL + * @param opaque Pointer to associated socket context * * @returns 0 on exit */ @@ -515,6 +518,14 @@ static void tcp_destroy_socket_context(void* raw_context) { FATAL_ASSERT(raw_context != NULL); TCPContext* context = raw_context; + // Destroy TCP send queue resources + context->run_sender = false; + + // Any pending TCP packets will be dropped + whist_wait_thread(context->send_thread, NULL); + TCPQueueItem queue_item; + fifo_queue_destroy(context->send_queue); + closesocket(context->socket); closesocket(context->listen_socket); whist_destroy_mutex(context->mutex); @@ -583,6 +594,9 @@ bool create_tcp_socket_context(SocketContext* network_context, char* destination context->last_pong_id = -1; start_timer(&context->last_ping_timer); context->connection_lost = false; + context->send_queue = NULL; + context->send_semaphore = NULL; + context->send_thread = NULL; start_timer(&context->last_recvp); int ret; @@ -601,6 +615,21 @@ bool create_tcp_socket_context(SocketContext* network_context, char* destination return false; } + // Set up TCP send queue + context->run_sender = true; + if ((context->send_queue = fifo_queue_create(sizeof(TCPQueueItem), 16)) == NULL || + (context->send_semaphore = whist_create_semaphore(0)) == NULL || + (context->send_thread = whist_create_thread(multithreaded_tcp_send, "multithreaded_tcp_send", context)) == NULL) { + // If any of the created resources are NULL, there was a failure and we need to clean up and return false + if (context->send_queue) + fifo_queue_destroy(context->send_queue); + if (context->send_semaphore) + whist_destroy_semaphore(context->send_semaphore); + free(context); + network_context->context = NULL; + return false; + } + // Restore the original timeout set_timeout(context->socket, context->timeout); @@ -648,18 +677,6 @@ int create_tcp_listen_socket(SOCKET* sock, int port, int timeout_ms) { return 0; } -void init_tcp_sender() { - run_tcp_sender = true; - tcp_send_queue = fifo_queue_create(sizeof(TCPQueueItem), 16); - tcp_send_thread = whist_create_thread(multithreaded_tcp_send, "multithreaded_tcp_send", NULL); -} - -void destroy_tcp_sender() { - run_tcp_sender = false; - whist_wait_thread(tcp_send_thread, NULL); - fifo_queue_destroy(tcp_send_queue); -} - /* ============================ Private Function Implementations @@ -799,29 +816,35 @@ int tcp_send_constructed_packet(TCPContext* context, TCPPacket* packet) { } // Add TCPNetworkPacket to the queue to be sent on the TCP send thread - TCPQueueItem* queue_item = safe_malloc(sizeof(TCPQueueItem)); - queue_item->context = context; - queue_item->packet = network_packet; - queue_item->packet_size = packet_size; - return fifo_queue_enqueue_item(tcp_send_queue, queue_item); + TCPQueueItem queue_item; + queue_item.packet = network_packet; + queue_item.packet_size = packet_size; + if (fifo_queue_enqueue_item(context->send_queue, &queue_item, false) < 0) + return -1; + whist_post_semaphore(context->send_semaphore); + return 0; } int multithreaded_tcp_send(void* opaque) { - UNUSED(opaque); - TCPQueueItem queue_item; TCPNetworkPacket* network_packet = NULL; - TCPContext* context = NULL; + TCPContext* context = (TCPContext*) opaque; while (true) { - if (fifo_queue_dequeue_item_timeout(tcp_send_queue, &queue_item, 5000) < 0) { - if (!run_tcp_sender) { - break; - } + whist_wait_semaphore(context->send_semaphore); + // Check to see if the sender thread needs to stop running + if (!context->run_sender) + break; + // If the connection is lost, re-increment the semaphore and continue + // to try again later + if (context->connection_lost) { + whist_post_semaphore(context->send_semaphore); continue; } + // If there is no item to be dequeued, continue + if (fifo_queue_dequeue_item(context->send_queue, &queue_item) < 0) + continue; network_packet = queue_item.packet; - context = queue_item.context; int tcp_packet_size = get_tcp_network_packet_size(network_packet); diff --git a/protocol/whist/network/tcp.h b/protocol/whist/network/tcp.h index f2523e30daa..10821c5e958 100644 --- a/protocol/whist/network/tcp.h +++ b/protocol/whist/network/tcp.h @@ -62,16 +62,4 @@ bool create_tcp_socket_context(SocketContext* context, char* destination, int po * @return 0 on success, otherwise failure. */ int create_tcp_listen_socket(SOCKET* sock, int port, int timeout_ms); - -/** - * @brief Create TCP send thread and resources - * - */ -void init_tcp_sender(void); - -/** - * @brief Destroy TCP send thread and resources - * - */ -void destroy_tcp_sender(void); #endif // WHIST_TCP_H diff --git a/protocol/whist/network/udp.c b/protocol/whist/network/udp.c index 5e6efc41b21..70b0c9d7ffe 100644 --- a/protocol/whist/network/udp.c +++ b/protocol/whist/network/udp.c @@ -1765,7 +1765,7 @@ void udp_handle_message(UDPContext* context, UDPPacket* packet) { nack_id.frame_id = packet->udp_nack_data.id; if ((short)packet->udp_nack_data.index >= 0) { nack_id.packet_index = packet->udp_nack_data.index; - if (fifo_queue_enqueue_item(context->nack_queue, &nack_id) < 0) { + if (fifo_queue_enqueue_item(context->nack_queue, &nack_id, false) < 0) { LOG_ERROR("Failed to enqueue NACK request"); } } else { @@ -1786,7 +1786,7 @@ void udp_handle_message(UDPContext* context, UDPPacket* packet) { if (LOG_NACKING) { LOG_INFO("Generating Nack for Frame ID %d, index %d", nack_id.frame_id, i); } - if (fifo_queue_enqueue_item(context->nack_queue, &nack_id) < 0) { + if (fifo_queue_enqueue_item(context->nack_queue, &nack_id, false) < 0) { LOG_ERROR("Failed to enqueue NACK request"); } } @@ -1809,7 +1809,7 @@ void udp_handle_message(UDPContext* context, UDPPacket* packet) { i < packet->udp_bitarray_nack_data.numBits; i++) { if (bit_array_test_bit(bit_arr, i)) { nack_id.packet_index = i; - if (fifo_queue_enqueue_item(context->nack_queue, &nack_id) < 0) { + if (fifo_queue_enqueue_item(context->nack_queue, &nack_id, false) < 0) { LOG_ERROR("Failed to enqueue NACK request"); } } diff --git a/protocol/whist/utils/queue.c b/protocol/whist/utils/queue.c index 6d161b6e1c8..0fddb7145da 100644 --- a/protocol/whist/utils/queue.c +++ b/protocol/whist/utils/queue.c @@ -15,7 +15,9 @@ typedef struct QueueContext { int max_items; WhistMutex mutex; WhistCondition cond; + WhistSemaphore sem; void *data; + bool destroying; } QueueContext; static void increment_idx(QueueContext *context, int *idx) { @@ -55,19 +57,34 @@ QueueContext *fifo_queue_create(size_t item_size, int max_items) { return NULL; } + context->sem = whist_create_semaphore(max_items); + if (context->sem == NULL) { + fifo_queue_destroy(context); + return NULL; + } + context->item_size = item_size; context->max_items = max_items; + context->destroying = false; return context; } -int fifo_queue_enqueue_item(QueueContext *context, const void *item) { +int fifo_queue_enqueue_item(QueueContext *context, const void *item, bool blocking) { if (context == NULL) { return -1; } whist_lock_mutex(context->mutex); - if (context->num_items >= context->max_items) { + while (context->num_items >= context->max_items) { whist_unlock_mutex(context->mutex); - return -1; + if (blocking) { + whist_wait_semaphore(context->sem); + if (context->destroying) { + return -1; + } + whist_lock_mutex(context->mutex); + } else { + return -1; + } } context->num_items++; void *target_item = (uint8_t *)context->data + (context->item_size * context->write_idx); @@ -75,6 +92,11 @@ int fifo_queue_enqueue_item(QueueContext *context, const void *item) { increment_idx(context, &context->write_idx); whist_broadcast_cond(context->cond); whist_unlock_mutex(context->mutex); + // If this enqueue call is not blocking, we still need to decrement the semaphore. + // Since we just successfully dequeued an element, we know that the semaphore + // count is greater than 0 and that this wait will be successful. + if (!blocking) + whist_wait_timeout_semaphore(context->sem, 0); return 0; } @@ -89,6 +111,7 @@ int fifo_queue_dequeue_item(QueueContext *context, void *item) { } dequeue_item(context, item); whist_unlock_mutex(context->mutex); + whist_post_semaphore(context->sem); return 0; } @@ -118,6 +141,12 @@ void fifo_queue_destroy(QueueContext *context) { if (context == NULL) { return; } + if (context->sem != NULL) { + // This ensures that a blocking enqueue will release + context->destroying = true; + whist_post_semaphore(context->sem); + whist_destroy_semaphore(context->sem); + } if (context->data != NULL) { free(context->data); } diff --git a/protocol/whist/utils/queue.h b/protocol/whist/utils/queue.h index 55c05c46daf..5cf67569901 100644 --- a/protocol/whist/utils/queue.h +++ b/protocol/whist/utils/queue.h @@ -23,10 +23,11 @@ QueueContext *fifo_queue_create(size_t item_size, int max_items); * * @param queue_context Queue's context pointer * @param item Pointer to the item that needs to be enqueued + * @param blocking Whether the enqueue operation should be blocking * * @returns 0 on success, -1 on failure */ -int fifo_queue_enqueue_item(QueueContext *queue_context, const void *item); +int fifo_queue_enqueue_item(QueueContext *queue_context, const void *item, bool blocking); /** * @brief Dequeue an item from the FIFO queue. If an item is not available,