Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support networkx to represent components as a graph #331

Merged
merged 41 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
ee29743
Add networkx compatibility
irux Aug 22, 2023
0ace374
Fix formatting
irux Aug 22, 2023
bb61f5f
Save progress with ci working
irux Aug 22, 2023
862aba0
Linting
irux Aug 22, 2023
8f46969
Fix pyright
irux Aug 22, 2023
769bd04
Test new dependency
irux Aug 28, 2023
a97148a
Fix lock
irux Aug 28, 2023
d929459
Fix problem when names are not overriten
irux Aug 28, 2023
7408e02
Delete pl show
irux Aug 28, 2023
a1ae585
Deleting ploting graph
irux Aug 28, 2023
36a86b0
Add test and fix some edge cases
irux Aug 29, 2023
c3aba1d
remove unused dependency
irux Aug 29, 2023
7476784
Delete more unused depencendies
irux Aug 29, 2023
f364f00
Delete matploit for testing
irux Aug 29, 2023
c197969
Test and changes
irux Aug 30, 2023
00da489
Implement comments
irux Sep 5, 2023
d84e0ea
Linting
irux Sep 5, 2023
147ff74
Format
irux Sep 5, 2023
e9f8712
Add more graph tests
irux Sep 5, 2023
d35bc41
Remove unused
irux Sep 5, 2023
5262f17
Implement comments
irux Sep 5, 2023
f81d573
Reformat
irux Sep 5, 2023
3ced2f6
Implement changes
irux Sep 5, 2023
35b7d8c
Implement changes
irux Sep 11, 2023
20e0c09
Replace link
irux Sep 11, 2023
510de2f
Fix link to kpops-examples (#357)
sujuka99 Sep 11, 2023
f1dbe1f
Fix docstring
irux Sep 12, 2023
931731b
Join tests
irux Sep 12, 2023
0052cd1
Increase timeout
irux Sep 12, 2023
f9cc88a
Save progress
irux Sep 12, 2023
b4e6a9e
Merge last state
irux Sep 12, 2023
184bc43
Implement changes
irux Sep 12, 2023
bd724e1
Delete unused imports
irux Sep 12, 2023
133005a
Implement changes
irux Sep 13, 2023
b956ebc
Fix test
irux Sep 18, 2023
df16d00
Solve linting
irux Sep 18, 2023
8b87409
Fix pyright
irux Sep 18, 2023
d73f169
Fix linting
irux Sep 18, 2023
a2a3192
Delete from implemented methods
irux Sep 18, 2023
5432c8a
Implement changes
irux Sep 18, 2023
1e89223
Change to id
irux Sep 18, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions kpops/components/base_components/kafka_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,11 @@ def __run_kafka_connect_resetter(self, dry_run: bool) -> None:
class KafkaSinkConnector(KafkaConnector):
"""Kafka sink connector model"""

@override
def get_input_topics(self) -> list[str]:
topics = getattr(self.app, "topics", None)
return topics.split(",") if topics is not None else []

@override
def add_input_topics(self, topics: list[str]) -> None:
existing_topics: str | None = getattr(self.app, "topics", None)
Expand Down
28 changes: 28 additions & 0 deletions kpops/components/base_components/pipeline_component.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from abc import ABC
from collections.abc import Iterator

from pydantic import Extra, Field

Expand Down Expand Up @@ -58,6 +59,21 @@ def __init__(self, **kwargs) -> None:
self.set_input_topics()
self.set_output_topics()

def get_input_topics(self) -> list[str]:
"""Get all the input topics from config."""
return []

def get_extra_input_topics(self) -> dict[str, list[str]]:
"""Get extra input topics list from config."""
return {}

def get_output_topic(self) -> str | None:
"""Get output topic from config."""

def get_extra_output_topics(self) -> dict[str, str]:
"""Get extra output topics list from config."""
return {}

@property
def full_name(self) -> str:
return self.prefix + self.name
Expand Down Expand Up @@ -116,6 +132,18 @@ def set_input_topics(self) -> None:
for name, topic in self.from_.topics.items():
self.apply_from_inputs(name, topic)

@property
def inputs(self) -> Iterator[str]:
yield from self.get_input_topics()
for role_topics in self.get_extra_input_topics().values():
yield from role_topics

@property
def outputs(self) -> Iterator[str]:
if output_topic := self.get_output_topic():
yield output_topic
yield from self.get_extra_output_topics().values()

def apply_from_inputs(self, name: str, topic: FromTopic) -> None:
"""Add a `from` section input to the component config

Expand Down
8 changes: 8 additions & 0 deletions kpops/components/streams_bootstrap/producer/producer_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ def apply_to_outputs(self, name: str, topic: TopicConfig) -> None:
case _:
super().apply_to_outputs(name, topic)

@override
def get_output_topic(self) -> str | None:
return self.app.streams.output_topic

@override
def get_extra_output_topics(self) -> dict[str, str]:
return self.app.streams.extra_output_topics

@override
def set_output_topic(self, topic_name: str) -> None:
self.app.streams.output_topic = topic_name
Expand Down
16 changes: 16 additions & 0 deletions kpops/components/streams_bootstrap/streams/streams_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,22 @@ class StreamsApp(KafkaApp):
description=describe_attr("app", __doc__),
)

@override
def get_input_topics(self) -> list[str]:
return self.app.streams.input_topics

@override
def get_extra_input_topics(self) -> dict[str, list[str]]:
return self.app.streams.extra_input_topics

@override
def get_output_topic(self) -> str | None:
return self.app.streams.output_topic

disrupted marked this conversation as resolved.
Show resolved Hide resolved
@override
def get_extra_output_topics(self) -> dict[str, str] | None:
disrupted marked this conversation as resolved.
Show resolved Hide resolved
return self.app.streams.extra_output_topics

@override
def add_input_topics(self, topics: list[str]) -> None:
self.app.streams.add_input_topics(topics)
Expand Down
36 changes: 35 additions & 1 deletion kpops/pipeline_generator/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
from contextlib import suppress
from pathlib import Path

import networkx as nx
import yaml
from pydantic import BaseModel
from pydantic import BaseModel, Field
from rich.console import Console
from rich.syntax import Syntax

Expand All @@ -35,6 +36,10 @@ class PipelineComponents(BaseModel):
"""Stores the pipeline components"""

components: list[PipelineComponent] = []
graph: nx.DiGraph = Field(default=nx.DiGraph(), exclude=True)

class Config:
arbitrary_types_allowed = True

@property
def last(self) -> PipelineComponent:
Expand All @@ -59,6 +64,10 @@ def __iter__(self) -> Iterator[PipelineComponent]:
def __len__(self) -> int:
return len(self.components)

def validate_graph(self) -> None:
if not nx.is_directed_acyclic_graph(self.graph):
raise ValueError("Pipeline is not a valid DAG.")

def validate_unique_names(self) -> None:
step_names = [component.full_name for component in self.components]
duplicates = [name for name, count in Counter(step_names).items() if count > 1]
Expand All @@ -67,6 +76,29 @@ def validate_unique_names(self) -> None:
f"step names should be unique. duplicate step names: {', '.join(duplicates)}"
)

def generate_graph(self) -> None:
for component in self.components:
component_node_name = self.__get_vertex_component_name(component)
self.graph.add_node(component_node_name)

for input_topic in component.inputs:
self.__add_input(input_topic, component_node_name)

for output_topic in component.outputs:
self.__add_output(output_topic, component_node_name)

def __add_output(self, output_topic: str, source: str) -> None:
self.graph.add_node(output_topic)
self.graph.add_edge(source, output_topic)

def __add_input(self, input_topic: str, component_node_name: str) -> None:
self.graph.add_node(input_topic)
self.graph.add_edge(input_topic, component_node_name)

@staticmethod
def __get_vertex_component_name(component: PipelineComponent) -> str:
return f"component-{component.full_name}"
disrupted marked this conversation as resolved.
Show resolved Hide resolved
disrupted marked this conversation as resolved.
Show resolved Hide resolved

@staticmethod
def _populate_component_name(component: PipelineComponent) -> None: # TODO: remove
with suppress(
Expand Down Expand Up @@ -109,6 +141,7 @@ def __init__(
self.registry = registry
self.env_components_index = create_env_components_index(environment_components)
self.parse_components(component_list)
self.components.generate_graph()
self.validate()
disrupted marked this conversation as resolved.
Show resolved Hide resolved

@classmethod
Expand Down Expand Up @@ -308,6 +341,7 @@ def substitute_in_component(self, component_as_dict: dict) -> dict:

def validate(self) -> None:
self.components.validate_unique_names()
self.components.validate_graph()

@staticmethod
def pipeline_filename_environment(path: Path, config: PipelineConfig) -> Path:
Expand Down
20 changes: 19 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ cachetools = "^5.2.0"
dictdiffer = "^0.9.0"
python-schema-registry-client = "^2.4.1"
httpx = "^0.24.1"
networkx = "^3.1"


[tool.poetry.group.dev.dependencies]
Expand Down
34 changes: 34 additions & 0 deletions tests/components/test_producer_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,3 +247,37 @@ async def test_should_clean_producer_app_and_deploy_clean_up_job_and_delete_clea
False,
),
]

def test_get_output_topics(
self,
config: PipelineConfig,
handlers: ComponentHandlers,
):
producer_app = ProducerApp(
name=self.PRODUCER_APP_NAME,
config=config,
handlers=handlers,
**{
"namespace": "test-namespace",
"app": {
"namespace": "test-namespace",
"streams": {"brokers": "fake-broker:9092"},
},
"to": {
"topics": {
"${output_topic_name}": TopicConfig(
type=OutputTopicTypes.OUTPUT, partitions_count=10
),
"extra-topic-1": TopicConfig(
role="first-extra-topic",
partitions_count=10,
),
}
},
},
)
assert producer_app.get_output_topic() == "${output_topic_name}"
disrupted marked this conversation as resolved.
Show resolved Hide resolved
assert producer_app.get_extra_output_topics() == {
"first-extra-topic": "extra-topic-1"
}
assert producer_app.get_input_topics() == []
45 changes: 45 additions & 0 deletions tests/components/test_streams_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,3 +443,48 @@ async def test_should_clean_streams_app_and_deploy_clean_up_job_and_delete_clean
dry_run,
),
]

@pytest.mark.asyncio
async def test_get_input_output_topics(
self, config: PipelineConfig, handlers: ComponentHandlers
):
streams_app = StreamsApp(
name=self.STREAMS_APP_NAME,
config=config,
handlers=handlers,
**{
"namespace": "test-namespace",
"app": {
"streams": {"brokers": "fake-broker:9092"},
},
"from": {
"topics": {
"example-input": {"type": "input"},
"b": {"type": "input"},
"a": {"type": "input"},
"topic-extra2": {"role": "role2"},
"topic-extra3": {"role": "role2"},
"topic-extra": {"role": "role1"},
".*": {"type": "pattern"},
"example.*": {
"type": "pattern",
"role": "another-pattern",
},
}
},
"to": {
"topics": {
"example-output": {"type": "output"},
"extra-topic": {"role": "fake-role"},
}
},
},
)

assert streams_app.get_input_topics() == ["example-input", "b", "a"]
assert streams_app.get_extra_input_topics() == {
"role1": ["topic-extra"],
"role2": ["topic-extra2", "topic-extra3"],
}
assert streams_app.get_output_topic() == "example-output"
assert streams_app.get_extra_output_topics() == {"fake-role": "extra-topic"}
19 changes: 19 additions & 0 deletions tests/pipeline/resources/pipeline-with-loop/defaults.yaml
sujuka99 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
pipeline-component:
prefix: ""

kubernetes-app:
namespace: example-namespace

kafka-connector:
namespace: example-namespace

kafka-app:
app:
streams:
brokers: 127.0.0.1:9092
schemaRegistryUrl: 127.0.0.1:8081

streams-app:
app:
labels:
pipeline: ${pipeline_name}
34 changes: 34 additions & 0 deletions tests/pipeline/resources/pipeline-with-loop/pipeline.yaml
sujuka99 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
- type: producer-app
name: app1
app:
image: producer-image
to:
topics:
my-output-topic:
type: output

- type: streams-app
name: app2
app:
image: app2-image
from:
topics:
my-output-topic:
type: input
to:
topics:
my-app2-topic:
type: output

- type: streams-app
name: app3
app:
image: app3-image
from:
topics:
my-app2-topic:
type: input
to:
topics:
my-output-topic:
type: output
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ kafka-app:
schema_registry_url: "${schema_registry_url}"
version: "2.4.2"

producer-app:
to:
topics:
${output_topic_name}:
partitions_count: 3


streams-app: # inherits from kafka-app
app:
streams:
Expand All @@ -19,7 +26,7 @@ streams-app: # inherits from kafka-app
type:
error-topic:
type: error
extra-topic:
extra-topic-output:
role: role
from:
topics:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
environment: development
defaults_path: ..
brokers: "broker:9092"
helm_diff_config:
enable: false
kafka_connect_host: "kafka_connect_host:8083"
kafka_rest_host: "kafka_rest_host:8082"

Loading