From 499fbcc59f31d224083cc9b8f956101c317c2a35 Mon Sep 17 00:00:00 2001 From: Amna Mubashar Date: Tue, 16 Jul 2024 15:39:40 +0200 Subject: [PATCH] Remove Multiplexer and related tests (#8020) --- haystack/components/others/__init__.py | 7 - haystack/components/others/multiplexer.py | 151 ------------------ haystack/components/validators/json_schema.py | 10 +- ...precated-multiplexer-1f948f1f9b811195.yaml | 4 + test/components/others/test_multiplexer.py | 35 ---- test/core/pipeline/features/test_run.py | 137 ++++++++-------- test/core/pipeline/test_pipeline.py | 7 +- 7 files changed, 84 insertions(+), 267 deletions(-) delete mode 100644 haystack/components/others/__init__.py delete mode 100644 haystack/components/others/multiplexer.py create mode 100644 releasenotes/notes/remove-deprecated-multiplexer-1f948f1f9b811195.yaml delete mode 100644 test/components/others/test_multiplexer.py diff --git a/haystack/components/others/__init__.py b/haystack/components/others/__init__.py deleted file mode 100644 index 921d9de3e8..0000000000 --- a/haystack/components/others/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -# SPDX-FileCopyrightText: 2022-present deepset GmbH -# -# SPDX-License-Identifier: Apache-2.0 - -from haystack.components.others.multiplexer import Multiplexer - -__all__ = ["Multiplexer"] diff --git a/haystack/components/others/multiplexer.py b/haystack/components/others/multiplexer.py deleted file mode 100644 index c1f5d172f9..0000000000 --- a/haystack/components/others/multiplexer.py +++ /dev/null @@ -1,151 +0,0 @@ -# SPDX-FileCopyrightText: 2022-present deepset GmbH -# -# SPDX-License-Identifier: Apache-2.0 - -import sys -import warnings -from typing import Any, Dict - -from haystack import component, default_from_dict, default_to_dict, logging -from haystack.core.component.types import Variadic -from haystack.utils import deserialize_type, serialize_type - -if sys.version_info < (3, 10): - from typing_extensions import TypeAlias -else: - from typing import TypeAlias - - -logger = logging.getLogger(__name__) - - -@component(is_greedy=True) -class Multiplexer: - """ - A component which receives data connections from multiple components and distributes them to multiple components. - - `Multiplexer` offers the ability to both receive data connections from multiple other - components and to distribute it to various other components, enhancing the functionality of complex data - processing pipelines. - - `Multiplexer` is important for spreading outputs from a single source like a Large Language Model (LLM) across - different branches of a pipeline. It is especially valuable in error correction loops by rerouting data for - reevaluation if errors are detected. For instance, in an example pipeline below, `Multiplexer` helps create - a schema valid JSON object (given a person's data) with the help of an `OpenAIChatGenerator` and a - `JsonSchemaValidator`. - In case the generated JSON object fails schema validation, `JsonSchemaValidator` starts a correction loop, sending - the data back through the `Multiplexer` to the `OpenAIChatGenerator` until it passes schema validation. If we didn't - have `Multiplexer`, we wouldn't be able to loop back the data to `OpenAIChatGenerator` for re-generation, as - components accept only one input connection for the declared run method parameters. - - Usage example: - - ```python - import json - from typing import List - - from haystack import Pipeline - from haystack.components.converters import OutputAdapter - from haystack.components.generators.chat import OpenAIChatGenerator - from haystack.components.others import Multiplexer - from haystack.components.validators import JsonSchemaValidator - from haystack.dataclasses import ChatMessage - - person_schema = { - "type": "object", - "properties": { - "first_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"}, - "last_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"}, - "nationality": {"type": "string", "enum": ["Italian", "Portuguese", "American"]}, - }, - "required": ["first_name", "last_name", "nationality"] - } - - # Initialize a pipeline - pipe = Pipeline() - - # Add components to the pipeline - pipe.add_component('mx', Multiplexer(List[ChatMessage])) - pipe.add_component('fc_llm', OpenAIChatGenerator(model="gpt-3.5-turbo-0125")) - pipe.add_component('validator', JsonSchemaValidator(json_schema=person_schema)) - pipe.add_component('adapter', OutputAdapter("{{chat_message}}", List[ChatMessage])), - # And connect them - pipe.connect("adapter", "mx") - pipe.connect("mx", "fc_llm") - pipe.connect("fc_llm.replies", "validator.messages") - pipe.connect("validator.validation_error", "mx") - - result = pipe.run(data={"fc_llm": {"generation_kwargs": {"response_format": {"type": "json_object"}}}, - "adapter": {"chat_message": [ChatMessage.from_user("Create json from Peter Parker")]}}) - - print(json.loads(result["validator"]["validated"][0].content)) - - - >> {'first_name': 'Peter', 'last_name': 'Parker', 'nationality': 'American', 'name': 'Spider-Man', 'occupation': - >> 'Superhero', 'age': 23, 'location': 'New York City'} - ``` - - Note that `Multiplexer` is created with a single type parameter. This determines the - type of data that `Multiplexer` will receive from the upstream connected components and also the - type of data that `Multiplexer` will distribute to the downstream connected components. In the example - above, the `Multiplexer` is created with the type `List[ChatMessage]`. This means `Multiplexer` will receive - a list of `ChatMessage` objects from the upstream connected components and also distribute a list of `ChatMessage` - objects to the downstream connected components. - - In the code example, `Multiplexer` receives a looped back `List[ChatMessage]` from the `JsonSchemaValidator` and - sends it down to the `OpenAIChatGenerator` for re-generation. We can have multiple loop back connections in the - pipeline. In this instance, the downstream component is only one – the `OpenAIChatGenerator` – but the pipeline can - have more than one downstream component. - """ - - def __init__(self, type_: TypeAlias): - """ - Create a `Multiplexer` component. - - :param type_: The type of data that the `Multiplexer` will receive from the upstream connected components and - distribute to the downstream connected components. - """ - warnings.warn( - "`Multiplexer` is deprecated and will be removed in Haystack 2.4.0. Use `joiners.BranchJoiner` instead.", - DeprecationWarning, - ) - self.type_ = type_ - component.set_input_types(self, value=Variadic[type_]) - component.set_output_types(self, value=type_) - - def to_dict(self): - """ - Serializes the component to a dictionary. - - :returns: - Dictionary with serialized data. - """ - return default_to_dict(self, type_=serialize_type(self.type_)) - - @classmethod - def from_dict(cls, data: Dict[str, Any]) -> "Multiplexer": - """ - Deserializes the component from a dictionary. - - :param data: - Dictionary to deserialize from. - :returns: - Deserialized component. - """ - data["init_parameters"]["type_"] = deserialize_type(data["init_parameters"]["type_"]) - return default_from_dict(cls, data) - - def run(self, **kwargs): - """ - The run method of the `Multiplexer` component. - - Multiplexes the input data from the upstream connected components and distributes it to the downstream connected - components. - - :param **kwargs: The input data. Must be of the type declared in `__init__`. - :return: A dictionary with the following keys: - - `value`: The input data. - """ - if (inputs_count := len(kwargs["value"])) != 1: - raise ValueError(f"Multiplexer expects only one input, but {inputs_count} were received.") - return {"value": kwargs["value"][0]} diff --git a/haystack/components/validators/json_schema.py b/haystack/components/validators/json_schema.py index fb2c11b41c..13f35be87e 100644 --- a/haystack/components/validators/json_schema.py +++ b/haystack/components/validators/json_schema.py @@ -44,7 +44,7 @@ class JsonSchemaValidator: from haystack import Pipeline from haystack.components.generators.chat import OpenAIChatGenerator - from haystack.components.others import Multiplexer + from haystack.components.joiners import BranchJoiner from haystack.components.validators import JsonSchemaValidator from haystack import component from haystack.dataclasses import ChatMessage @@ -62,13 +62,13 @@ def run(self, messages: List[ChatMessage]) -> dict: p.add_component("llm", OpenAIChatGenerator(model="gpt-4-1106-preview", generation_kwargs={"response_format": {"type": "json_object"}})) p.add_component("schema_validator", JsonSchemaValidator()) - p.add_component("mx_for_llm", Multiplexer(List[ChatMessage])) + p.add_component("joiner_for_llm", BranchJoiner(List[ChatMessage])) p.add_component("message_producer", MessageProducer()) - p.connect("message_producer.messages", "mx_for_llm") - p.connect("mx_for_llm", "llm") + p.connect("message_producer.messages", "joiner_for_llm") + p.connect("joiner_for_llm", "llm") p.connect("llm.replies", "schema_validator.messages") - p.connect("schema_validator.validation_error", "mx_for_llm") + p.connect("schema_validator.validation_error", "joiner_for_llm") result = p.run(data={ "message_producer": { diff --git a/releasenotes/notes/remove-deprecated-multiplexer-1f948f1f9b811195.yaml b/releasenotes/notes/remove-deprecated-multiplexer-1f948f1f9b811195.yaml new file mode 100644 index 0000000000..87f92d21c0 --- /dev/null +++ b/releasenotes/notes/remove-deprecated-multiplexer-1f948f1f9b811195.yaml @@ -0,0 +1,4 @@ +--- +upgrade: + - | + `Multiplexer` is removed and users should switch to `BranchJoiner` instead. diff --git a/test/components/others/test_multiplexer.py b/test/components/others/test_multiplexer.py deleted file mode 100644 index 759b3a2b17..0000000000 --- a/test/components/others/test_multiplexer.py +++ /dev/null @@ -1,35 +0,0 @@ -# SPDX-FileCopyrightText: 2022-present deepset GmbH -# -# SPDX-License-Identifier: Apache-2.0 -import pytest - -from haystack.components.others import Multiplexer - - -class TestMultiplexer: - def test_one_value(self): - multiplexer = Multiplexer(int) - output = multiplexer.run(value=[2]) - assert output == {"value": 2} - - def test_one_value_of_wrong_type(self): - # Multiplexer does not type check the input - multiplexer = Multiplexer(int) - output = multiplexer.run(value=["hello"]) - assert output == {"value": "hello"} - - def test_one_value_of_none_type(self): - # Multiplexer does not type check the input - multiplexer = Multiplexer(int) - output = multiplexer.run(value=[None]) - assert output == {"value": None} - - def test_more_values_of_expected_type(self): - multiplexer = Multiplexer(int) - with pytest.raises(ValueError, match="Multiplexer expects only one input, but 3 were received."): - multiplexer.run(value=[2, 3, 4]) - - def test_no_values(self): - multiplexer = Multiplexer(int) - with pytest.raises(ValueError, match="Multiplexer expects only one input, but 0 were received."): - multiplexer.run(value=[]) diff --git a/test/core/pipeline/features/test_run.py b/test/core/pipeline/features/test_run.py index baf157baa0..8524f20065 100644 --- a/test/core/pipeline/features/test_run.py +++ b/test/core/pipeline/features/test_run.py @@ -9,7 +9,7 @@ from haystack.components.builders import PromptBuilder, AnswerBuilder from haystack.components.retrievers.in_memory import InMemoryBM25Retriever from haystack.document_stores.in_memory import InMemoryDocumentStore -from haystack.components.others import Multiplexer +from haystack.components.joiners import BranchJoiner from haystack.testing.sample_components import ( Accumulate, AddFixedValue, @@ -91,7 +91,7 @@ def pipeline_complex(): pipeline.add_component("add_one", AddFixedValue(add=1)) pipeline.add_component("accumulate_2", Accumulate()) - pipeline.add_component("multiplexer", Multiplexer(type_=int)) + pipeline.add_component("branch_joiner", BranchJoiner(type_=int)) pipeline.add_component("below_10", Threshold(threshold=10)) pipeline.add_component("double", Double()) @@ -123,11 +123,11 @@ def pipeline_complex(): pipeline.connect("add_four", "accumulate_3") pipeline.connect("parity.odd", "add_one.value") - pipeline.connect("add_one", "multiplexer.value") - pipeline.connect("multiplexer", "below_10") + pipeline.connect("add_one", "branch_joiner.value") + pipeline.connect("branch_joiner", "below_10") pipeline.connect("below_10.below", "double") - pipeline.connect("double", "multiplexer.value") + pipeline.connect("double", "branch_joiner.value") pipeline.connect("below_10.above", "accumulate_2") pipeline.connect("accumulate_2", "diff.second_value") @@ -150,13 +150,13 @@ def pipeline_complex(): "add_two", "parity", "add_one", - "multiplexer", + "branch_joiner", "below_10", "double", - "multiplexer", + "branch_joiner", "below_10", "double", - "multiplexer", + "branch_joiner", "below_10", "accumulate_2", "greet_enumerator", @@ -206,43 +206,43 @@ def run(self, a: int, b: int = 2): @given("a pipeline that has two loops of identical lengths", target_fixture="pipeline_data") def pipeline_that_has_two_loops_of_identical_lengths(): pipeline = Pipeline(max_loops_allowed=10) - pipeline.add_component("multiplexer", Multiplexer(type_=int)) + pipeline.add_component("branch_joiner", BranchJoiner(type_=int)) pipeline.add_component("remainder", Remainder(divisor=3)) pipeline.add_component("add_one", AddFixedValue(add=1)) pipeline.add_component("add_two", AddFixedValue(add=2)) - pipeline.connect("multiplexer.value", "remainder.value") + pipeline.connect("branch_joiner.value", "remainder.value") pipeline.connect("remainder.remainder_is_1", "add_two.value") pipeline.connect("remainder.remainder_is_2", "add_one.value") - pipeline.connect("add_two", "multiplexer.value") - pipeline.connect("add_one", "multiplexer.value") + pipeline.connect("add_two", "branch_joiner.value") + pipeline.connect("add_one", "branch_joiner.value") return ( pipeline, [ PipelineRunData( - inputs={"multiplexer": {"value": 0}}, + inputs={"branch_joiner": {"value": 0}}, expected_outputs={"remainder": {"remainder_is_0": 0}}, - expected_run_order=["multiplexer", "remainder"], + expected_run_order=["branch_joiner", "remainder"], ), PipelineRunData( - inputs={"multiplexer": {"value": 3}}, + inputs={"branch_joiner": {"value": 3}}, expected_outputs={"remainder": {"remainder_is_0": 3}}, - expected_run_order=["multiplexer", "remainder"], + expected_run_order=["branch_joiner", "remainder"], ), PipelineRunData( - inputs={"multiplexer": {"value": 4}}, + inputs={"branch_joiner": {"value": 4}}, expected_outputs={"remainder": {"remainder_is_0": 6}}, - expected_run_order=["multiplexer", "remainder", "add_two", "multiplexer", "remainder"], + expected_run_order=["branch_joiner", "remainder", "add_two", "branch_joiner", "remainder"], ), PipelineRunData( - inputs={"multiplexer": {"value": 5}}, + inputs={"branch_joiner": {"value": 5}}, expected_outputs={"remainder": {"remainder_is_0": 6}}, - expected_run_order=["multiplexer", "remainder", "add_one", "multiplexer", "remainder"], + expected_run_order=["branch_joiner", "remainder", "add_one", "branch_joiner", "remainder"], ), PipelineRunData( - inputs={"multiplexer": {"value": 6}}, + inputs={"branch_joiner": {"value": 6}}, expected_outputs={"remainder": {"remainder_is_0": 6}}, - expected_run_order=["multiplexer", "remainder"], + expected_run_order=["branch_joiner", "remainder"], ), ], ) @@ -251,46 +251,53 @@ def pipeline_that_has_two_loops_of_identical_lengths(): @given("a pipeline that has two loops of different lengths", target_fixture="pipeline_data") def pipeline_that_has_two_loops_of_different_lengths(): pipeline = Pipeline(max_loops_allowed=10) - pipeline.add_component("multiplexer", Multiplexer(type_=int)) + pipeline.add_component("branch_joiner", BranchJoiner(type_=int)) pipeline.add_component("remainder", Remainder(divisor=3)) pipeline.add_component("add_one", AddFixedValue(add=1)) pipeline.add_component("add_two_1", AddFixedValue(add=1)) pipeline.add_component("add_two_2", AddFixedValue(add=1)) - pipeline.connect("multiplexer.value", "remainder.value") + pipeline.connect("branch_joiner.value", "remainder.value") pipeline.connect("remainder.remainder_is_1", "add_two_1.value") pipeline.connect("add_two_1", "add_two_2.value") - pipeline.connect("add_two_2", "multiplexer") + pipeline.connect("add_two_2", "branch_joiner") pipeline.connect("remainder.remainder_is_2", "add_one.value") - pipeline.connect("add_one", "multiplexer") + pipeline.connect("add_one", "branch_joiner") return ( pipeline, [ PipelineRunData( - inputs={"multiplexer": {"value": 0}}, + inputs={"branch_joiner": {"value": 0}}, expected_outputs={"remainder": {"remainder_is_0": 0}}, - expected_run_order=["multiplexer", "remainder"], + expected_run_order=["branch_joiner", "remainder"], ), PipelineRunData( - inputs={"multiplexer": {"value": 3}}, + inputs={"branch_joiner": {"value": 3}}, expected_outputs={"remainder": {"remainder_is_0": 3}}, - expected_run_order=["multiplexer", "remainder"], + expected_run_order=["branch_joiner", "remainder"], ), PipelineRunData( - inputs={"multiplexer": {"value": 4}}, + inputs={"branch_joiner": {"value": 4}}, expected_outputs={"remainder": {"remainder_is_0": 6}}, - expected_run_order=["multiplexer", "remainder", "add_two_1", "add_two_2", "multiplexer", "remainder"], + expected_run_order=[ + "branch_joiner", + "remainder", + "add_two_1", + "add_two_2", + "branch_joiner", + "remainder", + ], ), PipelineRunData( - inputs={"multiplexer": {"value": 5}}, + inputs={"branch_joiner": {"value": 5}}, expected_outputs={"remainder": {"remainder_is_0": 6}}, - expected_run_order=["multiplexer", "remainder", "add_one", "multiplexer", "remainder"], + expected_run_order=["branch_joiner", "remainder", "add_one", "branch_joiner", "remainder"], ), PipelineRunData( - inputs={"multiplexer": {"value": 6}}, + inputs={"branch_joiner": {"value": 6}}, expected_outputs={"remainder": {"remainder_is_0": 6}}, - expected_run_order=["multiplexer", "remainder"], + expected_run_order=["branch_joiner", "remainder"], ), ], ) @@ -302,20 +309,20 @@ def pipeline_that_has_a_single_loop_with_two_conditional_branches(): pipeline = Pipeline(max_loops_allowed=10) pipeline.add_component("add_one", AddFixedValue(add=1)) - pipeline.add_component("multiplexer", Multiplexer(type_=int)) + pipeline.add_component("branch_joiner", BranchJoiner(type_=int)) pipeline.add_component("below_10", Threshold(threshold=10)) pipeline.add_component("below_5", Threshold(threshold=5)) pipeline.add_component("add_three", AddFixedValue(add=3)) pipeline.add_component("accumulator", accumulator) pipeline.add_component("add_two", AddFixedValue(add=2)) - pipeline.connect("add_one.result", "multiplexer") - pipeline.connect("multiplexer.value", "below_10.value") + pipeline.connect("add_one.result", "branch_joiner") + pipeline.connect("branch_joiner.value", "below_10.value") pipeline.connect("below_10.below", "accumulator.value") pipeline.connect("accumulator.value", "below_5.value") pipeline.connect("below_5.above", "add_three.value") - pipeline.connect("below_5.below", "multiplexer") - pipeline.connect("add_three.result", "multiplexer") + pipeline.connect("below_5.below", "branch_joiner") + pipeline.connect("add_three.result", "branch_joiner") pipeline.connect("below_10.above", "add_two.value") return ( @@ -326,16 +333,16 @@ def pipeline_that_has_a_single_loop_with_two_conditional_branches(): expected_outputs={"add_two": {"result": 13}}, expected_run_order=[ "add_one", - "multiplexer", + "branch_joiner", "below_10", "accumulator", "below_5", - "multiplexer", + "branch_joiner", "below_10", "accumulator", "below_5", "add_three", - "multiplexer", + "branch_joiner", "below_10", "add_two", ], @@ -498,18 +505,18 @@ def pipeline_that_has_different_combinations_of_branches_that_merge_and_do_not_m def pipeline_that_has_two_branches_one_of_which_loops_back(): pipeline = Pipeline(max_loops_allowed=10) pipeline.add_component("add_zero", AddFixedValue(add=0)) - pipeline.add_component("multiplexer", Multiplexer(type_=int)) + pipeline.add_component("branch_joiner", BranchJoiner(type_=int)) pipeline.add_component("sum", Sum()) pipeline.add_component("below_10", Threshold(threshold=10)) pipeline.add_component("add_one", AddFixedValue(add=1)) pipeline.add_component("counter", Accumulate()) pipeline.add_component("add_two", AddFixedValue(add=2)) - pipeline.connect("add_zero", "multiplexer.value") - pipeline.connect("multiplexer", "below_10.value") + pipeline.connect("add_zero", "branch_joiner.value") + pipeline.connect("branch_joiner", "below_10.value") pipeline.connect("below_10.below", "add_one.value") pipeline.connect("add_one.result", "counter.value") - pipeline.connect("counter.value", "multiplexer.value") + pipeline.connect("counter.value", "branch_joiner.value") pipeline.connect("below_10.above", "add_two.value") pipeline.connect("add_two.result", "sum.values") @@ -521,15 +528,15 @@ def pipeline_that_has_two_branches_one_of_which_loops_back(): expected_outputs={"sum": {"total": 23}}, expected_run_order=[ "add_zero", - "multiplexer", + "branch_joiner", "below_10", "add_one", "counter", - "multiplexer", + "branch_joiner", "below_10", "add_one", "counter", - "multiplexer", + "branch_joiner", "below_10", "add_two", "sum", @@ -660,10 +667,10 @@ def pipeline_that_has_a_greedy_and_variadic_component_after_a_component_with_def template = "Given this documents: {{ documents|join(', ', attribute='content') }} Answer this question: {{ query }}" pipeline.add_component("retriever", InMemoryBM25Retriever(document_store=document_store)) pipeline.add_component("prompt_builder", PromptBuilder(template=template)) - pipeline.add_component("multiplexer", Multiplexer(List[Document])) + pipeline.add_component("branch_joiner", BranchJoiner(List[Document])) - pipeline.connect("retriever", "multiplexer") - pipeline.connect("multiplexer", "prompt_builder.documents") + pipeline.connect("retriever", "branch_joiner") + pipeline.connect("branch_joiner", "prompt_builder.documents") return ( pipeline, [ @@ -682,7 +689,7 @@ def pipeline_that_has_a_greedy_and_variadic_component_after_a_component_with_def "question" } }, - expected_run_order=["retriever", "multiplexer", "prompt_builder"], + expected_run_order=["retriever", "branch_joiner", "prompt_builder"], ) ], ) @@ -1063,20 +1070,20 @@ def pipeline_that_is_linear_and_returns_intermediate_outputs(): def pipeline_that_has_a_loop_and_returns_intermediate_outputs_from_it(): pipeline = Pipeline(max_loops_allowed=10) pipeline.add_component("add_one", AddFixedValue(add=1)) - pipeline.add_component("multiplexer", Multiplexer(type_=int)) + pipeline.add_component("branch_joiner", BranchJoiner(type_=int)) pipeline.add_component("below_10", Threshold(threshold=10)) pipeline.add_component("below_5", Threshold(threshold=5)) pipeline.add_component("add_three", AddFixedValue(add=3)) pipeline.add_component("accumulator", Accumulate()) pipeline.add_component("add_two", AddFixedValue(add=2)) - pipeline.connect("add_one.result", "multiplexer") - pipeline.connect("multiplexer.value", "below_10.value") + pipeline.connect("add_one.result", "branch_joiner") + pipeline.connect("branch_joiner.value", "below_10.value") pipeline.connect("below_10.below", "accumulator.value") pipeline.connect("accumulator.value", "below_5.value") pipeline.connect("below_5.above", "add_three.value") - pipeline.connect("below_5.below", "multiplexer") - pipeline.connect("add_three.result", "multiplexer") + pipeline.connect("below_5.below", "branch_joiner") + pipeline.connect("add_three.result", "branch_joiner") pipeline.connect("below_10.above", "add_two.value") return ( @@ -1087,7 +1094,7 @@ def pipeline_that_has_a_loop_and_returns_intermediate_outputs_from_it(): include_outputs_from={ "add_two", "add_one", - "multiplexer", + "branch_joiner", "below_10", "accumulator", "below_5", @@ -1096,7 +1103,7 @@ def pipeline_that_has_a_loop_and_returns_intermediate_outputs_from_it(): expected_outputs={ "add_two": {"result": 13}, "add_one": {"result": 4}, - "multiplexer": {"value": 11}, + "branch_joiner": {"value": 11}, "below_10": {"above": 11}, "accumulator": {"value": 8}, "below_5": {"above": 8}, @@ -1104,16 +1111,16 @@ def pipeline_that_has_a_loop_and_returns_intermediate_outputs_from_it(): }, expected_run_order=[ "add_one", - "multiplexer", + "branch_joiner", "below_10", "accumulator", "below_5", - "multiplexer", + "branch_joiner", "below_10", "accumulator", "below_5", "add_three", - "multiplexer", + "branch_joiner", "below_10", "add_two", ], diff --git a/test/core/pipeline/test_pipeline.py b/test/core/pipeline/test_pipeline.py index f87dfa6e84..106533603d 100644 --- a/test/core/pipeline/test_pipeline.py +++ b/test/core/pipeline/test_pipeline.py @@ -10,7 +10,6 @@ from haystack import Document from haystack.components.builders import PromptBuilder, AnswerBuilder from haystack.components.joiners import BranchJoiner -from haystack.components.others import Multiplexer from haystack.core.component import component from haystack.core.component.types import InputSocket, OutputSocket, Variadic from haystack.core.errors import DeserializationError, PipelineConnectError, PipelineDrawingError, PipelineError @@ -855,17 +854,17 @@ def test__init_inputs_state(self): {{ questions | join("\n") }} """ pipe.add_component("prompt_builder", PromptBuilder(template=template)) - pipe.add_component("multiplexer", Multiplexer(type_=int)) + pipe.add_component("branch_joiner", BranchJoiner(type_=int)) questions = ["What is the capital of Italy?", "What is the capital of France?"] data = { "prompt_builder": {"questions": questions}, - "multiplexer": {"value": 1}, + "branch_joiner": {"value": 1}, "not_a_component": "some input data", } res = pipe._init_inputs_state(data) assert res == { "prompt_builder": {"questions": ["What is the capital of Italy?", "What is the capital of France?"]}, - "multiplexer": {"value": [1]}, + "branch_joiner": {"value": [1]}, "not_a_component": "some input data", } assert id(questions) != id(res["prompt_builder"]["questions"])