Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support networkx to represent components as a graph #331

Merged
merged 41 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
ee29743
Add networkx compatibility
irux Aug 22, 2023
0ace374
Fix formatting
irux Aug 22, 2023
bb61f5f
Save progress with ci working
irux Aug 22, 2023
862aba0
Linting
irux Aug 22, 2023
8f46969
Fix pyright
irux Aug 22, 2023
769bd04
Test new dependency
irux Aug 28, 2023
a97148a
Fix lock
irux Aug 28, 2023
d929459
Fix problem when names are not overriten
irux Aug 28, 2023
7408e02
Delete pl show
irux Aug 28, 2023
a1ae585
Deleting ploting graph
irux Aug 28, 2023
36a86b0
Add test and fix some edge cases
irux Aug 29, 2023
c3aba1d
remove unused dependency
irux Aug 29, 2023
7476784
Delete more unused depencendies
irux Aug 29, 2023
f364f00
Delete matploit for testing
irux Aug 29, 2023
c197969
Test and changes
irux Aug 30, 2023
00da489
Implement comments
irux Sep 5, 2023
d84e0ea
Linting
irux Sep 5, 2023
147ff74
Format
irux Sep 5, 2023
e9f8712
Add more graph tests
irux Sep 5, 2023
d35bc41
Remove unused
irux Sep 5, 2023
5262f17
Implement comments
irux Sep 5, 2023
f81d573
Reformat
irux Sep 5, 2023
3ced2f6
Implement changes
irux Sep 5, 2023
35b7d8c
Implement changes
irux Sep 11, 2023
20e0c09
Replace link
irux Sep 11, 2023
510de2f
Fix link to kpops-examples (#357)
sujuka99 Sep 11, 2023
f1dbe1f
Fix docstring
irux Sep 12, 2023
931731b
Join tests
irux Sep 12, 2023
0052cd1
Increase timeout
irux Sep 12, 2023
f9cc88a
Save progress
irux Sep 12, 2023
b4e6a9e
Merge last state
irux Sep 12, 2023
184bc43
Implement changes
irux Sep 12, 2023
bd724e1
Delete unused imports
irux Sep 12, 2023
133005a
Implement changes
irux Sep 13, 2023
b956ebc
Fix test
irux Sep 18, 2023
df16d00
Solve linting
irux Sep 18, 2023
8b87409
Fix pyright
irux Sep 18, 2023
d73f169
Fix linting
irux Sep 18, 2023
a2a3192
Delete from implemented methods
irux Sep 18, 2023
5432c8a
Implement changes
irux Sep 18, 2023
1e89223
Change to id
irux Sep 18, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions kpops/components/base_components/kafka_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,11 @@ class KafkaSinkConnector(KafkaConnector):
exclude=True,
)

@override
def get_input_topics(self) -> list[str]:
topics = getattr(self.app, "topics", None)
return topics.split(",") if topics is not None else []

@override
def add_input_topics(self, topics: list[str]) -> None:
existing_topics: str | None = getattr(self.app, "topics", None)
Expand Down
14 changes: 14 additions & 0 deletions kpops/components/base_components/pipeline_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,20 @@ def __init__(self, **kwargs) -> None:
self.set_input_topics()
self.set_output_topics()

def get_input_topics(self) -> list[str] | None:
disrupted marked this conversation as resolved.
Show resolved Hide resolved
disrupted marked this conversation as resolved.
Show resolved Hide resolved
"""
Get all the input topics
"""

def get_extra_output_topics(self) -> dict[str, str] | None:
...

def get_extra_input_topics(self) -> dict[str, list[str]] | None:
disrupted marked this conversation as resolved.
Show resolved Hide resolved
disrupted marked this conversation as resolved.
Show resolved Hide resolved
...

def get_output_topic(self) -> str | None:
...

