Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support networkx to represent components as a graph #331

Merged
merged 41 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
ee29743
Add networkx compatibility
irux Aug 22, 2023
0ace374
Fix formatting
irux Aug 22, 2023
bb61f5f
Save progress with ci working
irux Aug 22, 2023
862aba0
Linting
irux Aug 22, 2023
8f46969
Fix pyright
irux Aug 22, 2023
769bd04
Test new dependency
irux Aug 28, 2023
a97148a
Fix lock
irux Aug 28, 2023
d929459
Fix problem when names are not overriten
irux Aug 28, 2023
7408e02
Delete pl show
irux Aug 28, 2023
a1ae585
Deleting ploting graph
irux Aug 28, 2023
36a86b0
Add test and fix some edge cases
irux Aug 29, 2023
c3aba1d
remove unused dependency
irux Aug 29, 2023
7476784
Delete more unused depencendies
irux Aug 29, 2023
f364f00
Delete matploit for testing
irux Aug 29, 2023
c197969
Test and changes
irux Aug 30, 2023
00da489
Implement comments
irux Sep 5, 2023
d84e0ea
Linting
irux Sep 5, 2023
147ff74
Format
irux Sep 5, 2023
e9f8712
Add more graph tests
irux Sep 5, 2023
d35bc41
Remove unused
irux Sep 5, 2023
5262f17
Implement comments
irux Sep 5, 2023
f81d573
Reformat
irux Sep 5, 2023
3ced2f6
Implement changes
irux Sep 5, 2023
35b7d8c
Implement changes
irux Sep 11, 2023
20e0c09
Replace link
irux Sep 11, 2023
510de2f
Fix link to kpops-examples (#357)
sujuka99 Sep 11, 2023
f1dbe1f
Fix docstring
irux Sep 12, 2023
931731b
Join tests
irux Sep 12, 2023
0052cd1
Increase timeout
irux Sep 12, 2023
f9cc88a
Save progress
irux Sep 12, 2023
b4e6a9e
Merge last state
irux Sep 12, 2023
184bc43
Implement changes
irux Sep 12, 2023
bd724e1
Delete unused imports
irux Sep 12, 2023
133005a
Implement changes
irux Sep 13, 2023
b956ebc
Fix test
irux Sep 18, 2023
df16d00
Solve linting
irux Sep 18, 2023
8b87409
Fix pyright
irux Sep 18, 2023
d73f169
Fix linting
irux Sep 18, 2023
a2a3192
Delete from implemented methods
irux Sep 18, 2023
5432c8a
Implement changes
irux Sep 18, 2023
1e89223
Change to id
irux Sep 18, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions docs/docs/resources/examples/defaults.md
disrupted marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,18 @@
--8<--
```

## [Word-count Pipeline](https://github.com/bakdata/kpops-examples/tree/main/word-count/deployment/kpops/defaults){target=_blank}
<!-- dprint-ignore-end -->

## [Word-count Pipeline](https://github.com/bakdata/kpops-examples/tree/main/word-count/deployment/kpops){target=_blank}

<!-- dprint-ignore-start -->

??? example "defaults.yaml"
```yaml
--8<--
https://raw.githubusercontent.com/bakdata/kpops-examples/main/word-count/deployment/kpops/defaults/defaults.yaml
--8<--
--8<--
https://raw.githubusercontent.com/bakdata/kpops-examples/main/word-count/deployment/kpops/defaults.yaml
--8<--
```

<!-- dprint-ignore-end -->

5 changes: 5 additions & 0 deletions kpops/components/base_components/kafka_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,11 @@ class KafkaSinkConnector(KafkaConnector):
exclude=True,
)

@override
def get_input_topics(self) -> list[str]:
topics = getattr(self.app, "topics", None)
return topics.split(",") if topics is not None else []

@override
def add_input_topics(self, topics: list[str]) -> None:
existing_topics: str | None = getattr(self.app, "topics", None)
Expand Down
18 changes: 18 additions & 0 deletions kpops/components/base_components/pipeline_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,24 @@ def __init__(self, **kwargs) -> None:
self.set_input_topics()
self.set_output_topics()

def get_input_topics(self) -> list[str] | None:
disrupted marked this conversation as resolved.
Show resolved Hide resolved
disrupted marked this conversation as resolved.
Show resolved Hide resolved
"""
Get all the input topics from config.
"""

def get_extra_output_topics(self) -> dict[str, str] | None:
"""
Get extra output topics list from config.
"""

def get_extra_input_topics(self) -> dict[str, list[str]] | None:
disrupted marked this conversation as resolved.
Show resolved Hide resolved
disrupted marked this conversation as resolved.
Show resolved Hide resolved
"""
Get extra input topics list from config.
"""

def get_output_topic(self) -> str | None:
...

def add_input_topics(self, topics: list[str]) -> None:
"""Add given topics to the list of input topics.

Expand Down
8 changes: 8 additions & 0 deletions kpops/components/streams_bootstrap/producer/producer_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ def apply_to_outputs(self, name: str, topic: TopicConfig) -> None:
case _:
super().apply_to_outputs(name, topic)

@override
def get_output_topic(self) -> str | None:
return self.app.streams.output_topic

@override
def get_extra_output_topics(self) -> dict[str, str]:
return self.app.streams.extra_output_topics

@override
def set_output_topic(self, topic_name: str) -> None:
self.app.streams.output_topic = topic_name
Expand Down
12 changes: 12 additions & 0 deletions kpops/components/streams_bootstrap/streams/streams_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ class StreamsApp(KafkaApp):
class Config(DescConfig):
extra = Extra.allow

@override
def get_input_topics(self) -> list[str] | None:
return self.app.streams.input_topics

@override
def get_extra_input_topics(self) -> dict[str, list[str]] | None:
return self.app.streams.extra_input_topics

@override
def get_output_topic(self) -> str | None:
return self.app.streams.output_topic

disrupted marked this conversation as resolved.
Show resolved Hide resolved
@override
def add_input_topics(self, topics: list[str]) -> None:
self.app.streams.add_input_topics(topics)
Expand Down
63 changes: 62 additions & 1 deletion kpops/pipeline_generator/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
from collections import Counter
from collections.abc import Iterator
from contextlib import suppress
from itertools import chain
from pathlib import Path

import networkx as nx
import yaml
from pydantic import BaseModel
from pydantic import BaseModel, Field
from rich.console import Console
from rich.syntax import Syntax

Expand All @@ -35,6 +37,10 @@ class PipelineComponents(BaseModel):
"""Stores the pipeline components"""

components: list[PipelineComponent] = []
graph: nx.DiGraph = Field(default=nx.DiGraph(), exclude=True)

class Config:
arbitrary_types_allowed = True

@property
def last(self) -> PipelineComponent:
Expand All @@ -59,6 +65,10 @@ def __iter__(self) -> Iterator[PipelineComponent]:
def __len__(self) -> int:
return len(self.components)

def validate_graph_components(self) -> None:
disrupted marked this conversation as resolved.
Show resolved Hide resolved
if not nx.is_directed_acyclic_graph(self.graph):
raise ValueError("Pipeline contains cycles.")
disrupted marked this conversation as resolved.
Show resolved Hide resolved

def validate_unique_names(self) -> None:
step_names = [component.name for component in self.components]
duplicates = [name for name, count in Counter(step_names).items() if count > 1]
Expand All @@ -67,6 +77,55 @@ def validate_unique_names(self) -> None:
f"step names should be unique. duplicate step names: {', '.join(duplicates)}"
)

def generate_graph(self) -> None:
for component in self.components:
all_input_topics = self.__get_all_input_topics(component)
all_output_topics = self.__get_all_output_topics(component)

component_node_name = self.__get_vertex_component_name(component)
self.graph.add_node(component_node_name)

self.__add_ingoing_edges(all_input_topics, component_node_name)
self.__add_outgoing_edges(all_output_topics, component_node_name)

def __add_outgoing_edges(
self, all_output_topics: list[str], component_node_name: str
) -> None:
disrupted marked this conversation as resolved.
Show resolved Hide resolved
for output_topic in all_output_topics:
self.graph.add_node(output_topic)
self.graph.add_edge(component_node_name, output_topic)

def __add_ingoing_edges(
self, all_input_topics: list[str], component_node_name: str
) -> None:
for input_topic in all_input_topics:
self.graph.add_node(input_topic)
self.graph.add_edge(input_topic, component_node_name)

@staticmethod
def __get_vertex_component_name(component: PipelineComponent) -> str:
return f"component-{component.name}"
disrupted marked this conversation as resolved.
Show resolved Hide resolved

def __get_all_output_topics(self, component: PipelineComponent) -> list[str]:
disrupted marked this conversation as resolved.
Show resolved Hide resolved
all_output_topics: list[str] = []
output_topic = component.get_output_topic()
extra_output_topics = component.get_extra_output_topics()
if output_topic is not None:
disrupted marked this conversation as resolved.
Show resolved Hide resolved
all_output_topics.append(output_topic)
if extra_output_topics:
all_output_topics.extend(list(extra_output_topics.values()))
return all_output_topics

def __get_all_input_topics(self, component: PipelineComponent) -> list[str]:
disrupted marked this conversation as resolved.
Show resolved Hide resolved
input_topics = component.get_input_topics()
extra_input_topics = component.get_extra_input_topics()
all_input_topics: list[str] = []
if input_topics is not None:
disrupted marked this conversation as resolved.
Show resolved Hide resolved
all_input_topics.extend(input_topics)
if extra_input_topics:
all_input_topics.extend(chain(*extra_input_topics.values()))
return all_input_topics

@staticmethod
def _populate_component_name(component: PipelineComponent) -> None:
component.name = component.prefix + component.name
Expand Down Expand Up @@ -112,6 +171,7 @@ def __init__(
self.registry = registry
self.env_components_index = create_env_components_index(environment_components)
self.parse_components(component_list)
self.components.generate_graph()
self.validate()
disrupted marked this conversation as resolved.
Show resolved Hide resolved

@classmethod
Expand Down Expand Up @@ -327,6 +387,7 @@ def substitute_in_component(self, component_as_dict: dict) -> dict:

def validate(self) -> None:
self.components.validate_unique_names()
self.components.validate_graph_components()

@staticmethod
def pipeline_filename_environment(path: Path, config: PipelineConfig) -> Path:
Expand Down
20 changes: 19 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ cachetools = "^5.2.0"
dictdiffer = "^0.9.0"
python-schema-registry-client = "^2.4.1"
httpx = "^0.24.1"
networkx = "^3.1"


[tool.poetry.group.dev.dependencies]
Expand Down
34 changes: 34 additions & 0 deletions tests/components/test_producer_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,3 +240,37 @@ async def test_should_clean_producer_app_and_deploy_clean_up_job_and_delete_clea
"test-namespace", self.PRODUCER_APP_CLEAN_NAME, False
),
]

def test_get_output_topics(
self, config: PipelineConfig, handlers: ComponentHandlers
):
producer_app = ProducerApp(
name=self.PRODUCER_APP_NAME,
config=config,
handlers=handlers,
**{
"namespace": "test-namespace",
"app": {
"namespace": "test-namespace",
"streams": {"brokers": "fake-broker:9092"},
},
"to": {
"topics": {
"${output_topic_name}": TopicConfig(
type=OutputTopicTypes.OUTPUT, partitions_count=10
),
"extra-topic-1": TopicConfig(
type=OutputTopicTypes.EXTRA,
role="first-extra-topic",
partitions_count=10,
),
}
},
},
)

assert producer_app.get_output_topic() == "${output_topic_name}"
disrupted marked this conversation as resolved.
Show resolved Hide resolved
assert producer_app.get_extra_output_topics() == {
"first-extra-topic": "extra-topic-1"
}
assert producer_app.get_input_topics() is None
59 changes: 59 additions & 0 deletions tests/components/test_streams_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,3 +429,62 @@ async def test_should_clean_streams_app_and_deploy_clean_up_job_and_delete_clean
"test-namespace", self.STREAMS_APP_CLEAN_NAME, dry_run
),
]

@pytest.mark.asyncio
async def test_get_topics(
disrupted marked this conversation as resolved.
Show resolved Hide resolved
self, config: PipelineConfig, handlers: ComponentHandlers
):
streams_app = StreamsApp(
name=self.STREAMS_APP_NAME,
config=config,
handlers=handlers,
**{
"namespace": "test-namespace",
"app": {
"streams": {"brokers": "fake-broker:9092"},
},
"from": {
"topics": {
"example-input": {"type": "input"},
"b": {"type": "input"},
"a": {"type": "input"},
"topic-extra2": {"type": "extra", "role": "role2"},
"topic-extra3": {"type": "extra", "role": "role2"},
"topic-extra": {"type": "extra", "role": "role1"},
".*": {"type": "input-pattern"},
"example.*": {
"type": "extra-pattern",
"role": "another-pattern",
},
}
},
},
)
assert streams_app.get_input_topics() == ["example-input", "b", "a"]
assert streams_app.get_extra_input_topics() == {
"role1": ["topic-extra"],
"role2": ["topic-extra2", "topic-extra3"],
}

@pytest.mark.asyncio
async def test_get_output_topic(
self, config: PipelineConfig, handlers: ComponentHandlers
):
streams_app = StreamsApp(
name=self.STREAMS_APP_NAME,
config=config,
handlers=handlers,
**{
"namespace": "test-namespace",
"app": {
"streams": {"brokers": "fake-broker:9092"},
},
"from": {
"topics": {
"example-input": {"type": "input"},
}
},
"to": {"topics": {"example-output": {"type": "output"}}},
},
)
assert streams_app.get_output_topic() == "example-output"
22 changes: 22 additions & 0 deletions tests/pipeline/resources/pipeline-with-loop/defaults.yaml
sujuka99 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
pipeline-component:
prefix: ""

kubernetes-app:
namespace: example-namespace

kafka-connector:
namespace: example-namespace

kafka-app:
app:
streams:
brokers: 127.0.0.1:9092
schemaRegistryUrl: 127.0.0.1:8081
optimizeLeaveGroupBehavior: false

streams-app:
app:
labels:
pipeline: ${pipeline_name}
streams:
optimizeLeaveGroupBehavior: false
34 changes: 34 additions & 0 deletions tests/pipeline/resources/pipeline-with-loop/pipeline.yaml
sujuka99 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
- type: producer
name: app1
app:
image: producer-image
to:
topics:
my-output-topic:
type: output

- type: streams-app
name: app2
app:
image: app2-image
from:
topics:
my-output-topic:
type: input
to:
topics:
my-app2-topic:
type: output

- type: streams-app
name: app3
app:
image: app3-image
from:
topics:
my-app2-topic:
type: input
to:
topics:
my-output-topic:
type: output
Loading