From 73f249bead4e465545012589a61308a719c3e518 Mon Sep 17 00:00:00 2001 From: Michael Demoret Date: Mon, 21 Oct 2024 11:23:52 -0400 Subject: [PATCH 01/10] Adding implementation of Router Nodes --- .../morpheus/_lib/cmake/libmorpheus.cmake | 1 + .../_lib/include/morpheus/stages/router.hpp | 109 ++++++++ .../_lib/src/modules/data_loader_module.cpp | 4 +- .../morpheus/_lib/src/stages/router.cpp | 66 +++++ .../morpheus/_lib/stages/__init__.pyi | 238 ++++++++++++++++-- .../morpheus/morpheus/_lib/stages/module.cpp | 43 +++- .../morpheus/stages/general/router_stage.py | 119 +++++++++ .../morpheus/stages/test_router_stage_pipe.py | 51 ++++ 8 files changed, 604 insertions(+), 27 deletions(-) create mode 100644 python/morpheus/morpheus/_lib/include/morpheus/stages/router.hpp create mode 100644 python/morpheus/morpheus/_lib/src/stages/router.cpp create mode 100644 python/morpheus/morpheus/stages/general/router_stage.py create mode 100644 tests/morpheus/stages/test_router_stage_pipe.py diff --git a/python/morpheus/morpheus/_lib/cmake/libmorpheus.cmake b/python/morpheus/morpheus/_lib/cmake/libmorpheus.cmake index 7d07b41bbd..878835188e 100644 --- a/python/morpheus/morpheus/_lib/cmake/libmorpheus.cmake +++ b/python/morpheus/morpheus/_lib/cmake/libmorpheus.cmake @@ -61,6 +61,7 @@ add_library(morpheus src/stages/kafka_source.cpp src/stages/preprocess_fil.cpp src/stages/preprocess_nlp.cpp + src/stages/router.cpp src/stages/serialize.cpp src/stages/triton_inference.cpp src/stages/write_to_file.cpp diff --git a/python/morpheus/morpheus/_lib/include/morpheus/stages/router.hpp b/python/morpheus/morpheus/_lib/include/morpheus/stages/router.hpp new file mode 100644 index 0000000000..6df93fc821 --- /dev/null +++ b/python/morpheus/morpheus/_lib/include/morpheus/stages/router.hpp @@ -0,0 +1,109 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "pymrc/utilities/function_wrappers.hpp" + +#include "morpheus/export.h" // for MORPHEUS_EXPORT +#include "morpheus/messages/control.hpp" // for ControlMessage + +#include // for operator<< +#include +#include // for Builder +#include // for Object +#include // for basic_json, json +#include // for object +#include // for PythonNode +#include // for decay_t, trace_activity, from, observable_member + +#include // for shared_ptr, unique_ptr +#include // for string + +namespace morpheus { +/****** Component public implementations *******************/ + +/** + * @addtogroup stages + * @{ + * @file + */ + +/****** RouterStage********************************/ +class MORPHEUS_EXPORT RouterControlMessageComponentStage + : public mrc::node::LambdaStaticRouterComponent>, + public mrc::pymrc::AutoRegSourceAdapter>, + public mrc::pymrc::AutoRegSinkAdapter>, + public mrc::pymrc::AutoRegIngressPort>, + public mrc::pymrc::AutoRegEgressPort> +{ + public: + using mrc::node::LambdaStaticRouterComponent>::LambdaStaticRouterComponent; +}; + +class MORPHEUS_EXPORT RouterControlMessageRunnableStage + : public mrc::node::LambdaStaticRouterRunnable>, + public mrc::pymrc::AutoRegSourceAdapter>, + public mrc::pymrc::AutoRegSinkAdapter>, + public mrc::pymrc::AutoRegIngressPort>, + public mrc::pymrc::AutoRegEgressPort> +{ + public: + using mrc::node::LambdaStaticRouterRunnable>::LambdaStaticRouterRunnable; +}; + +/****** DeserializationStageInterfaceProxy******************/ +/** + * @brief Interface proxy, used to insulate python bindings. + */ +struct MORPHEUS_EXPORT RouterStageInterfaceProxy +{ + /** + * @brief Create and initialize a DeserializationStage that emits + * ControlMessage's, and return the result. If `task_type` is not None, + * `task_payload` must also be not None, and vice versa. + * + * @param builder : Pipeline context object reference + * @param name : Name of a stage reference + * @return std::shared_ptr> + */ + static std::shared_ptr> init_cm_component( + mrc::segment::Builder& builder, + const std::string& name, + std::vector keys, + mrc::pymrc::PyFuncHolder)> key_fn); + + /** + * @brief Create and initialize a DeserializationStage that emits + * ControlMessage's, and return the result. If `task_type` is not None, + * `task_payload` must also be not None, and vice versa. + * + * @param builder : Pipeline context object reference + * @param name : Name of a stage reference + * @return std::shared_ptr> + */ + static std::shared_ptr> init_cm_runnable( + mrc::segment::Builder& builder, + const std::string& name, + std::vector keys, + mrc::pymrc::PyFuncHolder)> key_fn); +}; + +/** @} */ // end of group +} // namespace morpheus diff --git a/python/morpheus/morpheus/_lib/src/modules/data_loader_module.cpp b/python/morpheus/morpheus/_lib/src/modules/data_loader_module.cpp index 2abf1edda8..351a9ea181 100644 --- a/python/morpheus/morpheus/_lib/src/modules/data_loader_module.cpp +++ b/python/morpheus/morpheus/_lib/src/modules/data_loader_module.cpp @@ -102,8 +102,8 @@ void DataLoaderModule::initialize(mrc::segment::IBuilder& builder) return m_data_loader.load(control_message); })); - register_input_port("input", loader_node); - register_output_port("output", loader_node); + builder.register_module_input("input", loader_node); + builder.register_module_output("output", loader_node); } std::string DataLoaderModule::module_type_name() const diff --git a/python/morpheus/morpheus/_lib/src/stages/router.cpp b/python/morpheus/morpheus/_lib/src/stages/router.cpp new file mode 100644 index 0000000000..c6b98d09c6 --- /dev/null +++ b/python/morpheus/morpheus/_lib/src/stages/router.cpp @@ -0,0 +1,66 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & + * AFFILIATES. All rights reserved. SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "morpheus/stages/router.hpp" + +#include "pymrc/utilities/function_wrappers.hpp" + +#include // for cast + +namespace morpheus { + +namespace py = pybind11; + +std::shared_ptr> RouterStageInterfaceProxy::init_cm_component( + mrc::segment::Builder& builder, + const std::string& name, + std::vector keys, + mrc::pymrc::PyFuncHolder)> key_fn) +{ + auto stage = builder.construct_object( + name, keys, [key_fn = std::move(key_fn)](const std::shared_ptr& data) { + py::gil_scoped_acquire gil; + + auto ret_key = key_fn(data); + auto ret_key_str = py::str(ret_key); + + return std::string(ret_key_str); + }); + + return stage; +} + +std::shared_ptr> RouterStageInterfaceProxy::init_cm_runnable( + mrc::segment::Builder& builder, + const std::string& name, + std::vector keys, + mrc::pymrc::PyFuncHolder)> key_fn) +{ + auto stage = builder.construct_object( + name, keys, [key_fn = std::move(key_fn)](const std::shared_ptr& data) { + py::gil_scoped_acquire gil; + + auto ret_key = key_fn(data); + auto ret_key_str = py::str(ret_key); + + return std::string(ret_key_str); + }); + + return stage; +} + +} // namespace morpheus diff --git a/python/morpheus/morpheus/_lib/stages/__init__.pyi b/python/morpheus/morpheus/_lib/stages/__init__.pyi index bb40f3916b..a050f194bc 100644 --- a/python/morpheus/morpheus/_lib/stages/__init__.pyi +++ b/python/morpheus/morpheus/_lib/stages/__init__.pyi @@ -10,6 +10,7 @@ import morpheus._lib.stages import typing from morpheus._lib.common import FilterSource import morpheus._lib.common +import morpheus._lib.messages import mrc.core.coro import mrc.core.segment import os @@ -28,57 +29,262 @@ __all__ = [ "PreallocateMessageMetaStage", "PreprocessFILStage", "PreprocessNLPStage", + "RouterControlMessageComponentStage", + "RouterControlMessageRunnableStage", "SerializeStage", "WriteToFileStage" ] class AddClassificationsStage(mrc.core.segment.SegmentObject): - def __init__(self, builder: mrc.core.segment.Builder, name: str, idx2label: typing.Dict[int, str], threshold: float) -> None: ... + + def __init__(self, builder: mrc.core.segment.Builder, name: str, idx2label: typing.Dict[int, str], + threshold: float) -> None: + ... + pass + + class AddScoresStage(mrc.core.segment.SegmentObject): - def __init__(self, builder: mrc.core.segment.Builder, name: str, idx2label: typing.Dict[int, str]) -> None: ... + + def __init__(self, builder: mrc.core.segment.Builder, name: str, idx2label: typing.Dict[int, str]) -> None: + ... + pass + + class DeserializeStage(mrc.core.segment.SegmentObject): - def __init__(self, builder: mrc.core.segment.Builder, name: str, batch_size: int, ensure_sliceable_index: bool = True, task_type: object = None, task_payload: object = None) -> None: ... + + def __init__(self, + builder: mrc.core.segment.Builder, + name: str, + batch_size: int, + ensure_sliceable_index: bool = True, + task_type: object = None, + task_payload: object = None) -> None: + ... + pass + + class FileSourceStage(mrc.core.segment.SegmentObject): + @typing.overload - def __init__(self, builder: mrc.core.segment.Builder, name: str, filename: os.PathLike, repeat: int, filter_null: bool, filter_null_columns: typing.List[str], parser_kwargs: dict) -> None: ... + def __init__(self, + builder: mrc.core.segment.Builder, + name: str, + filename: os.PathLike, + repeat: int, + filter_null: bool, + filter_null_columns: typing.List[str], + parser_kwargs: dict) -> None: + ... + @typing.overload - def __init__(self, builder: mrc.core.segment.Builder, name: str, filename: str, repeat: int, filter_null: bool, filter_null_columns: typing.List[str], parser_kwargs: dict) -> None: ... + def __init__(self, + builder: mrc.core.segment.Builder, + name: str, + filename: str, + repeat: int, + filter_null: bool, + filter_null_columns: typing.List[str], + parser_kwargs: dict) -> None: + ... + pass + + class FilterDetectionsStage(mrc.core.segment.SegmentObject): - def __init__(self, builder: mrc.core.segment.Builder, name: str, threshold: float, copy: bool, filter_source: morpheus._lib.common.FilterSource, field_name: str = 'probs') -> None: ... + + def __init__(self, + builder: mrc.core.segment.Builder, + name: str, + threshold: float, + copy: bool, + filter_source: morpheus._lib.common.FilterSource, + field_name: str = 'probs') -> None: + ... + pass + + class HttpServerSourceStage(mrc.core.segment.SegmentObject): - def __init__(self, builder: mrc.core.segment.Builder, name: str, bind_address: str = '127.0.0.1', port: int = 8080, endpoint: str = '/message', live_endpoint: str = '/live', ready_endpoint: str = '/ready', method: str = 'POST', live_method: str = 'GET', ready_method: str = 'GET', accept_status: int = 201, sleep_time: float = 0.10000000149011612, queue_timeout: int = 5, max_queue_size: int = 1024, num_server_threads: int = 1, max_payload_size: int = 10485760, request_timeout: int = 30, lines: bool = False, stop_after: int = 0) -> None: ... + + def __init__(self, + builder: mrc.core.segment.Builder, + name: str, + bind_address: str = '127.0.0.1', + port: int = 8080, + endpoint: str = '/message', + live_endpoint: str = '/live', + ready_endpoint: str = '/ready', + method: str = 'POST', + live_method: str = 'GET', + ready_method: str = 'GET', + accept_status: int = 201, + sleep_time: float = 0.10000000149011612, + queue_timeout: int = 5, + max_queue_size: int = 1024, + num_server_threads: int = 1, + max_payload_size: int = 10485760, + request_timeout: int = 30, + lines: bool = False, + stop_after: int = 0) -> None: + ... + pass + + class InferenceClientStage(mrc.core.segment.SegmentObject): - def __init__(self, builder: mrc.core.segment.Builder, name: str, server_url: str, model_name: str, needs_logits: bool, force_convert_inputs: bool, input_mapping: typing.Dict[str, str] = {}, output_mapping: typing.Dict[str, str] = {}) -> None: ... + + def __init__(self, + builder: mrc.core.segment.Builder, + name: str, + server_url: str, + model_name: str, + needs_logits: bool, + force_convert_inputs: bool, + input_mapping: typing.Dict[str, str] = {}, + output_mapping: typing.Dict[str, str] = {}) -> None: + ... + pass + + class KafkaSourceStage(mrc.core.segment.SegmentObject): + @typing.overload - def __init__(self, builder: mrc.core.segment.Builder, name: str, max_batch_size: int, topic: str, batch_timeout_ms: int, config: typing.Dict[str, str], disable_commits: bool = False, disable_pre_filtering: bool = False, stop_after: int = 0, async_commits: bool = True, oauth_callback: typing.Optional[function] = None) -> None: ... + def __init__(self, + builder: mrc.core.segment.Builder, + name: str, + max_batch_size: int, + topic: str, + batch_timeout_ms: int, + config: typing.Dict[str, str], + disable_commits: bool = False, + disable_pre_filtering: bool = False, + stop_after: int = 0, + async_commits: bool = True, + oauth_callback: typing.Optional[function] = None) -> None: + ... + @typing.overload - def __init__(self, builder: mrc.core.segment.Builder, name: str, max_batch_size: int, topics: typing.List[str], batch_timeout_ms: int, config: typing.Dict[str, str], disable_commits: bool = False, disable_pre_filtering: bool = False, stop_after: int = 0, async_commits: bool = True, oauth_callback: typing.Optional[function] = None) -> None: ... + def __init__(self, + builder: mrc.core.segment.Builder, + name: str, + max_batch_size: int, + topics: typing.List[str], + batch_timeout_ms: int, + config: typing.Dict[str, str], + disable_commits: bool = False, + disable_pre_filtering: bool = False, + stop_after: int = 0, + async_commits: bool = True, + oauth_callback: typing.Optional[function] = None) -> None: + ... + pass + + class PreallocateControlMessageStage(mrc.core.segment.SegmentObject): - def __init__(self, builder: mrc.core.segment.Builder, name: str, needed_columns: typing.List[typing.Tuple[str, morpheus._lib.common.TypeId]]) -> None: ... + + def __init__(self, + builder: mrc.core.segment.Builder, + name: str, + needed_columns: typing.List[typing.Tuple[str, morpheus._lib.common.TypeId]]) -> None: + ... + pass + + class PreallocateMessageMetaStage(mrc.core.segment.SegmentObject): - def __init__(self, builder: mrc.core.segment.Builder, name: str, needed_columns: typing.List[typing.Tuple[str, morpheus._lib.common.TypeId]]) -> None: ... + + def __init__(self, + builder: mrc.core.segment.Builder, + name: str, + needed_columns: typing.List[typing.Tuple[str, morpheus._lib.common.TypeId]]) -> None: + ... + pass + + class PreprocessFILStage(mrc.core.segment.SegmentObject): - def __init__(self, builder: mrc.core.segment.Builder, name: str, features: typing.List[str]) -> None: ... + + def __init__(self, builder: mrc.core.segment.Builder, name: str, features: typing.List[str]) -> None: + ... + pass + + class PreprocessNLPStage(mrc.core.segment.SegmentObject): - def __init__(self, builder: mrc.core.segment.Builder, name: str, vocab_hash_file: str, sequence_length: int, truncation: bool, do_lower_case: bool, add_special_token: bool, stride: int, column: str) -> None: ... + + def __init__(self, + builder: mrc.core.segment.Builder, + name: str, + vocab_hash_file: str, + sequence_length: int, + truncation: bool, + do_lower_case: bool, + add_special_token: bool, + stride: int, + column: str) -> None: + ... + + pass + + +class RouterControlMessageComponentStage(mrc.core.segment.SegmentObject): + + def __init__(self, + builder: mrc.core.segment.Builder, + name: str, + *, + router_keys: typing.List[str], + key_fn: typing.Callable[[morpheus._lib.messages.ControlMessage], str]) -> None: + ... + pass + + +class RouterControlMessageRunnableStage(mrc.core.segment.SegmentObject): + + def __init__(self, + builder: mrc.core.segment.Builder, + name: str, + *, + router_keys: typing.List[str], + key_fn: typing.Callable[[morpheus._lib.messages.ControlMessage], str]) -> None: + ... + + pass + + class SerializeStage(mrc.core.segment.SegmentObject): - def __init__(self, builder: mrc.core.segment.Builder, name: str, include: typing.List[str], exclude: typing.List[str], fixed_columns: bool = True) -> None: ... + + def __init__(self, + builder: mrc.core.segment.Builder, + name: str, + include: typing.List[str], + exclude: typing.List[str], + fixed_columns: bool = True) -> None: + ... + pass + + class WriteToFileStage(mrc.core.segment.SegmentObject): - def __init__(self, builder: mrc.core.segment.Builder, name: str, filename: str, mode: str = 'w', file_type: morpheus._lib.common.FileTypes = FileTypes.Auto, include_index_col: bool = True, flush: bool = False) -> None: ... + + def __init__(self, + builder: mrc.core.segment.Builder, + name: str, + filename: str, + mode: str = 'w', + file_type: morpheus._lib.common.FileTypes = FileTypes.Auto, + include_index_col: bool = True, + flush: bool = False) -> None: + ... + pass + + __version__ = '24.10.0' diff --git a/python/morpheus/morpheus/_lib/stages/module.cpp b/python/morpheus/morpheus/_lib/stages/module.cpp index 266455177e..7c8b89d0df 100644 --- a/python/morpheus/morpheus/_lib/stages/module.cpp +++ b/python/morpheus/morpheus/_lib/stages/module.cpp @@ -29,20 +29,23 @@ #include "morpheus/stages/preallocate.hpp" // for PreallocateStage, PreallocateStageInterfaceProxy #include "morpheus/stages/preprocess_fil.hpp" // for PreprocessFILStage, PreprocessFILStageInterfaceProxy #include "morpheus/stages/preprocess_nlp.hpp" // for PreprocessNLPStage, PreprocessNLPStageInterfaceProxy -#include "morpheus/stages/serialize.hpp" // for SerializeStage, SerializeStageInterfaceProxy -#include "morpheus/stages/write_to_file.hpp" // for WriteToFileStage, WriteToFileStageInterfaceProxy -#include "morpheus/utilities/http_server.hpp" // for DefaultMaxPayloadSize -#include "morpheus/version.hpp" // for morpheus_VERSION_MAJOR, morpheus_VERSION_MINOR, morp... +#include "morpheus/stages/router.hpp" +#include "morpheus/stages/serialize.hpp" // for SerializeStage, SerializeStageInterfaceProxy +#include "morpheus/stages/write_to_file.hpp" // for WriteToFileStage, WriteToFileStageInterfaceProxy +#include "morpheus/utilities/http_server.hpp" // for DefaultMaxPayloadSize +#include "morpheus/version.hpp" // for morpheus_VERSION_MAJOR, morpheus_VERSION_MINOR, morp... #include // for Builder #include // for Object, ObjectProperties #include // for MRC_CONCAT_STR #include // for multiple_inheritance -#include // for arg, init, class_, module_, overload_cast, overload_... -#include // for none, dict, str_attr -#include // IWYU pragma: keep -#include // for from_import, import -#include // for trace_activity, decay_t +#include +#include // for arg, init, class_, module_, overload_cast, overload_... +#include // for none, dict, str_attr +#include +#include // IWYU pragma: keep +#include // for from_import, import +#include // for trace_activity, decay_t #include // for path #include // for shared_ptr, allocator @@ -279,6 +282,28 @@ PYBIND11_MODULE(stages, _module) py::arg("include_index_col") = true, py::arg("flush") = false); + py::class_, + mrc::segment::ObjectProperties, + std::shared_ptr>>( + _module, "RouterControlMessageComponentStage", py::multiple_inheritance()) + .def(py::init<>(&RouterStageInterfaceProxy::init_cm_component), + py::arg("builder"), + py::arg("name"), + py::kw_only(), + py::arg("router_keys"), + py::arg("key_fn")); + + py::class_, + mrc::segment::ObjectProperties, + std::shared_ptr>>( + _module, "RouterControlMessageRunnableStage", py::multiple_inheritance()) + .def(py::init<>(&RouterStageInterfaceProxy::init_cm_runnable), + py::arg("builder"), + py::arg("name"), + py::kw_only(), + py::arg("router_keys"), + py::arg("key_fn")); + _module.attr("__version__") = MRC_CONCAT_STR(morpheus_VERSION_MAJOR << "." << morpheus_VERSION_MINOR << "." << morpheus_VERSION_PATCH); } diff --git a/python/morpheus/morpheus/stages/general/router_stage.py b/python/morpheus/morpheus/stages/general/router_stage.py new file mode 100644 index 0000000000..4876f85a5c --- /dev/null +++ b/python/morpheus/morpheus/stages/general/router_stage.py @@ -0,0 +1,119 @@ +# Copyright (c) 2021-2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import typing + +import mrc +import typing_utils + +import morpheus.pipeline as _pipeline # pylint: disable=cyclic-import +from morpheus.cli.register_stage import register_stage +from morpheus.config import Config +from morpheus.messages import ControlMessage +from morpheus.pipeline.execution_mode_mixins import GpuAndCpuMixin +from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin + +logger = logging.getLogger(__name__) + + +@register_stage("router") +class RouterStage(GpuAndCpuMixin, PassThruTypeMixin, _pipeline.Stage): + """ + Buffer results. + + The input messages are buffered by this stage class for faster access to downstream stages. Allows + upstream stages to run faster than downstream stages. + + Parameters + ---------- + c : `morpheus.config.Config` + Pipeline configuration instance. + + """ + + def __init__(self, + c: Config, + *, + keys: list[str], + key_fn: typing.Callable[[ControlMessage], str], + is_runnable: bool = False) -> None: + super().__init__(c) + + self._keys = keys + self._key_fn = key_fn + self._is_runnable = is_runnable + + self._router = None + + self._create_ports(1, len(keys)) + + @property + def name(self) -> str: + return "router" + + def supports_cpp_node(self): + return True + + def _pre_compute_schema(self, schema: _pipeline.StageSchema): + # Pre-flight check to verify that the input type is one of the accepted types + super()._pre_compute_schema(schema) + input_type = schema.input_type + if (not typing_utils.issubtype(input_type, ControlMessage)): + raise RuntimeError((f"The {self.name} stage cannot handle input of {input_type}. " + f"Accepted input types: {(ControlMessage,)}")) + + def compute_schema(self, schema: _pipeline.StageSchema): + + # Get the input type + input_type = schema.input_type + + for port_idx in range(len(self._keys)): + schema.output_schemas[port_idx].set_type(input_type) + + def _build(self, builder: mrc.Builder, input_nodes: list[mrc.SegmentObject]) -> list[mrc.SegmentObject]: + + def _key_fn_wrapper(msg) -> str: + return self._key_fn(msg) + + assert len(input_nodes) == 1, "Router stage should have exactly one input node" + + if (self._build_cpp_node()): + import morpheus._lib.stages as _stages + + if (self._is_runnable): + self._router = _stages.RouterControlMessageRunnableStage(builder, + self.unique_name, + router_keys=self._keys, + key_fn=_key_fn_wrapper) + else: + self._router = _stages.RouterControlMessageComponentStage(builder, + self.unique_name, + router_keys=self._keys, + key_fn=_key_fn_wrapper) + else: + from mrc.core.node import Router + from mrc.core.node import RouterComponent + + if (self._is_runnable): + self._router = Router(builder, self.unique_name, router_keys=self._keys, key_fn=_key_fn_wrapper) + else: + self._router = RouterComponent(builder, + self.unique_name, + router_keys=self._keys, + key_fn=_key_fn_wrapper) + + builder.make_edge(input_nodes[0], self._router) + + return [self._router.get_child(k) for k in self._keys] diff --git a/tests/morpheus/stages/test_router_stage_pipe.py b/tests/morpheus/stages/test_router_stage_pipe.py new file mode 100644 index 0000000000..651e508043 --- /dev/null +++ b/tests/morpheus/stages/test_router_stage_pipe.py @@ -0,0 +1,51 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from morpheus.messages import ControlMessage +from morpheus.pipeline import Pipeline +from morpheus.stages.general.router_stage import RouterStage +from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage +from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage +from morpheus.stages.preprocess.deserialize_stage import DeserializeStage + + +def test_router_stage_pipe(config, filter_probs_df): + + keys = ["odd", "even"] + + count = 0 + + def determine_route_fn(x: ControlMessage): + nonlocal count + count += 1 + return keys[count % len(keys)] + + pipe = Pipeline(config) + source = pipe.add_stage(InMemorySourceStage(config, dataframes=[filter_probs_df], repeat=5)) + deserialize = pipe.add_stage(DeserializeStage(config)) + router_stage = pipe.add_stage(RouterStage(config, keys=keys, key_fn=determine_route_fn)) + sink1 = pipe.add_stage(InMemorySinkStage(config)) + sink2 = pipe.add_stage(InMemorySinkStage(config)) + + # Connect the stages + pipe.add_edge(source, deserialize) + pipe.add_edge(deserialize, router_stage) + pipe.add_edge(router_stage.output_ports[0], sink1) + pipe.add_edge(router_stage.output_ports[1], sink2) + + pipe.run() + + assert len(sink1.get_messages()) == 2, "Expected 2 messages in sink1" + assert len(sink2.get_messages()) == 3, "Expected 3 messages in sink2" From b5db36ed99c3f26382e633059d75c88e2182b365 Mon Sep 17 00:00:00 2001 From: Michael Demoret Date: Wed, 23 Oct 2024 15:26:32 -0400 Subject: [PATCH 02/10] Adding concurrency support to prevent backpressure --- .../morpheus/_lib/stages/__init__.pyi | 233 ++---------------- .../morpheus/stages/general/router_stage.py | 5 +- 2 files changed, 22 insertions(+), 216 deletions(-) diff --git a/python/morpheus/morpheus/_lib/stages/__init__.pyi b/python/morpheus/morpheus/_lib/stages/__init__.pyi index a050f194bc..5f19f3869b 100644 --- a/python/morpheus/morpheus/_lib/stages/__init__.pyi +++ b/python/morpheus/morpheus/_lib/stages/__init__.pyi @@ -37,254 +37,57 @@ __all__ = [ class AddClassificationsStage(mrc.core.segment.SegmentObject): - - def __init__(self, builder: mrc.core.segment.Builder, name: str, idx2label: typing.Dict[int, str], - threshold: float) -> None: - ... - + def __init__(self, builder: mrc.core.segment.Builder, name: str, idx2label: typing.Dict[int, str], threshold: float) -> None: ... pass - - class AddScoresStage(mrc.core.segment.SegmentObject): - - def __init__(self, builder: mrc.core.segment.Builder, name: str, idx2label: typing.Dict[int, str]) -> None: - ... - + def __init__(self, builder: mrc.core.segment.Builder, name: str, idx2label: typing.Dict[int, str]) -> None: ... pass - - class DeserializeStage(mrc.core.segment.SegmentObject): - - def __init__(self, - builder: mrc.core.segment.Builder, - name: str, - batch_size: int, - ensure_sliceable_index: bool = True, - task_type: object = None, - task_payload: object = None) -> None: - ... - + def __init__(self, builder: mrc.core.segment.Builder, name: str, batch_size: int, ensure_sliceable_index: bool = True, task_type: object = None, task_payload: object = None) -> None: ... pass - - class FileSourceStage(mrc.core.segment.SegmentObject): - @typing.overload - def __init__(self, - builder: mrc.core.segment.Builder, - name: str, - filename: os.PathLike, - repeat: int, - filter_null: bool, - filter_null_columns: typing.List[str], - parser_kwargs: dict) -> None: - ... - + def __init__(self, builder: mrc.core.segment.Builder, name: str, filename: os.PathLike, repeat: int, filter_null: bool, filter_null_columns: typing.List[str], parser_kwargs: dict) -> None: ... @typing.overload - def __init__(self, - builder: mrc.core.segment.Builder, - name: str, - filename: str, - repeat: int, - filter_null: bool, - filter_null_columns: typing.List[str], - parser_kwargs: dict) -> None: - ... - + def __init__(self, builder: mrc.core.segment.Builder, name: str, filename: str, repeat: int, filter_null: bool, filter_null_columns: typing.List[str], parser_kwargs: dict) -> None: ... pass - - class FilterDetectionsStage(mrc.core.segment.SegmentObject): - - def __init__(self, - builder: mrc.core.segment.Builder, - name: str, - threshold: float, - copy: bool, - filter_source: morpheus._lib.common.FilterSource, - field_name: str = 'probs') -> None: - ... - + def __init__(self, builder: mrc.core.segment.Builder, name: str, threshold: float, copy: bool, filter_source: morpheus._lib.common.FilterSource, field_name: str = 'probs') -> None: ... pass - - class HttpServerSourceStage(mrc.core.segment.SegmentObject): - - def __init__(self, - builder: mrc.core.segment.Builder, - name: str, - bind_address: str = '127.0.0.1', - port: int = 8080, - endpoint: str = '/message', - live_endpoint: str = '/live', - ready_endpoint: str = '/ready', - method: str = 'POST', - live_method: str = 'GET', - ready_method: str = 'GET', - accept_status: int = 201, - sleep_time: float = 0.10000000149011612, - queue_timeout: int = 5, - max_queue_size: int = 1024, - num_server_threads: int = 1, - max_payload_size: int = 10485760, - request_timeout: int = 30, - lines: bool = False, - stop_after: int = 0) -> None: - ... - + def __init__(self, builder: mrc.core.segment.Builder, name: str, bind_address: str = '127.0.0.1', port: int = 8080, endpoint: str = '/message', live_endpoint: str = '/live', ready_endpoint: str = '/ready', method: str = 'POST', live_method: str = 'GET', ready_method: str = 'GET', accept_status: int = 201, sleep_time: float = 0.10000000149011612, queue_timeout: int = 5, max_queue_size: int = 1024, num_server_threads: int = 1, max_payload_size: int = 10485760, request_timeout: int = 30, lines: bool = False, stop_after: int = 0) -> None: ... pass - - class InferenceClientStage(mrc.core.segment.SegmentObject): - - def __init__(self, - builder: mrc.core.segment.Builder, - name: str, - server_url: str, - model_name: str, - needs_logits: bool, - force_convert_inputs: bool, - input_mapping: typing.Dict[str, str] = {}, - output_mapping: typing.Dict[str, str] = {}) -> None: - ... - + def __init__(self, builder: mrc.core.segment.Builder, name: str, server_url: str, model_name: str, needs_logits: bool, force_convert_inputs: bool, input_mapping: typing.Dict[str, str] = {}, output_mapping: typing.Dict[str, str] = {}) -> None: ... pass - - class KafkaSourceStage(mrc.core.segment.SegmentObject): - @typing.overload - def __init__(self, - builder: mrc.core.segment.Builder, - name: str, - max_batch_size: int, - topic: str, - batch_timeout_ms: int, - config: typing.Dict[str, str], - disable_commits: bool = False, - disable_pre_filtering: bool = False, - stop_after: int = 0, - async_commits: bool = True, - oauth_callback: typing.Optional[function] = None) -> None: - ... - + def __init__(self, builder: mrc.core.segment.Builder, name: str, max_batch_size: int, topic: str, batch_timeout_ms: int, config: typing.Dict[str, str], disable_commits: bool = False, disable_pre_filtering: bool = False, stop_after: int = 0, async_commits: bool = True, oauth_callback: typing.Optional[function] = None) -> None: ... @typing.overload - def __init__(self, - builder: mrc.core.segment.Builder, - name: str, - max_batch_size: int, - topics: typing.List[str], - batch_timeout_ms: int, - config: typing.Dict[str, str], - disable_commits: bool = False, - disable_pre_filtering: bool = False, - stop_after: int = 0, - async_commits: bool = True, - oauth_callback: typing.Optional[function] = None) -> None: - ... - + def __init__(self, builder: mrc.core.segment.Builder, name: str, max_batch_size: int, topics: typing.List[str], batch_timeout_ms: int, config: typing.Dict[str, str], disable_commits: bool = False, disable_pre_filtering: bool = False, stop_after: int = 0, async_commits: bool = True, oauth_callback: typing.Optional[function] = None) -> None: ... pass - - class PreallocateControlMessageStage(mrc.core.segment.SegmentObject): - - def __init__(self, - builder: mrc.core.segment.Builder, - name: str, - needed_columns: typing.List[typing.Tuple[str, morpheus._lib.common.TypeId]]) -> None: - ... - + def __init__(self, builder: mrc.core.segment.Builder, name: str, needed_columns: typing.List[typing.Tuple[str, morpheus._lib.common.TypeId]]) -> None: ... pass - - class PreallocateMessageMetaStage(mrc.core.segment.SegmentObject): - - def __init__(self, - builder: mrc.core.segment.Builder, - name: str, - needed_columns: typing.List[typing.Tuple[str, morpheus._lib.common.TypeId]]) -> None: - ... - + def __init__(self, builder: mrc.core.segment.Builder, name: str, needed_columns: typing.List[typing.Tuple[str, morpheus._lib.common.TypeId]]) -> None: ... pass - - class PreprocessFILStage(mrc.core.segment.SegmentObject): - - def __init__(self, builder: mrc.core.segment.Builder, name: str, features: typing.List[str]) -> None: - ... - + def __init__(self, builder: mrc.core.segment.Builder, name: str, features: typing.List[str]) -> None: ... pass - - class PreprocessNLPStage(mrc.core.segment.SegmentObject): - - def __init__(self, - builder: mrc.core.segment.Builder, - name: str, - vocab_hash_file: str, - sequence_length: int, - truncation: bool, - do_lower_case: bool, - add_special_token: bool, - stride: int, - column: str) -> None: - ... - + def __init__(self, builder: mrc.core.segment.Builder, name: str, vocab_hash_file: str, sequence_length: int, truncation: bool, do_lower_case: bool, add_special_token: bool, stride: int, column: str) -> None: ... pass - - class RouterControlMessageComponentStage(mrc.core.segment.SegmentObject): - - def __init__(self, - builder: mrc.core.segment.Builder, - name: str, - *, - router_keys: typing.List[str], - key_fn: typing.Callable[[morpheus._lib.messages.ControlMessage], str]) -> None: - ... - + def __init__(self, builder: mrc.core.segment.Builder, name: str, *, router_keys: typing.List[str], key_fn: typing.Callable[[morpheus._lib.messages.ControlMessage], str]) -> None: ... pass - - class RouterControlMessageRunnableStage(mrc.core.segment.SegmentObject): - - def __init__(self, - builder: mrc.core.segment.Builder, - name: str, - *, - router_keys: typing.List[str], - key_fn: typing.Callable[[morpheus._lib.messages.ControlMessage], str]) -> None: - ... - + def __init__(self, builder: mrc.core.segment.Builder, name: str, *, router_keys: typing.List[str], key_fn: typing.Callable[[morpheus._lib.messages.ControlMessage], str]) -> None: ... pass - - class SerializeStage(mrc.core.segment.SegmentObject): - - def __init__(self, - builder: mrc.core.segment.Builder, - name: str, - include: typing.List[str], - exclude: typing.List[str], - fixed_columns: bool = True) -> None: - ... - + def __init__(self, builder: mrc.core.segment.Builder, name: str, include: typing.List[str], exclude: typing.List[str], fixed_columns: bool = True) -> None: ... pass - - class WriteToFileStage(mrc.core.segment.SegmentObject): - - def __init__(self, - builder: mrc.core.segment.Builder, - name: str, - filename: str, - mode: str = 'w', - file_type: morpheus._lib.common.FileTypes = FileTypes.Auto, - include_index_col: bool = True, - flush: bool = False) -> None: - ... - + def __init__(self, builder: mrc.core.segment.Builder, name: str, filename: str, mode: str = 'w', file_type: morpheus._lib.common.FileTypes = FileTypes.Auto, include_index_col: bool = True, flush: bool = False) -> None: ... pass - - __version__ = '24.10.0' diff --git a/python/morpheus/morpheus/stages/general/router_stage.py b/python/morpheus/morpheus/stages/general/router_stage.py index 4876f85a5c..d64100a98b 100644 --- a/python/morpheus/morpheus/stages/general/router_stage.py +++ b/python/morpheus/morpheus/stages/general/router_stage.py @@ -55,7 +55,7 @@ def __init__(self, self._key_fn = key_fn self._is_runnable = is_runnable - self._router = None + self._router: mrc.core.SegmentObject | None = None self._create_ports(1, len(keys)) @@ -114,6 +114,9 @@ def _key_fn_wrapper(msg) -> str: router_keys=self._keys, key_fn=_key_fn_wrapper) + if (self._is_runnable): + self._router.launch_options.engines_per_pe = 10 + builder.make_edge(input_nodes[0], self._router) return [self._router.get_child(k) for k in self._keys] From 55b0a1df37f56b6b1abd9f9233ba45a8deb4641f Mon Sep 17 00:00:00 2001 From: Michael Demoret Date: Wed, 23 Oct 2024 15:27:50 -0400 Subject: [PATCH 03/10] Adding backpressure test --- .../morpheus/stages/test_router_stage_pipe.py | 79 ++++++++++++++++++- 1 file changed, 77 insertions(+), 2 deletions(-) diff --git a/tests/morpheus/stages/test_router_stage_pipe.py b/tests/morpheus/stages/test_router_stage_pipe.py index 651e508043..a807453566 100644 --- a/tests/morpheus/stages/test_router_stage_pipe.py +++ b/tests/morpheus/stages/test_router_stage_pipe.py @@ -13,15 +13,24 @@ # See the License for the specific language governing permissions and # limitations under the License. +import threading + +import pytest + +from morpheus.config import ExecutionMode from morpheus.messages import ControlMessage +from morpheus.messages import MessageMeta from morpheus.pipeline import Pipeline +from morpheus.pipeline.stage_decorator import stage from morpheus.stages.general.router_stage import RouterStage +from morpheus.stages.input.in_memory_data_generation_stage import InMemoryDataGenStage from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage from morpheus.stages.preprocess.deserialize_stage import DeserializeStage -def test_router_stage_pipe(config, filter_probs_df): +@pytest.mark.parametrize("is_runnable", [True, False]) +def test_router_stage_pipe(config, filter_probs_df, is_runnable: bool): keys = ["odd", "even"] @@ -35,7 +44,7 @@ def determine_route_fn(x: ControlMessage): pipe = Pipeline(config) source = pipe.add_stage(InMemorySourceStage(config, dataframes=[filter_probs_df], repeat=5)) deserialize = pipe.add_stage(DeserializeStage(config)) - router_stage = pipe.add_stage(RouterStage(config, keys=keys, key_fn=determine_route_fn)) + router_stage = pipe.add_stage(RouterStage(config, keys=keys, key_fn=determine_route_fn, is_runnable=is_runnable)) sink1 = pipe.add_stage(InMemorySinkStage(config)) sink2 = pipe.add_stage(InMemorySinkStage(config)) @@ -49,3 +58,69 @@ def determine_route_fn(x: ControlMessage): assert len(sink1.get_messages()) == 2, "Expected 2 messages in sink1" assert len(sink2.get_messages()) == 3, "Expected 3 messages in sink2" + + +def test_router_stage_backpressure_pipe(config, filter_probs_df): + + # This test simulates a slow single consumer by blocking the second output port of the router stage The router stage + # will buffer the messages and block the source stage from sending more data When run as a component, less threads + # will be used but this system will eventually block. With a runnable, this should be able to run to completion + + # Set the edge buffer size to trigger blocking + config.edge_buffer_size = 4 + + keys = ["odd", "even"] + + count = 0 + + release_event = threading.Event() + + def source_fn(): + + for i in range(20): + cm = ControlMessage() + cm.set_metadata("index", i) + cm.payload(MessageMeta(filter_probs_df)) + yield cm + + # Release the event to allow the pipeline to continue + release_event.set() + + # Send more data + for i in range(20, 30): + cm = ControlMessage() + cm.set_metadata("index", i) + cm.payload(MessageMeta(filter_probs_df)) + yield cm + + def determine_route_fn(x: ControlMessage): + nonlocal count + count += 1 + return keys[count % len(keys)] + + pipe = Pipeline(config) + + source = pipe.add_stage(InMemoryDataGenStage(config, data_source=source_fn, output_data_type=ControlMessage)) + router_stage = pipe.add_stage(RouterStage(config, keys=keys, key_fn=determine_route_fn, is_runnable=True)) + sink1 = pipe.add_stage(InMemorySinkStage(config)) + sink2 = pipe.add_stage(InMemorySinkStage(config)) + + @stage(execution_modes=[ExecutionMode.CPU, ExecutionMode.GPU]) + def blocking_stage(data: ControlMessage) -> ControlMessage: + + release_event.wait() + + return data + + blocking = pipe.add_stage(blocking_stage(config)) + + # Connect the stages + pipe.add_edge(source, router_stage) + pipe.add_edge(router_stage.output_ports[0], sink1) + pipe.add_edge(router_stage.output_ports[1], blocking) + pipe.add_edge(blocking, sink2) + + pipe.run() + + assert len(sink1.get_messages()) == 15, "Expected 15 messages in sink1" + assert len(sink2.get_messages()) == 15, "Expected 15 messages in sink2" From 5c7813fa67e6b427802a695b221541610c32d875 Mon Sep 17 00:00:00 2001 From: Michael Demoret Date: Wed, 23 Oct 2024 20:38:38 -0400 Subject: [PATCH 04/10] Style cleanup --- external/utilities | 2 +- .../morpheus/morpheus/_lib/stages/module.cpp | 23 ++++++++++--------- .../morpheus/stages/test_router_stage_pipe.py | 4 ++-- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/external/utilities b/external/utilities index 85f8f7af2e..87b33dd0b7 160000 --- a/external/utilities +++ b/external/utilities @@ -1 +1 @@ -Subproject commit 85f8f7af2e8d9bc7bde978cd40c40297b1116957 +Subproject commit 87b33dd0b7fd3d7460742bc5ad13d77e0d722c3c diff --git a/python/morpheus/morpheus/_lib/stages/module.cpp b/python/morpheus/morpheus/_lib/stages/module.cpp index 7c8b89d0df..c0dd03c328 100644 --- a/python/morpheus/morpheus/_lib/stages/module.cpp +++ b/python/morpheus/morpheus/_lib/stages/module.cpp @@ -39,19 +39,20 @@ #include // for Object, ObjectProperties #include // for MRC_CONCAT_STR #include // for multiple_inheritance -#include -#include // for arg, init, class_, module_, overload_cast, overload_... -#include // for none, dict, str_attr -#include -#include // IWYU pragma: keep -#include // for from_import, import -#include // for trace_activity, decay_t +#include // IWYU pragma: keep +#include // for arg, init, class_, module_, overload_cast, overload_... +#include // for none, dict, str_attr +#include // IWYU pragma: keep +#include // IWYU pragma: keep +#include // for from_import, import +#include // for trace_activity, decay_t #include // for path -#include // for shared_ptr, allocator -#include // for operator<<, basic_ostringstream -#include // for string -#include // for vector +#include +#include // for shared_ptr, allocator +#include // for operator<<, basic_ostringstream +#include // for string +#include // for vector namespace morpheus { namespace py = pybind11; diff --git a/tests/morpheus/stages/test_router_stage_pipe.py b/tests/morpheus/stages/test_router_stage_pipe.py index a807453566..90705a2859 100644 --- a/tests/morpheus/stages/test_router_stage_pipe.py +++ b/tests/morpheus/stages/test_router_stage_pipe.py @@ -36,7 +36,7 @@ def test_router_stage_pipe(config, filter_probs_df, is_runnable: bool): count = 0 - def determine_route_fn(x: ControlMessage): + def determine_route_fn(_: ControlMessage): nonlocal count count += 1 return keys[count % len(keys)] @@ -93,7 +93,7 @@ def source_fn(): cm.payload(MessageMeta(filter_probs_df)) yield cm - def determine_route_fn(x: ControlMessage): + def determine_route_fn(_: ControlMessage): nonlocal count count += 1 return keys[count % len(keys)] From 62cef9d88636fad5715cac1c6d935226b0374c03 Mon Sep 17 00:00:00 2001 From: Michael Demoret Date: Thu, 24 Oct 2024 17:55:40 -0400 Subject: [PATCH 05/10] Style cleanup --- python/morpheus/morpheus/stages/general/router_stage.py | 5 +++-- tests/morpheus/stages/test_router_stage_pipe.py | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/python/morpheus/morpheus/stages/general/router_stage.py b/python/morpheus/morpheus/stages/general/router_stage.py index d64100a98b..f7ae7f3d36 100644 --- a/python/morpheus/morpheus/stages/general/router_stage.py +++ b/python/morpheus/morpheus/stages/general/router_stage.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2024, NVIDIA CORPORATION. +# Copyright (c) 2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ import typing import mrc +import mrc.core.segment import typing_utils import morpheus.pipeline as _pipeline # pylint: disable=cyclic-import @@ -55,7 +56,7 @@ def __init__(self, self._key_fn = key_fn self._is_runnable = is_runnable - self._router: mrc.core.SegmentObject | None = None + self._router: mrc.core.segment.SegmentObject | None = None self._create_ports(1, len(keys)) diff --git a/tests/morpheus/stages/test_router_stage_pipe.py b/tests/morpheus/stages/test_router_stage_pipe.py index 90705a2859..8d840f203e 100644 --- a/tests/morpheus/stages/test_router_stage_pipe.py +++ b/tests/morpheus/stages/test_router_stage_pipe.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2023-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); From cb3d95f2f5ed995b0ab5823ef00b95e8702e4885 Mon Sep 17 00:00:00 2001 From: Michael Demoret Date: Fri, 25 Oct 2024 12:20:40 -0400 Subject: [PATCH 06/10] IWYU cleanup --- ci/iwyu/mappings.imp | 2 +- .../_lib/include/morpheus/stages/router.hpp | 13 ++++++------- python/morpheus/morpheus/_lib/messages/module.cpp | 3 +-- .../morpheus/_lib/src/stages/file_source.cpp | 5 ++--- .../morpheus/_lib/src/stages/kafka_source.cpp | 5 ++--- python/morpheus/morpheus/_lib/src/stages/router.cpp | 8 ++++++-- .../morpheus/_lib/src/stages/write_to_file.cpp | 8 ++++---- .../_lib/llm/src/py_llm_lambda_node.cpp | 2 +- 8 files changed, 23 insertions(+), 23 deletions(-) diff --git a/ci/iwyu/mappings.imp b/ci/iwyu/mappings.imp index 72ec52aa20..2d4875e21b 100644 --- a/ci/iwyu/mappings.imp +++ b/ci/iwyu/mappings.imp @@ -48,7 +48,7 @@ { "include": [ "", private, "", "public" ] }, # pybind11 -{ "include": [ "", "private", "", "public" ] }, +{ "include": [ "@", "private", "", "public" ] }, { "include": [ "", "private", "", "public" ] }, # rxcpp diff --git a/python/morpheus/morpheus/_lib/include/morpheus/stages/router.hpp b/python/morpheus/morpheus/_lib/include/morpheus/stages/router.hpp index 6df93fc821..d450c9235a 100644 --- a/python/morpheus/morpheus/_lib/include/morpheus/stages/router.hpp +++ b/python/morpheus/morpheus/_lib/include/morpheus/stages/router.hpp @@ -17,22 +17,21 @@ #pragma once -#include "pymrc/utilities/function_wrappers.hpp" - #include "morpheus/export.h" // for MORPHEUS_EXPORT #include "morpheus/messages/control.hpp" // for ControlMessage -#include // for operator<< #include #include // for Builder #include // for Object -#include // for basic_json, json -#include // for object -#include // for PythonNode -#include // for decay_t, trace_activity, from, observable_member +#include // for AutoRegSinkAdapter, AutoRegSourceAdapter +#include // IWYU pragma: keep +#include // for AutoRegEgressPort, AutoRegIngressPort +#include +#include // for decay_t, trace_activity, from, observable_member #include // for shared_ptr, unique_ptr #include // for string +#include // for vector namespace morpheus { /****** Component public implementations *******************/ diff --git a/python/morpheus/morpheus/_lib/messages/module.cpp b/python/morpheus/morpheus/_lib/messages/module.cpp index 961c3187e9..fed31a6d11 100644 --- a/python/morpheus/morpheus/_lib/messages/module.cpp +++ b/python/morpheus/morpheus/_lib/messages/module.cpp @@ -15,8 +15,6 @@ * limitations under the License. */ -#include "pymrc/utilities/object_wrappers.hpp" - #include "morpheus/io/data_loader_registry.hpp" #include "morpheus/messages/control.hpp" #include "morpheus/messages/memory/inference_memory.hpp" @@ -43,6 +41,7 @@ #include // IWYU pragma: keep #include // IWYU pragma: keep #include +#include #include // for pymrc::import #include diff --git a/python/morpheus/morpheus/_lib/src/stages/file_source.cpp b/python/morpheus/morpheus/_lib/src/stages/file_source.cpp index c3dce33693..82532b9afe 100644 --- a/python/morpheus/morpheus/_lib/src/stages/file_source.cpp +++ b/python/morpheus/morpheus/_lib/src/stages/file_source.cpp @@ -17,9 +17,6 @@ #include "morpheus/stages/file_source.hpp" -#include "mrc/segment/object.hpp" -#include "pymrc/node.hpp" - #include "morpheus/io/deserializers.hpp" #include "morpheus/objects/file_types.hpp" #include "morpheus/objects/table_info.hpp" @@ -29,10 +26,12 @@ #include #include #include +#include #include // IWYU pragma: keep #include #include // for str_attr_accessor #include // for pybind11::int_ +#include #include #include diff --git a/python/morpheus/morpheus/_lib/src/stages/kafka_source.cpp b/python/morpheus/morpheus/_lib/src/stages/kafka_source.cpp index 1bb6ea369d..2aa02a2598 100644 --- a/python/morpheus/morpheus/_lib/src/stages/kafka_source.cpp +++ b/python/morpheus/morpheus/_lib/src/stages/kafka_source.cpp @@ -17,9 +17,6 @@ #include "morpheus/stages/kafka_source.hpp" -#include "mrc/segment/object.hpp" -#include "pymrc/utilities/function_wrappers.hpp" // for PyFuncWrapper - #include "morpheus/messages/meta.hpp" #include "morpheus/utilities/stage_util.hpp" #include "morpheus/utilities/string_util.hpp" @@ -31,11 +28,13 @@ #include #include #include +#include #include // for SharedFuture #include #include #include #include +#include // for PyFuncWrapper #include // for find, min, transform #include diff --git a/python/morpheus/morpheus/_lib/src/stages/router.cpp b/python/morpheus/morpheus/_lib/src/stages/router.cpp index c6b98d09c6..c9f62235b5 100644 --- a/python/morpheus/morpheus/_lib/src/stages/router.cpp +++ b/python/morpheus/morpheus/_lib/src/stages/router.cpp @@ -17,9 +17,13 @@ #include "morpheus/stages/router.hpp" -#include "pymrc/utilities/function_wrappers.hpp" - +#include // for gil_scoped_acquire #include // for cast +#include // for str, pybind11 +#include + +#include // for remove_reference +#include // for move namespace morpheus { diff --git a/python/morpheus/morpheus/_lib/src/stages/write_to_file.cpp b/python/morpheus/morpheus/_lib/src/stages/write_to_file.cpp index 327c09df8b..75157cf174 100644 --- a/python/morpheus/morpheus/_lib/src/stages/write_to_file.cpp +++ b/python/morpheus/morpheus/_lib/src/stages/write_to_file.cpp @@ -17,13 +17,13 @@ #include "morpheus/stages/write_to_file.hpp" // IWYU pragma: associated -#include "mrc/segment/builder.hpp" -#include "mrc/segment/object.hpp" -#include "pymrc/node.hpp" - #include "morpheus/io/serializers.hpp" #include "morpheus/utilities/string_util.hpp" +#include +#include +#include + #include #include #include diff --git a/python/morpheus_llm/morpheus_llm/_lib/llm/src/py_llm_lambda_node.cpp b/python/morpheus_llm/morpheus_llm/_lib/llm/src/py_llm_lambda_node.cpp index 2f4ce4b10f..a06d14b005 100644 --- a/python/morpheus_llm/morpheus_llm/_lib/llm/src/py_llm_lambda_node.cpp +++ b/python/morpheus_llm/morpheus_llm/_lib/llm/src/py_llm_lambda_node.cpp @@ -19,7 +19,6 @@ #include "morpheus_llm/llm/llm_context.hpp" // for LLMContext #include "morpheus_llm/llm/llm_node_base.hpp" -#include "pymrc/coro.hpp" #include "morpheus/utilities/json_types.hpp" // for cast_from_json, cast_from_pyobject #include "morpheus/utilities/string_util.hpp" @@ -29,6 +28,7 @@ #include // for PyGILState_Check, gil_scoped_acquire, gil_scoped_release #include #include +#include #include // IWYU pragma: keep #include From d7e5eb5149270df4facb30caf025b4ed7d5d5614 Mon Sep 17 00:00:00 2001 From: Michael Demoret Date: Fri, 25 Oct 2024 14:12:34 -0400 Subject: [PATCH 07/10] Removing the C++ implementation since its not needed. --- .../morpheus/_lib/cmake/libmorpheus.cmake | 1 - .../_lib/include/morpheus/stages/router.hpp | 108 ------------------ .../morpheus/_lib/src/stages/router.cpp | 70 ------------ .../morpheus/morpheus/_lib/stages/module.cpp | 31 +---- .../morpheus/stages/general/router_stage.py | 77 +++++-------- 5 files changed, 35 insertions(+), 252 deletions(-) delete mode 100644 python/morpheus/morpheus/_lib/include/morpheus/stages/router.hpp delete mode 100644 python/morpheus/morpheus/_lib/src/stages/router.cpp diff --git a/python/morpheus/morpheus/_lib/cmake/libmorpheus.cmake b/python/morpheus/morpheus/_lib/cmake/libmorpheus.cmake index 878835188e..7d07b41bbd 100644 --- a/python/morpheus/morpheus/_lib/cmake/libmorpheus.cmake +++ b/python/morpheus/morpheus/_lib/cmake/libmorpheus.cmake @@ -61,7 +61,6 @@ add_library(morpheus src/stages/kafka_source.cpp src/stages/preprocess_fil.cpp src/stages/preprocess_nlp.cpp - src/stages/router.cpp src/stages/serialize.cpp src/stages/triton_inference.cpp src/stages/write_to_file.cpp diff --git a/python/morpheus/morpheus/_lib/include/morpheus/stages/router.hpp b/python/morpheus/morpheus/_lib/include/morpheus/stages/router.hpp deleted file mode 100644 index d450c9235a..0000000000 --- a/python/morpheus/morpheus/_lib/include/morpheus/stages/router.hpp +++ /dev/null @@ -1,108 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. - * SPDX-License-Identifier: Apache-2.0 - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include "morpheus/export.h" // for MORPHEUS_EXPORT -#include "morpheus/messages/control.hpp" // for ControlMessage - -#include -#include // for Builder -#include // for Object -#include // for AutoRegSinkAdapter, AutoRegSourceAdapter -#include // IWYU pragma: keep -#include // for AutoRegEgressPort, AutoRegIngressPort -#include -#include // for decay_t, trace_activity, from, observable_member - -#include // for shared_ptr, unique_ptr -#include // for string -#include // for vector - -namespace morpheus { -/****** Component public implementations *******************/ - -/** - * @addtogroup stages - * @{ - * @file - */ - -/****** RouterStage********************************/ -class MORPHEUS_EXPORT RouterControlMessageComponentStage - : public mrc::node::LambdaStaticRouterComponent>, - public mrc::pymrc::AutoRegSourceAdapter>, - public mrc::pymrc::AutoRegSinkAdapter>, - public mrc::pymrc::AutoRegIngressPort>, - public mrc::pymrc::AutoRegEgressPort> -{ - public: - using mrc::node::LambdaStaticRouterComponent>::LambdaStaticRouterComponent; -}; - -class MORPHEUS_EXPORT RouterControlMessageRunnableStage - : public mrc::node::LambdaStaticRouterRunnable>, - public mrc::pymrc::AutoRegSourceAdapter>, - public mrc::pymrc::AutoRegSinkAdapter>, - public mrc::pymrc::AutoRegIngressPort>, - public mrc::pymrc::AutoRegEgressPort> -{ - public: - using mrc::node::LambdaStaticRouterRunnable>::LambdaStaticRouterRunnable; -}; - -/****** DeserializationStageInterfaceProxy******************/ -/** - * @brief Interface proxy, used to insulate python bindings. - */ -struct MORPHEUS_EXPORT RouterStageInterfaceProxy -{ - /** - * @brief Create and initialize a DeserializationStage that emits - * ControlMessage's, and return the result. If `task_type` is not None, - * `task_payload` must also be not None, and vice versa. - * - * @param builder : Pipeline context object reference - * @param name : Name of a stage reference - * @return std::shared_ptr> - */ - static std::shared_ptr> init_cm_component( - mrc::segment::Builder& builder, - const std::string& name, - std::vector keys, - mrc::pymrc::PyFuncHolder)> key_fn); - - /** - * @brief Create and initialize a DeserializationStage that emits - * ControlMessage's, and return the result. If `task_type` is not None, - * `task_payload` must also be not None, and vice versa. - * - * @param builder : Pipeline context object reference - * @param name : Name of a stage reference - * @return std::shared_ptr> - */ - static std::shared_ptr> init_cm_runnable( - mrc::segment::Builder& builder, - const std::string& name, - std::vector keys, - mrc::pymrc::PyFuncHolder)> key_fn); -}; - -/** @} */ // end of group -} // namespace morpheus diff --git a/python/morpheus/morpheus/_lib/src/stages/router.cpp b/python/morpheus/morpheus/_lib/src/stages/router.cpp deleted file mode 100644 index c9f62235b5..0000000000 --- a/python/morpheus/morpheus/_lib/src/stages/router.cpp +++ /dev/null @@ -1,70 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & - * AFFILIATES. All rights reserved. SPDX-License-Identifier: Apache-2.0 - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "morpheus/stages/router.hpp" - -#include // for gil_scoped_acquire -#include // for cast -#include // for str, pybind11 -#include - -#include // for remove_reference -#include // for move - -namespace morpheus { - -namespace py = pybind11; - -std::shared_ptr> RouterStageInterfaceProxy::init_cm_component( - mrc::segment::Builder& builder, - const std::string& name, - std::vector keys, - mrc::pymrc::PyFuncHolder)> key_fn) -{ - auto stage = builder.construct_object( - name, keys, [key_fn = std::move(key_fn)](const std::shared_ptr& data) { - py::gil_scoped_acquire gil; - - auto ret_key = key_fn(data); - auto ret_key_str = py::str(ret_key); - - return std::string(ret_key_str); - }); - - return stage; -} - -std::shared_ptr> RouterStageInterfaceProxy::init_cm_runnable( - mrc::segment::Builder& builder, - const std::string& name, - std::vector keys, - mrc::pymrc::PyFuncHolder)> key_fn) -{ - auto stage = builder.construct_object( - name, keys, [key_fn = std::move(key_fn)](const std::shared_ptr& data) { - py::gil_scoped_acquire gil; - - auto ret_key = key_fn(data); - auto ret_key_str = py::str(ret_key); - - return std::string(ret_key_str); - }); - - return stage; -} - -} // namespace morpheus diff --git a/python/morpheus/morpheus/_lib/stages/module.cpp b/python/morpheus/morpheus/_lib/stages/module.cpp index 028c21bef2..35ddc19d8a 100644 --- a/python/morpheus/morpheus/_lib/stages/module.cpp +++ b/python/morpheus/morpheus/_lib/stages/module.cpp @@ -29,11 +29,10 @@ #include "morpheus/stages/preallocate.hpp" // for PreallocateStage, PreallocateStageInterfaceProxy #include "morpheus/stages/preprocess_fil.hpp" // for PreprocessFILStage, PreprocessFILStageInterfaceProxy #include "morpheus/stages/preprocess_nlp.hpp" // for PreprocessNLPStage, PreprocessNLPStageInterfaceProxy -#include "morpheus/stages/router.hpp" -#include "morpheus/stages/serialize.hpp" // for SerializeStage, SerializeStageInterfaceProxy -#include "morpheus/stages/write_to_file.hpp" // for WriteToFileStage, WriteToFileStageInterfaceProxy -#include "morpheus/utilities/http_server.hpp" // for DefaultMaxPayloadSize -#include "morpheus/version.hpp" // for morpheus_VERSION_MAJOR, morpheus_VERSION_MINOR, morp... +#include "morpheus/stages/serialize.hpp" // for SerializeStage, SerializeStageInterfaceProxy +#include "morpheus/stages/write_to_file.hpp" // for WriteToFileStage, WriteToFileStageInterfaceProxy +#include "morpheus/utilities/http_server.hpp" // for DefaultMaxPayloadSize +#include "morpheus/version.hpp" // for morpheus_VERSION_MAJOR, morpheus_VERSION_MINOR, morp... #include // for Builder #include // for Object, ObjectProperties @@ -310,28 +309,6 @@ PYBIND11_MODULE(stages, _module) py::arg("include_index_col") = true, py::arg("flush") = false); - py::class_, - mrc::segment::ObjectProperties, - std::shared_ptr>>( - _module, "RouterControlMessageComponentStage", py::multiple_inheritance()) - .def(py::init<>(&RouterStageInterfaceProxy::init_cm_component), - py::arg("builder"), - py::arg("name"), - py::kw_only(), - py::arg("router_keys"), - py::arg("key_fn")); - - py::class_, - mrc::segment::ObjectProperties, - std::shared_ptr>>( - _module, "RouterControlMessageRunnableStage", py::multiple_inheritance()) - .def(py::init<>(&RouterStageInterfaceProxy::init_cm_runnable), - py::arg("builder"), - py::arg("name"), - py::kw_only(), - py::arg("router_keys"), - py::arg("key_fn")); - _module.attr("__version__") = MRC_CONCAT_STR(morpheus_VERSION_MAJOR << "." << morpheus_VERSION_MINOR << "." << morpheus_VERSION_PATCH); } diff --git a/python/morpheus/morpheus/stages/general/router_stage.py b/python/morpheus/morpheus/stages/general/router_stage.py index f7ae7f3d36..8fff244151 100644 --- a/python/morpheus/morpheus/stages/general/router_stage.py +++ b/python/morpheus/morpheus/stages/general/router_stage.py @@ -17,30 +17,38 @@ import mrc import mrc.core.segment -import typing_utils import morpheus.pipeline as _pipeline # pylint: disable=cyclic-import from morpheus.cli.register_stage import register_stage from morpheus.config import Config -from morpheus.messages import ControlMessage from morpheus.pipeline.execution_mode_mixins import GpuAndCpuMixin -from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin logger = logging.getLogger(__name__) @register_stage("router") -class RouterStage(GpuAndCpuMixin, PassThruTypeMixin, _pipeline.Stage): +class RouterStage(GpuAndCpuMixin, _pipeline.Stage): """ Buffer results. - The input messages are buffered by this stage class for faster access to downstream stages. Allows - upstream stages to run faster than downstream stages. + The input messages are buffered by this stage class for faster access to downstream stages. Allows upstream stages + to run faster than downstream stages. Parameters ---------- c : `morpheus.config.Config` Pipeline configuration instance. + keys : `list[str]` + List of keys to route the messages. + key_fn : `typing.Callable[[object], str]` + Function to determine the key for the message. The function should take a message as input and return a key. The + key should be one of the keys in the `keys` list. + processing_engines : `int` + Number of processing engines to use for the router. If set to 0, the router will use the thread from the + upstream node for processing. In this situation, slow downstream nodes can block which can prevent routing to + other, non-blocked downstream nodes. To resolve this, set the `processing_engines` parameter to a value greater + than 0. This will create separate engines (similar to a thread) which can continue routing even if one gets + blocked. Higher values of `processing_engines` can prevent blocking at the expense of additional threads. """ @@ -48,13 +56,19 @@ def __init__(self, c: Config, *, keys: list[str], - key_fn: typing.Callable[[ControlMessage], str], - is_runnable: bool = False) -> None: + key_fn: typing.Callable[[object], str], + processing_engines=0) -> None: super().__init__(c) self._keys = keys self._key_fn = key_fn - self._is_runnable = is_runnable + self._processing_engines = processing_engines + + if (self._processing_engines < 0): + raise ValueError("Invalid number of processing engines. Must be greater than or equal to 0.") + + if (len(keys) == 0): + raise ValueError("Router stage must have at least one key.") self._router: mrc.core.segment.SegmentObject | None = None @@ -67,14 +81,6 @@ def name(self) -> str: def supports_cpp_node(self): return True - def _pre_compute_schema(self, schema: _pipeline.StageSchema): - # Pre-flight check to verify that the input type is one of the accepted types - super()._pre_compute_schema(schema) - input_type = schema.input_type - if (not typing_utils.issubtype(input_type, ControlMessage)): - raise RuntimeError((f"The {self.name} stage cannot handle input of {input_type}. " - f"Accepted input types: {(ControlMessage,)}")) - def compute_schema(self, schema: _pipeline.StageSchema): # Get the input type @@ -85,38 +91,17 @@ def compute_schema(self, schema: _pipeline.StageSchema): def _build(self, builder: mrc.Builder, input_nodes: list[mrc.SegmentObject]) -> list[mrc.SegmentObject]: - def _key_fn_wrapper(msg) -> str: - return self._key_fn(msg) - assert len(input_nodes) == 1, "Router stage should have exactly one input node" - if (self._build_cpp_node()): - import morpheus._lib.stages as _stages - - if (self._is_runnable): - self._router = _stages.RouterControlMessageRunnableStage(builder, - self.unique_name, - router_keys=self._keys, - key_fn=_key_fn_wrapper) - else: - self._router = _stages.RouterControlMessageComponentStage(builder, - self.unique_name, - router_keys=self._keys, - key_fn=_key_fn_wrapper) + from mrc.core.node import Router + from mrc.core.node import RouterComponent + + if (self._processing_engines > 0): + self._router = Router(builder, self.unique_name, router_keys=self._keys, key_fn=self._key_fn) + + self._router.launch_options.engines_per_pe = self._processing_engines else: - from mrc.core.node import Router - from mrc.core.node import RouterComponent - - if (self._is_runnable): - self._router = Router(builder, self.unique_name, router_keys=self._keys, key_fn=_key_fn_wrapper) - else: - self._router = RouterComponent(builder, - self.unique_name, - router_keys=self._keys, - key_fn=_key_fn_wrapper) - - if (self._is_runnable): - self._router.launch_options.engines_per_pe = 10 + self._router = RouterComponent(builder, self.unique_name, router_keys=self._keys, key_fn=self._key_fn) builder.make_edge(input_nodes[0], self._router) From 83b3aff21ac44d3f7da47221d225b393d86334d2 Mon Sep 17 00:00:00 2001 From: Michael Demoret Date: Fri, 25 Oct 2024 14:13:00 -0400 Subject: [PATCH 08/10] Updating pyi --- python/morpheus/morpheus/_lib/stages/__init__.pyi | 9 --------- 1 file changed, 9 deletions(-) diff --git a/python/morpheus/morpheus/_lib/stages/__init__.pyi b/python/morpheus/morpheus/_lib/stages/__init__.pyi index 69eecb3cbd..922c194deb 100644 --- a/python/morpheus/morpheus/_lib/stages/__init__.pyi +++ b/python/morpheus/morpheus/_lib/stages/__init__.pyi @@ -10,7 +10,6 @@ import morpheus._lib.stages import typing from morpheus._lib.common import FilterSource import morpheus._lib.common -import morpheus._lib.messages import mrc.core.coro import mrc.core.segment import os @@ -30,8 +29,6 @@ __all__ = [ "PreallocateMessageMetaStage", "PreprocessFILStage", "PreprocessNLPStage", - "RouterControlMessageComponentStage", - "RouterControlMessageRunnableStage", "SerializeStage", "WriteToFileStage" ] @@ -82,12 +79,6 @@ class PreprocessFILStage(mrc.core.segment.SegmentObject): class PreprocessNLPStage(mrc.core.segment.SegmentObject): def __init__(self, builder: mrc.core.segment.Builder, name: str, vocab_hash_file: str, sequence_length: int, truncation: bool, do_lower_case: bool, add_special_token: bool, stride: int, column: str) -> None: ... pass -class RouterControlMessageComponentStage(mrc.core.segment.SegmentObject): - def __init__(self, builder: mrc.core.segment.Builder, name: str, *, router_keys: typing.List[str], key_fn: typing.Callable[[morpheus._lib.messages.ControlMessage], str]) -> None: ... - pass -class RouterControlMessageRunnableStage(mrc.core.segment.SegmentObject): - def __init__(self, builder: mrc.core.segment.Builder, name: str, *, router_keys: typing.List[str], key_fn: typing.Callable[[morpheus._lib.messages.ControlMessage], str]) -> None: ... - pass class SerializeStage(mrc.core.segment.SegmentObject): def __init__(self, builder: mrc.core.segment.Builder, name: str, include: typing.List[str], exclude: typing.List[str], fixed_columns: bool = True) -> None: ... pass From cd9081dad12155a96b4a0c2a630b3fdeebff3ba4 Mon Sep 17 00:00:00 2001 From: Michael Demoret Date: Fri, 25 Oct 2024 14:19:47 -0400 Subject: [PATCH 09/10] Fixing up the router tests --- python/morpheus/morpheus/_lib/stages/module.cpp | 9 ++++----- tests/morpheus/stages/test_router_stage_pipe.py | 9 +++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/python/morpheus/morpheus/_lib/stages/module.cpp b/python/morpheus/morpheus/_lib/stages/module.cpp index 35ddc19d8a..6fb855e0c5 100644 --- a/python/morpheus/morpheus/_lib/stages/module.cpp +++ b/python/morpheus/morpheus/_lib/stages/module.cpp @@ -47,11 +47,10 @@ #include // for trace_activity, decay_t #include // for path -#include -#include // for shared_ptr, allocator -#include // for operator<<, basic_ostringstream -#include // for string -#include // for vector +#include // for shared_ptr, allocator +#include // for operator<<, basic_ostringstream +#include // for string +#include // for vector namespace morpheus { namespace py = pybind11; diff --git a/tests/morpheus/stages/test_router_stage_pipe.py b/tests/morpheus/stages/test_router_stage_pipe.py index 8d840f203e..861bb44b6d 100644 --- a/tests/morpheus/stages/test_router_stage_pipe.py +++ b/tests/morpheus/stages/test_router_stage_pipe.py @@ -29,8 +29,8 @@ from morpheus.stages.preprocess.deserialize_stage import DeserializeStage -@pytest.mark.parametrize("is_runnable", [True, False]) -def test_router_stage_pipe(config, filter_probs_df, is_runnable: bool): +@pytest.mark.parametrize("processing_engines", [0, 4]) +def test_router_stage_pipe(config, filter_probs_df, processing_engines: bool): keys = ["odd", "even"] @@ -44,7 +44,8 @@ def determine_route_fn(_: ControlMessage): pipe = Pipeline(config) source = pipe.add_stage(InMemorySourceStage(config, dataframes=[filter_probs_df], repeat=5)) deserialize = pipe.add_stage(DeserializeStage(config)) - router_stage = pipe.add_stage(RouterStage(config, keys=keys, key_fn=determine_route_fn, is_runnable=is_runnable)) + router_stage = pipe.add_stage( + RouterStage(config, keys=keys, key_fn=determine_route_fn, processing_engines=processing_engines)) sink1 = pipe.add_stage(InMemorySinkStage(config)) sink2 = pipe.add_stage(InMemorySinkStage(config)) @@ -101,7 +102,7 @@ def determine_route_fn(_: ControlMessage): pipe = Pipeline(config) source = pipe.add_stage(InMemoryDataGenStage(config, data_source=source_fn, output_data_type=ControlMessage)) - router_stage = pipe.add_stage(RouterStage(config, keys=keys, key_fn=determine_route_fn, is_runnable=True)) + router_stage = pipe.add_stage(RouterStage(config, keys=keys, key_fn=determine_route_fn, processing_engines=10)) sink1 = pipe.add_stage(InMemorySinkStage(config)) sink2 = pipe.add_stage(InMemorySinkStage(config)) From 6683ef8d523bc4dcef53810a0848d541b7add27b Mon Sep 17 00:00:00 2001 From: Michael Demoret Date: Mon, 28 Oct 2024 17:57:12 -0400 Subject: [PATCH 10/10] Fixing failing CLI test --- python/morpheus/morpheus/stages/general/router_stage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/morpheus/morpheus/stages/general/router_stage.py b/python/morpheus/morpheus/stages/general/router_stage.py index 8fff244151..d6be2a93ee 100644 --- a/python/morpheus/morpheus/stages/general/router_stage.py +++ b/python/morpheus/morpheus/stages/general/router_stage.py @@ -57,7 +57,7 @@ def __init__(self, *, keys: list[str], key_fn: typing.Callable[[object], str], - processing_engines=0) -> None: + processing_engines: int = 0) -> None: super().__init__(c) self._keys = keys