def add_input_topics(self, topics: list[str]) -> None:
"""Add given topics to the list of input topics.

Expand Down
8 changes: 8 additions & 0 deletions kpops/components/streams_bootstrap/producer/producer_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ def apply_to_outputs(self, name: str, topic: TopicConfig) -> None:
case _:
super().apply_to_outputs(name, topic)

@override
def get_output_topic(self) -> str | None:
return self.app.streams.output_topic

@override
def get_extra_output_topics(self) -> dict[str, str]:
return self.app.streams.extra_output_topics

@override
def set_output_topic(self, topic_name: str) -> None:
self.app.streams.output_topic = topic_name
Expand Down
12 changes: 12 additions & 0 deletions kpops/components/streams_bootstrap/streams/streams_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ class StreamsApp(KafkaApp):
class Config(DescConfig):
extra = Extra.allow

@override
def get_input_topics(self) -> list[str] | None:
return self.app.streams.input_topics

@override
def get_extra_input_topics(self) -> dict[str, list[str]] | None:
return self.app.streams.extra_input_topics

@override
def get_output_topic(self) -> str | None:
return self.app.streams.output_topic

disrupted marked this conversation as resolved.
Show resolved Hide resolved
@override
def add_input_topics(self, topics: list[str]) -> None:
self.app.streams.add_input_topics(topics)
Expand Down
66 changes: 65 additions & 1 deletion kpops/pipeline_generator/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
from contextlib import suppress
from pathlib import Path

import networkx as nx
import yaml
from pydantic import BaseModel
from pydantic import BaseModel, Field
from rich.console import Console
from rich.syntax import Syntax

Expand All @@ -35,6 +36,10 @@ class PipelineComponents(BaseModel):
"""Stores the pipeline components"""

components: list[PipelineComponent] = []
graph_components: nx.DiGraph = Field(default=nx.DiGraph(), exclude=True)
disrupted marked this conversation as resolved.
Show resolved Hide resolved

class Config:
arbitrary_types_allowed = True

@property
def last(self) -> PipelineComponent:
Expand All @@ -59,6 +64,10 @@ def __iter__(self) -> Iterator[PipelineComponent]:
def __len__(self) -> int:
return len(self.components)

def validate_graph_components(self) -> None:
disrupted marked this conversation as resolved.
Show resolved Hide resolved
if not nx.is_directed_acyclic_graph(self.graph_components):
raise ValueError("Component graph contain loops!")
disrupted marked this conversation as resolved.
Show resolved Hide resolved

def validate_unique_names(self) -> None:
step_names = [component.name for component in self.components]
duplicates = [name for name, count in Counter(step_names).items() if count > 1]
Expand Down Expand Up @@ -112,6 +121,7 @@ def __init__(
self.registry = registry
self.env_components_index = create_env_components_index(environment_components)
self.parse_components(component_list)
self.__generate_graph()
self.validate()

@classmethod
Expand Down Expand Up @@ -160,6 +170,59 @@ def load_from_yaml(
pipeline = cls(main_content, env_content, registry, config, handlers)
return pipeline

def __generate_graph(self):
disrupted marked this conversation as resolved.
Show resolved Hide resolved
for component in self.components:
all_input_topics = self.__get_all_input_topics(component)
all_output_topics = self.__get_all_output_topics(component)

component_node_name = self.__get_vertex_component_name(component)
self.components.graph_components.add_node(component_node_name)

self.__add_ingoing_edges(all_input_topics, component_node_name)
self.__add_outgoing_edges(all_output_topics, component_node_name)

def __add_outgoing_edges(
self, all_output_topics: list[str], component_node_name: str
) -> None:
for output_topic in all_output_topics:
self.components.graph_components.add_node(output_topic)
self.components.graph_components.add_edge(component_node_name, output_topic)

def __add_ingoing_edges(
self, all_input_topics: list[str], component_node_name: str
) -> None:
for input_topic in all_input_topics:
self.components.graph_components.add_node(input_topic)
self.components.graph_components.add_edge(input_topic, component_node_name)

def __get_vertex_component_name(self, component: PipelineComponent) -> str:
disrupted marked this conversation as resolved.
Show resolved Hide resolved
component_vertex_name = f"component-{component.name}"
disrupted marked this conversation as resolved.
Show resolved Hide resolved
return component_vertex_name
disrupted marked this conversation as resolved.
Show resolved Hide resolved

def __get_all_output_topics(self, component: PipelineComponent) -> list[str]:
all_output_topics: list[str] = []
output_topics = component.get_output_topic()
disrupted marked this conversation as resolved.
Show resolved Hide resolved
extra_output_topics = component.get_extra_output_topics()
if output_topics is not None:
all_output_topics += [output_topics]
disrupted marked this conversation as resolved.
Show resolved Hide resolved
if extra_output_topics is not None and extra_output_topics:
disrupted marked this conversation as resolved.
Show resolved Hide resolved
all_output_topics += list(extra_output_topics.values())
return all_output_topics

def __get_all_input_topics(self, component: PipelineComponent) -> list[str]:
input_topics = component.get_input_topics()
extra_input_topics = component.get_extra_input_topics()
all_input_topics: list[str] = []
if input_topics is not None:
all_input_topics += input_topics
if extra_input_topics is not None and extra_input_topics:
disrupted marked this conversation as resolved.
Show resolved Hide resolved
all_input_topics += [
topic
for list_topics in extra_input_topics.values()
for topic in list_topics
]
disrupted marked this conversation as resolved.
Show resolved Hide resolved
return all_input_topics

def parse_components(self, component_list: list[dict]) -> None:
"""Instantiate, enrich and inflate a list of components

Expand Down Expand Up @@ -327,6 +390,7 @@ def substitute_in_component(self, component_as_dict: dict) -> dict:

def validate(self) -> None:
self.components.validate_unique_names()
self.components.validate_graph_components()

@staticmethod
def pipeline_filename_environment(path: Path, config: PipelineConfig) -> Path:
Expand Down
20 changes: 19 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ cachetools = "^5.2.0"
dictdiffer = "^0.9.0"
python-schema-registry-client = "^2.4.1"
httpx = "^0.24.1"
networkx = "^3.1"


