Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: rename cb functions in protocol #266

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions core/include/ten_runtime/protocol/protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,14 @@ typedef void (*ten_protocol_on_output_func_t)(ten_protocol_t *self,
typedef void (*ten_protocol_listen_func_t)(ten_protocol_t *self,
const char *uri);

typedef ten_connection_t *(*ten_protocol_on_accepted_func_t)(
ten_protocol_t *new_protocol);
typedef ten_connection_t *(*ten_protocol_on_client_accepted_func_t)(
ten_protocol_t *self, ten_protocol_t *new_protocol);

typedef bool (*ten_protocol_connect_to_func_t)(ten_protocol_t *self,
const char *uri);

typedef void (*ten_protocol_on_connected_func_t)(ten_protocol_t *self,
bool success);
typedef void (*ten_protocol_on_server_connected_func_t)(ten_protocol_t *self,
bool success);

typedef void (*ten_protocol_migrate_func_t)(ten_protocol_t *self,
ten_engine_t *engine,
Expand Down Expand Up @@ -190,3 +190,5 @@ TEN_RUNTIME_API ten_protocol_context_store_t *ten_protocol_get_context_store(
ten_protocol_t *self);

TEN_RUNTIME_API bool ten_protocol_role_is_communication(ten_protocol_t *self);

TEN_RUNTIME_API bool ten_protocol_role_is_listening(ten_protocol_t *self);
15 changes: 11 additions & 4 deletions core/include_internal/ten_runtime/protocol/protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,18 @@ typedef struct ten_protocol_t {
ten_protocol_on_output_func_t on_output;

// This is the callback function when a client connects to this protocol.
ten_protocol_on_accepted_func_t on_accepted;
// Note that this function pointer can only be set in 'ten_protocol_listen'
// and the 'listen' method should be able to call back this function when
// the client successfully establishes a connection.
ten_protocol_on_client_accepted_func_t on_client_accepted;

// This is the callback function when this protocol connected to the remote
// server.
ten_protocol_on_connected_func_t on_connected;
// Note that this function pointer can only be set in
// 'ten_protocol_connect_to' and the 'connect_to' method should be able to
// call back this function when the connection to the remote server is
// established.
ten_protocol_on_server_connected_func_t on_server_connected;

// This is the callback function when this protocol is migrated to the new
// runloop.
Expand Down Expand Up @@ -218,11 +225,11 @@ TEN_RUNTIME_PRIVATE_API bool ten_protocol_cascade_close_upward(

TEN_RUNTIME_PRIVATE_API void ten_protocol_listen(
ten_protocol_t *self, const char *uri,
ten_protocol_on_accepted_func_t on_accepted);
ten_protocol_on_client_accepted_func_t on_client_accepted);

TEN_RUNTIME_PRIVATE_API bool ten_protocol_connect_to(
ten_protocol_t *self, const char *uri,
ten_protocol_on_connected_func_t on_connected);
ten_protocol_on_server_connected_func_t on_server_connected);

TEN_RUNTIME_PRIVATE_API void ten_protocol_migrate(
ten_protocol_t *self, ten_engine_t *engine, ten_connection_t *connection,
Expand Down
5 changes: 4 additions & 1 deletion core/src/ten_runtime/app/endpoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
#include "ten_utils/macro/check.h"

static ten_connection_t *create_connection_when_client_accepted(
ten_protocol_t *protocol) {
ten_protocol_t *listening_protocol, ten_protocol_t *protocol) {
TEN_ASSERT(listening_protocol &&
ten_protocol_check_integrity(listening_protocol, true),
"Should not happen.");
TEN_ASSERT(protocol && ten_protocol_check_integrity(protocol, true),
"Should not happen.");
TEN_ASSERT(ten_protocol_attach_to(protocol) == TEN_PROTOCOL_ATTACH_TO_APP,
Expand Down
13 changes: 8 additions & 5 deletions core/src/ten_runtime/protocol/asynced/protocol_asynced.c
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,8 @@ static void ten_protocol_asynced_on_connected(void *self_, void *arg) {
TEN_ASSERT(protocol && ten_protocol_check_integrity(&protocol->base, true),
"Should not happen.");

if (protocol->base.on_connected) {
protocol->base.on_connected(&protocol->base, (bool)arg);
if (protocol->base.on_server_connected) {
protocol->base.on_server_connected(&protocol->base, (bool)arg);
}

// The task is completed, so delete a reference to the 'protocol' to reflect
Expand Down Expand Up @@ -427,12 +427,12 @@ static void ten_app_thread_on_client_protocol_created(ten_env_t *ten_env,
// object) might need to be migrated, so set the value to 'INIT' as the
// default value is 'DONE'. Refer to 'ten_protocol_asynced_init()'.
protocol->migration_state = TEN_CONNECTION_MIGRATION_STATE_INIT;
protocol->base.on_accepted = listening_protocol->base.on_accepted;

ten_protocol_attach_to_app_and_thread(&protocol->base, app);

if (protocol->base.on_accepted) {
protocol->base.on_accepted(&protocol->base);
if (listening_protocol->base.on_client_accepted) {
listening_protocol->base.on_client_accepted(&listening_protocol->base,
&protocol->base);
}

info->on_created(protocol, info);
Expand Down Expand Up @@ -506,6 +506,9 @@ bool ten_protocol_asynced_on_client_accepted_async(
// executed.
ten_ref_inc_ref(&listening_protocol->base.ref);

// TODO(xilin): Replace pushing a task to the runloop with wrapping it into
// 'ten_env' apis.

ten_runloop_t *loop =
ten_protocol_get_attached_runloop(&listening_protocol->base);
TEN_ASSERT(loop,
Expand Down
19 changes: 6 additions & 13 deletions core/src/ten_runtime/protocol/integrated/protocol_integrated.c
Original file line number Diff line number Diff line change
Expand Up @@ -301,17 +301,14 @@ static void ten_app_thread_on_client_protocol_created(ten_env_t *ten_env,
ten_protocol_check_integrity(new_communication_base_protocol, true),
"Should not happen.");

// Setup important fields of the newly created protocol.
new_communication_base_protocol->on_connected =
listening_base_protocol->on_connected;

// Attach the newly created protocol to app first.
ten_protocol_attach_to_app(new_communication_base_protocol,
listening_base_protocol->attached_target.app);

TEN_ASSERT(listening_base_protocol->on_accepted, "Should not happen.");
TEN_ASSERT(listening_base_protocol->on_client_accepted, "Should not happen.");
TEN_UNUSED ten_connection_t *connection =
listening_base_protocol->on_accepted(new_communication_base_protocol);
listening_base_protocol->on_client_accepted(
listening_base_protocol, new_communication_base_protocol);
TEN_ASSERT(connection && ten_connection_check_integrity(connection, true),
"Should not happen.");

Expand Down Expand Up @@ -427,11 +424,7 @@ static void ten_protocol_integrated_on_output_async(
TEN_ASSERT(msgs, "Invalid argument.");

ten_protocol_t *protocol = &self->base;
TEN_ASSERT(protocol &&
// TEN_NOLINTNEXTLINE(thread-check)
// thread-check: This function is intended to be used in
// different threads.
ten_protocol_check_integrity(protocol, false) &&
TEN_ASSERT(protocol && ten_protocol_check_integrity(protocol, true) &&
ten_protocol_role_is_communication(protocol),
"Should not happen.");

Expand Down Expand Up @@ -474,8 +467,8 @@ static void on_server_connected(ten_transport_t *transport,
ten_protocol_integrated_set_stream(protocol, stream);
}

if (protocol->base.on_connected) {
protocol->base.on_connected(&protocol->base, success);
if (protocol->base.on_server_connected) {
protocol->base.on_server_connected(&protocol->base, success);
}

if (success) {
Expand Down
32 changes: 23 additions & 9 deletions core/src/ten_runtime/protocol/protocol.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "ten_runtime/addon/addon.h"
#include "ten_runtime/app/app.h"
#include "ten_runtime/protocol/close.h"
#include "ten_runtime/protocol/protocol.h"
#include "ten_utils/lib/error.h"
#include "ten_utils/lib/mutex.h"
#include "ten_utils/lib/ref.h"
Expand Down Expand Up @@ -120,10 +121,10 @@ void ten_protocol_init(ten_protocol_t *self, const char *name,
self->on_output = on_output;

self->listen = listen;
self->on_accepted = NULL;
self->on_client_accepted = NULL;

self->connect_to = connect_to;
self->on_connected = NULL;
self->on_server_connected = NULL;

self->migrate = migrate;
self->on_migrated = NULL;
Expand Down Expand Up @@ -192,11 +193,14 @@ void ten_protocol_deinit(ten_protocol_t *self) {
ten_sanitizer_thread_check_deinit(&self->thread_check);
}

void ten_protocol_listen(ten_protocol_t *self, const char *uri,
ten_protocol_on_accepted_func_t on_accepted) {
void ten_protocol_listen(
ten_protocol_t *self, const char *uri,
ten_protocol_on_client_accepted_func_t on_client_accepted) {
TEN_ASSERT(self && ten_protocol_check_integrity(self, true),
"Should not happen.");
TEN_ASSERT(self->listen && uri && on_accepted, "Should not happen.");
TEN_ASSERT(ten_protocol_role_is_listening(self),
"Only the listening protocol could listen.");
TEN_ASSERT(self->listen && uri && on_client_accepted, "Should not happen.");

TEN_ASSERT(self->attach_to == TEN_PROTOCOL_ATTACH_TO_APP,
"Should not happen.");
Expand All @@ -209,7 +213,7 @@ void ten_protocol_listen(ten_protocol_t *self, const char *uri,
TEN_ASSERT(self->context_store,
"The protocol context store in app is not ready.");

self->on_accepted = on_accepted;
self->on_client_accepted = on_client_accepted;
self->listen(self, uri);
}

Expand Down Expand Up @@ -386,10 +390,13 @@ ten_protocol_get_context_store_in_connect_to(ten_protocol_t *self) {
return ten_app_get_protocol_context_store(app);
}

bool ten_protocol_connect_to(ten_protocol_t *self, const char *uri,
ten_protocol_on_connected_func_t on_connected) {
bool ten_protocol_connect_to(
ten_protocol_t *self, const char *uri,
ten_protocol_on_server_connected_func_t on_server_connected) {
TEN_ASSERT(self && ten_protocol_check_integrity(self, true),
"Should not happen.");
TEN_ASSERT(ten_protocol_role_is_communication(self),
"Only the communication protocol could connect to remote.");
TEN_ASSERT(uri, "Should not happen.");

if (self->attach_to == TEN_PROTOCOL_ATTACH_TO_CONNECTION &&
Expand All @@ -406,7 +413,7 @@ bool ten_protocol_connect_to(ten_protocol_t *self, const char *uri,
TEN_ASSERT(self->context_store,
"The protocol context store is not ready in 'connect_to'.");

self->on_connected = on_connected;
self->on_server_connected = on_server_connected;
if (self->connect_to) {
return self->connect_to(self, uri);
}
Expand Down Expand Up @@ -554,3 +561,10 @@ bool ten_protocol_role_is_communication(ten_protocol_t *self) {

return self->role > TEN_PROTOCOL_ROLE_LISTEN;
}

bool ten_protocol_role_is_listening(ten_protocol_t *self) {
TEN_ASSERT(self && ten_protocol_check_integrity(self, true),
"Access across threads.");

return self->role == TEN_PROTOCOL_ROLE_LISTEN;
}
Loading