diff --git a/.vscode/launch.json b/.vscode/launch.json index 515908cdf3..129e82db4d 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -137,7 +137,7 @@ "request": "launch", "program": "${workspaceFolder}/out/linux/x64/tests/standalone/ten_runtime_smoke_test", "args": [ - "--gtest_filter=LogTest.LogFile" + "--gtest_filter=ExtensionTest.FailedToConnectToRemote" ], "cwd": "${workspaceFolder}/out/linux/x64/tests/standalone/", "env": { diff --git a/core/include/ten_runtime/protocol/protocol.h b/core/include/ten_runtime/protocol/protocol.h index 202dd7bdd2..cd1a821dcb 100644 --- a/core/include/ten_runtime/protocol/protocol.h +++ b/core/include/ten_runtime/protocol/protocol.h @@ -111,18 +111,20 @@ typedef void (*ten_protocol_close_func_t)(ten_protocol_t *self); typedef void (*ten_protocol_on_output_func_t)(ten_protocol_t *self, ten_list_t *output); -typedef void (*ten_protocol_listen_func_t)(ten_protocol_t *self, - const char *uri); - typedef ten_connection_t *(*ten_protocol_on_client_accepted_func_t)( ten_protocol_t *self, ten_protocol_t *new_protocol); -typedef bool (*ten_protocol_connect_to_func_t)(ten_protocol_t *self, - const char *uri); +typedef void (*ten_protocol_listen_func_t)( + ten_protocol_t *self, const char *uri, + ten_protocol_on_client_accepted_func_t on_client_accepted); typedef void (*ten_protocol_on_server_connected_func_t)(ten_protocol_t *self, bool success); +typedef void (*ten_protocol_connect_to_func_t)( + ten_protocol_t *self, const char *uri, + ten_protocol_on_server_connected_func_t on_server_connected); + typedef void (*ten_protocol_migrate_func_t)(ten_protocol_t *self, ten_engine_t *engine, ten_connection_t *connection, diff --git a/core/include/ten_utils/io/transport.h b/core/include/ten_utils/io/transport.h index df9721e3c0..c38467721f 100644 --- a/core/include/ten_utils/io/transport.h +++ b/core/include/ten_utils/io/transport.h @@ -56,12 +56,14 @@ struct ten_transport_t { */ void (*on_server_connected)(ten_transport_t *transport, ten_stream_t *stream, int status); + void *on_server_connected_data; /** * Callback when a new rx stream is created */ void (*on_client_accepted)(ten_transport_t *transport, ten_stream_t *stream, int status); + void *on_client_accepted_data; /** * Callback when transport closed diff --git a/core/include_internal/ten_runtime/protocol/integrated/close.h b/core/include_internal/ten_runtime/protocol/integrated/close.h index f6c6206113..886b04f9f7 100644 --- a/core/include_internal/ten_runtime/protocol/integrated/close.h +++ b/core/include_internal/ten_runtime/protocol/integrated/close.h @@ -103,6 +103,9 @@ typedef struct ten_protocol_integrated_t ten_protocol_integrated_t; +TEN_RUNTIME_PRIVATE_API void ten_protocol_integrated_on_close( + ten_protocol_integrated_t *self); + TEN_RUNTIME_PRIVATE_API void ten_protocol_integrated_on_stream_closed( ten_protocol_integrated_t *self); diff --git a/core/include_internal/ten_runtime/protocol/integrated/protocol_integrated.h b/core/include_internal/ten_runtime/protocol/integrated/protocol_integrated.h index a1c90d6a38..fecc0a3a6d 100644 --- a/core/include_internal/ten_runtime/protocol/integrated/protocol_integrated.h +++ b/core/include_internal/ten_runtime/protocol/integrated/protocol_integrated.h @@ -8,11 +8,13 @@ #include "ten_runtime/ten_config.h" +#include "include_internal/ten_runtime/protocol/integrated/retry.h" #include "include_internal/ten_runtime/protocol/protocol.h" #include "ten_utils/io/stream.h" #include "ten_utils/io/transport.h" typedef struct ten_protocol_integrated_t ten_protocol_integrated_t; +typedef struct ten_timer_t ten_timer_t; typedef void (*ten_protocol_integrated_on_input_func_t)( ten_protocol_integrated_t *protocol, ten_buf_t buf, ten_list_t *input); @@ -20,6 +22,22 @@ typedef void (*ten_protocol_integrated_on_input_func_t)( typedef ten_buf_t (*ten_protocol_integrated_on_output_func_t)( ten_protocol_integrated_t *protocol, ten_list_t *output); +typedef struct ten_protocol_integrated_connect_to_context_t { + // The protocol which is trying to connect to the server. + ten_protocol_integrated_t *protocol; + + // The server URI to connect to. + ten_string_t server_uri; + + // The callback function to be called when the connection is established or + // failed. + // + // @note Set to NULL if the callback has been called. + ten_protocol_on_server_connected_func_t on_server_connected; + + void *user_data; +} ten_protocol_integrated_connect_to_context_t; + /** * @brief This is the base class of all the protocols which uses the event loop * inside the TEN world. @@ -43,9 +61,22 @@ struct ten_protocol_integrated_t { // Used to convert TEN runtime messages to a buffer. ten_protocol_integrated_on_output_func_t on_output; + + // Used to configure the retry mechanism. + ten_protocol_integrated_retry_config_t retry_config; + ten_timer_t *retry_timer; }; TEN_RUNTIME_API void ten_protocol_integrated_init( ten_protocol_integrated_t *self, const char *name, ten_protocol_integrated_on_input_func_t on_input, ten_protocol_integrated_on_output_func_t on_output); + +TEN_RUNTIME_PRIVATE_API ten_protocol_integrated_connect_to_context_t * +ten_protocol_integrated_connect_to_context_create( + ten_protocol_integrated_t *self, const char *server_uri, + ten_protocol_on_server_connected_func_t on_server_connected, + void *user_data); + +TEN_RUNTIME_PRIVATE_API void ten_protocol_integrated_connect_to_context_destroy( + ten_protocol_integrated_connect_to_context_t *context); diff --git a/core/include_internal/ten_runtime/protocol/integrated/retry.h b/core/include_internal/ten_runtime/protocol/integrated/retry.h new file mode 100644 index 0000000000..90f49ffa05 --- /dev/null +++ b/core/include_internal/ten_runtime/protocol/integrated/retry.h @@ -0,0 +1,28 @@ +// +// Copyright © 2024 Agora +// This file is part of TEN Framework, an open source project. +// Licensed under the Apache License, Version 2.0, with certain conditions. +// Refer to the "LICENSE" file in the root directory for more information. +// +#pragma once + +#include "ten_runtime/ten_config.h" + +#include +#include + +typedef struct ten_protocol_integrated_t ten_protocol_integrated_t; + +typedef struct ten_protocol_integrated_retry_config_t { + // Whether to enable the retry mechanism. + bool enable; + + // The max retry times. + uint32_t max_retries; + + // The interval between retries. + uint32_t interval_ms; +} ten_protocol_integrated_retry_config_t; + +TEN_RUNTIME_PRIVATE_API void ten_protocol_integrated_retry_config_init( + ten_protocol_integrated_retry_config_t *self); diff --git a/core/include_internal/ten_runtime/protocol/protocol.h b/core/include_internal/ten_runtime/protocol/protocol.h index 901c519866..9713abdec0 100644 --- a/core/include_internal/ten_runtime/protocol/protocol.h +++ b/core/include_internal/ten_runtime/protocol/protocol.h @@ -139,20 +139,6 @@ typedef struct ten_protocol_t { // Used to handle the output TEN messages to the remote. ten_protocol_on_output_func_t on_output; - // This is the callback function when a client connects to this protocol. - // Note that this function pointer can only be set in 'ten_protocol_listen' - // and the 'listen' method should be able to call back this function when - // the client successfully establishes a connection. - ten_protocol_on_client_accepted_func_t on_client_accepted; - - // This is the callback function when this protocol connected to the remote - // server. - // Note that this function pointer can only be set in - // 'ten_protocol_connect_to' and the 'connect_to' method should be able to - // call back this function when the connection to the remote server is - // established. - ten_protocol_on_server_connected_func_t on_server_connected; - // This is the callback function when this protocol is migrated to the new // runloop. ten_protocol_on_migrated_func_t on_migrated; @@ -211,7 +197,7 @@ TEN_RUNTIME_PRIVATE_API void ten_protocol_listen( ten_protocol_t *self, const char *uri, ten_protocol_on_client_accepted_func_t on_client_accepted); -TEN_RUNTIME_PRIVATE_API bool ten_protocol_connect_to( +TEN_RUNTIME_PRIVATE_API void ten_protocol_connect_to( ten_protocol_t *self, const char *uri, ten_protocol_on_server_connected_func_t on_server_connected); diff --git a/core/include_internal/ten_runtime/timer/timer.h b/core/include_internal/ten_runtime/timer/timer.h index b262575d81..fecde5b79d 100644 --- a/core/include_internal/ten_runtime/timer/timer.h +++ b/core/include_internal/ten_runtime/timer/timer.h @@ -44,6 +44,16 @@ struct ten_timer_t { int32_t requested_times; // TEN_TIMER_INFINITE means "forever" int32_t times; + // If the auto_restart flag is set to be 'false', it will __not__ + // automatically restart timing after each timeout. Instead, the user needs to + // manually restart the timer (ten_timer_enable). When the number of timeouts + // exceeds the specified times, the timer will automatically close. + // + // Conversely, if auto_restart is set to be 'true' (by default), the timer + // will automatically decide whether to restart timing or close the timer + // based on its policy after each timeout. + bool auto_restart; + ten_loc_t src_loc; ten_runloop_timer_t *backend; @@ -58,7 +68,8 @@ TEN_RUNTIME_PRIVATE_API ten_timer_t *ten_timer_create_with_cmd( TEN_RUNTIME_PRIVATE_API ten_timer_t *ten_timer_create(ten_runloop_t *runloop, uint64_t timeout_in_us, - int32_t requested_times); + int32_t requested_times, + bool auto_restart); TEN_RUNTIME_PRIVATE_API void ten_timer_destroy(ten_timer_t *self); diff --git a/core/include_internal/ten_utils/log/log.h b/core/include_internal/ten_utils/log/log.h index deb457e3c5..dbd04742f9 100644 --- a/core/include_internal/ten_utils/log/log.h +++ b/core/include_internal/ten_utils/log/log.h @@ -60,4 +60,6 @@ TEN_UTILS_API void ten_log_global_deinit(void); TEN_UTILS_API void ten_log_global_set_output_level(TEN_LOG_LEVEL level); +TEN_UTILS_API void ten_log_global_set_output_to_stderr(void); + TEN_UTILS_API void ten_log_global_set_output_to_file(const char *log_path); diff --git a/core/include_internal/ten_utils/log/output.h b/core/include_internal/ten_utils/log/output.h index 6a468cc112..ed5e959157 100644 --- a/core/include_internal/ten_utils/log/output.h +++ b/core/include_internal/ten_utils/log/output.h @@ -8,6 +8,8 @@ #include "ten_utils/ten_config.h" +#include + /** * @brief String to put in the end of each log line (can be empty). */ @@ -20,8 +22,15 @@ typedef struct ten_string_t ten_string_t; TEN_UTILS_API void ten_log_set_output_to_stderr(ten_log_t *self); +TEN_UTILS_PRIVATE_API void ten_log_output_to_file_cb(ten_string_t *msg, + void *user_data); + TEN_UTILS_PRIVATE_API void ten_log_output_to_stderr_cb(ten_string_t *msg, void *user_data); TEN_UTILS_PRIVATE_API void ten_log_set_output_to_file(ten_log_t *self, const char *log_path); + +TEN_UTILS_PRIVATE_API void ten_log_output_to_file_deinit(ten_log_t *self); + +TEN_UTILS_PRIVATE_API bool ten_log_is_output_to_file(ten_log_t *self); diff --git a/core/src/ten_runtime/app/metadata.c b/core/src/ten_runtime/app/metadata.c index b875ad5f51..50c3009a6e 100644 --- a/core/src/ten_runtime/app/metadata.c +++ b/core/src/ten_runtime/app/metadata.c @@ -180,6 +180,10 @@ bool ten_app_handle_ten_namespace_properties(ten_app_t *self) { self->one_event_loop_per_engine = false; self->long_running_mode = false; + // First, set the log-related configuration to default values. This way, if + // there are no log-related properties under the `ten` namespace, the default + // values will be used. + ten_log_global_set_output_to_stderr(); ten_log_global_set_output_level(DEFAULT_LOG_OUTPUT_LEVEL); if (!ten_app_determine_ten_namespace_properties(self, diff --git a/core/src/ten_runtime/connection/connection.c b/core/src/ten_runtime/connection/connection.c index e6b16d4db9..f0bd87912c 100644 --- a/core/src/ten_runtime/connection/connection.c +++ b/core/src/ten_runtime/connection/connection.c @@ -393,14 +393,13 @@ void ten_connection_connect_to(ten_connection_t *self, const char *uri, "Should not happen."); } - if (self->protocol) { - bool is_connected = - ten_protocol_connect_to(self->protocol, uri, on_server_connected); + TEN_ASSERT( + self->protocol && ten_protocol_check_integrity(self->protocol, true), + "Should not happen."); + TEN_ASSERT(ten_protocol_role_is_communication(self->protocol), + "Should not happen."); - if (!is_connected && on_server_connected) { - on_server_connected(self->protocol, false); - } - } + ten_protocol_connect_to(self->protocol, uri, on_server_connected); } void ten_connection_attach_to_remote(ten_connection_t *self, diff --git a/core/src/ten_runtime/extension/internal/path_timer.c b/core/src/ten_runtime/extension/internal/path_timer.c index 398f24878d..9f6d5c34e0 100644 --- a/core/src/ten_runtime/extension/internal/path_timer.c +++ b/core/src/ten_runtime/extension/internal/path_timer.c @@ -116,7 +116,7 @@ ten_timer_t *ten_extension_create_timer_for_in_path(ten_extension_t *self) { ten_timer_t *timer = ten_timer_create( ten_extension_thread_get_attached_runloop(extension_thread), - self->path_timeout_info.check_interval, TEN_TIMER_INFINITE); + self->path_timeout_info.check_interval, TEN_TIMER_INFINITE, true); ten_timer_set_on_triggered(timer, ten_extension_in_path_timer_on_triggered, self); @@ -136,7 +136,7 @@ ten_timer_t *ten_extension_create_timer_for_out_path(ten_extension_t *self) { ten_timer_t *timer = ten_timer_create( ten_extension_thread_get_attached_runloop(extension_thread), - self->path_timeout_info.check_interval, TEN_TIMER_INFINITE); + self->path_timeout_info.check_interval, TEN_TIMER_INFINITE, true); ten_timer_set_on_triggered(timer, ten_extension_out_path_timer_on_triggered, self); diff --git a/core/src/ten_runtime/protocol/integrated/close.c b/core/src/ten_runtime/protocol/integrated/close.c index 5f21568bcf..e003751bcb 100644 --- a/core/src/ten_runtime/protocol/integrated/close.c +++ b/core/src/ten_runtime/protocol/integrated/close.c @@ -10,6 +10,7 @@ #include "include_internal/ten_runtime/protocol/integrated/close.h" #include "include_internal/ten_runtime/protocol/integrated/protocol_integrated.h" #include "include_internal/ten_runtime/protocol/protocol.h" +#include "include_internal/ten_runtime/timer/timer.h" #include "ten_utils/io/stream.h" #include "ten_utils/log/log.h" #include "ten_utils/macro/check.h" @@ -39,6 +40,10 @@ static bool ten_protocol_integrated_could_be_close( if (self->role_facility.communication_stream) { return false; } + + if (self->retry_timer) { + return false; + } break; default: TEN_ASSERT(0, "Should not happen."); @@ -48,7 +53,7 @@ static bool ten_protocol_integrated_could_be_close( return true; } -static void ten_protocol_integrated_on_close(ten_protocol_integrated_t *self) { +void ten_protocol_integrated_on_close(ten_protocol_integrated_t *self) { TEN_ASSERT(self, "Should not happen."); ten_protocol_t *protocol = &self->base; @@ -141,6 +146,12 @@ void ten_protocol_integrated_close(ten_protocol_integrated_t *self) { ten_stream_close(self->role_facility.communication_stream); perform_any_closing_operation = true; } + + if (self->retry_timer) { + ten_timer_stop_async(self->retry_timer); + ten_timer_close_async(self->retry_timer); + perform_any_closing_operation = true; + } break; default: diff --git a/core/src/ten_runtime/protocol/integrated/protocol_integrated.c b/core/src/ten_runtime/protocol/integrated/protocol_integrated.c index ffb95474d3..bddf4c0712 100644 --- a/core/src/ten_runtime/protocol/integrated/protocol_integrated.c +++ b/core/src/ten_runtime/protocol/integrated/protocol_integrated.c @@ -33,6 +33,7 @@ #include "ten_utils/lib/mutex.h" #include "ten_utils/lib/ref.h" #include "ten_utils/lib/smart_ptr.h" +#include "ten_utils/log/log.h" #include "ten_utils/macro/check.h" #include "ten_utils/macro/mark.h" @@ -127,7 +128,7 @@ static void ten_stream_on_data(ten_stream_t *stream, void *data, int size) { if (size < 0) { // Something unexpected happened, close the protocol. - TEN_LOGV("Failed to receive data, close the protocol: %d", size); + TEN_LOGD("Failed to receive data, close the protocol: %d", size); // This branch means that the client side closes the physical connection // first, and then the corresponding protocol will be closed. An example of @@ -284,6 +285,9 @@ static void ten_app_thread_on_client_protocol_created(ten_env_t *ten_env, ten_stream_t *stream = cb_data; TEN_ASSERT(stream, "Should not happen."); + ten_protocol_on_client_accepted_func_t on_client_accepted = stream->user_data; + TEN_ASSERT(on_client_accepted, "Should not happen."); + ten_app_t *app = ten_env_get_attached_app(ten_env); TEN_ASSERT(app && ten_app_check_integrity(app, true), "Should not happen."); @@ -304,10 +308,8 @@ static void ten_app_thread_on_client_protocol_created(ten_env_t *ten_env, ten_protocol_attach_to_app(new_communication_base_protocol, listening_base_protocol->attached_target.app); - TEN_ASSERT(listening_base_protocol->on_client_accepted, "Should not happen."); - TEN_UNUSED ten_connection_t *connection = - listening_base_protocol->on_client_accepted( - listening_base_protocol, new_communication_base_protocol); + TEN_UNUSED ten_connection_t *connection = on_client_accepted( + listening_base_protocol, new_communication_base_protocol); TEN_ASSERT(connection && ten_connection_check_integrity(connection, true), "Should not happen."); @@ -319,13 +321,22 @@ static void ten_app_thread_on_client_protocol_created(ten_env_t *ten_env, TEN_ASSERT(!rc, "ten_stream_start_read() failed: %d", rc); } -static void ten_protocol_integrated_on_client_accepted( - ten_transport_t *transport, ten_stream_t *stream, TEN_UNUSED int status) { +static void ten_transport_on_client_accepted(ten_transport_t *transport, + ten_stream_t *stream, + TEN_UNUSED int status) { TEN_ASSERT(transport && stream, "Should not happen."); ten_protocol_integrated_t *listening_protocol = transport->user_data; TEN_ASSERT(listening_protocol, "Should not happen."); + // The `on_client_accepted_data` in transport stores the `on_client_accepted` + // callback function set by the TEN runtime. + ten_protocol_on_client_accepted_func_t on_client_accepted = + transport->on_client_accepted_data; + TEN_ASSERT(on_client_accepted, "Should not happen."); + + stream->user_data = on_client_accepted; + ten_protocol_t *listening_base_protocol = &listening_protocol->base; TEN_ASSERT(listening_base_protocol && ten_protocol_check_integrity(listening_base_protocol, true), @@ -353,8 +364,10 @@ static void ten_protocol_integrated_on_client_accepted( ten_error_deinit(&err); } -static void ten_protocol_integrated_listen(ten_protocol_integrated_t *self, - const char *uri) { +static void ten_protocol_integrated_listen( + ten_protocol_t *self_, const char *uri, + ten_protocol_on_client_accepted_func_t on_client_accepted) { + ten_protocol_integrated_t *self = (ten_protocol_integrated_t *)self_; TEN_ASSERT(self && ten_protocol_check_integrity(&self->base, true), "Should not happen."); TEN_ASSERT(uri, "Should not happen."); @@ -373,7 +386,14 @@ static void ten_protocol_integrated_listen(ten_protocol_integrated_t *self, self->role_facility.listening_transport = transport; transport->user_data = self; - transport->on_client_accepted = ten_protocol_integrated_on_client_accepted; + // When a client connects, it is first handled using + // `ten_transport_on_client_accepted`, and only afterward is the + // `on_client_accepted` defined from TEN runtime. This way, some tasks can be + // performed within the protocol at the transport/stream layer first, before + // switching to the TEN runtime's `on_client_accepted` callback. + transport->on_client_accepted = ten_transport_on_client_accepted; + transport->on_client_accepted_data = on_client_accepted; + ten_transport_set_close_cb(transport, ten_protocol_integrated_on_transport_closed, self); @@ -385,6 +405,8 @@ static void ten_protocol_integrated_listen(ten_protocol_integrated_t *self, if (rc) { TEN_LOGE("Failed to create a listening endpoint (%s): %d", ten_string_get_raw_str(transport_uri), rc); + + // TODO(xilin): Handle the error. } ten_string_destroy(transport_uri); @@ -447,38 +469,218 @@ static void ten_protocol_integrated_on_output_async( NULL); } -static void on_server_connected(ten_transport_t *transport, - ten_stream_t *stream, int status) { +static void ten_protocol_integrated_on_server_finally_connected( + ten_protocol_integrated_connect_to_context_t *cb_data, bool success) { + TEN_ASSERT(cb_data, "Should not happen."); + TEN_ASSERT(cb_data->on_server_connected, "Should not happen."); + + ten_protocol_integrated_t *protocol = cb_data->protocol; + TEN_ASSERT(protocol && ten_protocol_check_integrity(&protocol->base, true), + "Should not happen."); + + cb_data->on_server_connected(&cb_data->protocol->base, success); + cb_data->on_server_connected = NULL; + + ten_protocol_integrated_connect_to_context_destroy(cb_data); +} + +static void ten_transport_on_server_connected_after_retry( + ten_transport_t *transport, ten_stream_t *stream, int status) { ten_protocol_integrated_t *protocol = transport->user_data; + // Since the transport is created with the runloop of the engine, it is + // currently in the engine thread. TEN_ASSERT(protocol && ten_protocol_check_integrity(&protocol->base, true), "Should not happen."); TEN_ASSERT(ten_protocol_role_is_communication(&protocol->base), "Should not happen."); - // If the protocol is closing, do not handle this connect success event. + ten_protocol_integrated_connect_to_context_t *connect_to_context = + transport->on_server_connected_data; + TEN_ASSERT(connect_to_context, "Should not happen."); + TEN_ASSERT(connect_to_context->on_server_connected, "Should not happen."); + if (ten_protocol_is_closing(&protocol->base)) { + ten_stream_close(stream); + // The ownership of the 'connect_to_context' is transferred to the timer, so + // the 'connect_to_context' will be freed when the timer is closed. return; } + TEN_ASSERT(protocol->retry_timer, "Should not happen."); + bool success = status >= 0; if (success) { ten_protocol_integrated_set_stream(protocol, stream); + + connect_to_context->on_server_connected(&protocol->base, success); + transport->on_server_connected_data = NULL; + // Set 'on_server_connected' to NULL to indicate that this callback has + // already been called and to prevent it from being called again. + connect_to_context->on_server_connected = NULL; + + ten_stream_start_read(stream); + + TEN_LOGD("Connect to %s successfully after retry", + ten_string_get_raw_str(&connect_to_context->server_uri)); + + ten_timer_stop_async(protocol->retry_timer); + ten_timer_close_async(protocol->retry_timer); + } else { + ten_stream_close(stream); + + // Reset the timer to retry or close the timer if the retry times are + // exhausted. + ten_timer_enable(protocol->retry_timer); + + TEN_LOGD("Failed to connect to %s after retry", + ten_string_get_raw_str(&connect_to_context->server_uri)); + } +} + +static void ten_protocol_integrated_on_retry_timer_triggered( + TEN_UNUSED ten_timer_t *self, void *on_trigger_data) { + ten_protocol_integrated_connect_to_context_t *connect_to_context = + on_trigger_data; + TEN_ASSERT(connect_to_context, "Should not happen."); + + ten_protocol_integrated_t *protocol = connect_to_context->protocol; + TEN_ASSERT(protocol && ten_protocol_check_integrity(&protocol->base, true), + "Should not happen."); + + ten_runloop_t *loop = ten_protocol_get_attached_runloop(&protocol->base); + TEN_ASSERT(loop, "Should not happen."); + + ten_transport_t *transport = ten_transport_create(loop); + transport->user_data = protocol; + transport->on_server_connected = + ten_transport_on_server_connected_after_retry; + transport->on_server_connected_data = connect_to_context; + + int rc = ten_transport_connect(transport, &connect_to_context->server_uri); + if (rc) { + // If the 'ten_transport_connect' directly returns error, it could be due to + // invalid parameters or other errors which cannot be solved by retrying. + + TEN_LOGW( + "Failed to connect to %s due to invalid parameters or other fatal " + "errors.", + ten_string_get_raw_str(&connect_to_context->server_uri)); + + transport->on_server_connected_data = NULL; + ten_transport_close(transport); + + connect_to_context->on_server_connected(&protocol->base, false); + // Set 'on_server_connected' to NULL to indicate that this callback has + // already been called and to prevent it from being called again. + connect_to_context->on_server_connected = NULL; + + // If the 'ten_transport_connect' directly returns error, it could be due to + // invalid parameters or other errors which cannot be solved by retrying. So + // we close the timer here. + ten_timer_stop_async(protocol->retry_timer); + ten_timer_close_async(protocol->retry_timer); + } +} + +static void ten_protocol_integrated_on_retry_timer_closed(ten_timer_t *timer, + void *user_data) { + TEN_ASSERT(timer, "Should not happen."); + + ten_protocol_integrated_connect_to_context_t *connect_to_context = user_data; + TEN_ASSERT(connect_to_context, "Should not happen."); + + ten_protocol_integrated_t *protocol = connect_to_context->protocol; + TEN_ASSERT(protocol && ten_protocol_check_integrity(&protocol->base, true), + "Should not happen."); + + if (connect_to_context->on_server_connected) { + TEN_LOGD( + "Retry timer is closed, but the connection to %s is not established " + "yet", + ten_string_get_raw_str(&connect_to_context->server_uri)); + ten_protocol_integrated_on_server_finally_connected(connect_to_context, + false); + } else { + ten_protocol_integrated_connect_to_context_destroy(connect_to_context); } - if (protocol->base.on_server_connected) { - protocol->base.on_server_connected(&protocol->base, success); + protocol->retry_timer = NULL; + + if (ten_protocol_is_closing(&protocol->base)) { + ten_protocol_integrated_on_close(protocol); } +} + +static void ten_transport_on_server_connected(ten_transport_t *transport, + ten_stream_t *stream, + int status) { + ten_protocol_integrated_t *protocol = transport->user_data; + + // Since the transport is created with the runloop of the engine, it is + // currently in the engine thread. + TEN_ASSERT(protocol && ten_protocol_check_integrity(&protocol->base, true), + "Should not happen."); + TEN_ASSERT(ten_protocol_role_is_communication(&protocol->base), + "Should not happen."); + + TEN_ASSERT(!protocol->retry_timer, "Should not happen."); + + ten_protocol_integrated_connect_to_context_t *cb_data = + transport->on_server_connected_data; + TEN_ASSERT(cb_data, "Should not happen."); + TEN_ASSERT(cb_data->on_server_connected, "Should not happen."); + + if (ten_protocol_is_closing(&protocol->base)) { + ten_stream_close(stream); + + ten_protocol_integrated_on_server_finally_connected(cb_data, false); + return; + } + + bool success = status >= 0; if (success) { + ten_protocol_integrated_on_server_finally_connected(cb_data, success); + + ten_protocol_integrated_set_stream(protocol, stream); ten_stream_start_read(stream); } else { ten_stream_close(stream); + + bool need_retry = + protocol->retry_config.enable && protocol->retry_config.max_retries > 0; + + if (!need_retry) { + ten_protocol_integrated_on_server_finally_connected(cb_data, success); + return; + } + + ten_runloop_t *loop = ten_protocol_get_attached_runloop(&protocol->base); + TEN_ASSERT(loop, "Should not happen."); + + ten_timer_t *timer = ten_timer_create( + loop, (uint64_t)protocol->retry_config.interval_ms * 1000, + (int32_t)protocol->retry_config.max_retries, false); + TEN_ASSERT(timer, "Should not happen."); + + protocol->retry_timer = timer; + + // Note that the ownership of the 'cb_data' is transferred to the timer. + // The 'cb_data' will be freed when the timer is closed. + ten_timer_set_on_triggered( + timer, ten_protocol_integrated_on_retry_timer_triggered, cb_data); + ten_timer_set_on_closed( + timer, ten_protocol_integrated_on_retry_timer_closed, cb_data); + + ten_timer_enable(timer); } } -static bool ten_protocol_integrated_connect_to(ten_protocol_integrated_t *self, - const char *uri) { +static void ten_protocol_integrated_connect_to( + ten_protocol_t *self_, const char *uri, + ten_protocol_on_server_connected_func_t on_server_connected) { + ten_protocol_integrated_t *self = (ten_protocol_integrated_t *)self_; TEN_ASSERT(self && ten_protocol_check_integrity(&self->base, true), "Should not happen."); TEN_ASSERT(uri, "Should not happen."); @@ -493,25 +695,41 @@ static bool ten_protocol_integrated_connect_to(ten_protocol_integrated_t *self, self->base.attached_target.connection->attached_target.remote->engine, true), "Should not happen."); + TEN_ASSERT(!self->retry_timer, "Should not happen."); ten_runloop_t *loop = ten_remote_get_attached_runloop( self->base.attached_target.connection->attached_target.remote); TEN_ASSERT(loop, "Should not happen."); + ten_string_t *transport_uri = ten_protocol_uri_to_transport_uri(uri); + TEN_ASSERT(transport_uri, "Should not happen."); + + // Note that if connection fails, the transport needs to be closed. ten_transport_t *transport = ten_transport_create(loop); transport->user_data = self; - transport->on_server_connected = on_server_connected; + transport->on_server_connected = ten_transport_on_server_connected; - ten_string_t *transport_uri = ten_protocol_uri_to_transport_uri(uri); - TEN_ASSERT(transport_uri, "Should not happen."); - if (ten_transport_connect(transport, transport_uri)) { + // The 'connect_to_server_context' will be freed once the + // 'on_server_connected' callback is called. + ten_protocol_integrated_connect_to_context_t *connect_to_server_context = + ten_protocol_integrated_connect_to_context_create( + self, ten_string_get_raw_str(transport_uri), on_server_connected, + NULL); + transport->on_server_connected_data = connect_to_server_context; + + int rc = ten_transport_connect(transport, transport_uri); + ten_string_destroy(transport_uri); + + if (rc) { TEN_LOGW("Failed to connect to %s", ten_string_get_raw_str(transport_uri)); - // Something wrong when connecting. + // If the 'ten_transport_connect' directly returns error, it could be due to + // invalid parameters or other errors which cannot be solved by retrying. So + // we don't need to retry here. + ten_protocol_integrated_on_server_finally_connected( + connect_to_server_context, false); + ten_transport_close(transport); - return false; } - ten_string_destroy(transport_uri); - return true; } static void ten_protocol_integrated_on_stream_cleaned( @@ -662,8 +880,7 @@ void ten_protocol_integrated_init( &self->base, name, (ten_protocol_close_func_t)ten_protocol_integrated_close, (ten_protocol_on_output_func_t)ten_protocol_integrated_on_output_async, - (ten_protocol_listen_func_t)ten_protocol_integrated_listen, - (ten_protocol_connect_to_func_t)ten_protocol_integrated_connect_to, + ten_protocol_integrated_listen, ten_protocol_integrated_connect_to, (ten_protocol_migrate_func_t)ten_protocol_integrated_migrate, (ten_protocol_clean_func_t)ten_protocol_integrated_clean); @@ -676,4 +893,38 @@ void ten_protocol_integrated_init( self->on_input = on_input; self->on_output = on_output; + ten_protocol_integrated_retry_config_init(&self->retry_config); + self->retry_timer = NULL; +} + +ten_protocol_integrated_connect_to_context_t * +ten_protocol_integrated_connect_to_context_create( + ten_protocol_integrated_t *self, const char *server_uri, + ten_protocol_on_server_connected_func_t on_server_connected, + void *user_data) { + TEN_ASSERT(server_uri, "Invalid argument."); + TEN_ASSERT(on_server_connected, "Invalid argument."); + + ten_protocol_integrated_connect_to_context_t *context = + (ten_protocol_integrated_connect_to_context_t *)TEN_MALLOC( + sizeof(ten_protocol_integrated_connect_to_context_t)); + TEN_ASSERT(context, "Failed to allocate memory."); + + ten_string_init_from_c_str(&context->server_uri, server_uri, + strlen(server_uri)); + context->on_server_connected = on_server_connected; + context->user_data = user_data; + context->protocol = self; + + return context; +} + +void ten_protocol_integrated_connect_to_context_destroy( + ten_protocol_integrated_connect_to_context_t *context) { + TEN_ASSERT(context, "Invalid argument."); + // Ensure the callback has been called and reset to NULL. + TEN_ASSERT(!context->on_server_connected, "Invalid argument."); + + ten_string_deinit(&context->server_uri); + TEN_FREE(context); } diff --git a/core/src/ten_runtime/protocol/integrated/retry.c b/core/src/ten_runtime/protocol/integrated/retry.c new file mode 100644 index 0000000000..b3453968aa --- /dev/null +++ b/core/src/ten_runtime/protocol/integrated/retry.c @@ -0,0 +1,14 @@ +// +// Copyright © 2024 Agora +// This file is part of TEN Framework, an open source project. +// Licensed under the Apache License, Version 2.0, with certain conditions. +// Refer to the "LICENSE" file in the root directory for more information. +// +#include "include_internal/ten_runtime/protocol/integrated/retry.h" + +void ten_protocol_integrated_retry_config_init( + ten_protocol_integrated_retry_config_t *self) { + self->enable = false; + self->max_retries = 0; + self->interval_ms = 0; +} diff --git a/core/src/ten_runtime/protocol/protocol.c b/core/src/ten_runtime/protocol/protocol.c index 8557f89e72..1311b79f95 100644 --- a/core/src/ten_runtime/protocol/protocol.c +++ b/core/src/ten_runtime/protocol/protocol.c @@ -118,10 +118,7 @@ void ten_protocol_init(ten_protocol_t *self, const char *name, self->on_output = on_output; self->listen = listen; - self->on_client_accepted = NULL; - self->connect_to = connect_to; - self->on_server_connected = NULL; self->migrate = migrate; self->on_migrated = NULL; @@ -202,8 +199,7 @@ void ten_protocol_listen( TEN_ASSERT(app && ten_app_check_integrity(app, true), "Access across threads."); - self->on_client_accepted = on_client_accepted; - self->listen(self, uri); + self->listen(self, uri, on_client_accepted); } bool ten_protocol_cascade_close_upward(ten_protocol_t *self) { @@ -348,7 +344,7 @@ void ten_protocol_send_msg(ten_protocol_t *self, ten_shared_ptr_t *msg) { } } -bool ten_protocol_connect_to( +void ten_protocol_connect_to( ten_protocol_t *self, const char *uri, ten_protocol_on_server_connected_func_t on_server_connected) { TEN_ASSERT(self && ten_protocol_check_integrity(self, true), @@ -356,6 +352,7 @@ bool ten_protocol_connect_to( TEN_ASSERT(ten_protocol_role_is_communication(self), "Only the communication protocol could connect to remote."); TEN_ASSERT(uri, "Should not happen."); + TEN_ASSERT(on_server_connected, "Should not happen."); if (self->attach_to == TEN_PROTOCOL_ATTACH_TO_CONNECTION && ten_connection_attach_to(self->attached_target.connection) == @@ -367,11 +364,13 @@ bool ten_protocol_connect_to( "Should not happen."); } - self->on_server_connected = on_server_connected; if (self->connect_to) { - return self->connect_to(self, uri); + self->connect_to(self, uri, on_server_connected); + } else { + // The protocol doesn't implement the 'connect_to' function, so the + // 'on_server_connected' callback is called directly. + on_server_connected(self, false); } - return false; } void ten_protocol_migrate(ten_protocol_t *self, ten_engine_t *engine, diff --git a/core/src/ten_runtime/timer/timer.c b/core/src/ten_runtime/timer/timer.c index c6d8654f25..5413e40313 100644 --- a/core/src/ten_runtime/timer/timer.c +++ b/core/src/ten_runtime/timer/timer.c @@ -135,6 +135,14 @@ static void ten_timer_on_trigger(ten_timer_t *self, self->on_trigger(self, self->on_trigger_data); } + if (!self->auto_restart) { + // If the timer is _not_ auto_restart, it will not automatically start the + // next round of timing or close itself after each timeout trigger. + // Instead, it will only do so when the user manually enables the timer + // again. + return; + } + if (self->requested_times == TEN_TIMER_INFINITE || self->times < self->requested_times) { // Setup the next timeout. @@ -164,6 +172,7 @@ static ten_timer_t *ten_timer_create_internal(ten_runloop_t *runloop) { self->id = 0; self->times = 0; + self->auto_restart = true; ten_loc_init_empty(&self->src_loc); @@ -194,7 +203,7 @@ static ten_timer_t *ten_timer_create_internal(ten_runloop_t *runloop) { } ten_timer_t *ten_timer_create(ten_runloop_t *runloop, uint64_t timeout_in_us, - int32_t requested_times) { + int32_t requested_times, bool auto_restart) { TEN_ASSERT(runloop && ten_runloop_check_integrity(runloop, true), "Should not happen."); @@ -205,6 +214,7 @@ ten_timer_t *ten_timer_create(ten_runloop_t *runloop, uint64_t timeout_in_us, self->timeout_in_us = timeout_in_us; self->requested_times = requested_times; + self->auto_restart = auto_restart; return self; } @@ -258,6 +268,14 @@ void ten_timer_enable(ten_timer_t *self) { ten_runloop_check_integrity(self->runloop, true), "Should not happen."); + if (self->requested_times != TEN_TIMER_INFINITE && + self->times >= self->requested_times) { + // The timer has ended, so it should not be enabled again. + ten_timer_stop_async(self); + ten_timer_close_async(self); + return; + } + ten_runloop_timer_set_timeout(self->backend, self->timeout_in_us / 1000, 0); ten_runloop_timer_start(self->backend, self->runloop, diff --git a/core/src/ten_utils/io/general/transport/transport.c b/core/src/ten_utils/io/general/transport/transport.c index 98e8fcb3bc..442a2c72b5 100644 --- a/core/src/ten_utils/io/general/transport/transport.c +++ b/core/src/ten_utils/io/general/transport/transport.c @@ -9,11 +9,11 @@ #include #include "include_internal/ten_utils/io/runloop.h" -#include "ten_utils/macro/check.h" #include "ten_utils/io/general/transport/backend/base.h" #include "ten_utils/io/general/transport/backend/factory.h" #include "ten_utils/lib/mutex.h" #include "ten_utils/lib/string.h" +#include "ten_utils/macro/check.h" #include "ten_utils/macro/mark.h" // Destroy all the resources hold by this transport object. @@ -50,6 +50,10 @@ ten_transport_t *ten_transport_create(ten_runloop_t *loop) { self->loop = loop; self->user_data = NULL; self->backend = NULL; + self->on_server_connected = NULL; + self->on_server_connected_data = NULL; + self->on_client_accepted = NULL; + self->on_client_accepted_data = NULL; self->on_closed = NULL; self->on_closed_data = NULL; self->drop_type = TEN_TRANSPORT_DROP_NEW; diff --git a/core/src/ten_utils/log/global.c b/core/src/ten_utils/log/global.c index ea7d6dc9b7..966b365070 100644 --- a/core/src/ten_utils/log/global.c +++ b/core/src/ten_utils/log/global.c @@ -25,6 +25,13 @@ void ten_log_global_set_output_level(TEN_LOG_LEVEL level) { ten_log_set_output_level(&ten_global_log, level); } +void ten_log_global_set_output_to_stderr(void) { + if (ten_log_is_output_to_file(&ten_global_log)) { + ten_log_output_to_file_deinit(&ten_global_log); + } + ten_log_set_output_to_stderr(&ten_global_log); +} + void ten_log_global_set_output_to_file(const char *log_path) { ten_log_set_output_to_file(&ten_global_log, log_path); } diff --git a/core/src/ten_utils/log/output.c b/core/src/ten_utils/log/output.c index caacca2d7c..242ca099d9 100644 --- a/core/src/ten_utils/log/output.c +++ b/core/src/ten_utils/log/output.c @@ -169,7 +169,7 @@ static int *get_log_fd(const char *log_path) { return fd_ptr; } -static void ten_log_output_to_file_cb(ten_string_t *msg, void *user_data) { +void ten_log_output_to_file_cb(ten_string_t *msg, void *user_data) { assert(msg && "Invalid argument."); if (!user_data) { @@ -243,3 +243,16 @@ void ten_log_set_output_to_stderr(ten_log_t *self) { ten_log_set_formatter(self, ten_log_default_formatter, NULL); #endif } + +void ten_log_output_to_file_deinit(ten_log_t *self) { + assert(self && "Invalid argument."); + assert(self->output.output_cb == ten_log_output_to_file_cb && + "Invalid argument."); + + free(self->output.user_data); +} + +bool ten_log_is_output_to_file(ten_log_t *self) { + assert(self && "Invalid argument."); + return self->output.output_cb == ten_log_output_to_file_cb; +} diff --git a/packages/core_protocols/msgpack/protocol.c b/packages/core_protocols/msgpack/protocol.c index 3facd12e89..ba1d170922 100644 --- a/packages/core_protocols/msgpack/protocol.c +++ b/packages/core_protocols/msgpack/protocol.c @@ -78,6 +78,11 @@ static void ten_protocol_msgpack_on_create_instance( (ten_protocol_integrated_on_input_func_t)ten_protocol_msgpack_on_input, (ten_protocol_integrated_on_output_func_t)ten_protocol_msgpack_on_output); + // Configure the retry mechanism. + self->base.retry_config.enable = true; + self->base.retry_config.interval_ms = 500; + self->base.retry_config.max_retries = 5; + ten_msgpack_parser_init(&self->parser); ten_env_on_create_instance_done(ten_env, self, context, NULL); diff --git a/tests/ten_runtime/smoke/extension_test/basic/basic_multi_app.cc b/tests/ten_runtime/smoke/extension_test/basic/basic_multi_app.cc index 484171891d..a84ae7bea1 100644 --- a/tests/ten_runtime/smoke/extension_test/basic/basic_multi_app.cc +++ b/tests/ten_runtime/smoke/extension_test/basic/basic_multi_app.cc @@ -7,7 +7,6 @@ #include #include #include -#include #include "gtest/gtest.h" #include "include_internal/ten_runtime/binding/cpp/ten.h" @@ -137,8 +136,6 @@ TEST(ExtensionTest, BasicMultiApp) { // NOLINT app2_thread = ten_thread_create("app thread 2", app_thread_2_main, nullptr); app1_thread = ten_thread_create("app thread 1", app_thread_1_main, nullptr); - ten_sleep(300); - // In a scenario which contains multiple TEN app, the construction of a // graph might failed because not all TEN app has already been launched // successfully. diff --git a/tests/ten_runtime/smoke/extension_test/basic/basic_multi_app_close_through_engine.cc b/tests/ten_runtime/smoke/extension_test/basic/basic_multi_app_close_through_engine.cc index b92af23a9a..74084d8305 100644 --- a/tests/ten_runtime/smoke/extension_test/basic/basic_multi_app_close_through_engine.cc +++ b/tests/ten_runtime/smoke/extension_test/basic/basic_multi_app_close_through_engine.cc @@ -6,7 +6,6 @@ // #include #include -#include #include "gtest/gtest.h" #include "include_internal/ten_runtime/binding/cpp/ten.h" @@ -133,8 +132,6 @@ TEST(ExtensionTest, BasicMultiAppCloseThroughEngine) { // NOLINT auto *app_thread_1 = ten_thread_create("app thread 1", app_thread_1_main, nullptr); - ten_sleep(300); - ten::msgpack_tcp_client_t *client = nullptr; for (size_t i = 0; i < MULTIPLE_APP_SCENARIO_GRAPH_CONSTRUCTION_RETRY_TIMES; diff --git a/tests/ten_runtime/smoke/extension_test/basic/basic_throw_exception_in_extension.cc b/tests/ten_runtime/smoke/extension_test/basic/basic_throw_exception_in_extension.cc index 20aac24b31..5db6536961 100644 --- a/tests/ten_runtime/smoke/extension_test/basic/basic_throw_exception_in_extension.cc +++ b/tests/ten_runtime/smoke/extension_test/basic/basic_throw_exception_in_extension.cc @@ -6,7 +6,6 @@ // #include #include -#include #include "gtest/gtest.h" #include "include_internal/ten_runtime/binding/cpp/ten.h" @@ -160,14 +159,14 @@ void *app_thread_3_main(TEN_UNUSED void *args) { return nullptr; } -TEN_CPP_REGISTER_ADDON_AS_EXTENSION(basic_throw_exception_in_extension__extension_1, - test_extension_1); -TEN_CPP_REGISTER_ADDON_AS_EXTENSION(basic_throw_exception_in_extension__extension_2, - test_extension_2); -TEN_CPP_REGISTER_ADDON_AS_EXTENSION(basic_throw_exception_in_extension__extension_3, - test_extension_3); -TEN_CPP_REGISTER_ADDON_AS_EXTENSION(basic_throw_exception_in_extension__extension_4, - test_extension_4); +TEN_CPP_REGISTER_ADDON_AS_EXTENSION( + basic_throw_exception_in_extension__extension_1, test_extension_1); +TEN_CPP_REGISTER_ADDON_AS_EXTENSION( + basic_throw_exception_in_extension__extension_2, test_extension_2); +TEN_CPP_REGISTER_ADDON_AS_EXTENSION( + basic_throw_exception_in_extension__extension_3, test_extension_3); +TEN_CPP_REGISTER_ADDON_AS_EXTENSION( + basic_throw_exception_in_extension__extension_4, test_extension_4); } // namespace @@ -180,8 +179,6 @@ TEST(ExtensionTest, BasicThrowExceptionInExtension) { // NOLINT auto *app_1_thread = ten_thread_create("app thread 1", app_thread_1_main, nullptr); - ten_sleep(300); - // Create a client and connect to the app. ten::msgpack_tcp_client_t *client = nullptr; diff --git a/tests/ten_runtime/smoke/extension_test/command/command_stop_graph_actively.cc b/tests/ten_runtime/smoke/extension_test/command/command_stop_graph_actively.cc index 21cb74d11f..0260ae804f 100644 --- a/tests/ten_runtime/smoke/extension_test/command/command_stop_graph_actively.cc +++ b/tests/ten_runtime/smoke/extension_test/command/command_stop_graph_actively.cc @@ -6,7 +6,6 @@ // #include #include -#include #include "gtest/gtest.h" #include "include_internal/ten_runtime/binding/cpp/ten.h" @@ -191,8 +190,6 @@ TEST(ExtensionTest, CommandStopGraphActively) { // NOLINT auto *app_thread_1 = ten_thread_create("app thread 1", app_thread_1_main, nullptr); - ten_sleep(300); - // Create a client and connect to the app. ten::msgpack_tcp_client_t *client = nullptr; diff --git a/tests/ten_runtime/smoke/extension_test/command/command_stop_graph_actively_through_cmd.cc b/tests/ten_runtime/smoke/extension_test/command/command_stop_graph_actively_through_cmd.cc index ec1dc10d89..36bdb2df35 100644 --- a/tests/ten_runtime/smoke/extension_test/command/command_stop_graph_actively_through_cmd.cc +++ b/tests/ten_runtime/smoke/extension_test/command/command_stop_graph_actively_through_cmd.cc @@ -6,7 +6,6 @@ // #include #include -#include #include "gtest/gtest.h" #include "include_internal/ten_runtime/binding/cpp/ten.h" @@ -190,8 +189,6 @@ TEST(ExtensionTest, CommandStopGraphActivelyThroughCmd) { // NOLINT auto *app_thread_1 = ten_thread_create("app thread 1", app_thread_1_main, nullptr); - ten_sleep(300); - // Create a client and connect to the app. ten::msgpack_tcp_client_t *client = nullptr; diff --git a/tests/ten_runtime/smoke/extension_test/command/command_stop_graph_actively_through_cmd_dest.cc b/tests/ten_runtime/smoke/extension_test/command/command_stop_graph_actively_through_cmd_dest.cc index 2576e095c1..e3d10f6c8b 100644 --- a/tests/ten_runtime/smoke/extension_test/command/command_stop_graph_actively_through_cmd_dest.cc +++ b/tests/ten_runtime/smoke/extension_test/command/command_stop_graph_actively_through_cmd_dest.cc @@ -6,7 +6,6 @@ // #include #include -#include #include "gtest/gtest.h" #include "include_internal/ten_runtime/binding/cpp/ten.h" @@ -161,14 +160,18 @@ void *app_thread_3_main(TEN_UNUSED void *args) { return nullptr; } -TEN_CPP_REGISTER_ADDON_AS_EXTENSION(command_stop_graph_actively_through_cmd_dest__extension_1, - test_extension_1); -TEN_CPP_REGISTER_ADDON_AS_EXTENSION(command_stop_graph_actively_through_cmd_dest__extension_2, - test_extension_2); -TEN_CPP_REGISTER_ADDON_AS_EXTENSION(command_stop_graph_actively_through_cmd_dest__extension_3, - test_extension_3); -TEN_CPP_REGISTER_ADDON_AS_EXTENSION(command_stop_graph_actively_through_cmd_dest__extension_4, - test_extension_4); +TEN_CPP_REGISTER_ADDON_AS_EXTENSION( + command_stop_graph_actively_through_cmd_dest__extension_1, + test_extension_1); +TEN_CPP_REGISTER_ADDON_AS_EXTENSION( + command_stop_graph_actively_through_cmd_dest__extension_2, + test_extension_2); +TEN_CPP_REGISTER_ADDON_AS_EXTENSION( + command_stop_graph_actively_through_cmd_dest__extension_3, + test_extension_3); +TEN_CPP_REGISTER_ADDON_AS_EXTENSION( + command_stop_graph_actively_through_cmd_dest__extension_4, + test_extension_4); } // namespace @@ -181,8 +184,6 @@ TEST(ExtensionTest, CommandStopGraphActivelyThroughCmdDest) { // NOLINT auto *app_thread_1 = ten_thread_create("app thread 1", app_thread_1_main, nullptr); - ten_sleep(300); - // Create a client and connect to the app. ten::msgpack_tcp_client_t *client = nullptr; diff --git a/tests/ten_runtime/smoke/extension_test/concurrent/multi_app_concurrent.cc b/tests/ten_runtime/smoke/extension_test/concurrent/multi_app_concurrent.cc index 2369fd7dff..f708b37064 100644 --- a/tests/ten_runtime/smoke/extension_test/concurrent/multi_app_concurrent.cc +++ b/tests/ten_runtime/smoke/extension_test/concurrent/multi_app_concurrent.cc @@ -219,8 +219,6 @@ TEST(ExtensionTest, DISABLED_MultiAppConcurrent) { // NOLINT auto *app_thread_1 = ten_thread_create("app thread 1", app_thread_1_main, nullptr); - ten_sleep(300); - std::vector client_threads; for (size_t i = 0; i < ONE_ENGINE_ONE_CLIENT_CONCURRENT_CNT; ++i) { diff --git a/tests/ten_runtime/smoke/extension_test/concurrent/multi_app_sequential.cc b/tests/ten_runtime/smoke/extension_test/concurrent/multi_app_sequential.cc index fbb6d5f65a..075119a3da 100644 --- a/tests/ten_runtime/smoke/extension_test/concurrent/multi_app_sequential.cc +++ b/tests/ten_runtime/smoke/extension_test/concurrent/multi_app_sequential.cc @@ -6,7 +6,6 @@ // #include #include -#include #include "gtest/gtest.h" #include "include_internal/ten_runtime/binding/cpp/ten.h" @@ -131,8 +130,6 @@ TEST(ExtensionTest, MultiAppSequential) { // NOLINT auto *app_thread_1 = ten_thread_create("app thread 1", app_thread_1_main, nullptr); - ten_sleep(300); - for (size_t i = 0; i < SEQUENTIAL_CLIENT_CNT; ++i) { ten::msgpack_tcp_client_t *client = nullptr; diff --git a/tests/ten_runtime/smoke/extension_test/concurrent/one_engine_concurrent.cc b/tests/ten_runtime/smoke/extension_test/concurrent/one_engine_concurrent.cc index 40da8e75b7..b2cfbb7946 100644 --- a/tests/ten_runtime/smoke/extension_test/concurrent/one_engine_concurrent.cc +++ b/tests/ten_runtime/smoke/extension_test/concurrent/one_engine_concurrent.cc @@ -156,8 +156,6 @@ TEST(ExtensionTest, OneEngineConcurrent) { // NOLINT auto *app_thread_1 = ten_thread_create("app thread 1", app_thread_1_main, nullptr); - ten_sleep(300); - // Create a client and connect to the app. ten::msgpack_tcp_client_t *client = nullptr; diff --git a/tests/ten_runtime/smoke/extension_test/engine/engine_long_running_mode.cc b/tests/ten_runtime/smoke/extension_test/engine/engine_long_running_mode.cc index 5cdf99de77..e21295f98b 100644 --- a/tests/ten_runtime/smoke/extension_test/engine/engine_long_running_mode.cc +++ b/tests/ten_runtime/smoke/extension_test/engine/engine_long_running_mode.cc @@ -6,7 +6,6 @@ // #include #include -#include #include "gtest/gtest.h" #include "include_internal/ten_runtime/binding/cpp/ten.h" @@ -117,8 +116,6 @@ TEST(ExtensionTest, EngineLongRunningMode) { // NOLINT auto *app_thread_1 = ten_thread_create("app thread 1", app_thread_1_main, nullptr); - ten_sleep(300); - // Create a client and connect to the app. ten::msgpack_tcp_client_t *client = nullptr; std::string graph_id; diff --git a/tests/ten_runtime/smoke/extension_test/graph/graph_loop_in_multi_app.cc b/tests/ten_runtime/smoke/extension_test/graph/graph_loop_in_multi_app.cc index 8b6bcd7823..e41a64c8ce 100644 --- a/tests/ten_runtime/smoke/extension_test/graph/graph_loop_in_multi_app.cc +++ b/tests/ten_runtime/smoke/extension_test/graph/graph_loop_in_multi_app.cc @@ -6,7 +6,6 @@ // #include #include -#include #include "gtest/gtest.h" #include "include_internal/ten_runtime/binding/cpp/ten.h" @@ -181,8 +180,6 @@ TEST(ExtensionTest, GraphLoopInMultiApp) { // NOLINT auto *app_thread_1 = ten_thread_create("app thread 1", app_thread_1_main, nullptr); - ten_sleep(300); - // Create a client and connect to the app. ten::msgpack_tcp_client_t *client = nullptr; std::string graph_id; diff --git a/tests/ten_runtime/smoke/extension_test/graph/graph_multiple_polygon.cc b/tests/ten_runtime/smoke/extension_test/graph/graph_multiple_polygon.cc index 2f70360afb..72778b3cbd 100644 --- a/tests/ten_runtime/smoke/extension_test/graph/graph_multiple_polygon.cc +++ b/tests/ten_runtime/smoke/extension_test/graph/graph_multiple_polygon.cc @@ -215,8 +215,6 @@ TEST(ExtensionTest, GraphMultiplePolygon) { // NOLINT auto *app_thread1 = ten_thread_create("app thread 1", app_thread_1_main, nullptr); - ten_sleep(300); - // Create a client and connect to the app. ten::msgpack_tcp_client_t *client = nullptr; diff --git a/tests/ten_runtime/smoke/extension_test/graph/graph_y_shape_in_multi_app.cc b/tests/ten_runtime/smoke/extension_test/graph/graph_y_shape_in_multi_app.cc index a5b09cce19..fcabf8a4ea 100644 --- a/tests/ten_runtime/smoke/extension_test/graph/graph_y_shape_in_multi_app.cc +++ b/tests/ten_runtime/smoke/extension_test/graph/graph_y_shape_in_multi_app.cc @@ -6,7 +6,6 @@ // #include #include -#include #include "gtest/gtest.h" #include "include_internal/ten_runtime/binding/cpp/ten.h" @@ -180,8 +179,6 @@ TEST(ExtensionTest, GraphYShapeInMultiApp) { // NOLINT auto *app_thread_1 = ten_thread_create("app thread 1", app_thread_1_main, nullptr); - ten_sleep(300); - // Create a client and connect to the app. ten::msgpack_tcp_client_t *client = nullptr; std::string graph_id; diff --git a/tests/ten_runtime/smoke/extension_test/graph_name/graph_name_basic.cc b/tests/ten_runtime/smoke/extension_test/graph_name/graph_name_basic.cc index 9bdeabb4e0..89db8807d8 100644 --- a/tests/ten_runtime/smoke/extension_test/graph_name/graph_name_basic.cc +++ b/tests/ten_runtime/smoke/extension_test/graph_name/graph_name_basic.cc @@ -5,7 +5,6 @@ // Refer to the "LICENSE" file in the root directory for more information. // #include -#include #include "gtest/gtest.h" #include "include_internal/ten_runtime/binding/cpp/ten.h" @@ -125,8 +124,6 @@ TEST(ExtensionTest, GraphNameBasic) { // NOLINT auto *app_thread_1 = ten_thread_create("app thread 1", app_thread_1_main, nullptr); - ten_sleep(300); - // extension1(app1) --> extension3(app2) --> extension2(app1) --> return ten::msgpack_tcp_client_t *client = nullptr; std::string graph_id; diff --git a/tests/ten_runtime/smoke/extension_test/multi_dest/multi_dest_in_multi_app.cc b/tests/ten_runtime/smoke/extension_test/multi_dest/multi_dest_in_multi_app.cc index 2448e51dbc..bca5a0ce6a 100644 --- a/tests/ten_runtime/smoke/extension_test/multi_dest/multi_dest_in_multi_app.cc +++ b/tests/ten_runtime/smoke/extension_test/multi_dest/multi_dest_in_multi_app.cc @@ -6,7 +6,6 @@ // #include #include -#include #include "gtest/gtest.h" #include "include_internal/ten_runtime/binding/cpp/ten.h" @@ -57,8 +56,9 @@ return nullptr; \ } -#define REGISTER_EXTENSION(N) \ - TEN_CPP_REGISTER_ADDON_AS_EXTENSION(multi_dest_in_multi_app__extension_##N, test_extension_##N); +#define REGISTER_EXTENSION(N) \ + TEN_CPP_REGISTER_ADDON_AS_EXTENSION(multi_dest_in_multi_app__extension_##N, \ + test_extension_##N); #define START_APP(N) \ auto test_app_##N##_thread = \ @@ -156,14 +156,6 @@ TEST(ExtensionTest, MultiDestInMultiApp) { // NOLINT START_APP(4) START_APP(5) - // TODO(Wei): When apps are not started completely, and the client sends the - // 'start_graph' command to them, apps could not form a complete graph (ex: - // app 3 is not started completely yet, and app 2 tries to send the - // 'start_graph' command to it), so we need to add a delay here, or we need to - // design a mechanism which could tell us that the apps in question are all - // ready to accept incoming messages. - ten_sleep(1000); - // Create a client and connect to the app. ten::msgpack_tcp_client_t *client = nullptr; diff --git a/tests/ten_runtime/smoke/extension_test/multi_dest/multi_dest_in_multi_app_with_response_handler.cc b/tests/ten_runtime/smoke/extension_test/multi_dest/multi_dest_in_multi_app_with_response_handler.cc index f0fc6effb9..7ddd438e76 100644 --- a/tests/ten_runtime/smoke/extension_test/multi_dest/multi_dest_in_multi_app_with_response_handler.cc +++ b/tests/ten_runtime/smoke/extension_test/multi_dest/multi_dest_in_multi_app_with_response_handler.cc @@ -6,7 +6,6 @@ // #include #include -#include #include "gtest/gtest.h" #include "include_internal/ten_runtime/binding/cpp/ten.h" @@ -57,8 +56,10 @@ return nullptr; \ } -#define REGISTER_EXTENSION(N) \ - TEN_CPP_REGISTER_ADDON_AS_EXTENSION(multi_dest_in_multi_app_with_result_handler__extension_##N, test_extension_##N); +#define REGISTER_EXTENSION(N) \ + TEN_CPP_REGISTER_ADDON_AS_EXTENSION( \ + multi_dest_in_multi_app_with_result_handler__extension_##N, \ + test_extension_##N); #define START_APP(N) \ auto test_app_##N##_thread = \ @@ -156,14 +157,6 @@ TEST(ExtensionTest, MultiDestInMultiAppWithResponseHandler) { // NOLINT START_APP(4) START_APP(5) - // TODO(Wei): When apps are not started completely, and the client sends the - // 'start_graph' command to them, apps could not form a complete graph (ex: - // app 3 is not started completely yet, and app 2 tries to send the - // 'start_graph' command to it), so we need to add a delay here, or we need to - // design a mechanism which could tell us that the apps in question are all - // ready to accept incoming messages. - ten_sleep(1000); - // Create a client and connect to the app. ten::msgpack_tcp_client_t *client = nullptr; diff --git a/tests/ten_runtime/smoke/extension_test/multi_dest/multi_dest_in_multi_app_with_response_handler_lambda.cc b/tests/ten_runtime/smoke/extension_test/multi_dest/multi_dest_in_multi_app_with_response_handler_lambda.cc index fbdcec669e..3a19570348 100644 --- a/tests/ten_runtime/smoke/extension_test/multi_dest/multi_dest_in_multi_app_with_response_handler_lambda.cc +++ b/tests/ten_runtime/smoke/extension_test/multi_dest/multi_dest_in_multi_app_with_response_handler_lambda.cc @@ -6,7 +6,6 @@ // #include #include -#include #include "gtest/gtest.h" #include "include_internal/ten_runtime/binding/cpp/ten.h" @@ -59,8 +58,10 @@ return nullptr; \ } -#define REGISTER_EXTENSION(N) \ - TEN_CPP_REGISTER_ADDON_AS_EXTENSION(multi_dest_in_multi_app_with_result_handler_lambda__extension_##N, test_extension_##N); +#define REGISTER_EXTENSION(N) \ + TEN_CPP_REGISTER_ADDON_AS_EXTENSION( \ + multi_dest_in_multi_app_with_result_handler_lambda__extension_##N, \ + test_extension_##N); #define START_APP(N) \ auto test_app_##N##_thread = \ @@ -161,14 +162,6 @@ TEST(ExtensionTest, MultiDestInMultiAppWithResponseHandlerLambda) { // NOLINT START_APP(4) START_APP(5) - // TODO(Wei): When apps are not started completely, and the client sends the - // 'start_graph' command to them, apps could not form a complete graph (ex: - // app 3 is not started completely yet, and app 2 tries to send the - // 'start_graph' command to it), so we need to add a delay here, or we need to - // design a mechanism which could tell us that the apps in question are all - // ready to accept incoming messages. - ten_sleep(1000); - // Create a client and connect to the app. ten::msgpack_tcp_client_t *client = nullptr; diff --git a/tests/ten_runtime/smoke/extension_test/multi_dest/multi_dest_resp_when_all.cc b/tests/ten_runtime/smoke/extension_test/multi_dest/multi_dest_resp_when_all.cc index 4b11c6429a..0030c64549 100644 --- a/tests/ten_runtime/smoke/extension_test/multi_dest/multi_dest_resp_when_all.cc +++ b/tests/ten_runtime/smoke/extension_test/multi_dest/multi_dest_resp_when_all.cc @@ -10,7 +10,6 @@ #include "gtest/gtest.h" #include "include_internal/ten_runtime/binding/cpp/ten.h" #include "ten_utils/lib/thread.h" -#include "ten_utils/lib/time.h" #include "tests/common/client/cpp/msgpack_tcp.h" #include "tests/ten_runtime/smoke/extension_test/util/binding/cpp/check.h" @@ -167,10 +166,6 @@ TEST(ExtensionTest, MultiDestRespWhenAll) { ten_test::check_result_is(resp, "137", TEN_STATUS_CODE_OK, "hello world, too"); - // Wait for some time to ensure that the extension 1 will not receive any more - // results. - ten_sleep(100); - delete client; ten_thread_join(app_thread, -1); diff --git a/tests/ten_runtime/smoke/extension_test/multi_dest/multi_dest_resp_when_all_in_multi_app.cc b/tests/ten_runtime/smoke/extension_test/multi_dest/multi_dest_resp_when_all_in_multi_app.cc index ca50341d2a..a6fc45f6db 100644 --- a/tests/ten_runtime/smoke/extension_test/multi_dest/multi_dest_resp_when_all_in_multi_app.cc +++ b/tests/ten_runtime/smoke/extension_test/multi_dest/multi_dest_resp_when_all_in_multi_app.cc @@ -163,8 +163,6 @@ TEST(ExtensionTest, MultiDestRespWhenAllInMultiApp) { // NOLINT auto *app_1_thread = ten_thread_create("app thread 1", app_thread_1_main, nullptr); - ten_sleep(300); - // Create a client and connect to the app. ten::msgpack_tcp_client_t *client = nullptr; diff --git a/tests/ten_runtime/smoke/extension_test/predefined_graph/predefined_graph_multi_app.cc b/tests/ten_runtime/smoke/extension_test/predefined_graph/predefined_graph_multi_app.cc index ca8a9413be..b3f5ca017f 100644 --- a/tests/ten_runtime/smoke/extension_test/predefined_graph/predefined_graph_multi_app.cc +++ b/tests/ten_runtime/smoke/extension_test/predefined_graph/predefined_graph_multi_app.cc @@ -69,7 +69,7 @@ class test_app_1 : public ten::app_t { "log_level": 2, "predefined_graphs": [{ "name": "default", - "auto_start": false, + "auto_start": true, "singleton": true, "nodes": [{ "type": "extension", @@ -146,12 +146,13 @@ TEN_CPP_REGISTER_ADDON_AS_EXTENSION(predefined_graph_multi_app__extension_2, TEST(ExtensionTest, PredefinedGraphMultiApp) { // NOLINT // Start app. - auto *app_2_thread = - ten_thread_create("app thread 2", app_thread_2_main, nullptr); auto *app_1_thread = ten_thread_create("app thread 1", app_thread_1_main, nullptr); - ten_sleep(300); + // Used to verify the retry mechanism of the protocol. + ten_sleep(1000); + auto *app_2_thread = + ten_thread_create("app thread 2", app_thread_2_main, nullptr); // Create a client and connect to the app. auto *client = new ten::msgpack_tcp_client_t("msgpack://127.0.0.1:8001/"); diff --git a/tests/ten_runtime/smoke/extension_test/start_graph/start_predefined_graph_cross_app.cc b/tests/ten_runtime/smoke/extension_test/start_graph/start_predefined_graph_cross_app.cc index 778b96c0e5..2adf2e33cb 100644 --- a/tests/ten_runtime/smoke/extension_test/start_graph/start_predefined_graph_cross_app.cc +++ b/tests/ten_runtime/smoke/extension_test/start_graph/start_predefined_graph_cross_app.cc @@ -6,7 +6,6 @@ // #include "gtest/gtest.h" #include "include_internal/ten_runtime/binding/cpp/ten.h" -#include "ten_utils/lib/time.h" #include "tests/common/client/cpp/msgpack_tcp.h" #include "tests/ten_runtime/smoke/extension_test/util/binding/cpp/check.h" @@ -251,8 +250,6 @@ TEST(ExtensionTest, StartPredefinedGraphCrossApp) { // NOLINT auto *app_2_thread = ten_thread_create("app thread 2", app_thread_2_main, nullptr); - ten_sleep(300); - // Create a client and connect to the app. auto *client = new ten::msgpack_tcp_client_t("msgpack://127.0.0.1:8001/"); diff --git a/tests/ten_runtime/smoke/extension_test/start_graph/start_two_predefined_graphs.cc b/tests/ten_runtime/smoke/extension_test/start_graph/start_two_predefined_graphs.cc index ddac8cf46c..397163ffcd 100644 --- a/tests/ten_runtime/smoke/extension_test/start_graph/start_two_predefined_graphs.cc +++ b/tests/ten_runtime/smoke/extension_test/start_graph/start_two_predefined_graphs.cc @@ -6,7 +6,6 @@ // #include "gtest/gtest.h" #include "include_internal/ten_runtime/binding/cpp/ten.h" -#include "ten_utils/lib/time.h" #include "tests/common/client/cpp/msgpack_tcp.h" #include "tests/ten_runtime/smoke/extension_test/util/binding/cpp/check.h" @@ -264,8 +263,6 @@ TEST(ExtensionTest, StartTwoPredefinedGraphs) { // NOLINT auto *app_2_thread = ten_thread_create("app thread 2", app_thread_2_main, nullptr); - ten_sleep(300); - // Create a client and connect to the app. auto *client = new ten::msgpack_tcp_client_t("msgpack://127.0.0.1:8001/"); diff --git a/tests/ten_runtime/smoke/graph_test/group_node_missing_2_apps.cc b/tests/ten_runtime/smoke/graph_test/group_node_missing_2_apps.cc index 42007d8214..ce98469ac6 100644 --- a/tests/ten_runtime/smoke/graph_test/group_node_missing_2_apps.cc +++ b/tests/ten_runtime/smoke/graph_test/group_node_missing_2_apps.cc @@ -134,8 +134,6 @@ TEST(GraphTest, GroupNodeMissing2Apps) { // NOLINT app2_thread = ten_thread_create("app thread 2", app_thread_2_main, nullptr); app1_thread = ten_thread_create("app thread 1", app_thread_1_main, nullptr); - ten_sleep(300); - // In a scenario which contains multiple TEN app, the construction of a // graph might failed because not all TEN app has already been launched // successfully.