[tool.poetry.group.dev.dependencies]
Expand Down
33 changes: 33 additions & 0 deletions tests/components/test_producer_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,3 +240,36 @@ async def test_should_clean_producer_app_and_deploy_clean_up_job_and_delete_clea
"test-namespace", self.PRODUCER_APP_CLEAN_NAME, False
),
]

def test_get_output_topics(
self, config: PipelineConfig, handlers: ComponentHandlers
):
producer_app = ProducerApp(
name=self.PRODUCER_APP_NAME,
config=config,
handlers=handlers,
**{
"namespace": "test-namespace",
"app": {
"namespace": "test-namespace",
"streams": {"brokers": "fake-broker:9092"},
},
"to": {
"topics": {
"${output_topic_name}": TopicConfig(
type=OutputTopicTypes.OUTPUT, partitions_count=10
),
"extra-topic-1": TopicConfig(
type=OutputTopicTypes.EXTRA,
role="first-extra-topic",
partitions_count=10,
),
}
},
},
)

assert producer_app.get_output_topic() == "${output_topic_name}"
disrupted marked this conversation as resolved.
Show resolved Hide resolved
assert producer_app.get_extra_output_topics() == {
"first-extra-topic": "extra-topic-1"
}
59 changes: 59 additions & 0 deletions tests/components/test_streams_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,3 +429,62 @@ async def test_should_clean_streams_app_and_deploy_clean_up_job_and_delete_clean
"test-namespace", self.STREAMS_APP_CLEAN_NAME, dry_run
),
]

@pytest.mark.asyncio
async def test_get_topics(
disrupted marked this conversation as resolved.
Show resolved Hide resolved
self, config: PipelineConfig, handlers: ComponentHandlers
):
streams_app = StreamsApp(
name=self.STREAMS_APP_NAME,
config=config,
handlers=handlers,
**{
"namespace": "test-namespace",
"app": {
"streams": {"brokers": "fake-broker:9092"},
},
"from": {
"topics": {
"example-input": {"type": "input"},
"b": {"type": "input"},
"a": {"type": "input"},
"topic-extra2": {"type": "extra", "role": "role2"},
"topic-extra3": {"type": "extra", "role": "role2"},
"topic-extra": {"type": "extra", "role": "role1"},
".*": {"type": "input-pattern"},
"example.*": {
"type": "extra-pattern",
"role": "another-pattern",
},
}
},
},
)
assert streams_app.get_input_topics() == ["example-input", "b", "a"]
assert streams_app.get_extra_input_topics() == {
"role1": ["topic-extra"],
"role2": ["topic-extra2", "topic-extra3"],
}

@pytest.mark.asyncio
async def test_get_output_topic(
self, config: PipelineConfig, handlers: ComponentHandlers
):
streams_app = StreamsApp(
name=self.STREAMS_APP_NAME,
config=config,
handlers=handlers,
**{
"namespace": "test-namespace",
"app": {
"streams": {"brokers": "fake-broker:9092"},
},
"from": {
"topics": {
"example-input": {"type": "input"},
}
},
"to": {"topics": {"example-output": {"type": "output"}}},
},
)
assert streams_app.get_output_topic() == "example-output"
22 changes: 22 additions & 0 deletions tests/pipeline/resources/pipeline-with-loop/defaults.yaml
sujuka99 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
pipeline-component:
prefix: ""

kubernetes-app:
namespace: example-namespace

kafka-connector:
namespace: example-namespace

kafka-app:
app:
streams:
brokers: 127.0.0.1:9092
schemaRegistryUrl: 127.0.0.1:8081
optimizeLeaveGroupBehavior: false

streams-app:
app:
labels:
pipeline: ${pipeline_name}
streams:
optimizeLeaveGroupBehavior: false
34 changes: 34 additions & 0 deletions tests/pipeline/resources/pipeline-with-loop/pipeline.yaml
sujuka99 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
- type: producer
name: app1
app:
image: producer-image
to:
topics:
my-output-topic:
type: output

- type: streams-app
name: app2
app:
image: app2-image
from:
topics:
my-output-topic:
type: input
to:
topics:
my-app2-topic:
type: output

- type: streams-app
name: app3
app:
image: app3-image
from:
topics:
my-app2-topic:
type: input
to:
topics:
my-output-topic:
type: output
15 changes: 15 additions & 0 deletions tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,3 +480,18 @@ def test_validate_unique_step_names(self):
],
catch_exceptions=False,
)

def test_validate_loops_on_pipeline(self):
with pytest.raises(ValueError, match="Component graph contain loops!"):
runner.invoke(
app,
[
"generate",
"--pipeline-base-dir",
str(PIPELINE_BASE_DIR_PATH),
str(RESOURCE_PATH / "pipeline-with-loop/pipeline.yaml"),
"--defaults",
str(RESOURCE_PATH / "pipeline-with-loop"),
],
catch_exceptions=False,
)