From 7fa12991b71f5a791c45cb406d7cc934580e5b48 Mon Sep 17 00:00:00 2001 From: Hu Yueh-Wei Date: Sun, 10 Nov 2024 17:37:54 +0800 Subject: [PATCH 1/4] feat: add immediately return result mode --- .vscode/launch.json | 2 +- core/include_internal/ten_runtime/path/path.h | 2 + .../ten_runtime/path/path_group.h | 11 +- .../engine/msg_interface/start_graph.c | 5 +- core/src/ten_runtime/extension/extension.c | 5 +- .../ten_runtime/extension/internal/close.c | 3 + core/src/ten_runtime/extension/msg_handling.c | 19 +-- core/src/ten_runtime/path/path.c | 16 +- core/src/ten_runtime/path/path_group.c | 26 ++- core/src/ten_runtime/path/path_table.c | 149 ++++++++---------- 10 files changed, 107 insertions(+), 131 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index 04a5442513..37c3f66dcd 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -137,7 +137,7 @@ "request": "launch", "program": "${workspaceFolder}/out/linux/x64/tests/standalone/ten_runtime_smoke_test", "args": [ - "--gtest_filter=ExtensionTest.OneEngineConcurrent" + "--gtest_filter=ExtensionTest.MultiDestSendInStopPeriod" ], "cwd": "${workspaceFolder}/out/linux/x64/tests/standalone/", "env": { diff --git a/core/include_internal/ten_runtime/path/path.h b/core/include_internal/ten_runtime/path/path.h index 72bde25c1b..9c860c7222 100644 --- a/core/include_internal/ten_runtime/path/path.h +++ b/core/include_internal/ten_runtime/path/path.h @@ -83,3 +83,5 @@ TEN_RUNTIME_PRIVATE_API void ten_path_set_result(ten_path_t *path, TEN_RUNTIME_PRIVATE_API void ten_path_set_expired_time( ten_path_t *path, uint64_t expired_time_us); + +TEN_RUNTIME_PRIVATE_API ten_path_group_t *ten_path_get_group(ten_path_t *self); diff --git a/core/include_internal/ten_runtime/path/path_group.h b/core/include_internal/ten_runtime/path/path_group.h index b3d8934b8c..bba57ef883 100644 --- a/core/include_internal/ten_runtime/path/path_group.h +++ b/core/include_internal/ten_runtime/path/path_group.h @@ -135,10 +135,13 @@ typedef enum TEN_PATH_GROUP_POLICY { // If receive a fail result, return it, otherwise, when all OK results are // received, return the first received one. Clear the group after returning // the result. - TEN_PATH_GROUP_POLICY_ONE_FAIL_RETURN_AND_ALL_OK_RETURN_FIRST, + TEN_PATH_GROUP_POLICY_RETURN_FIRST_OK_OR_FAIL, // Similar to the above, except return the last received one. - TEN_PATH_GROUP_POLICY_ONE_FAIL_RETURN_AND_ALL_OK_RETURN_LAST, + TEN_PATH_GROUP_POLICY_RETURN_LAST_OK_OR_FAIL, + + // Return each result immediately as it is received. + TEN_PATH_GROUP_POLICY_RETURN_EACH_IMMEDIATELY, // More modes is allowed, and could be added here in case needed. } TEN_PATH_GROUP_POLICY; @@ -150,7 +153,9 @@ typedef struct ten_path_group_t { ten_path_table_t *table; TEN_PATH_GROUP_POLICY policy; - ten_list_t members; // Contain the members of the group. + + // Contain the members of the group. + ten_list_t members; // ten_path_t // If this flag is set, none of the paths in the path_group can be used to // trace back cmd results anymore. diff --git a/core/src/ten_runtime/engine/msg_interface/start_graph.c b/core/src/ten_runtime/engine/msg_interface/start_graph.c index b7245efdce..1dc90a841b 100644 --- a/core/src/ten_runtime/engine/msg_interface/start_graph.c +++ b/core/src/ten_runtime/engine/msg_interface/start_graph.c @@ -124,9 +124,8 @@ void ten_engine_handle_cmd_start_graph(ten_engine_t *self, if (ten_list_size(&new_works) > 1) { // Create path group for these newly submitted 'start_graph' commands. - ten_paths_create_group( - &new_works, - TEN_PATH_GROUP_POLICY_ONE_FAIL_RETURN_AND_ALL_OK_RETURN_LAST); + ten_paths_create_group(&new_works, + TEN_PATH_GROUP_POLICY_RETURN_LAST_OK_OR_FAIL); } ten_list_clear(&new_works); diff --git a/core/src/ten_runtime/extension/extension.c b/core/src/ten_runtime/extension/extension.c index 5bab43f205..13aad138ba 100644 --- a/core/src/ten_runtime/extension/extension.c +++ b/core/src/ten_runtime/extension/extension.c @@ -711,9 +711,8 @@ bool ten_extension_handle_out_msg(ten_extension_t *self, ten_shared_ptr_t *msg, if (ten_list_size(&result_out_paths) > 1) { // Create a path group in this case. - ten_paths_create_group( - &result_out_paths, - TEN_PATH_GROUP_POLICY_ONE_FAIL_RETURN_AND_ALL_OK_RETURN_LAST); + ten_paths_create_group(&result_out_paths, + TEN_PATH_GROUP_POLICY_RETURN_LAST_OK_OR_FAIL); } ten_list_clear(&result_out_paths); diff --git a/core/src/ten_runtime/extension/internal/close.c b/core/src/ten_runtime/extension/internal/close.c index 94dc0024f5..169fd77e5c 100644 --- a/core/src/ten_runtime/extension/internal/close.c +++ b/core/src/ten_runtime/extension/internal/close.c @@ -53,6 +53,8 @@ static void ten_extension_thread_process_remaining_paths( ten_extension_get_name(extension, true), out_paths_cnt); ten_list_t cmd_result_list = TEN_LIST_INIT_VAL; + + // Generate an error result for each remaining out path. ten_list_foreach (out_paths, iter) { ten_path_t *path = (ten_path_t *)ten_ptr_listnode_get(iter.node); TEN_ASSERT(path && ten_path_check_integrity(path, true), @@ -72,6 +74,7 @@ static void ten_extension_thread_process_remaining_paths( ten_shared_ptr_destroy(cmd_result); } + // Send these newly generated error results to the extension. ten_list_foreach (&cmd_result_list, iter) { ten_shared_ptr_t *cmd_result = ten_smart_ptr_listnode_get(iter.node); TEN_ASSERT(cmd_result && ten_cmd_base_check_integrity(cmd_result), diff --git a/core/src/ten_runtime/extension/msg_handling.c b/core/src/ten_runtime/extension/msg_handling.c index 8f7e20d143..e4fa1c3e70 100644 --- a/core/src/ten_runtime/extension/msg_handling.c +++ b/core/src/ten_runtime/extension/msg_handling.c @@ -101,6 +101,8 @@ void ten_extension_handle_in_msg(ten_extension_t *self, ten_shared_ptr_t *msg) { // 'commands' before sending it to the extension. if (msg_is_cmd_result) { + // =-=-= + // Set the cmd result to the corresponding OUT path to indicate that // there has been a cmd result flow through that OUT path. ten_path_t *out_path = @@ -152,7 +154,6 @@ void ten_extension_handle_in_msg(ten_extension_t *self, ten_shared_ptr_t *msg) { // TODO(Xilin): Currently, there is no mechanism for auto return, so the // relevant codes should be able to be disabled. ten_extension_cache_cmd_result_to_in_path_for_auto_return(self, msg); - delete_msg = true; } } @@ -199,8 +200,6 @@ void ten_extension_handle_in_msg(ten_extension_t *self, ten_shared_ptr_t *msg) { if (!msg_is_cmd_result) { // Create the corresponding IN paths for the input commands. - ten_list_t in_paths = TEN_LIST_INIT_VAL; - ten_list_foreach (&converted_msgs, iter) { ten_msg_and_its_result_conversion_t *msg_and_result_conversion = ten_ptr_listnode_get(iter.node); @@ -213,24 +212,12 @@ void ten_extension_handle_in_msg(ten_extension_t *self, ten_shared_ptr_t *msg) { if (ten_msg_is_cmd(actual_cmd)) { if (ten_msg_info[ten_msg_get_type(actual_cmd)].create_in_path) { // Add a path entry to the IN path table of this extension. - ten_path_t *in_path = (ten_path_t *)ten_path_table_add_in_path( + ten_path_table_add_in_path( self->path_table, actual_cmd, msg_and_result_conversion->result_conversion); - - ten_list_push_ptr_back(&in_paths, in_path, NULL); } } } - - if (ten_list_size(&in_paths) > 1) { - // Create a path group in this case. - - ten_paths_create_group( - &in_paths, - TEN_PATH_GROUP_POLICY_ONE_FAIL_RETURN_AND_ALL_OK_RETURN_LAST); - } - - ten_list_clear(&in_paths); } // The path table processing is completed, it's time to check the schema. And diff --git a/core/src/ten_runtime/path/path.c b/core/src/ten_runtime/path/path.c index 97a2c9dc83..ec0c9ccbaf 100644 --- a/core/src/ten_runtime/path/path.c +++ b/core/src/ten_runtime/path/path.c @@ -170,10 +170,7 @@ void ten_path_set_result(ten_path_t *path, ten_shared_ptr_t *cmd_result) { if (ten_path_is_in_a_group(path)) { // Move the current path to the last of the members of the group, so that we // can know which one should be returned in different policies. - ten_path_group_t *path_group = - (ten_path_group_t *)ten_shared_ptr_get_data(path->group); - TEN_ASSERT(path_group && ten_path_group_check_integrity(path_group, true), - "Invalid argument."); + ten_path_group_t *path_group = ten_path_get_group(path); ten_list_t *members = &path_group->members; TEN_ASSERT(members, "Should not happen."); @@ -191,3 +188,14 @@ void ten_path_set_expired_time(ten_path_t *path, uint64_t expired_time_us) { path->expired_time_us = expired_time_us; } + +ten_path_group_t *ten_path_get_group(ten_path_t *self) { + TEN_ASSERT(self && ten_path_check_integrity(self, true), "Invalid argument."); + + ten_path_group_t *path_group = + (ten_path_group_t *)ten_shared_ptr_get_data(self->group); + TEN_ASSERT(path_group && ten_path_group_check_integrity(path_group, true), + "Invalid argument."); + + return path_group; +} diff --git a/core/src/ten_runtime/path/path_group.c b/core/src/ten_runtime/path/path_group.c index 8d5c38a678..d56f0b5e1e 100644 --- a/core/src/ten_runtime/path/path_group.c +++ b/core/src/ten_runtime/path/path_group.c @@ -140,12 +140,11 @@ static ten_path_t *ten_path_group_resolve_in_one_fail_and_all_ok_return( "Invalid argument."); ten_shared_ptr_t *cmd_result = path->cached_cmd_result; + if (cmd_result) { TEN_ASSERT(ten_msg_get_type(cmd_result) == TEN_MSG_TYPE_CMD_RESULT, "Invalid argument."); - } - if (cmd_result) { if (ten_cmd_result_get_status_code(cmd_result) != TEN_STATUS_CODE_OK) { // Receive a fail result, return it. return path; @@ -178,10 +177,7 @@ ten_list_t *ten_path_group_get_members(ten_path_t *path) { TEN_ASSERT(path && ten_path_check_integrity(path, true), "Invalid argument."); TEN_ASSERT(ten_path_is_in_a_group(path), "Invalid argument."); - ten_path_group_t *path_group = - (ten_path_group_t *)ten_shared_ptr_get_data(path->group); - TEN_ASSERT(path_group && ten_path_group_check_integrity(path_group, true), - "Invalid argument."); + ten_path_group_t *path_group = ten_path_get_group(path); ten_list_t *members = &path_group->members; TEN_ASSERT(members && ten_list_check_integrity(members), @@ -198,30 +194,30 @@ ten_list_t *ten_path_group_get_members(ten_path_t *path) { * group to a final cmd result. It checks the policy of the path group and calls * the appropriate function to resolve the path group status. * - * Policy `TEN_PATH_GROUP_POLICY_ONE_FAIL_RETURN_AND_ALL_OK_RETURN_FIRST` - * returns the first path in the list if all paths have succeeded, and returns - * the first path that has failed otherwise. + * Policy `TEN_PATH_GROUP_POLICY_RETURN_FIRST_OK_OR_FAIL` returns the first path + * in the list if all paths have succeeded, and returns the first path that has + * failed otherwise. */ ten_path_t *ten_path_group_resolve(ten_path_t *path, TEN_PATH_TYPE type) { TEN_ASSERT(path && ten_path_check_integrity(path, true), "Invalid argument."); TEN_ASSERT(ten_path_is_in_a_group(path), "Invalid argument."); - ten_path_group_t *path_group = - (ten_path_group_t *)ten_shared_ptr_get_data(path->group); - TEN_ASSERT(path_group && ten_path_group_check_integrity(path_group, true), - "Invalid argument."); + ten_path_group_t *path_group = ten_path_get_group(path); ten_list_t *members = &path_group->members; TEN_ASSERT(members && ten_list_check_integrity(members), "Should not happen."); switch (path_group->policy) { - case TEN_PATH_GROUP_POLICY_ONE_FAIL_RETURN_AND_ALL_OK_RETURN_FIRST: + case TEN_PATH_GROUP_POLICY_RETURN_FIRST_OK_OR_FAIL: return ten_path_group_resolve_in_one_fail_and_all_ok_return(members, type, false); - case TEN_PATH_GROUP_POLICY_ONE_FAIL_RETURN_AND_ALL_OK_RETURN_LAST: + case TEN_PATH_GROUP_POLICY_RETURN_LAST_OK_OR_FAIL: return ten_path_group_resolve_in_one_fail_and_all_ok_return(members, type, true); + case TEN_PATH_GROUP_POLICY_RETURN_EACH_IMMEDIATELY: + // In this policy, we return the current path immediately. + return path; default: TEN_ASSERT(0, "Should not happen."); return NULL; diff --git a/core/src/ten_runtime/path/path_table.c b/core/src/ten_runtime/path/path_table.c index c8e76bf481..e83b3f2287 100644 --- a/core/src/ten_runtime/path/path_table.c +++ b/core/src/ten_runtime/path/path_table.c @@ -378,40 +378,17 @@ static void ten_path_mark_belonging_group_processed(ten_path_t *path) { group->has_been_processed = true; } -static bool ten_path_table_remove_group_and_all_its_paths_if_needed( +static void ten_path_table_remove_group_and_all_its_paths( ten_path_table_t *self, TEN_PATH_TYPE type, ten_path_t *path) { TEN_ASSERT(self, "Invalid argument."); TEN_ASSERT(ten_path_table_check_integrity(self, true), "Invalid argument."); TEN_ASSERT(path && ten_path_check_integrity(path, true), "Invalid argument."); TEN_ASSERT(ten_path_is_in_a_group(path), "Invalid argument."); - ten_list_t *list = type == TEN_PATH_IN ? &self->in_paths : &self->out_paths; - - ten_path_group_t *path_group = - (ten_path_group_t *)ten_shared_ptr_get_data(path->group); - TEN_ASSERT(path_group && ten_path_group_check_integrity(path_group, true), - "Invalid argument."); - - if (!path_group->has_been_processed) { - // This path group has not yet completed its task, so it cannot be removed. - return false; - } + ten_list_t *paths = type == TEN_PATH_IN ? &self->in_paths : &self->out_paths; ten_list_t *group_members = ten_path_group_get_members(path); - ten_list_foreach (group_members, iter) { - ten_path_t *group_path = ten_ptr_listnode_get(iter.node); - TEN_ASSERT(group_path && ten_path_check_integrity(group_path, true), - "Invalid argument."); - - if (group_path->cached_cmd_result == NULL) { - // If there is a path in the group that has not received the cmd result, - // it means this group is not finished, therefore, we should not remove - // the group. - return false; - } - } - // If all paths in the group have received the cmd result, we should remove // the group, and all its contained paths. ten_list_foreach (group_members, iter) { @@ -419,13 +396,11 @@ static bool ten_path_table_remove_group_and_all_its_paths_if_needed( TEN_ASSERT(group_path && ten_path_check_integrity(group_path, true), "Invalid argument."); - ten_listnode_t *group_path_node = ten_list_find_ptr(list, group_path); + ten_listnode_t *group_path_node = ten_list_find_ptr(paths, group_path); TEN_ASSERT(group_path_node, "Should not happen."); - ten_list_remove_node(list, group_path_node); + ten_list_remove_node(paths, group_path_node); } - - return true; } // Search the specified path table for the corresponding path entry. @@ -480,6 +455,9 @@ static ten_path_t *ten_path_table_find_path(ten_path_table_t *self, // > If yes, decide the resulting cmd result from the cmd results in // the path group, and transmit the determined cmd result backward. // > If no, do nothing. +// +// Note: This function will be called after the cmd result is linked to the +// corresponding path. ten_shared_ptr_t *ten_path_table_determine_actual_cmd_result( ten_path_table_t *self, TEN_PATH_TYPE path_type, ten_path_t *path, bool remove_path) { @@ -492,59 +470,67 @@ ten_shared_ptr_t *ten_path_table_determine_actual_cmd_result( path = ten_path_group_resolve(path, path_type); } - ten_shared_ptr_t *cmd_result = NULL; + if (!path) { + // The return path has not been decided, so no cmd result needs to be sent + // to the extension. + return NULL; + } - if (path) { - cmd_result = path->cached_cmd_result; - TEN_ASSERT(cmd_result && ten_cmd_base_check_integrity(cmd_result), - "Invalid argument."); + ten_shared_ptr_t *cmd_result = path->cached_cmd_result; + TEN_ASSERT(cmd_result && ten_cmd_base_check_integrity(cmd_result), + "Invalid argument."); - // The `cached_cmd_result` is the only criterion used to determine whether a - // path has completed its task. Therefore, it is set here to ensure that - // other validation logic can function properly. - path->cached_cmd_result = ten_shared_ptr_clone(cmd_result); - - // We need to use the original cmd name to find the schema definition of the - // cmd result. - ten_cmd_result_set_original_cmd_name( - cmd_result, ten_string_get_raw_str(&path->cmd_name)); - - // The command ID should be reverted to an old one when flows through this - // path. - if (!ten_string_is_empty(&path->original_cmd_id)) { - ten_cmd_base_set_cmd_id(cmd_result, - ten_string_get_raw_str(&path->original_cmd_id)); - } + // The `cached_cmd_result` is the only criterion used to determine whether a + // path has completed its task. Therefore, it is set here to ensure that + // other validation logic can function properly. + path->cached_cmd_result = ten_shared_ptr_clone(cmd_result); + + // We need to use the original cmd name to find the schema definition of the + // cmd result. + ten_cmd_result_set_original_cmd_name(cmd_result, + ten_string_get_raw_str(&path->cmd_name)); + + // The command ID should be reverted to an old one when flows through this + // path. + if (!ten_string_is_empty(&path->original_cmd_id)) { + ten_cmd_base_set_cmd_id(cmd_result, + ten_string_get_raw_str(&path->original_cmd_id)); + } - // Set the dest of the cmd result to the source location of the path. - ten_msg_clear_and_set_dest_to_loc(cmd_result, &path->src_loc); + // Set the dest of the cmd result to the source location of the path. + ten_msg_clear_and_set_dest_to_loc(cmd_result, &path->src_loc); - if (path_type == TEN_PATH_OUT) { - // Restore the settings of the result handler, so that the extension - // could call the result handler for the result. - ten_cmd_base_set_result_handler( - cmd_result, ((ten_path_out_t *)path)->result_handler, - ((ten_path_out_t *)path)->result_handler_data); - } + if (path_type == TEN_PATH_OUT) { + // Restore the settings of the result handler, so that the extension + // could call the result handler for the result. + ten_cmd_base_set_result_handler( + cmd_result, ((ten_path_out_t *)path)->result_handler, + ((ten_path_out_t *)path)->result_handler_data); + } + + if (ten_path_is_in_a_group(path)) { + ten_path_group_t *path_group = ten_path_get_group(path); + + // =-=-= - if (ten_path_is_in_a_group(path)) { + if (path_group->policy != TEN_PATH_GROUP_POLICY_RETURN_EACH_IMMEDIATELY) { // When execution reaches this point, it means that the path group has // completed its task, so the flag is marked accordingly. ten_path_mark_belonging_group_processed(path); - // Remove the path group from the path table if all paths in the group - // have been processed. - ten_path_table_remove_group_and_all_its_paths_if_needed(self, path_type, - path); - } else { + // The path group has completed its task, so clean up the path group and + // all paths it contains. + ten_path_table_remove_group_and_all_its_paths(self, path_type, path); + } + } else { + if (remove_path) { + // This path is not in any group, and we have already decided on the cmd + // result to send to the extension, so this path can be deleted. + // // Remove the corresponding path from the path table, because the // purpose of that path is completed. - - if (remove_path) { - ten_list_remove_ptr( - path_type == TEN_PATH_IN ? &self->in_paths : &self->out_paths, - path); - } + ten_list_remove_ptr( + path_type == TEN_PATH_IN ? &self->in_paths : &self->out_paths, path); } } @@ -562,23 +548,14 @@ ten_path_t *ten_path_table_set_result(ten_path_table_t *self, "Invalid argument."); ten_path_t *path = ten_path_table_find_path(self, path_type, cmd_result); - - if (path) { - ten_path_set_result(path, cmd_result); - - // Since `cached_cmd_result` can indicate whether a path has been processed, - // every time `cached_cmd_result` is assigned a value, it should check - // whether this group can be removed from the path table, thus avoiding - // resource leakage. - if (ten_path_is_in_a_group(path)) { - bool removed = ten_path_table_remove_group_and_all_its_paths_if_needed( - self, path_type, path); - if (removed) { - return NULL; - } - } + if (!path) { + // The path for the cmd result to return is no longer available. + return NULL; } + // Associate the cmd result with the corresponding path entry. + ten_path_set_result(path, cmd_result); + return path; } From 5f21fd41fe50b3850e5e346203ed5b0e7e5ab1fd Mon Sep 17 00:00:00 2001 From: Hu Yueh-Wei Date: Sun, 10 Nov 2024 17:54:37 +0800 Subject: [PATCH 2/4] feat: add immediately return result mode --- .../ten_runtime/path/path_group.h | 9 ---- core/src/ten_runtime/extension/msg_handling.c | 2 - core/src/ten_runtime/path/path_group.c | 1 - core/src/ten_runtime/path/path_table.c | 42 ++++++++++++------- 4 files changed, 28 insertions(+), 26 deletions(-) diff --git a/core/include_internal/ten_runtime/path/path_group.h b/core/include_internal/ten_runtime/path/path_group.h index bba57ef883..3b0387433a 100644 --- a/core/include_internal/ten_runtime/path/path_group.h +++ b/core/include_internal/ten_runtime/path/path_group.h @@ -156,15 +156,6 @@ typedef struct ten_path_group_t { // Contain the members of the group. ten_list_t members; // ten_path_t - - // If this flag is set, none of the paths in the path_group can be used to - // trace back cmd results anymore. - // - // For example, if the policy is ONE_FAIL_RETURN_AND_ALL_OK_RETURN_FIRST - // and one of the paths in the group has received a fail cmd result, then - // the 'has_been_processed' flag will be set to true to prevent the left - // paths in the group from transmitting cmd results. - bool has_been_processed; } ten_path_group_t; TEN_RUNTIME_PRIVATE_API bool ten_path_group_check_integrity( diff --git a/core/src/ten_runtime/extension/msg_handling.c b/core/src/ten_runtime/extension/msg_handling.c index e4fa1c3e70..517d6fd9f5 100644 --- a/core/src/ten_runtime/extension/msg_handling.c +++ b/core/src/ten_runtime/extension/msg_handling.c @@ -101,8 +101,6 @@ void ten_extension_handle_in_msg(ten_extension_t *self, ten_shared_ptr_t *msg) { // 'commands' before sending it to the extension. if (msg_is_cmd_result) { - // =-=-= - // Set the cmd result to the corresponding OUT path to indicate that // there has been a cmd result flow through that OUT path. ten_path_t *out_path = diff --git a/core/src/ten_runtime/path/path_group.c b/core/src/ten_runtime/path/path_group.c index d56f0b5e1e..0b08f89d35 100644 --- a/core/src/ten_runtime/path/path_group.c +++ b/core/src/ten_runtime/path/path_group.c @@ -70,7 +70,6 @@ static ten_path_group_t *ten_path_group_create(ten_path_table_t *table, self->table = table; self->policy = policy; - self->has_been_processed = false; ten_list_init(&self->members); return self; diff --git a/core/src/ten_runtime/path/path_table.c b/core/src/ten_runtime/path/path_table.c index e83b3f2287..11b25d2dd3 100644 --- a/core/src/ten_runtime/path/path_table.c +++ b/core/src/ten_runtime/path/path_table.c @@ -366,16 +366,25 @@ static ten_path_in_t *ten_path_table_find_in_path(ten_path_table_t *self, return ten_ptr_listnode_get(old_node); } -static void ten_path_mark_belonging_group_processed(ten_path_t *path) { +static void ten_path_table_remove_path_from_group(ten_path_table_t *self, + TEN_PATH_TYPE type, + ten_path_t *path) { + TEN_ASSERT(self, "Invalid argument."); + TEN_ASSERT(ten_path_table_check_integrity(self, true), "Invalid argument."); TEN_ASSERT(path && ten_path_check_integrity(path, true), "Invalid argument."); TEN_ASSERT(ten_path_is_in_a_group(path), "Invalid argument."); - ten_path_group_t *group = - (ten_path_group_t *)ten_shared_ptr_get_data(path->group); - TEN_ASSERT(group && ten_path_group_check_integrity(group, true), - "Invalid argument."); + // Remove path from group members. + ten_list_t *group_members = ten_path_group_get_members(path); + ten_listnode_t *group_member_node = ten_list_find_ptr(group_members, path); + TEN_ASSERT(group_member_node, "Should not happen."); + ten_list_remove_node(group_members, group_member_node); - group->has_been_processed = true; + // Remove path from the path table. + ten_list_t *paths = type == TEN_PATH_IN ? &self->in_paths : &self->out_paths; + ten_listnode_t *paths_node = ten_list_find_ptr(paths, path); + TEN_ASSERT(paths_node, "Should not happen."); + ten_list_remove_node(paths, paths_node); } static void ten_path_table_remove_group_and_all_its_paths( @@ -511,16 +520,21 @@ ten_shared_ptr_t *ten_path_table_determine_actual_cmd_result( if (ten_path_is_in_a_group(path)) { ten_path_group_t *path_group = ten_path_get_group(path); - // =-=-= + switch (path_group->policy) { + case TEN_PATH_GROUP_POLICY_RETURN_EACH_IMMEDIATELY: + ten_path_table_remove_path_from_group(self, path_type, path); + break; - if (path_group->policy != TEN_PATH_GROUP_POLICY_RETURN_EACH_IMMEDIATELY) { - // When execution reaches this point, it means that the path group has - // completed its task, so the flag is marked accordingly. - ten_path_mark_belonging_group_processed(path); + case TEN_PATH_GROUP_POLICY_RETURN_FIRST_OK_OR_FAIL: + case TEN_PATH_GROUP_POLICY_RETURN_LAST_OK_OR_FAIL: + // The path group has completed its task, so clean up the path group and + // all paths it contains. + ten_path_table_remove_group_and_all_its_paths(self, path_type, path); + break; - // The path group has completed its task, so clean up the path group and - // all paths it contains. - ten_path_table_remove_group_and_all_its_paths(self, path_type, path); + default: + TEN_ASSERT(0, "Should not happen."); + break; } } else { if (remove_path) { From 9a26b80f62e5a6a4caaa92a0f6b4a5fd791dfd90 Mon Sep 17 00:00:00 2001 From: Hu Yueh-Wei Date: Sun, 10 Nov 2024 17:55:12 +0800 Subject: [PATCH 3/4] feat: add immediately return result mode --- tests/ten_runtime/smoke/msg_test/msg_1.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/ten_runtime/smoke/msg_test/msg_1.cc b/tests/ten_runtime/smoke/msg_test/msg_1.cc index 958784ecc8..726544a322 100644 --- a/tests/ten_runtime/smoke/msg_test/msg_1.cc +++ b/tests/ten_runtime/smoke/msg_test/msg_1.cc @@ -6,7 +6,6 @@ // #include #include -#include #include "gtest/gtest.h" #include "include_internal/ten_runtime/binding/cpp/ten.h" From 3c71fd441d0d3fc1ffaa950c77ec0503a8219c4c Mon Sep 17 00:00:00 2001 From: Hu Yueh-Wei Date: Sun, 10 Nov 2024 18:03:21 +0800 Subject: [PATCH 4/4] feat: add immediately return result mode --- ...ultiple_result.cc => multiple_result_1.cc} | 10 +- .../cmd_result_test/multiple_result_2.cc | 178 ++++++++++++++++++ 2 files changed, 183 insertions(+), 5 deletions(-) rename tests/ten_runtime/smoke/cmd_result_test/{multiple_result.cc => multiple_result_1.cc} (93%) create mode 100644 tests/ten_runtime/smoke/cmd_result_test/multiple_result_2.cc diff --git a/tests/ten_runtime/smoke/cmd_result_test/multiple_result.cc b/tests/ten_runtime/smoke/cmd_result_test/multiple_result_1.cc similarity index 93% rename from tests/ten_runtime/smoke/cmd_result_test/multiple_result.cc rename to tests/ten_runtime/smoke/cmd_result_test/multiple_result_1.cc index 597c13cb63..1cdb34f7b8 100644 --- a/tests/ten_runtime/smoke/cmd_result_test/multiple_result.cc +++ b/tests/ten_runtime/smoke/cmd_result_test/multiple_result_1.cc @@ -90,14 +90,14 @@ void *test_app_thread_main(TEN_UNUSED void *args) { return nullptr; } -TEN_CPP_REGISTER_ADDON_AS_EXTENSION(multiple_result__test_extension_1, +TEN_CPP_REGISTER_ADDON_AS_EXTENSION(multiple_result_1__test_extension_1, test_extension_1); -TEN_CPP_REGISTER_ADDON_AS_EXTENSION(multiple_result__test_extension_2, +TEN_CPP_REGISTER_ADDON_AS_EXTENSION(multiple_result_1__test_extension_2, test_extension_2); } // namespace -TEST(CmdResultTest, MultipleResult) { // NOLINT +TEST(CmdResultTest, MultipleResult1) { // NOLINT // Start app. auto *app_thread = ten_thread_create("app thread", test_app_thread_main, nullptr); @@ -114,13 +114,13 @@ TEST(CmdResultTest, MultipleResult) { // NOLINT "nodes": [{ "type": "extension", "name": "test_extension_1", - "addon": "multiple_result__test_extension_1", + "addon": "multiple_result_1__test_extension_1", "extension_group": "basic_extension_group", "app": "msgpack://127.0.0.1:8001/" },{ "type": "extension", "name": "test_extension_2", - "addon": "multiple_result__test_extension_2", + "addon": "multiple_result_1__test_extension_2", "extension_group": "basic_extension_group", "app": "msgpack://127.0.0.1:8001/" }], diff --git a/tests/ten_runtime/smoke/cmd_result_test/multiple_result_2.cc b/tests/ten_runtime/smoke/cmd_result_test/multiple_result_2.cc new file mode 100644 index 0000000000..a830e08e5a --- /dev/null +++ b/tests/ten_runtime/smoke/cmd_result_test/multiple_result_2.cc @@ -0,0 +1,178 @@ +// +// Copyright © 2024 Agora +// This file is part of TEN Framework, an open source project. +// Licensed under the Apache License, Version 2.0, with certain conditions. +// Refer to the "LICENSE" file in the root directory for more information. +// +#include +#include + +#include "gtest/gtest.h" +#include "include_internal/ten_runtime/binding/cpp/ten.h" +#include "ten_utils/lib/thread.h" +#include "tests/common/client/cpp/msgpack_tcp.h" +#include "tests/ten_runtime/smoke/extension_test/util/binding/cpp/check.h" + +namespace { + +class test_extension_1 : public ten::extension_t { + public: + explicit test_extension_1(const std::string &name) : ten::extension_t(name) {} + + void on_cmd(ten::ten_env_t &ten_env, + std::unique_ptr cmd) override { + if (std::string(cmd->get_name()) == "hello_world") { + ten_env.send_cmd(std::move(cmd), + [this](ten::ten_env_t &ten_env, + std::unique_ptr cmd_result) { + ++received_result_cnt; + + cmd_result->set_property("detail", "hello world, too"); + ten_env.return_result_directly(std::move(cmd_result)); + }); + } + } + + private: + int received_result_cnt{0}; +}; + +class test_extension_2 : public ten::extension_t { + public: + explicit test_extension_2(const std::string &name) : ten::extension_t(name) {} + + void on_cmd(ten::ten_env_t &ten_env, + std::unique_ptr cmd) override { + if (std::string(cmd->get_name()) == "hello_world") { + auto cmd_result = ten::cmd_result_t::create(TEN_STATUS_CODE_OK); + cmd_result->set_property("detail", "result from 2"); + ten_env.return_result(std::move(cmd_result), std::move(cmd)); + } + } +}; + +class test_extension_3 : public ten::extension_t { + public: + explicit test_extension_3(const std::string &name) : ten::extension_t(name) {} + + void on_cmd(ten::ten_env_t &ten_env, + std::unique_ptr cmd) override { + if (std::string(cmd->get_name()) == "hello_world") { + auto cmd_result = ten::cmd_result_t::create(TEN_STATUS_CODE_OK); + cmd_result->set_property("detail", "result from 3"); + ten_env.return_result(std::move(cmd_result), std::move(cmd)); + } + } +}; + +class test_app : public ten::app_t { + public: + void on_configure(ten::ten_env_t &ten_env) override { + bool rc = ten_env.init_property_from_json( + // clang-format off + R"({ + "_ten": { + "uri": "msgpack://127.0.0.1:8001/", + "log_level": 2 + } + })" + // clang-format on + , + nullptr); + ASSERT_EQ(rc, true); + + ten_env.on_configure_done(); + } +}; + +void *test_app_thread_main(TEN_UNUSED void *args) { + auto *app = new test_app(); + app->run(); + delete app; + + return nullptr; +} + +TEN_CPP_REGISTER_ADDON_AS_EXTENSION(multiple_result_2__test_extension_1, + test_extension_1); +TEN_CPP_REGISTER_ADDON_AS_EXTENSION(multiple_result_2__test_extension_2, + test_extension_2); +TEN_CPP_REGISTER_ADDON_AS_EXTENSION(multiple_result_2__test_extension_3, + test_extension_3); + +} // namespace + +TEST(CmdResultTest, MultipleResult2) { // NOLINT + // Start app. + auto *app_thread = + ten_thread_create("app thread", test_app_thread_main, nullptr); + + // Create a client and connect to the app. + auto *client = new ten::msgpack_tcp_client_t("msgpack://127.0.0.1:8001/"); + + // Send graph. + nlohmann::json resp = client->send_json_and_recv_resp_in_json( + R"({ + "_ten": { + "type": "start_graph", + "seq_id": "55", + "nodes": [{ + "type": "extension", + "name": "test_extension_1", + "addon": "multiple_result_2__test_extension_1", + "extension_group": "basic_extension_group", + "app": "msgpack://127.0.0.1:8001/" + },{ + "type": "extension", + "name": "test_extension_2", + "addon": "multiple_result_2__test_extension_2", + "extension_group": "basic_extension_group", + "app": "msgpack://127.0.0.1:8001/" + },{ + "type": "extension", + "name": "test_extension_3", + "addon": "multiple_result_2__test_extension_3", + "extension_group": "basic_extension_group", + "app": "msgpack://127.0.0.1:8001/" + }], + "connections": [{ + "app": "msgpack://127.0.0.1:8001/", + "extension_group": "basic_extension_group", + "extension": "test_extension_1", + "cmd": [{ + "name": "hello_world", + "dest": [{ + "app": "msgpack://127.0.0.1:8001/", + "extension_group": "basic_extension_group", + "extension": "test_extension_2" + },{ + "app": "msgpack://127.0.0.1:8001/", + "extension_group": "basic_extension_group", + "extension": "test_extension_3" + }] + }] + }] + } + })"_json); + ten_test::check_status_code_is(resp, TEN_STATUS_CODE_OK); + + // Send a user-defined 'hello world' command. + resp = client->send_json_and_recv_resp_in_json( + R"({ + "_ten": { + "name": "hello_world", + "seq_id": "137", + "dest": [{ + "app": "msgpack://127.0.0.1:8001/", + "extension_group": "basic_extension_group", + "extension": "test_extension_1" + }] + } + })"_json); + ten_test::check_result_is(resp, "137", TEN_STATUS_CODE_OK, + "hello world, too"); + + delete client; + + ten_thread_join(app_thread, -1); +}