Skip to content

Commit

Permalink
Check
Browse files Browse the repository at this point in the history
  • Loading branch information
atobiszei committed Jan 21, 2025
1 parent 4cb0217 commit dddaf1b
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ cc_library(
"@mediapipe//mediapipe/calculators/tensor:image_to_tensor_calculator",
"@mediapipe//mediapipe/modules/holistic_landmark:holistic_landmark_cpu",
"libovmsmediapipe_utils",
"@mediapipe//mediapipe/framework:thread_pool_executor",
"@mediapipe//mediapipe/calculators/geti/inference:inference_calculators",
"@mediapipe//mediapipe/calculators/geti/utils:utils",
"@mediapipe//mediapipe/calculators/geti/utils:emptylabel_calculators",
Expand Down
5 changes: 5 additions & 0 deletions src/mediapipe_internal/mediapipegraphdefinition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ MediapipeGraphDefinition::MediapipeGraphDefinition(const std::string name,
reporter(std::make_unique<MediapipeServableMetricReporter>(metricConfig, registry, name)) {
mgconfig = config;
passKfsRequestFlag = false;
if (!sharedThreadPool) {
SPDLOG_ERROR("Created shared Thread Pool XXX");
sharedThreadPool = std::make_shared<mediapipe::ThreadPoolExecutor>(std::thread::hardware_concurrency()); // TODO FIXME should be in MP factory
}
}

Status MediapipeGraphDefinition::createInputsInfo() {
Expand Down Expand Up @@ -477,4 +481,5 @@ Status MediapipeGraphDefinition::initializeNodes() {
}
return StatusCode::OK;
}
std::shared_ptr<mediapipe::ThreadPoolExecutor> sharedThreadPool; // TODO FIXME should be in MP factory
} // namespace ovms
2 changes: 2 additions & 0 deletions src/mediapipe_internal/mediapipegraphdefinition.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "mediapipe/framework/calculator_graph.h"
#include "mediapipe/framework/port/parse_text_proto.h"
#include "mediapipe/framework/port/status.h"
#include "mediapipe/framework/thread_pool_executor.h"
#pragma GCC diagnostic pop

#include "mediapipegraphconfig.hpp"
Expand All @@ -53,6 +54,7 @@ struct LLMNodeResources;
using PythonNodeResourcesMap = std::unordered_map<std::string, std::shared_ptr<PythonNodeResources>>;
using LLMNodeResourcesMap = std::unordered_map<std::string, std::shared_ptr<LLMNodeResources>>;

extern std::shared_ptr<mediapipe::ThreadPoolExecutor> sharedThreadPool;
class MediapipeGraphDefinition {
friend MediapipeGraphDefinitionUnloadGuard;

Expand Down
9 changes: 9 additions & 0 deletions src/mediapipe_internal/mediapipegraphexecutor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <vector>

#include "../execution_context.hpp"
#include "../logging.hpp"
#include "../model_metric_reporter.hpp"
#include "../profiler.hpp"
#include "../status.hpp"
Expand Down Expand Up @@ -103,7 +104,11 @@ class MediapipeGraphExecutor {
MetricCounterGuard failedRequestsGuard(this->mediapipeServableMetricReporter->getRequestsMetric(executionContext, false));
MetricGaugeGuard currentGraphsGuard(this->mediapipeServableMetricReporter->currentGraphs.get());
::mediapipe::CalculatorGraph graph;
SPDLOG_ERROR("SetExecutor XXX");
std::ignore = graph.SetExecutor("", sharedThreadPool); // TODO FIXME
SPDLOG_ERROR("Start unary KServe request mediapipe graph: {} initializationXXXbegin", this->name);
MP_RETURN_ON_FAIL(graph.Initialize(this->config), std::string("failed initialization of MediaPipe graph: ") + this->name, StatusCode::MEDIAPIPE_GRAPH_INITIALIZATION_ERROR);
SPDLOG_ERROR("Start unary KServe request mediapipe graph: {} initializationXXXend", this->name);
std::unordered_map<std::string, ::mediapipe::OutputStreamPoller> outputPollers;
for (auto& name : this->outputNames) {
if (name.empty()) {
Expand All @@ -124,7 +129,9 @@ class MediapipeGraphExecutor {
inputSidePackets[PYTHON_SESSION_SIDE_PACKET_TAG] = mediapipe::MakePacket<PythonNodeResourcesMap>(this->pythonNodeResourcesMap).At(STARTING_TIMESTAMP);
inputSidePackets[LLM_SESSION_SIDE_PACKET_TAG] = mediapipe::MakePacket<LLMNodeResourcesMap>(this->llmNodeResourcesMap).At(STARTING_TIMESTAMP);
#endif
SPDLOG_ERROR("Start unary KServe request mediapipe graph: {} startRunXXXbegin", this->name);
MP_RETURN_ON_FAIL(graph.StartRun(inputSidePackets), std::string("start MediaPipe graph: ") + this->name, StatusCode::MEDIAPIPE_GRAPH_START_ERROR);
SPDLOG_ERROR("Start unary KServe request mediapipe graph: {} startRunXXXend", this->name);

::mediapipe::Packet packet;
std::set<std::string> outputPollersWithReceivedPacket;
Expand Down Expand Up @@ -216,7 +223,9 @@ class MediapipeGraphExecutor {
{
OVMS_PROFILE_SCOPE("Mediapipe graph initialization");
// Init
SPDLOG_DEBUG("Start unary KServe request mediapipe graph: {} initializationXXX", this->name);
MP_RETURN_ON_FAIL(graph.Initialize(this->config), "graph initialization", StatusCode::MEDIAPIPE_GRAPH_INITIALIZATION_ERROR);
SPDLOG_DEBUG("Start unary KServe request mediapipe graph: {} initializationXXX ended", this->name);
}
{
OVMS_PROFILE_SCOPE("Mediapipe graph installing packet observers");
Expand Down

0 comments on commit dddaf1b

Please sign in to comment.