Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support networkx to represent components as a graph #331

Merged
merged 41 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
ee29743
Add networkx compatibility
irux Aug 22, 2023
0ace374
Fix formatting
irux Aug 22, 2023
bb61f5f
Save progress with ci working
irux Aug 22, 2023
862aba0
Linting
irux Aug 22, 2023
8f46969
Fix pyright
irux Aug 22, 2023
769bd04
Test new dependency
irux Aug 28, 2023
a97148a
Fix lock
irux Aug 28, 2023
d929459
Fix problem when names are not overriten
irux Aug 28, 2023
7408e02
Delete pl show
irux Aug 28, 2023
a1ae585
Deleting ploting graph
irux Aug 28, 2023
36a86b0
Add test and fix some edge cases
irux Aug 29, 2023
c3aba1d
remove unused dependency
irux Aug 29, 2023
7476784
Delete more unused depencendies
irux Aug 29, 2023
f364f00
Delete matploit for testing
irux Aug 29, 2023
c197969
Test and changes
irux Aug 30, 2023
00da489
Implement comments
irux Sep 5, 2023
d84e0ea
Linting
irux Sep 5, 2023
147ff74
Format
irux Sep 5, 2023
e9f8712
Add more graph tests
irux Sep 5, 2023
d35bc41
Remove unused
irux Sep 5, 2023
5262f17
Implement comments
irux Sep 5, 2023
f81d573
Reformat
irux Sep 5, 2023
3ced2f6
Implement changes
irux Sep 5, 2023
35b7d8c
Implement changes
irux Sep 11, 2023
20e0c09
Replace link
irux Sep 11, 2023
510de2f
Fix link to kpops-examples (#357)
sujuka99 Sep 11, 2023
f1dbe1f
Fix docstring
irux Sep 12, 2023
931731b
Join tests
irux Sep 12, 2023
0052cd1
Increase timeout
irux Sep 12, 2023
f9cc88a
Save progress
irux Sep 12, 2023
b4e6a9e
Merge last state
irux Sep 12, 2023
184bc43
Implement changes
irux Sep 12, 2023
bd724e1
Delete unused imports
irux Sep 12, 2023
133005a
Implement changes
irux Sep 13, 2023
b956ebc
Fix test
irux Sep 18, 2023
df16d00
Solve linting
irux Sep 18, 2023
8b87409
Fix pyright
irux Sep 18, 2023
d73f169
Fix linting
irux Sep 18, 2023
a2a3192
Delete from implemented methods
irux Sep 18, 2023
5432c8a
Implement changes
irux Sep 18, 2023
1e89223
Change to id
irux Sep 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 13 additions & 15 deletions kpops/components/base_components/pipeline_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,18 @@ def __init__(self, **kwargs) -> None:

def get_input_topics(self) -> list[str] | None:
disrupted marked this conversation as resolved.
Show resolved Hide resolved
disrupted marked this conversation as resolved.
Show resolved Hide resolved
"""Get all the input topics from config."""
disrupted marked this conversation as resolved.
Show resolved Hide resolved
return []

def get_extra_input_topics(self) -> dict[str, list[str]] | None:
disrupted marked this conversation as resolved.
Show resolved Hide resolved
disrupted marked this conversation as resolved.
Show resolved Hide resolved
"""Get extra input topics list from config."""
return {}

def get_output_topic(self) -> str | None:
"""Get output topic from config."""

def get_extra_output_topics(self) -> dict[str, str] | None:
disrupted marked this conversation as resolved.
Show resolved Hide resolved
"""Get extra output topics list from config."""
return {}

@property
def full_name(self) -> str:
Expand Down Expand Up @@ -130,19 +133,18 @@ def set_input_topics(self) -> None:
for name, topic in self.from_.topics.items():
self.apply_from_inputs(name, topic)

def __get_all_input_topics(self) -> list[str]:
input_topics = self.get_input_topics()
extra_input_topics = self.get_extra_input_topics()
all_input_topics: list[str] = []
if input_topics is not None:
all_input_topics.extend(input_topics)
if extra_input_topics:
all_input_topics.extend(chain(*extra_input_topics.values()))
return all_input_topics
@property
def inputs(self) -> Iterator[str]:
yield from self.get_input_topics()
for role_topics in self.get_extra_input_topics().values():
yield from role_topics

@property
def input_topics(self) -> Iterator[str]:
yield from self.__get_all_input_topics()
def outputs(self) -> Iterator[str]:
if self.get_output_topic() is not None:
yield self.get_output_topic()
yield from self.get_extra_output_topics().values()


def __get_all_output_topics(self) -> list[str]:
all_output_topics: list[str] = []
Expand All @@ -154,10 +156,6 @@ def __get_all_output_topics(self) -> list[str]:
all_output_topics.extend(list(extra_output_topics.values()))
return all_output_topics

@property
def output_topics(self):
yield from self.__get_all_output_topics()

def apply_from_inputs(self, name: str, topic: FromTopic) -> None:
"""Add a `from` section input to the component config

Expand Down
4 changes: 4 additions & 0 deletions kpops/components/streams_bootstrap/streams/streams_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ def get_extra_input_topics(self) -> dict[str, list[str]] | None:
def get_output_topic(self) -> str | None:
return self.app.streams.output_topic

disrupted marked this conversation as resolved.
Show resolved Hide resolved
@override
def get_extra_output_topics(self) -> dict[str, str] | None:
disrupted marked this conversation as resolved.
Show resolved Hide resolved
return self.app.streams.extra_output_topics

@override
def add_input_topics(self, topics: list[str]) -> None:
self.app.streams.add_input_topics(topics)
Expand Down
8 changes: 4 additions & 4 deletions kpops/pipeline_generator/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,15 @@ def generate_graph(self) -> None:
component_node_name = self.__get_vertex_component_name(component)
self.graph.add_node(component_node_name)

self.__add_ingoing_edges(list(component.input_topics), component_node_name)
self.__add_output(list(component.output_topics), component_node_name)
self.__add_inputs(list(component.inputs), component_node_name)
disrupted marked this conversation as resolved.
Show resolved Hide resolved
self.__add_outputs(list(component.outputs), component_node_name)

def __add_output(self, all_output_topics: list[str], source: str) -> None:
def __add_outputs(self, all_output_topics: list[str], source: str) -> None:
for output_topic in all_output_topics:
self.graph.add_node(output_topic)
self.graph.add_edge(source, output_topic)

def __add_ingoing_edges(
def __add_inputs(
self, all_input_topics: list[str], component_node_name: str
) -> None:
for input_topic in all_input_topics:
Expand Down
2 changes: 1 addition & 1 deletion tests/components/test_producer_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,4 +280,4 @@ def test_get_output_topics(
assert producer_app.get_extra_output_topics() == {
"first-extra-topic": "extra-topic-1"
}
assert producer_app.get_input_topics() is None
assert producer_app.get_input_topics() == []
12 changes: 11 additions & 1 deletion tests/components/test_streams_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,14 @@ async def test_get_input_output_topics(
},
}
},
"to": {"topics": {"example-output": {"type": "output"}}},
"to": {"topics": {
"example-output": {
"type": "output"
},
"extra-topic":{
"role": "fake-role"
}
}},
},
)

Expand All @@ -482,3 +489,6 @@ async def test_get_input_output_topics(
"role2": ["topic-extra2", "topic-extra3"],
}
assert streams_app.get_output_topic() == "example-output"
assert streams_app.get_extra_output_topics() == {
"fake-role": "extra-topic"
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ kafka-app:
schema_registry_url: "${schema_registry_url}"
version: "2.4.2"

producer-app:
to:
topics:
${output_topic_name}:
partitions_count: 3


streams-app: # inherits from kafka-app
app:
streams:
Expand All @@ -19,7 +26,7 @@ streams-app: # inherits from kafka-app
type:
error-topic:
type: error
extra-topic:
extra-topic-output:
role: role
from:
topics:
Expand Down
4 changes: 2 additions & 2 deletions tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,10 +528,10 @@ def test_short_topic_definition(self):
input_components = enriched_pipeline["components"][4]["from"]["components"]
assert "type" not in output_topics["output-topic"]
assert output_topics["error-topic"]["type"] == "error"
assert "type" not in output_topics["extra-topic"]
assert "type" not in output_topics["extra-topic-output"]
assert "role" not in output_topics["output-topic"]
assert "role" not in output_topics["error-topic"]
assert output_topics["extra-topic"]["role"] == "role"
assert output_topics["extra-topic-output"]["role"] == "role"

assert "type" not in ["input-topic"]
assert "type" not in input_topics["extra-topic"]
Expand Down
Loading