diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt index fd5782a..6fdd6d8 100755 --- a/examples/cpp/CMakeLists.txt +++ b/examples/cpp/CMakeLists.txt @@ -7,6 +7,12 @@ add_compile_options(-std=c++11) find_package(Threads REQUIRED) find_package(protobuf CONFIG REQUIRED) find_package(gRPC CONFIG REQUIRED) +find_package(Boost) + +IF (Boost_FOUND) + INCLUDE_DIRECTORIES(${Boost_INCLUDE_DIR}) + ADD_DEFINITIONS( "-DHAS_BOOST" ) +ENDIF() include_directories(.) include_directories(protos) @@ -61,6 +67,17 @@ target_link_libraries( protobuf::libprotobuf ) +add_executable( + formant_client_example + formant_client_example.cpp + ${INCLUDES} +) +target_link_libraries( + formant_client_example + gRPC::grpc++ + protobuf::libprotobuf +) + add_executable( get_config_params get_config_params.cpp diff --git a/examples/cpp/formant_client_example.cpp b/examples/cpp/formant_client_example.cpp new file mode 100644 index 0000000..e1022a0 --- /dev/null +++ b/examples/cpp/formant_client_example.cpp @@ -0,0 +1,247 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "protos/agent/v1/agent.grpc.pb.h" + +using grpc::Channel; +using grpc::ClientContext; +using grpc::Status; + +using v1::agent::Agent; + +// For getting config data +using v1::agent::GetApplicationConfigurationRequest; +using v1::agent::GetApplicationConfigurationResponse; + +// For receiving commands +using v1::agent::GetCommandRequestStreamRequest; +using v1::agent::GetCommandRequestStreamResponse; +using v1::agent::SendCommandResponseRequest; +using v1::agent::SendCommandResponseResponse; + +#ifndef GET_CONFIG_H +#define GET_CONFIG_H + +class FormantAgentClient +{ +public: + using FormantAgentCallback = std::function; + + inline FormantAgentClient(std::shared_ptr channel) : stub_(Agent::NewStub(channel)) {} + + inline FormantAgentClient() : FormantAgentClient(grpc::CreateChannel("unix:///var/lib/formant/agent.sock", grpc::InsecureChannelCredentials())) + {} + + inline ~FormantAgentClient() + { + if (command_stream_thread_started) + { + cancel_command_receiver_thread(); + } + } + + /** + * @brief Return the configuration value for the key. Returned as a std::pair where the first element is + * true is the key was found, and returned as false if the key was not found in the config. + * + * @param param + * @return std::pair + */ + inline std::pair get_config_param(const std::string &key) + { + + if (app_config_loaded) + { + + if (app_config.configuration_map().contains(key)) + { + return {true, app_config.configuration_map().find(key)->second}; + } + + return {false, ""}; + } + + GetApplicationConfigurationRequest request; + GetApplicationConfigurationResponse response; + ClientContext context; + + stub_->GetApplicationConfiguration(&context, request, &response); + + app_config = response.configuration(); + app_config_loaded = true; + + return get_config_param(key); + } + + /** + * @brief Register a call-back with the Formant agent for the specified command's + * + * @param callback + * @param command_filters + */ + inline void register_command_callback(FormantAgentCallback callback, const std::vector &command_filters, bool reset_stream = true) + { + for (size_t i = 0; i < command_filters.size(); ++i) + register_command_callback(callback, command_filters[i], i == command_filters.size() - 1 && reset_stream); + } + + /** + * @brief Register a call-back with the Formant agent for the specified command + * + * @param callback + * @param command_filters + */ + inline void register_command_callback(FormantAgentCallback callback, const std::string &command_filter, bool reset_stream = true) + { + + // Step 1: register the register the command filter if needed + if (!command_filters.count(command_filter)) + { + command_filters.insert(command_filter); + command_stream_request.add_command_filter(command_filter); + } + else if (reset_stream) + { + reset_stream = false; // If we didn't add a new command filter, we dont need to reset stream + } + + // Step 2: add the callback to the command filter + { + boost::mutex::scoped_lock lock(callbacks_map_mutex); + callbacks[command_filter].push_back(callback); + } + + // Step 3: if the stream has not started, then start the stream. + if (!command_stream_thread_started) + { + start_command_receiver_thread(); + reset_stream = false; + } + + // Step 4: reset the stream if needed + if (reset_stream) + { + cancel_command_receiver_thread(); + start_command_receiver_thread(); + } + } + +private: + /** + * @brief The command receiver thread is a cancellable thread which waits for registered commands to + * be processed by the thread. + * + */ + inline void command_receiver_thread() + { + + ClientContext stream_context; + register_command_receiver_context(&stream_context); + GetCommandRequestStreamRequest request = command_stream_request; + auto stream = stub_->GetCommandRequestStream(&stream_context, request); + GetCommandRequestStreamResponse message; + + command_stream_thread_started = true; + + for (auto c : command_filters) + { + std::cout << c << " "; + } + + while (stream->Read(&message)) + { + auto command = message.request().command(); + auto payload = message.request().text(); + auto id = message.request().id(); + + SendCommandResponseRequest finishRequest; + SendCommandResponseResponse finishResponse; + + ClientContext response_context; + finishRequest.mutable_response()->set_request_id(id); + finishRequest.mutable_response()->set_success(true); + stub_->SendCommandResponse(&response_context, finishRequest, &finishResponse); + + auto &callback_funcs = callbacks[command]; + + // Pass the message to each callback for processing. + for (auto f : callback_funcs) + f(message); + } + } + + /** + * @brief Stop the command receiver thread. + * + */ + inline void cancel_command_receiver_thread() + { + command_stream_context->TryCancel(); + command_stream_thread.join(); + command_stream_thread_started = false; + } + + inline void start_command_receiver_thread() + { + command_stream_thread = std::thread(&FormantAgentClient::command_receiver_thread, this); + while (!command_stream_thread_started) + sleep(0.1); + } + + inline void register_command_receiver_context(ClientContext *context) + { + command_stream_context = context; + } + + // Agent stub for grpc + std::unique_ptr stub_; + + // variables for getting robot configuration from Formant + v1::model::ApplicationConfiguration app_config; + bool app_config_loaded = false; + + // the command stream variable which listens for specified commands. + std::unique_ptr> command_stream; + // std::unordered_map> callbacks; + std::unordered_map>> callbacks; + + boost::mutex callbacks_map_mutex; + std::set command_filters; + GetCommandRequestStreamRequest command_stream_request; // request used to set-up the command stream with the agent. + + ClientContext *command_stream_context; + + std::thread command_stream_thread; + bool command_stream_thread_started = false; +}; + +void callback1(const GetCommandRequestStreamResponse &msg) +{ + std::cout << "Command received callback 1" << std::endl; +} + +void callback2(const GetCommandRequestStreamResponse &msg) +{ + std::cout << "Command received callback 2" << std::endl; +} + +int main() +{ + + FormantAgentClient client; + std::vector command_filter_1{"test"}; + std::vector command_filter_2{"test", "test2"}; + client.register_command_callback(&callback1, command_filter_1, false); + client.register_command_callback(&callback2, command_filter_2); + + while(true){ + sleep(1); + } +} + +#endif \ No newline at end of file