Skip to content

Commit

Permalink
Handle reconnect gracefully (#7147)
Browse files Browse the repository at this point in the history
* Handle reconnect gracefully

* Remove comment

* Pair initialize with destroy, for full tear-down

* Use new initialize call without thread

* add debug logs to socketio server

* start socketio server again

* Update index.ts

Signed-off-by: Ming <[email protected]>

* Update index.ts

Signed-off-by: Ming <[email protected]>

* Update index.ts

Signed-off-by: Ming <[email protected]>

* Mark ret as UNUSED, but keep variable

Signed-off-by: Ming <[email protected]>
Co-authored-by: Ming <[email protected]>
  • Loading branch information
npip99 and Ming authored Aug 31, 2022
1 parent 50937d2 commit 88fb011
Show file tree
Hide file tree
Showing 12 changed files with 161 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,44 @@ void HTMLWhistElement::whistPlay() {
WHIST_VIRTUAL_INTERFACE_CALL(video.set_video_playing, whist_window_id_, true);
}

bool HTMLWhistElement::isWhistConnected() {
bool is_whist_connected = WHIST_VIRTUAL_INTERFACE_CALL(lifecycle.is_connected);
return is_whist_connected;
}

void HTMLWhistElement::whistConnect(const String& whist_parameters) {
// Initiate a new whist connection
bool new_connection = WHIST_VIRTUAL_INTERFACE_CALL(lifecycle.connect);

// Send parameters for this new whist connection
if (new_connection) {
WhistClient::WhistFrontendEvent event = {};
event.type = WhistClient::FRONTEND_EVENT_STARTUP_PARAMETER;
event.startup_parameter.error = false;
// TODO: Validate that this is actually a valid JSONObject so that we don't die on bad parameter values..
auto json_object = JSONObject::From(ParseJSON(whist_parameters));
for (wtf_size_t i = 0; i < json_object->size(); ++i) {
JSONObject::Entry entry = json_object->at(i);
String wtf_key = entry.first;
String wtf_value;
entry.second->AsString(&wtf_value);
char* key = strdup(wtf_key.Utf8().c_str());
char* value = strdup(wtf_value.Utf8().c_str());

event.startup_parameter.key = key;
event.startup_parameter.value = value;

// key and value are freed by the WhistClient event handler
WHIST_VIRTUAL_INTERFACE_CALL(events.send, &event);
}

// Mark as finished, so that whist may connect
event.startup_parameter.key = strdup("finished");
event.startup_parameter.value = NULL;
WHIST_VIRTUAL_INTERFACE_CALL(events.send, &event);
}
}

Node::InsertionNotificationRequest HTMLWhistElement::InsertedInto(
ContainerNode& insertion_point) {
SetSrcObject(nullptr); // placeholder for launching the functions that create the whistplayer
Expand All @@ -413,34 +451,6 @@ void HTMLWhistElement::RemovedFrom(ContainerNode& insertion_point) {

void HTMLWhistElement::ParseAttribute(
const AttributeModificationParams& params) {

const QualifiedName& name = params.name;
WhistClient::WhistFrontendEvent event;
event.type = WhistClient::FRONTEND_EVENT_STARTUP_PARAMETER;
event.startup_parameter.error = false;
if (name == html_names::kSrcAttr) {
// TODO: Validate that this is actually a valid JSONObject so that we don't die on bad src tags.
auto json_object = JSONObject::From(ParseJSON(params.new_value));
for (wtf_size_t i = 0; i < json_object->size(); ++i) {
JSONObject::Entry entry = json_object->at(i);
String wtf_key = entry.first;
String wtf_value;
entry.second->AsString(&wtf_value);
char* key = strdup(wtf_key.Utf8().c_str());
char* value = strdup(wtf_value.Utf8().c_str());

event.startup_parameter.key = key;
event.startup_parameter.value = value;

// key and value are freed by the WhistClient event handler
WHIST_VIRTUAL_INTERFACE_CALL(events.send, &event);
}
event.startup_parameter.key = strdup("finished");
event.startup_parameter.value = NULL;
WHIST_VIRTUAL_INTERFACE_CALL(events.send, &event);
} else {
HTMLElement::ParseAttribute(params);
}
}

void HTMLWhistElement::OpenFileChooser() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ class CORE_EXPORT HTMLWhistElement final
// These functions are called from JavaScript -- see html_whist_element.idl for bindings.
void whistPause();
void whistPlay();
bool isWhistConnected();
void whistConnect(const String& whist_parameters);

private:
class HTMLWhistElementResizeObserverDelegate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@
[ImplementedAs=whistPause] void freeze();
// TODO: Should this be a promise instead?
[ImplementedAs=whistPlay] void thaw();
// TODO: Expose a whistInit(any data) function which takes a
// JSON object of parameters to pass to the protocol.
[ImplementedAs=whistConnect] void whistConnect(DOMString whist_parameters);
[ImplementedAs=isWhistConnected] boolean isWhistConnected();
};
11 changes: 3 additions & 8 deletions browser/hybrid/third_party/whist/protocol_client_interface.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,7 @@ void InitializeWhistClient() {
whist_virtual_interface = get_virtual_interface();
}

WHIST_VIRTUAL_INTERFACE_CALL(lifecycle.connect);
// TODO: lifecycle.disconnect at some point? How to do this safely?

std::thread client_main([]() {
int ret = WHIST_VIRTUAL_INTERFACE_CALL(lifecycle.start, protocol_argc, protocol_argv);
DLOG(INFO) << "whist_client_main exited with: " << ret;
});
client_main.detach();
// Initialize whist, so that connections can be made from javascript later
WHIST_VIRTUAL_INTERFACE_CALL(lifecycle.initialize, protocol_argc, protocol_argv);
// TODO: lifecycle.destroy sometime? If necessary?
}
92 changes: 75 additions & 17 deletions protocol/client/frontend/virtual/interface.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <whist/core/whist.h>
#include <mutex>
#include <thread>
extern "C" {
#include "common.h"
// This is a crutch. Once video is callback-ized we won't need it anymore.
Expand All @@ -14,7 +15,7 @@ extern "C" {

static WhistMutex lock;
static AVFrame* pending = NULL;
static bool connected = false;
static std::atomic<bool> protocol_alive = false;
static int requested_width;
static int requested_height;

Expand Down Expand Up @@ -55,10 +56,69 @@ OnNotificationCallback on_notification_callback_ptr = NULL;
GetModifierKeyState get_modifier_key_state = NULL;
}

static void vi_api_connect(void) {
lock = whist_create_mutex();
events_queue = fifo_queue_create(sizeof(WhistFrontendEvent), MAX_EVENTS_QUEUED);
connected = true;
static WhistSemaphore connection_semaphore = whist_create_semaphore(0);
static std::thread whist_main_thread;

static int vi_api_initialize(int argc, const char* argv[]) {
// Create variables, if not already existant
if (lock == NULL) {
lock = whist_create_mutex();
}
if (events_queue == NULL) {
events_queue = fifo_queue_create(sizeof(WhistFrontendEvent), MAX_EVENTS_QUEUED);
}
// Main whist loop
whist_main_thread = std::thread([=]() -> void {
while (true) {
whist_wait_semaphore(connection_semaphore);
// If the semaphore was hit with protocol marked as dead, exit
if (protocol_alive == false) {
break;
}
// Start the protocol if valid arguments were given
if (argc > 0 && argv != NULL) {
int ret = whist_client_main(argc, argv);
UNUSED(ret);
}
// Mark the protocol as dead when main exits
protocol_alive = false;
}
});
return 0;
}

static void vi_api_destroy() {
// Verify that the protocol isn't alive
FATAL_ASSERT(protocol_alive == false);
// Kill the whist main thread by hitting the semaphore while protocl is marked-as-dead
whist_post_semaphore(connection_semaphore);
whist_main_thread.join();
}

static bool vi_api_connect() {
// Mark the protocol as alive
bool protocol_was_alive = protocol_alive.exchange(true);
// But if the protocol wasn't just alive, hit the semaphore to start it
if (protocol_was_alive == false) {
// Drain the event queue
WhistFrontendEvent event;
while (fifo_queue_dequeue_item(events_queue, &event) == 0)
;
// Hit the semaphore to start the protocol again
whist_post_semaphore(connection_semaphore);
return true;
} else {
// Do nothing, the protocol is already alive
return false;
}
}

static bool vi_api_is_connected() { return protocol_alive; }

static void vi_api_disconnect() {
// TODO: Actually force a disconnect when this happens
LOG_ERROR("Forceful disconnect!");
protocol_alive = false;
}

static void vi_api_set_on_cursor_change_callback(int window_id, OnCursorChangeCallback cb) {
Expand All @@ -70,7 +130,7 @@ static void vi_api_set_on_notification_callback(OnNotificationCallback cb) {
on_notification_callback_ptr = cb;
}

static void* vi_api_get_frame_ref(void) {
static void* vi_api_get_frame_ref() {
std::lock_guard<std::mutex> guard(whist_window_mutex);
// Consume the pending AVFrame, and return it
void* frame_ref = pending;
Expand Down Expand Up @@ -117,7 +177,7 @@ static void vi_api_set_video_playing(int window_id, bool playing) {
}

void virtual_interface_send_frame(AVFrame* frame) {
if (!connected) return;
if (!protocol_alive) return;
std::lock_guard<std::mutex> guard(whist_window_mutex);

// Update the pending frame
Expand Down Expand Up @@ -148,12 +208,6 @@ static void vi_api_set_video_frame_callback(int window_id, VideoFrameCallback ca
whist_windows[window_id].video_frame_callback_ptr = callback_ptr;
}

static void vi_api_disconnect(void) {
connected = false;
whist_destroy_mutex(lock);
fifo_queue_destroy(events_queue);
}

static void vi_api_send_event(const WhistFrontendEvent* frontend_event) {
if (frontend_event->type == FRONTEND_EVENT_RESIZE) {
requested_width = frontend_event->resize.width;
Expand Down Expand Up @@ -189,7 +243,7 @@ static void vi_api_set_get_modifier_key_state(GetModifierKeyState cb) {
get_modifier_key_state = cb;
}

static int vi_api_create_window(void) {
static int vi_api_create_window() {
std::lock_guard<std::mutex> guard(whist_window_mutex);
// Use serial window IDs, so that each window gets a unique ID
static int serial_window_ids = 1;
Expand Down Expand Up @@ -219,12 +273,16 @@ static void vi_api_destroy_window(int window_id) {
static const VirtualInterface vi = {
.lifecycle =
{
.initialize = vi_api_initialize,
.destroy = vi_api_destroy,

.connect = vi_api_connect,
.is_connected = vi_api_is_connected,
.disconnect = vi_api_disconnect,

.create_window = vi_api_create_window,
.register_context = vi_api_register_context,
.destroy_window = vi_api_destroy_window,
.start = whist_client_main,
.disconnect = vi_api_disconnect,
},
.video =
{
Expand All @@ -251,4 +309,4 @@ static const VirtualInterface vi = {
},
};

const VirtualInterface* get_virtual_interface(void) { return &vi; }
const VirtualInterface* get_virtual_interface() { return &vi; }
10 changes: 7 additions & 3 deletions protocol/client/frontend/virtual/interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@ typedef int (*GetModifierKeyState)(void);

typedef struct VirtualInterface {
struct {
void (*connect)(void);
int (*initialize)(int argc, const char* argv[]);
void (*destroy)(void);

bool (*connect)(void);
bool (*is_connected)(void);
void (*disconnect)(void);

int (*create_window)(void);
void (*register_context)(int window_id, void* context);
void (*destroy_window)(int window_id);
int (*start)(int argc, const char* argv[]);
void (*disconnect)(void);
} lifecycle;
struct {
void* (*get_frame_ref)(void);
Expand Down
5 changes: 4 additions & 1 deletion protocol/client/whist_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,8 @@ int whist_client_main(int argc, const char* argv[]) {
// Initialize logger error monitor
whist_error_monitor_initialize(true);

print_system_info();
LOG_INFO("Whist client revision %s", whist_git_revision());
WhistThread system_info_thread = print_system_info();

client_exiting = false;
WhistExitCode exit_code = WHIST_EXIT_SUCCESS;
Expand Down Expand Up @@ -450,6 +450,9 @@ int whist_client_main(int argc, const char* argv[]) {
exit_code = WHIST_EXIT_FAILURE;
}

// Wait on system info thread before destroying logger
whist_wait_thread(system_info_thread, NULL);

// Destroy any resources being used by the client
LOG_INFO("Closing Client...");

Expand Down
3 changes: 3 additions & 0 deletions protocol/server/network.c
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ int client_connect_socket(WhistServerState *state, Client *client,
return -1;
}

// Since a new connection has occurred, the stream needs a reset
state->stream_needs_restart = true;

WhistTimer connection_timer;
start_timer(&connection_timer);
bool successful_handshake = false;
Expand Down
3 changes: 3 additions & 0 deletions protocol/test/protocol_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1962,6 +1962,8 @@ int test_virtual_intr(void* arg) {

TEST_F(ProtocolTest, VirtualEventTest) {
const VirtualInterface* vi = get_virtual_interface();
// argc of 0 means protocol will not be called
vi->lifecycle.initialize(0, NULL);
vi->lifecycle.connect();
WhistFrontendEvent sent_event;
sent_event.type = FRONTEND_EVENT_RESIZE; // Can be anything. We just want this to
Expand Down Expand Up @@ -1989,6 +1991,7 @@ TEST_F(ProtocolTest, VirtualEventTest) {
whist_wait_thread(thread_context, NULL);
virtual_destroy(&frontend);
vi->lifecycle.disconnect();
vi->lifecycle.destroy();
}

TEST_F(ProtocolTest, WCCTest) {
Expand Down
4 changes: 2 additions & 2 deletions protocol/whist/core/whist.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,14 @@ void whist_init_subsystems(void) {
whist_init_networking();
}

void print_system_info(void) {
WhistThread print_system_info(void) {
/*
Print the system info of the computer
*/

WhistThread sysinfo_thread =
whist_create_thread(multithreaded_print_system_info, "print_system_info", NULL);
whist_detach_thread(sysinfo_thread);
return sysinfo_thread;
}

int runcmd(const char *cmdline, char **response) {
Expand Down
9 changes: 3 additions & 6 deletions protocol/whist/core/whist.h
Original file line number Diff line number Diff line change
Expand Up @@ -644,15 +644,12 @@ Public Functions
*/
void whist_init_subsystems(void);

/**
* @brief Print the memory trace of a process
*/
void print_memory_info(void);

/**
* @brief Print the system info of the computer
*
* @returns The thread that will be printing the system info
*/
void print_system_info(void);
WhistThread print_system_info(void);

/**
* @brief Run a system command via command prompt or
Expand Down
Loading

0 comments on commit 88fb011

Please sign in to comment.