Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding implementation of Router Nodes #1963

Merged
merged 15 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/iwyu/mappings.imp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
{ "include": [ "<google/protobuf/repeated_ptr_field.h>", private, "<google/protobuf/repeated_field.h>", "public" ] },

# pybind11
{ "include": [ "<pybind11/detail/common.h>", "private", "<pybind11/pytypes.h>", "public" ] },
{ "include": [ "@<pybind11/detail/.*>", "private", "<pybind11/pybind11.h>", "public" ] },
{ "include": [ "<pybind11/cast.h>", "private", "<pybind11/pybind11.h>", "public" ] },

# rxcpp
Expand Down
1 change: 1 addition & 0 deletions python/morpheus/morpheus/_lib/cmake/libmorpheus.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ add_library(morpheus
src/stages/kafka_source.cpp
src/stages/preprocess_fil.cpp
src/stages/preprocess_nlp.cpp
src/stages/router.cpp
src/stages/serialize.cpp
src/stages/triton_inference.cpp
src/stages/write_to_file.cpp
Expand Down
108 changes: 108 additions & 0 deletions python/morpheus/morpheus/_lib/include/morpheus/stages/router.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "morpheus/export.h" // for MORPHEUS_EXPORT
#include "morpheus/messages/control.hpp" // for ControlMessage

#include <mrc/node/operators/router.hpp>
#include <mrc/segment/builder.hpp> // for Builder
#include <mrc/segment/object.hpp> // for Object
#include <pymrc/edge_adapter.hpp> // for AutoRegSinkAdapter, AutoRegSourceAdapter
#include <pymrc/node.hpp> // IWYU pragma: keep
#include <pymrc/port_builders.hpp> // for AutoRegEgressPort, AutoRegIngressPort
#include <pymrc/utilities/function_wrappers.hpp>
#include <rxcpp/rx.hpp> // for decay_t, trace_activity, from, observable_member

#include <memory> // for shared_ptr, unique_ptr
#include <string> // for string
#include <vector> // for vector

namespace morpheus {
/****** Component public implementations *******************/

/**
* @addtogroup stages
* @{
* @file
*/

/****** RouterStage********************************/
class MORPHEUS_EXPORT RouterControlMessageComponentStage
: public mrc::node::LambdaStaticRouterComponent<std::string, std::shared_ptr<ControlMessage>>,
public mrc::pymrc::AutoRegSourceAdapter<std::shared_ptr<ControlMessage>>,
public mrc::pymrc::AutoRegSinkAdapter<std::shared_ptr<ControlMessage>>,
public mrc::pymrc::AutoRegIngressPort<std::shared_ptr<ControlMessage>>,
public mrc::pymrc::AutoRegEgressPort<std::shared_ptr<ControlMessage>>
{
public:
using mrc::node::LambdaStaticRouterComponent<std::string,
std::shared_ptr<ControlMessage>>::LambdaStaticRouterComponent;
};

class MORPHEUS_EXPORT RouterControlMessageRunnableStage
: public mrc::node::LambdaStaticRouterRunnable<std::string, std::shared_ptr<ControlMessage>>,
public mrc::pymrc::AutoRegSourceAdapter<std::shared_ptr<ControlMessage>>,
public mrc::pymrc::AutoRegSinkAdapter<std::shared_ptr<ControlMessage>>,
public mrc::pymrc::AutoRegIngressPort<std::shared_ptr<ControlMessage>>,
public mrc::pymrc::AutoRegEgressPort<std::shared_ptr<ControlMessage>>
{
public:
using mrc::node::LambdaStaticRouterRunnable<std::string,
std::shared_ptr<ControlMessage>>::LambdaStaticRouterRunnable;
};

/****** DeserializationStageInterfaceProxy******************/
/**
* @brief Interface proxy, used to insulate python bindings.
*/
struct MORPHEUS_EXPORT RouterStageInterfaceProxy
{
/**
* @brief Create and initialize a DeserializationStage that emits
* ControlMessage's, and return the result. If `task_type` is not None,
* `task_payload` must also be not None, and vice versa.
*
* @param builder : Pipeline context object reference
* @param name : Name of a stage reference
* @return std::shared_ptr<mrc::segment::Object<DeserializeStage>>
*/
static std::shared_ptr<mrc::segment::Object<RouterControlMessageComponentStage>> init_cm_component(
mrc::segment::Builder& builder,
const std::string& name,
std::vector<std::string> keys,
mrc::pymrc::PyFuncHolder<std::string(std::shared_ptr<ControlMessage>)> key_fn);

/**
* @brief Create and initialize a DeserializationStage that emits
* ControlMessage's, and return the result. If `task_type` is not None,
* `task_payload` must also be not None, and vice versa.
*
* @param builder : Pipeline context object reference
* @param name : Name of a stage reference
* @return std::shared_ptr<mrc::segment::Object<DeserializeStage>>
*/
static std::shared_ptr<mrc::segment::Object<RouterControlMessageRunnableStage>> init_cm_runnable(
mrc::segment::Builder& builder,
const std::string& name,
std::vector<std::string> keys,
mrc::pymrc::PyFuncHolder<std::string(std::shared_ptr<ControlMessage>)> key_fn);
};

/** @} */ // end of group
} // namespace morpheus
3 changes: 1 addition & 2 deletions python/morpheus/morpheus/_lib/messages/module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
* limitations under the License.
*/

#include "pymrc/utilities/object_wrappers.hpp"

#include "morpheus/io/data_loader_registry.hpp"
#include "morpheus/messages/control.hpp"
#include "morpheus/messages/memory/inference_memory.hpp"
Expand All @@ -43,6 +41,7 @@
#include <pybind11/stl.h> // IWYU pragma: keep
#include <pymrc/node.hpp> // IWYU pragma: keep
#include <pymrc/port_builders.hpp>
#include <pymrc/utilities/object_wrappers.hpp>
#include <pymrc/utils.hpp> // for pymrc::import
#include <rxcpp/rx.hpp>

Expand Down
5 changes: 2 additions & 3 deletions python/morpheus/morpheus/_lib/src/stages/file_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

#include "morpheus/stages/file_source.hpp"

#include "mrc/segment/object.hpp"
#include "pymrc/node.hpp"

#include "morpheus/io/deserializers.hpp"
#include "morpheus/objects/file_types.hpp"
#include "morpheus/objects/table_info.hpp"
Expand All @@ -29,10 +26,12 @@
#include <cudf/types.hpp>
#include <glog/logging.h>
#include <mrc/segment/builder.hpp>
#include <mrc/segment/object.hpp>
#include <pybind11/cast.h> // IWYU pragma: keep
#include <pybind11/gil.h>
#include <pybind11/pybind11.h> // for str_attr_accessor
#include <pybind11/pytypes.h> // for pybind11::int_
#include <pymrc/node.hpp>

#include <filesystem>
#include <memory>
Expand Down
5 changes: 2 additions & 3 deletions python/morpheus/morpheus/_lib/src/stages/kafka_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

#include "morpheus/stages/kafka_source.hpp"

#include "mrc/segment/object.hpp"
#include "pymrc/utilities/function_wrappers.hpp" // for PyFuncWrapper

#include "morpheus/messages/meta.hpp"
#include "morpheus/utilities/stage_util.hpp"
#include "morpheus/utilities/string_util.hpp"
Expand All @@ -31,11 +28,13 @@
#include <librdkafka/rdkafkacpp.h>
#include <mrc/runnable/context.hpp>
#include <mrc/segment/builder.hpp>
#include <mrc/segment/object.hpp>
#include <mrc/types.hpp> // for SharedFuture
#include <nlohmann/json.hpp>
#include <pybind11/pybind11.h>
#include <pybind11/pytypes.h>
#include <pymrc/node.hpp>
#include <pymrc/utilities/function_wrappers.hpp> // for PyFuncWrapper

#include <algorithm> // for find, min, transform
#include <chrono>
Expand Down
70 changes: 70 additions & 0 deletions python/morpheus/morpheus/_lib/src/stages/router.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION &
* AFFILIATES. All rights reserved. SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "morpheus/stages/router.hpp"

#include <pybind11/gil.h> // for gil_scoped_acquire
#include <pybind11/pybind11.h> // for cast
#include <pybind11/pytypes.h> // for str, pybind11
#include <pymrc/utilities/function_wrappers.hpp>

#include <type_traits> // for remove_reference
#include <utility> // for move

namespace morpheus {

namespace py = pybind11;

std::shared_ptr<mrc::segment::Object<RouterControlMessageComponentStage>> RouterStageInterfaceProxy::init_cm_component(
mrc::segment::Builder& builder,
const std::string& name,
std::vector<std::string> keys,
mrc::pymrc::PyFuncHolder<std::string(std::shared_ptr<ControlMessage>)> key_fn)
{
auto stage = builder.construct_object<RouterControlMessageComponentStage>(
name, keys, [key_fn = std::move(key_fn)](const std::shared_ptr<ControlMessage>& data) {
py::gil_scoped_acquire gil;

auto ret_key = key_fn(data);
auto ret_key_str = py::str(ret_key);

return std::string(ret_key_str);
});

return stage;
}

std::shared_ptr<mrc::segment::Object<RouterControlMessageRunnableStage>> RouterStageInterfaceProxy::init_cm_runnable(
mrc::segment::Builder& builder,
const std::string& name,
std::vector<std::string> keys,
mrc::pymrc::PyFuncHolder<std::string(std::shared_ptr<ControlMessage>)> key_fn)
{
auto stage = builder.construct_object<RouterControlMessageRunnableStage>(
name, keys, [key_fn = std::move(key_fn)](const std::shared_ptr<ControlMessage>& data) {
py::gil_scoped_acquire gil;

auto ret_key = key_fn(data);
auto ret_key_str = py::str(ret_key);

return std::string(ret_key_str);
});

return stage;
}

} // namespace morpheus
8 changes: 4 additions & 4 deletions python/morpheus/morpheus/_lib/src/stages/write_to_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

#include "morpheus/stages/write_to_file.hpp" // IWYU pragma: associated

#include "mrc/segment/builder.hpp"
#include "mrc/segment/object.hpp"
#include "pymrc/node.hpp"

#include "morpheus/io/serializers.hpp"
#include "morpheus/utilities/string_util.hpp"

#include <mrc/segment/builder.hpp>
#include <mrc/segment/object.hpp>
#include <pymrc/node.hpp>

#include <exception>
#include <memory>
#include <sstream>
Expand Down
9 changes: 9 additions & 0 deletions python/morpheus/morpheus/_lib/stages/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import morpheus._lib.stages
import typing
from morpheus._lib.common import FilterSource
import morpheus._lib.common
import morpheus._lib.messages
import mrc.core.coro
import mrc.core.segment
import os
Expand All @@ -29,6 +30,8 @@ __all__ = [
"PreallocateMessageMetaStage",
"PreprocessFILStage",
"PreprocessNLPStage",
"RouterControlMessageComponentStage",
"RouterControlMessageRunnableStage",
"SerializeStage",
"WriteToFileStage"
]
Expand Down Expand Up @@ -79,6 +82,12 @@ class PreprocessFILStage(mrc.core.segment.SegmentObject):
class PreprocessNLPStage(mrc.core.segment.SegmentObject):
def __init__(self, builder: mrc.core.segment.Builder, name: str, vocab_hash_file: str, sequence_length: int, truncation: bool, do_lower_case: bool, add_special_token: bool, stride: int, column: str) -> None: ...
pass
class RouterControlMessageComponentStage(mrc.core.segment.SegmentObject):
def __init__(self, builder: mrc.core.segment.Builder, name: str, *, router_keys: typing.List[str], key_fn: typing.Callable[[morpheus._lib.messages.ControlMessage], str]) -> None: ...
pass
class RouterControlMessageRunnableStage(mrc.core.segment.SegmentObject):
def __init__(self, builder: mrc.core.segment.Builder, name: str, *, router_keys: typing.List[str], key_fn: typing.Callable[[morpheus._lib.messages.ControlMessage], str]) -> None: ...
pass
class SerializeStage(mrc.core.segment.SegmentObject):
def __init__(self, builder: mrc.core.segment.Builder, name: str, include: typing.List[str], exclude: typing.List[str], fixed_columns: bool = True) -> None: ...
pass
Expand Down
42 changes: 34 additions & 8 deletions python/morpheus/morpheus/_lib/stages/module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,30 @@
#include "morpheus/stages/preallocate.hpp" // for PreallocateStage, PreallocateStageInterfaceProxy
#include "morpheus/stages/preprocess_fil.hpp" // for PreprocessFILStage, PreprocessFILStageInterfaceProxy
#include "morpheus/stages/preprocess_nlp.hpp" // for PreprocessNLPStage, PreprocessNLPStageInterfaceProxy
#include "morpheus/stages/serialize.hpp" // for SerializeStage, SerializeStageInterfaceProxy
#include "morpheus/stages/write_to_file.hpp" // for WriteToFileStage, WriteToFileStageInterfaceProxy
#include "morpheus/utilities/http_server.hpp" // for DefaultMaxPayloadSize
#include "morpheus/version.hpp" // for morpheus_VERSION_MAJOR, morpheus_VERSION_MINOR, morp...
#include "morpheus/stages/router.hpp"
#include "morpheus/stages/serialize.hpp" // for SerializeStage, SerializeStageInterfaceProxy
#include "morpheus/stages/write_to_file.hpp" // for WriteToFileStage, WriteToFileStageInterfaceProxy
#include "morpheus/utilities/http_server.hpp" // for DefaultMaxPayloadSize
#include "morpheus/version.hpp" // for morpheus_VERSION_MAJOR, morpheus_VERSION_MINOR, morp...

#include <mrc/segment/builder.hpp> // for Builder
#include <mrc/segment/object.hpp> // for Object, ObjectProperties
#include <mrc/utils/string_utils.hpp> // for MRC_CONCAT_STR
#include <pybind11/attr.h> // for multiple_inheritance
#include <pybind11/functional.h> // IWYU pragma: keep
#include <pybind11/pybind11.h> // for arg, init, class_, module_, overload_cast, overload_...
#include <pybind11/pytypes.h> // for none, dict, str_attr
#include <pybind11/stl.h> // IWYU pragma: keep
#include <pybind11/stl/filesystem.h> // IWYU pragma: keep
#include <pymrc/utils.hpp> // for from_import, import
#include <rxcpp/rx.hpp> // for trace_activity, decay_t

#include <filesystem> // for path
#include <memory> // for shared_ptr, allocator
#include <sstream> // for operator<<, basic_ostringstream
#include <string> // for string
#include <vector> // for vector
#include <map>
#include <memory> // for shared_ptr, allocator
#include <sstream> // for operator<<, basic_ostringstream
#include <string> // for string
#include <vector> // for vector

namespace morpheus {
namespace py = pybind11;
Expand Down Expand Up @@ -306,6 +310,28 @@ PYBIND11_MODULE(stages, _module)
py::arg("include_index_col") = true,
py::arg("flush") = false);

py::class_<mrc::segment::Object<RouterControlMessageComponentStage>,
mrc::segment::ObjectProperties,
std::shared_ptr<mrc::segment::Object<RouterControlMessageComponentStage>>>(
_module, "RouterControlMessageComponentStage", py::multiple_inheritance())
.def(py::init<>(&RouterStageInterfaceProxy::init_cm_component),
py::arg("builder"),
py::arg("name"),
py::kw_only(),
py::arg("router_keys"),
py::arg("key_fn"));

py::class_<mrc::segment::Object<RouterControlMessageRunnableStage>,
mrc::segment::ObjectProperties,
std::shared_ptr<mrc::segment::Object<RouterControlMessageRunnableStage>>>(
_module, "RouterControlMessageRunnableStage", py::multiple_inheritance())
.def(py::init<>(&RouterStageInterfaceProxy::init_cm_runnable),
py::arg("builder"),
py::arg("name"),
py::kw_only(),
py::arg("router_keys"),
py::arg("key_fn"));

_module.attr("__version__") =
MRC_CONCAT_STR(morpheus_VERSION_MAJOR << "." << morpheus_VERSION_MINOR << "." << morpheus_VERSION_PATCH);
}
Expand Down
Loading
Loading