Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add immediately return result mode #252

Merged
merged 4 commits into from
Nov 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@
"request": "launch",
"program": "${workspaceFolder}/out/linux/x64/tests/standalone/ten_runtime_smoke_test",
"args": [
"--gtest_filter=ExtensionTest.OneEngineConcurrent"
"--gtest_filter=ExtensionTest.MultiDestSendInStopPeriod"
],
"cwd": "${workspaceFolder}/out/linux/x64/tests/standalone/",
"env": {
Expand Down
2 changes: 2 additions & 0 deletions core/include_internal/ten_runtime/path/path.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,5 @@ TEN_RUNTIME_PRIVATE_API void ten_path_set_result(ten_path_t *path,

TEN_RUNTIME_PRIVATE_API void ten_path_set_expired_time(
ten_path_t *path, uint64_t expired_time_us);

TEN_RUNTIME_PRIVATE_API ten_path_group_t *ten_path_get_group(ten_path_t *self);
20 changes: 8 additions & 12 deletions core/include_internal/ten_runtime/path/path_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,13 @@ typedef enum TEN_PATH_GROUP_POLICY {
// If receive a fail result, return it, otherwise, when all OK results are
// received, return the first received one. Clear the group after returning
// the result.
TEN_PATH_GROUP_POLICY_ONE_FAIL_RETURN_AND_ALL_OK_RETURN_FIRST,
TEN_PATH_GROUP_POLICY_RETURN_FIRST_OK_OR_FAIL,

// Similar to the above, except return the last received one.
TEN_PATH_GROUP_POLICY_ONE_FAIL_RETURN_AND_ALL_OK_RETURN_LAST,
TEN_PATH_GROUP_POLICY_RETURN_LAST_OK_OR_FAIL,

// Return each result immediately as it is received.
TEN_PATH_GROUP_POLICY_RETURN_EACH_IMMEDIATELY,

// More modes is allowed, and could be added here in case needed.
} TEN_PATH_GROUP_POLICY;
Expand All @@ -150,16 +153,9 @@ typedef struct ten_path_group_t {
ten_path_table_t *table;

TEN_PATH_GROUP_POLICY policy;
ten_list_t members; // Contain the members of the group.

// If this flag is set, none of the paths in the path_group can be used to
// trace back cmd results anymore.
//
// For example, if the policy is ONE_FAIL_RETURN_AND_ALL_OK_RETURN_FIRST
// and one of the paths in the group has received a fail cmd result, then
// the 'has_been_processed' flag will be set to true to prevent the left
// paths in the group from transmitting cmd results.
bool has_been_processed;

// Contain the members of the group.
ten_list_t members; // ten_path_t
} ten_path_group_t;

TEN_RUNTIME_PRIVATE_API bool ten_path_group_check_integrity(
Expand Down
5 changes: 2 additions & 3 deletions core/src/ten_runtime/engine/msg_interface/start_graph.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,8 @@ void ten_engine_handle_cmd_start_graph(ten_engine_t *self,

if (ten_list_size(&new_works) > 1) {
// Create path group for these newly submitted 'start_graph' commands.
ten_paths_create_group(
&new_works,
TEN_PATH_GROUP_POLICY_ONE_FAIL_RETURN_AND_ALL_OK_RETURN_LAST);
ten_paths_create_group(&new_works,
TEN_PATH_GROUP_POLICY_RETURN_LAST_OK_OR_FAIL);
}
ten_list_clear(&new_works);

Expand Down
5 changes: 2 additions & 3 deletions core/src/ten_runtime/extension/extension.c
Original file line number Diff line number Diff line change
Expand Up @@ -711,9 +711,8 @@ bool ten_extension_handle_out_msg(ten_extension_t *self, ten_shared_ptr_t *msg,

if (ten_list_size(&result_out_paths) > 1) {
// Create a path group in this case.
ten_paths_create_group(
&result_out_paths,
TEN_PATH_GROUP_POLICY_ONE_FAIL_RETURN_AND_ALL_OK_RETURN_LAST);
ten_paths_create_group(&result_out_paths,
TEN_PATH_GROUP_POLICY_RETURN_LAST_OK_OR_FAIL);
}

ten_list_clear(&result_out_paths);
Expand Down
3 changes: 3 additions & 0 deletions core/src/ten_runtime/extension/internal/close.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ static void ten_extension_thread_process_remaining_paths(
ten_extension_get_name(extension, true), out_paths_cnt);

ten_list_t cmd_result_list = TEN_LIST_INIT_VAL;

// Generate an error result for each remaining out path.
ten_list_foreach (out_paths, iter) {
ten_path_t *path = (ten_path_t *)ten_ptr_listnode_get(iter.node);
TEN_ASSERT(path && ten_path_check_integrity(path, true),
Expand All @@ -72,6 +74,7 @@ static void ten_extension_thread_process_remaining_paths(
ten_shared_ptr_destroy(cmd_result);
}

// Send these newly generated error results to the extension.
ten_list_foreach (&cmd_result_list, iter) {
ten_shared_ptr_t *cmd_result = ten_smart_ptr_listnode_get(iter.node);
TEN_ASSERT(cmd_result && ten_cmd_base_check_integrity(cmd_result),
Expand Down
17 changes: 1 addition & 16 deletions core/src/ten_runtime/extension/msg_handling.c
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ void ten_extension_handle_in_msg(ten_extension_t *self, ten_shared_ptr_t *msg) {
// TODO(Xilin): Currently, there is no mechanism for auto return, so the
// relevant codes should be able to be disabled.
ten_extension_cache_cmd_result_to_in_path_for_auto_return(self, msg);

delete_msg = true;
}
}
Expand Down Expand Up @@ -199,8 +198,6 @@ void ten_extension_handle_in_msg(ten_extension_t *self, ten_shared_ptr_t *msg) {
if (!msg_is_cmd_result) {
// Create the corresponding IN paths for the input commands.

ten_list_t in_paths = TEN_LIST_INIT_VAL;

ten_list_foreach (&converted_msgs, iter) {
ten_msg_and_its_result_conversion_t *msg_and_result_conversion =
ten_ptr_listnode_get(iter.node);
Expand All @@ -213,24 +210,12 @@ void ten_extension_handle_in_msg(ten_extension_t *self, ten_shared_ptr_t *msg) {
if (ten_msg_is_cmd(actual_cmd)) {
if (ten_msg_info[ten_msg_get_type(actual_cmd)].create_in_path) {
// Add a path entry to the IN path table of this extension.
ten_path_t *in_path = (ten_path_t *)ten_path_table_add_in_path(
ten_path_table_add_in_path(
self->path_table, actual_cmd,
msg_and_result_conversion->result_conversion);

ten_list_push_ptr_back(&in_paths, in_path, NULL);
}
}
}

if (ten_list_size(&in_paths) > 1) {
// Create a path group in this case.

ten_paths_create_group(
&in_paths,
TEN_PATH_GROUP_POLICY_ONE_FAIL_RETURN_AND_ALL_OK_RETURN_LAST);
}

ten_list_clear(&in_paths);
}

// The path table processing is completed, it's time to check the schema. And
Expand Down
16 changes: 12 additions & 4 deletions core/src/ten_runtime/path/path.c
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,7 @@ void ten_path_set_result(ten_path_t *path, ten_shared_ptr_t *cmd_result) {
if (ten_path_is_in_a_group(path)) {
// Move the current path to the last of the members of the group, so that we
// can know which one should be returned in different policies.
ten_path_group_t *path_group =
(ten_path_group_t *)ten_shared_ptr_get_data(path->group);
TEN_ASSERT(path_group && ten_path_group_check_integrity(path_group, true),
"Invalid argument.");
ten_path_group_t *path_group = ten_path_get_group(path);

ten_list_t *members = &path_group->members;
TEN_ASSERT(members, "Should not happen.");
Expand All @@ -191,3 +188,14 @@ void ten_path_set_expired_time(ten_path_t *path, uint64_t expired_time_us) {

path->expired_time_us = expired_time_us;
}

ten_path_group_t *ten_path_get_group(ten_path_t *self) {
TEN_ASSERT(self && ten_path_check_integrity(self, true), "Invalid argument.");

ten_path_group_t *path_group =
(ten_path_group_t *)ten_shared_ptr_get_data(self->group);
TEN_ASSERT(path_group && ten_path_group_check_integrity(path_group, true),
"Invalid argument.");

return path_group;
}
27 changes: 11 additions & 16 deletions core/src/ten_runtime/path/path_group.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ static ten_path_group_t *ten_path_group_create(ten_path_table_t *table,

self->table = table;
self->policy = policy;
self->has_been_processed = false;
ten_list_init(&self->members);

return self;
Expand Down Expand Up @@ -140,12 +139,11 @@ static ten_path_t *ten_path_group_resolve_in_one_fail_and_all_ok_return(
"Invalid argument.");

ten_shared_ptr_t *cmd_result = path->cached_cmd_result;

if (cmd_result) {
TEN_ASSERT(ten_msg_get_type(cmd_result) == TEN_MSG_TYPE_CMD_RESULT,
"Invalid argument.");
}

if (cmd_result) {
if (ten_cmd_result_get_status_code(cmd_result) != TEN_STATUS_CODE_OK) {
// Receive a fail result, return it.
return path;
Expand Down Expand Up @@ -178,10 +176,7 @@ ten_list_t *ten_path_group_get_members(ten_path_t *path) {
TEN_ASSERT(path && ten_path_check_integrity(path, true), "Invalid argument.");
TEN_ASSERT(ten_path_is_in_a_group(path), "Invalid argument.");

ten_path_group_t *path_group =
(ten_path_group_t *)ten_shared_ptr_get_data(path->group);
TEN_ASSERT(path_group && ten_path_group_check_integrity(path_group, true),
"Invalid argument.");
ten_path_group_t *path_group = ten_path_get_group(path);

ten_list_t *members = &path_group->members;
TEN_ASSERT(members && ten_list_check_integrity(members),
Expand All @@ -198,30 +193,30 @@ ten_list_t *ten_path_group_get_members(ten_path_t *path) {
* group to a final cmd result. It checks the policy of the path group and calls
* the appropriate function to resolve the path group status.
*
* Policy `TEN_PATH_GROUP_POLICY_ONE_FAIL_RETURN_AND_ALL_OK_RETURN_FIRST`
* returns the first path in the list if all paths have succeeded, and returns
* the first path that has failed otherwise.
* Policy `TEN_PATH_GROUP_POLICY_RETURN_FIRST_OK_OR_FAIL` returns the first path
* in the list if all paths have succeeded, and returns the first path that has
* failed otherwise.
*/
ten_path_t *ten_path_group_resolve(ten_path_t *path, TEN_PATH_TYPE type) {
TEN_ASSERT(path && ten_path_check_integrity(path, true), "Invalid argument.");
TEN_ASSERT(ten_path_is_in_a_group(path), "Invalid argument.");

ten_path_group_t *path_group =
(ten_path_group_t *)ten_shared_ptr_get_data(path->group);
TEN_ASSERT(path_group && ten_path_group_check_integrity(path_group, true),
"Invalid argument.");
ten_path_group_t *path_group = ten_path_get_group(path);

ten_list_t *members = &path_group->members;
TEN_ASSERT(members && ten_list_check_integrity(members),
"Should not happen.");

switch (path_group->policy) {
case TEN_PATH_GROUP_POLICY_ONE_FAIL_RETURN_AND_ALL_OK_RETURN_FIRST:
case TEN_PATH_GROUP_POLICY_RETURN_FIRST_OK_OR_FAIL:
return ten_path_group_resolve_in_one_fail_and_all_ok_return(members, type,
false);
case TEN_PATH_GROUP_POLICY_ONE_FAIL_RETURN_AND_ALL_OK_RETURN_LAST:
case TEN_PATH_GROUP_POLICY_RETURN_LAST_OK_OR_FAIL:
return ten_path_group_resolve_in_one_fail_and_all_ok_return(members, type,
true);
case TEN_PATH_GROUP_POLICY_RETURN_EACH_IMMEDIATELY:
// In this policy, we return the current path immediately.
return path;
default:
TEN_ASSERT(0, "Should not happen.");
return NULL;
Expand Down
Loading
Loading