From 61315c24546b6eb4ad0f5398a95d5610b5a1fa24 Mon Sep 17 00:00:00 2001 From: sunxilin Date: Wed, 13 Nov 2024 21:32:34 +0800 Subject: [PATCH] chore: rename cb functions in protocol --- core/include/ten_runtime/protocol/protocol.h | 10 +++--- .../ten_runtime/protocol/protocol.h | 15 ++++++--- core/src/ten_runtime/app/endpoint.c | 5 ++- .../protocol/asynced/protocol_asynced.c | 13 +++++--- .../protocol/integrated/protocol_integrated.c | 19 ++++------- core/src/ten_runtime/protocol/protocol.c | 32 +++++++++++++------ 6 files changed, 58 insertions(+), 36 deletions(-) diff --git a/core/include/ten_runtime/protocol/protocol.h b/core/include/ten_runtime/protocol/protocol.h index a3b79343cb..4af588d956 100644 --- a/core/include/ten_runtime/protocol/protocol.h +++ b/core/include/ten_runtime/protocol/protocol.h @@ -115,14 +115,14 @@ typedef void (*ten_protocol_on_output_func_t)(ten_protocol_t *self, typedef void (*ten_protocol_listen_func_t)(ten_protocol_t *self, const char *uri); -typedef ten_connection_t *(*ten_protocol_on_accepted_func_t)( - ten_protocol_t *new_protocol); +typedef ten_connection_t *(*ten_protocol_on_client_accepted_func_t)( + ten_protocol_t *self, ten_protocol_t *new_protocol); typedef bool (*ten_protocol_connect_to_func_t)(ten_protocol_t *self, const char *uri); -typedef void (*ten_protocol_on_connected_func_t)(ten_protocol_t *self, - bool success); +typedef void (*ten_protocol_on_server_connected_func_t)(ten_protocol_t *self, + bool success); typedef void (*ten_protocol_migrate_func_t)(ten_protocol_t *self, ten_engine_t *engine, @@ -190,3 +190,5 @@ TEN_RUNTIME_API ten_protocol_context_store_t *ten_protocol_get_context_store( ten_protocol_t *self); TEN_RUNTIME_API bool ten_protocol_role_is_communication(ten_protocol_t *self); + +TEN_RUNTIME_API bool ten_protocol_role_is_listening(ten_protocol_t *self); diff --git a/core/include_internal/ten_runtime/protocol/protocol.h b/core/include_internal/ten_runtime/protocol/protocol.h index 937dec9e63..5ce48846d5 100644 --- a/core/include_internal/ten_runtime/protocol/protocol.h +++ b/core/include_internal/ten_runtime/protocol/protocol.h @@ -143,11 +143,18 @@ typedef struct ten_protocol_t { ten_protocol_on_output_func_t on_output; // This is the callback function when a client connects to this protocol. - ten_protocol_on_accepted_func_t on_accepted; + // Note that this function pointer can only be set in 'ten_protocol_listen' + // and the 'listen' method should be able to call back this function when + // the client successfully establishes a connection. + ten_protocol_on_client_accepted_func_t on_client_accepted; // This is the callback function when this protocol connected to the remote // server. - ten_protocol_on_connected_func_t on_connected; + // Note that this function pointer can only be set in + // 'ten_protocol_connect_to' and the 'connect_to' method should be able to + // call back this function when the connection to the remote server is + // established. + ten_protocol_on_server_connected_func_t on_server_connected; // This is the callback function when this protocol is migrated to the new // runloop. @@ -218,11 +225,11 @@ TEN_RUNTIME_PRIVATE_API bool ten_protocol_cascade_close_upward( TEN_RUNTIME_PRIVATE_API void ten_protocol_listen( ten_protocol_t *self, const char *uri, - ten_protocol_on_accepted_func_t on_accepted); + ten_protocol_on_client_accepted_func_t on_client_accepted); TEN_RUNTIME_PRIVATE_API bool ten_protocol_connect_to( ten_protocol_t *self, const char *uri, - ten_protocol_on_connected_func_t on_connected); + ten_protocol_on_server_connected_func_t on_server_connected); TEN_RUNTIME_PRIVATE_API void ten_protocol_migrate( ten_protocol_t *self, ten_engine_t *engine, ten_connection_t *connection, diff --git a/core/src/ten_runtime/app/endpoint.c b/core/src/ten_runtime/app/endpoint.c index d484cf44e3..e8150ad854 100644 --- a/core/src/ten_runtime/app/endpoint.c +++ b/core/src/ten_runtime/app/endpoint.c @@ -16,7 +16,10 @@ #include "ten_utils/macro/check.h" static ten_connection_t *create_connection_when_client_accepted( - ten_protocol_t *protocol) { + ten_protocol_t *listening_protocol, ten_protocol_t *protocol) { + TEN_ASSERT(listening_protocol && + ten_protocol_check_integrity(listening_protocol, true), + "Should not happen."); TEN_ASSERT(protocol && ten_protocol_check_integrity(protocol, true), "Should not happen."); TEN_ASSERT(ten_protocol_attach_to(protocol) == TEN_PROTOCOL_ATTACH_TO_APP, diff --git a/core/src/ten_runtime/protocol/asynced/protocol_asynced.c b/core/src/ten_runtime/protocol/asynced/protocol_asynced.c index 9f6f534d4c..966cfe5979 100644 --- a/core/src/ten_runtime/protocol/asynced/protocol_asynced.c +++ b/core/src/ten_runtime/protocol/asynced/protocol_asynced.c @@ -346,8 +346,8 @@ static void ten_protocol_asynced_on_connected(void *self_, void *arg) { TEN_ASSERT(protocol && ten_protocol_check_integrity(&protocol->base, true), "Should not happen."); - if (protocol->base.on_connected) { - protocol->base.on_connected(&protocol->base, (bool)arg); + if (protocol->base.on_server_connected) { + protocol->base.on_server_connected(&protocol->base, (bool)arg); } // The task is completed, so delete a reference to the 'protocol' to reflect @@ -427,12 +427,12 @@ static void ten_app_thread_on_client_protocol_created(ten_env_t *ten_env, // object) might need to be migrated, so set the value to 'INIT' as the // default value is 'DONE'. Refer to 'ten_protocol_asynced_init()'. protocol->migration_state = TEN_CONNECTION_MIGRATION_STATE_INIT; - protocol->base.on_accepted = listening_protocol->base.on_accepted; ten_protocol_attach_to_app_and_thread(&protocol->base, app); - if (protocol->base.on_accepted) { - protocol->base.on_accepted(&protocol->base); + if (listening_protocol->base.on_client_accepted) { + listening_protocol->base.on_client_accepted(&listening_protocol->base, + &protocol->base); } info->on_created(protocol, info); @@ -506,6 +506,9 @@ bool ten_protocol_asynced_on_client_accepted_async( // executed. ten_ref_inc_ref(&listening_protocol->base.ref); + // TODO(xilin): Replace pushing a task to the runloop with wrapping it into + // 'ten_env' apis. + ten_runloop_t *loop = ten_protocol_get_attached_runloop(&listening_protocol->base); TEN_ASSERT(loop, diff --git a/core/src/ten_runtime/protocol/integrated/protocol_integrated.c b/core/src/ten_runtime/protocol/integrated/protocol_integrated.c index 4d18ea844f..c949fd70e9 100644 --- a/core/src/ten_runtime/protocol/integrated/protocol_integrated.c +++ b/core/src/ten_runtime/protocol/integrated/protocol_integrated.c @@ -301,17 +301,14 @@ static void ten_app_thread_on_client_protocol_created(ten_env_t *ten_env, ten_protocol_check_integrity(new_communication_base_protocol, true), "Should not happen."); - // Setup important fields of the newly created protocol. - new_communication_base_protocol->on_connected = - listening_base_protocol->on_connected; - // Attach the newly created protocol to app first. ten_protocol_attach_to_app(new_communication_base_protocol, listening_base_protocol->attached_target.app); - TEN_ASSERT(listening_base_protocol->on_accepted, "Should not happen."); + TEN_ASSERT(listening_base_protocol->on_client_accepted, "Should not happen."); TEN_UNUSED ten_connection_t *connection = - listening_base_protocol->on_accepted(new_communication_base_protocol); + listening_base_protocol->on_client_accepted( + listening_base_protocol, new_communication_base_protocol); TEN_ASSERT(connection && ten_connection_check_integrity(connection, true), "Should not happen."); @@ -427,11 +424,7 @@ static void ten_protocol_integrated_on_output_async( TEN_ASSERT(msgs, "Invalid argument."); ten_protocol_t *protocol = &self->base; - TEN_ASSERT(protocol && - // TEN_NOLINTNEXTLINE(thread-check) - // thread-check: This function is intended to be used in - // different threads. - ten_protocol_check_integrity(protocol, false) && + TEN_ASSERT(protocol && ten_protocol_check_integrity(protocol, true) && ten_protocol_role_is_communication(protocol), "Should not happen."); @@ -474,8 +467,8 @@ static void on_server_connected(ten_transport_t *transport, ten_protocol_integrated_set_stream(protocol, stream); } - if (protocol->base.on_connected) { - protocol->base.on_connected(&protocol->base, success); + if (protocol->base.on_server_connected) { + protocol->base.on_server_connected(&protocol->base, success); } if (success) { diff --git a/core/src/ten_runtime/protocol/protocol.c b/core/src/ten_runtime/protocol/protocol.c index bf0c2be010..cd13e278bc 100644 --- a/core/src/ten_runtime/protocol/protocol.c +++ b/core/src/ten_runtime/protocol/protocol.c @@ -22,6 +22,7 @@ #include "ten_runtime/addon/addon.h" #include "ten_runtime/app/app.h" #include "ten_runtime/protocol/close.h" +#include "ten_runtime/protocol/protocol.h" #include "ten_utils/lib/error.h" #include "ten_utils/lib/mutex.h" #include "ten_utils/lib/ref.h" @@ -120,10 +121,10 @@ void ten_protocol_init(ten_protocol_t *self, const char *name, self->on_output = on_output; self->listen = listen; - self->on_accepted = NULL; + self->on_client_accepted = NULL; self->connect_to = connect_to; - self->on_connected = NULL; + self->on_server_connected = NULL; self->migrate = migrate; self->on_migrated = NULL; @@ -192,11 +193,14 @@ void ten_protocol_deinit(ten_protocol_t *self) { ten_sanitizer_thread_check_deinit(&self->thread_check); } -void ten_protocol_listen(ten_protocol_t *self, const char *uri, - ten_protocol_on_accepted_func_t on_accepted) { +void ten_protocol_listen( + ten_protocol_t *self, const char *uri, + ten_protocol_on_client_accepted_func_t on_client_accepted) { TEN_ASSERT(self && ten_protocol_check_integrity(self, true), "Should not happen."); - TEN_ASSERT(self->listen && uri && on_accepted, "Should not happen."); + TEN_ASSERT(ten_protocol_role_is_listening(self), + "Only the listening protocol could listen."); + TEN_ASSERT(self->listen && uri && on_client_accepted, "Should not happen."); TEN_ASSERT(self->attach_to == TEN_PROTOCOL_ATTACH_TO_APP, "Should not happen."); @@ -209,7 +213,7 @@ void ten_protocol_listen(ten_protocol_t *self, const char *uri, TEN_ASSERT(self->context_store, "The protocol context store in app is not ready."); - self->on_accepted = on_accepted; + self->on_client_accepted = on_client_accepted; self->listen(self, uri); } @@ -386,10 +390,13 @@ ten_protocol_get_context_store_in_connect_to(ten_protocol_t *self) { return ten_app_get_protocol_context_store(app); } -bool ten_protocol_connect_to(ten_protocol_t *self, const char *uri, - ten_protocol_on_connected_func_t on_connected) { +bool ten_protocol_connect_to( + ten_protocol_t *self, const char *uri, + ten_protocol_on_server_connected_func_t on_server_connected) { TEN_ASSERT(self && ten_protocol_check_integrity(self, true), "Should not happen."); + TEN_ASSERT(ten_protocol_role_is_communication(self), + "Only the communication protocol could connect to remote."); TEN_ASSERT(uri, "Should not happen."); if (self->attach_to == TEN_PROTOCOL_ATTACH_TO_CONNECTION && @@ -406,7 +413,7 @@ bool ten_protocol_connect_to(ten_protocol_t *self, const char *uri, TEN_ASSERT(self->context_store, "The protocol context store is not ready in 'connect_to'."); - self->on_connected = on_connected; + self->on_server_connected = on_server_connected; if (self->connect_to) { return self->connect_to(self, uri); } @@ -554,3 +561,10 @@ bool ten_protocol_role_is_communication(ten_protocol_t *self) { return self->role > TEN_PROTOCOL_ROLE_LISTEN; } + +bool ten_protocol_role_is_listening(ten_protocol_t *self) { + TEN_ASSERT(self && ten_protocol_check_integrity(self, true), + "Access across threads."); + + return self->role == TEN_PROTOCOL_ROLE_LISTEN; +}