Skip to content

Commit

Permalink
feat: add immediate return result (#254)
Browse files Browse the repository at this point in the history
  • Loading branch information
halajohn authored Nov 10, 2024
1 parent e0cd533 commit 91bf25d
Show file tree
Hide file tree
Showing 30 changed files with 606 additions and 68 deletions.
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@
"request": "launch",
"program": "${workspaceFolder}/out/linux/x64/tests/standalone/ten_runtime_smoke_test",
"args": [
"--gtest_filter=ExtensionTest.MultiDestSendInStopPeriod"
"--gtest_filter=GraphTest.GroupNodeMissing2Apps"
],
"cwd": "${workspaceFolder}/out/linux/x64/tests/standalone/",
"env": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ class cmd_result_t : public msg_t {
c_msg, err != nullptr ? err->get_internal_representation() : nullptr);
}

bool is_completed(error_t *err = nullptr) const {
return ten_cmd_result_is_completed(
c_msg, err != nullptr ? err->get_internal_representation() : nullptr);
}

bool set_final(bool final, error_t *err = nullptr) {
return ten_cmd_result_set_final(
c_msg, final,
Expand Down
3 changes: 2 additions & 1 deletion core/include/ten_runtime/binding/cpp/internal/ten_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "ten_runtime/binding/cpp/internal/msg/data.h"
#include "ten_runtime/binding/cpp/internal/msg/video_frame.h"
#include "ten_runtime/common/errno.h"
#include "ten_runtime/msg/cmd_result/cmd_result.h"
#include "ten_runtime/ten.h"
#include "ten_runtime/ten_env/internal/metadata.h"
#include "ten_runtime/ten_env/internal/on_xxx_done.h"
Expand Down Expand Up @@ -1184,7 +1185,7 @@ class ten_env_t {

(*result_handler)(*cpp_ten_env, std::move(cmd_result));

if (ten_cmd_result_is_final(c_cmd_result, nullptr)) {
if (ten_cmd_result_is_completed(c_cmd_result, nullptr)) {
// Only when is_final is true should the result handler be cleared.
// Otherwise, since more result handlers are expected, the result
// handler should not be cleared.
Expand Down
3 changes: 3 additions & 0 deletions core/include/ten_runtime/msg/cmd_result/cmd_result.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,8 @@ ten_cmd_result_get_status_code(ten_shared_ptr_t *self);
TEN_RUNTIME_API bool ten_cmd_result_is_final(ten_shared_ptr_t *self,
ten_error_t *err);

TEN_RUNTIME_API bool ten_cmd_result_is_completed(ten_shared_ptr_t *self,
ten_error_t *err);

TEN_RUNTIME_API bool ten_cmd_result_set_final(ten_shared_ptr_t *self,
bool is_final, ten_error_t *err);
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,6 @@ TEN_RUNTIME_PRIVATE_API PyObject *ten_py_cmd_result_set_final(PyObject *self,

TEN_RUNTIME_PRIVATE_API PyObject *ten_py_cmd_result_is_final(PyObject *self,
PyObject *args);

TEN_RUNTIME_PRIVATE_API PyObject *ten_py_cmd_result_is_completed(
PyObject *self, PyObject *args);
6 changes: 6 additions & 0 deletions core/include_internal/ten_runtime/common/constant_str.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,9 @@

#define TEN_STR_DEFAULT_EXTENSION_GROUP "default_extension_group"
#define TEN_STR_TEN_TEST_EXTENSION "ten:test_extension"

// Result return policy.
#define TEN_STR_RESULT_RETURN_POLICY "result_return_policy"
#define TEN_STR_FIRST_ERROR_OR_FIRST_OK "first_error_or_first_ok"
#define TEN_STR_FIRST_ERROR_OR_LAST_OK "first_error_or_last_ok"
#define TEN_STR_EACH_IMMEDIATELY "each_immediately"
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "ten_runtime/ten_config.h"

#include "include_internal/ten_runtime/path/path_group.h"
#include "ten_utils/container/list.h"
#include "ten_utils/lib/error.h"
#include "ten_utils/lib/signature.h"
Expand All @@ -20,7 +21,8 @@
typedef struct ten_msg_dest_info_t {
ten_signature_t signature;
ten_string_t name; // The name of a message or an interface.
ten_list_t dest; // ten_weak_ptr_t of ten_extension_info_t
TEN_RESULT_RETURN_POLICY policy;
ten_list_t dest; // ten_weak_ptr_t of ten_extension_info_t
} ten_msg_dest_info_t;

TEN_RUNTIME_PRIVATE_API bool ten_msg_dest_info_check_integrity(
Expand Down
12 changes: 11 additions & 1 deletion core/include_internal/ten_runtime/msg/cmd_base/cmd_result/cmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ typedef struct ten_cmd_result_t {

ten_value_t status_code; // int32 (TEN_STATUS_CODE)

ten_value_t is_final; // bool
ten_value_t is_final; // bool
ten_value_t is_completed; // bool
} ten_cmd_result_t;

TEN_RUNTIME_PRIVATE_API bool ten_raw_cmd_result_validate_schema(
Expand Down Expand Up @@ -103,9 +104,18 @@ TEN_RUNTIME_API ten_json_t *ten_cmd_result_to_json(ten_shared_ptr_t *self,
TEN_RUNTIME_PRIVATE_API bool ten_raw_cmd_result_is_final(ten_cmd_result_t *self,
ten_error_t *err);

TEN_RUNTIME_PRIVATE_API bool ten_raw_cmd_result_is_completed(
ten_cmd_result_t *self, ten_error_t *err);

TEN_RUNTIME_PRIVATE_API bool ten_raw_cmd_result_set_final(
ten_cmd_result_t *self, bool is_final, ten_error_t *err);

TEN_RUNTIME_PRIVATE_API bool ten_raw_cmd_result_set_completed(
ten_cmd_result_t *self, bool is_completed, ten_error_t *err);

TEN_RUNTIME_PRIVATE_API bool ten_raw_cmd_result_loop_all_fields(
ten_msg_t *self, ten_raw_msg_process_one_field_func_t cb, void *user_data,
ten_error_t *err);

TEN_RUNTIME_PRIVATE_API bool ten_cmd_result_set_completed(
ten_shared_ptr_t *self, bool is_completed, ten_error_t *err);
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
//
// Copyright © 2024 Agora
// This file is part of TEN Framework, an open source project.
// Licensed under the Apache License, Version 2.0, with certain conditions.
// Refer to the "LICENSE" file in the root directory for more information.
//
#pragma once

#include "ten_runtime/ten_config.h"

#include <stdbool.h>

#include "include_internal/ten_runtime/msg/loop_fields.h"
#include "ten_utils/container/list.h"

typedef struct ten_msg_t ten_msg_t;
typedef struct ten_http_t ten_http_t;
typedef struct ten_error_t ten_error_t;

TEN_RUNTIME_PRIVATE_API void ten_cmd_result_copy_is_completed(
ten_msg_t *self, ten_msg_t *src, ten_list_t *excluded_field_ids);

TEN_RUNTIME_PRIVATE_API bool ten_cmd_result_process_is_completed(
ten_msg_t *self, ten_raw_msg_process_one_field_func_t cb, void *user_data,
ten_error_t *err);
22 changes: 3 additions & 19 deletions core/include_internal/ten_runtime/path/path_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include "include_internal/ten_runtime/path/common.h"
#include "include_internal/ten_runtime/path/path_table.h"
#include "include_internal/ten_runtime/path/result_return_policy.h"
#include "ten_utils/container/list.h"

// There is a possible group relationship among ten_path_t, and that group
Expand Down Expand Up @@ -129,30 +130,13 @@
typedef struct ten_path_t ten_path_t;
typedef struct ten_msg_conversion_t ten_msg_conversion_t;

typedef enum TEN_PATH_GROUP_POLICY {
TEN_PATH_GROUP_POLICY_INVALID,

// If receive a fail result, return it, otherwise, when all OK results are
// received, return the first received one. Clear the group after returning
// the result.
TEN_PATH_GROUP_POLICY_RETURN_FIRST_OK_OR_FAIL,

// Similar to the above, except return the last received one.
TEN_PATH_GROUP_POLICY_RETURN_LAST_OK_OR_FAIL,

// Return each result immediately as it is received.
TEN_PATH_GROUP_POLICY_RETURN_EACH_IMMEDIATELY,

// More modes is allowed, and could be added here in case needed.
} TEN_PATH_GROUP_POLICY;

typedef struct ten_path_group_t {
ten_signature_t signature;
ten_sanitizer_thread_check_t thread_check;

ten_path_table_t *table;

TEN_PATH_GROUP_POLICY policy;
TEN_RESULT_RETURN_POLICY policy;

// Contain the members of the group.
ten_list_t members; // ten_path_t
Expand All @@ -169,7 +153,7 @@ TEN_RUNTIME_PRIVATE_API ten_list_t *ten_path_group_get_members(
ten_path_t *path);

TEN_RUNTIME_PRIVATE_API void ten_paths_create_group(
ten_list_t *paths, TEN_PATH_GROUP_POLICY policy);
ten_list_t *paths, TEN_RESULT_RETURN_POLICY policy);

TEN_RUNTIME_PRIVATE_API ten_path_t *ten_path_group_resolve(ten_path_t *path,
TEN_PATH_TYPE type);
35 changes: 35 additions & 0 deletions core/include_internal/ten_runtime/path/result_return_policy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
//
// Copyright © 2024 Agora
// This file is part of TEN Framework, an open source project.
// Licensed under the Apache License, Version 2.0, with certain conditions.
// Refer to the "LICENSE" file in the root directory for more information.
//
#pragma once

#include "ten_runtime/ten_config.h"

#define TEN_DEFAULT_RESULT_RETURN_POLICY \
TEN_RESULT_RETURN_POLICY_FIRST_ERROR_OR_LAST_OK

typedef enum TEN_RESULT_RETURN_POLICY {
TEN_RESULT_RETURN_POLICY_INVALID,

// If receive a fail result, return it, otherwise, when all OK results are
// received, return the first received one. Clear the group after returning
// the result.
TEN_RESULT_RETURN_POLICY_FIRST_ERROR_OR_FIRST_OK,

// Similar to the above, except return the last received one.
TEN_RESULT_RETURN_POLICY_FIRST_ERROR_OR_LAST_OK,

// Return each result immediately as it is received.
TEN_RESULT_RETURN_POLICY_EACH_IMMEDIATELY,

// More modes is allowed, and could be added here in case needed.
} TEN_RESULT_RETURN_POLICY;

TEN_RUNTIME_PRIVATE_API TEN_RESULT_RETURN_POLICY
ten_result_return_policy_from_string(const char *policy_str);

TEN_RUNTIME_PRIVATE_API const char *ten_result_return_policy_to_string(
TEN_RESULT_RETURN_POLICY policy);
3 changes: 3 additions & 0 deletions core/src/ten_runtime/binding/go/interface/ten/cmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ ten_go_status_t ten_go_cmd_result_set_final(uintptr_t bridge_addr,
ten_go_status_t ten_go_cmd_result_is_final(uintptr_t bridge_addr,
bool *is_final);

ten_go_status_t ten_go_cmd_result_is_completed(uintptr_t bridge_addr,
bool *is_completed);

ten_go_handle_t ten_go_cmd_result_get_detail(uintptr_t bridge_addr);

ten_go_status_t ten_go_cmd_result_get_detail_json_and_size(
Expand Down
18 changes: 18 additions & 0 deletions core/src/ten_runtime/binding/go/interface/ten/cmd_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type CmdResult interface {
GetStatusCode() (StatusCode, error)
SetFinal(isFinal bool) error
IsFinal() (bool, error)
IsCompleted() (bool, error)
}

type cmdResult struct {
Expand Down Expand Up @@ -117,3 +118,20 @@ func (p *cmdResult) IsFinal() (bool, error) {

return bool(isFinal), nil
}

func (p *cmdResult) IsCompleted() (bool, error) {
var isCompleted C.bool
err := withCGOLimiter(func() error {
apiStatus := C.ten_go_cmd_result_is_completed(
p.getCPtr(),
&isCompleted,
)
return withGoStatus(&apiStatus)
})

if err != nil {
return false, err
}

return bool(isCompleted), nil
}
30 changes: 30 additions & 0 deletions core/src/ten_runtime/binding/go/native/msg/cmd/cmd.c
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,33 @@ ten_go_status_t ten_go_cmd_result_is_final(uintptr_t bridge_addr,
ten_error_deinit(&err);
return status;
}

ten_go_status_t ten_go_cmd_result_is_completed(uintptr_t bridge_addr,
bool *is_completed) {
TEN_ASSERT(bridge_addr && is_completed, "Invalid argument.");

ten_go_msg_t *msg_bridge = ten_go_msg_reinterpret(bridge_addr);
TEN_ASSERT(msg_bridge && ten_go_msg_check_integrity(msg_bridge),
"Should not happen.");

ten_shared_ptr_t *c_cmd = ten_go_msg_c_msg(msg_bridge);
TEN_ASSERT(c_cmd, "Should not happen.");

ten_go_status_t status;
ten_go_status_init_with_errno(&status, TEN_ERRNO_OK);

ten_error_t err;
ten_error_init(&err);

bool is_completed_ =
ten_cmd_result_is_completed(ten_go_msg_c_msg(msg_bridge), &err);

if (!ten_error_is_success(&err)) {
ten_go_status_set(&status, ten_error_errno(&err), ten_error_errmsg(&err));
} else {
*is_completed = is_completed_;
}

ten_error_deinit(&err);
return status;
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ def set_final(self, is_final: bool):

def is_final(self) -> bool:
return _CmdResult.is_final(self)

def is_completed(self) -> bool:
return _CmdResult.is_completed(self)
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class _CmdResult(_Msg):
def get_status_code(self) -> int: ...
def set_final(self, is_final_flag: int) -> None: ...
def is_final(self) -> bool: ...
def is_completed(self) -> bool: ...

class _Data(_Msg):
def alloc_buf(self, size: int) -> None: ...
Expand Down
23 changes: 23 additions & 0 deletions core/src/ten_runtime/binding/python/native/msg/cmd_result.c
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,29 @@ PyObject *ten_py_cmd_result_is_final(PyObject *self, PyObject *args) {
return PyBool_FromLong(is_final);
}

PyObject *ten_py_cmd_result_is_completed(PyObject *self, PyObject *args) {
ten_py_cmd_result_t *py_cmd_result = (ten_py_cmd_result_t *)self;

TEN_ASSERT(py_cmd_result &&
ten_py_msg_check_integrity((ten_py_msg_t *)py_cmd_result),
"Invalid argument.");

ten_error_t err;
ten_error_init(&err);

bool is_completed =
ten_cmd_result_is_completed(py_cmd_result->msg.c_msg, &err);

if (!ten_error_is_success(&err)) {
ten_error_deinit(&err);
return ten_py_raise_py_runtime_error_exception("Failed to is_completed.");
}

ten_error_deinit(&err);

return PyBool_FromLong(is_completed);
}

bool ten_py_cmd_result_init_for_module(PyObject *module) {
PyTypeObject *py_type = ten_py_cmd_result_py_type();
if (PyType_Ready(py_type) < 0) {
Expand Down
1 change: 1 addition & 0 deletions core/src/ten_runtime/binding/python/native/msg/type.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ PyTypeObject *ten_py_cmd_result_py_type(void) {
NULL},
{"set_final", ten_py_cmd_result_set_final, METH_VARARGS, NULL},
{"is_final", ten_py_cmd_result_is_final, METH_VARARGS, NULL},
{"is_completed", ten_py_cmd_result_is_completed, METH_VARARGS, NULL},
{NULL, NULL, 0, NULL},
};

Expand Down
4 changes: 2 additions & 2 deletions core/src/ten_runtime/engine/msg_interface/start_graph.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ void ten_engine_handle_cmd_start_graph(ten_engine_t *self,

if (ten_list_size(&new_works) > 1) {
// Create path group for these newly submitted 'start_graph' commands.
ten_paths_create_group(&new_works,
TEN_PATH_GROUP_POLICY_RETURN_LAST_OK_OR_FAIL);
ten_paths_create_group(
&new_works, TEN_RESULT_RETURN_POLICY_FIRST_ERROR_OR_LAST_OK);
}
ten_list_clear(&new_works);

Expand Down
Loading

0 comments on commit 91bf25d

Please sign in to comment.