Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
irux committed Oct 30, 2023
1 parent 1f9e14e commit 73e2564
Show file tree
Hide file tree
Showing 14 changed files with 121 additions and 108 deletions.
1 change: 1 addition & 0 deletions kpops/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ def clean(
verbose: bool = VERBOSE_OPTION,
):
async def clean_runner(component: PipelineComponent):
await component.destroy(dry_run)
await component.clean(dry_run)

async def async_clean():
Expand Down
21 changes: 19 additions & 2 deletions kpops/component_handlers/helm_wrapper/helm.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
import logging
import re
import subprocess
Expand Down Expand Up @@ -102,7 +103,7 @@ async def upgrade_install(
command.extend(flags.to_command())
if dry_run:
command.append("--dry-run")
return self.__execute(command)
return await self.__async_execute(command)

async def uninstall(
self,
Expand All @@ -121,7 +122,7 @@ async def uninstall(
if dry_run:
command.append("--dry-run")
try:
return self.__execute(command)
return await self.__async_execute(command)
except ReleaseNotFoundException:
log.warning(
f"Release with name {release_name} not found. Could not uninstall app."
Expand Down Expand Up @@ -222,6 +223,22 @@ def __execute(self, command: list[str]) -> str:
log.debug(process.stdout)
return process.stdout

async def __async_execute(self, command: list[str]):
command = self.__set_global_flags(command)
log.debug(f"Executing {' '.join(command)}")
print(command)
proc = await asyncio.create_subprocess_shell(
" ".join(command),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)

stdout, stderr = await proc.communicate()
Helm.parse_helm_command_stderr_output(stderr.decode())
log.debug(stdout)
return stdout.decode()



def __set_global_flags(self, command: list[str]) -> list[str]:
if self._context:
log.debug(f"Changing the Kubernetes context to {self._context}")
Expand Down
16 changes: 8 additions & 8 deletions kpops/components/base_components/kafka_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ async def deploy(self, dry_run: bool) -> None:
)
await super().deploy(dry_run)

def _run_clean_up_job(
async def _run_clean_up_job(
self,
values: dict,
dry_run: bool,
Expand All @@ -117,11 +117,11 @@ def _run_clean_up_job(
)
log.info(f"Uninstall old cleanup job for {clean_up_release_name}")

self.__uninstall_clean_up_job(clean_up_release_name, dry_run)
await self.__uninstall_clean_up_job(clean_up_release_name, dry_run)

log.info(f"Init cleanup job for {clean_up_release_name}")

stdout = self.__install_clean_up_job(
stdout = await self.__install_clean_up_job(
clean_up_release_name, suffix, values, dry_run
)

Expand All @@ -130,17 +130,17 @@ def _run_clean_up_job(

if not retain_clean_jobs:
log.info(f"Uninstall cleanup job for {clean_up_release_name}")
self.__uninstall_clean_up_job(clean_up_release_name, dry_run)
await self.__uninstall_clean_up_job(clean_up_release_name, dry_run)

def __uninstall_clean_up_job(self, release_name: str, dry_run: bool) -> None:
async def __uninstall_clean_up_job(self, release_name: str, dry_run: bool) -> None:
"""Uninstall clean up job.
:param release_name: Name of the Helm release
:param dry_run: Whether to do a dry run of the command
"""
self.helm.uninstall(self.namespace, release_name, dry_run)
await self.helm.uninstall(self.namespace, release_name, dry_run)

def __install_clean_up_job(
async def __install_clean_up_job(
self,
release_name: str,
suffix: str,
Expand All @@ -156,7 +156,7 @@ def __install_clean_up_job(
:return: Install clean up job with helm, return the output of the installation
"""
clean_up_release_name = trim_release_name(release_name, suffix)
return self.helm.upgrade_install(
return await self.helm.upgrade_install(
clean_up_release_name,
self.clean_up_helm_chart,
dry_run,
Expand Down
32 changes: 16 additions & 16 deletions kpops/components/base_components/kafka_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ async def clean(self, dry_run: bool) -> None:
)
await self.handlers.topic_handler.delete_topics(self.to, dry_run=dry_run)

def _run_connect_resetter(
async def _run_connect_resetter(
self,
dry_run: bool,
retain_clean_jobs: bool,
Expand All @@ -186,15 +186,15 @@ def _run_connect_resetter(
f"Connector Cleanup: uninstalling cleanup job Helm release from previous runs for {self.full_name}"
)
)
self.__uninstall_connect_resetter(self._resetter_release_name, dry_run)
await self.__uninstall_connect_resetter(self._resetter_release_name, dry_run)

log.info(
magentaify(
f"Connector Cleanup: deploy Connect {self._connector_type.value} resetter for {self.full_name}"
)
)

stdout = self.__install_connect_resetter(dry_run, **kwargs)
stdout = await self.__install_connect_resetter(dry_run, **kwargs)

if dry_run:
self.dry_run_handler.print_helm_diff(
Expand All @@ -203,9 +203,9 @@ def _run_connect_resetter(

if not retain_clean_jobs:
log.info(magentaify("Connector Cleanup: uninstall Kafka Resetter."))
self.__uninstall_connect_resetter(self._resetter_release_name, dry_run)
await self.__uninstall_connect_resetter(self._resetter_release_name, dry_run)

def __install_connect_resetter(
async def __install_connect_resetter(
self,
dry_run: bool,
**kwargs,
Expand All @@ -215,7 +215,7 @@ def __install_connect_resetter(
:param dry_run: Whether to dry run the command
:return: The output of `helm upgrade --install`
"""
return self.helm.upgrade_install(
return await self.helm.upgrade_install(
release_name=self._resetter_release_name,
namespace=self.namespace,
chart=self._resetter_helm_chart,
Expand Down Expand Up @@ -252,13 +252,13 @@ def _get_kafka_connect_resetter_values(
**self.resetter_values,
}

def __uninstall_connect_resetter(self, release_name: str, dry_run: bool) -> None:
async def __uninstall_connect_resetter(self, release_name: str, dry_run: bool) -> None:
"""Uninstall connector resetter.
:param release_name: Name of the release to be uninstalled
:param dry_run: Whether to do a dry run of the command
"""
self.helm.uninstall(
await self.helm.uninstall(
namespace=self.namespace,
release_name=release_name,
dry_run=dry_run,
Expand Down Expand Up @@ -301,19 +301,19 @@ def template(self) -> None:

@override
async def reset(self, dry_run: bool) -> None:
self.__run_kafka_connect_resetter(dry_run)
await self.__run_kafka_connect_resetter(dry_run)

@override
async def clean(self, dry_run: bool) -> None:
await super().clean(dry_run)
self.__run_kafka_connect_resetter(dry_run)
await self.__run_kafka_connect_resetter(dry_run)

def __run_kafka_connect_resetter(self, dry_run: bool) -> None:
async def __run_kafka_connect_resetter(self, dry_run: bool) -> None:
"""Run the connector resetter.
:param dry_run: Whether to do a dry run of the command
"""
self._run_connect_resetter(
await self._run_connect_resetter(
dry_run=dry_run,
retain_clean_jobs=self.config.retain_clean_jobs,
offset_topic=self.offset_topic,
Expand Down Expand Up @@ -359,22 +359,22 @@ def set_error_topic(self, topic_name: str) -> None:

@override
async def reset(self, dry_run: bool) -> None:
self.__run_kafka_connect_resetter(dry_run, delete_consumer_group=False)
await self.__run_kafka_connect_resetter(dry_run, delete_consumer_group=False)

@override
async def clean(self, dry_run: bool) -> None:
await super().clean(dry_run)
self.__run_kafka_connect_resetter(dry_run, delete_consumer_group=True)
await self.__run_kafka_connect_resetter(dry_run, delete_consumer_group=True)

def __run_kafka_connect_resetter(
async def __run_kafka_connect_resetter(
self, dry_run: bool, delete_consumer_group: bool
) -> None:
"""Run the connector resetter.
:param dry_run: Whether to do a dry run of the command
:param delete_consumer_group: Whether the consumer group should be deleted or not
"""
self._run_connect_resetter(
await self._run_connect_resetter(
dry_run=dry_run,
retain_clean_jobs=self.config.retain_clean_jobs,
delete_consumer_group=delete_consumer_group,
Expand Down
4 changes: 2 additions & 2 deletions kpops/components/base_components/kubernetes_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def deploy_flags(self) -> HelmUpgradeInstallFlags:

@override
async def deploy(self, dry_run: bool) -> None:
stdout = self.helm.upgrade_install(
stdout = await self.helm.upgrade_install(
self.helm_release_name,
self.helm_chart,
dry_run,
Expand All @@ -150,7 +150,7 @@ async def deploy(self, dry_run: bool) -> None:

@override
async def destroy(self, dry_run: bool) -> None:
stdout = self.helm.uninstall(
stdout = await self.helm.uninstall(
self.namespace,
self.helm_release_name,
dry_run,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def clean_up_helm_chart(self) -> str:

@override
async def clean(self, dry_run: bool) -> None:
self._run_clean_up_job(
await self._run_clean_up_job(
values=self.to_helm_values(),
dry_run=dry_run,
retain_clean_jobs=self.config.retain_clean_jobs,
Expand Down
8 changes: 4 additions & 4 deletions kpops/components/streams_bootstrap/streams/streams_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,21 @@ def clean_up_helm_chart(self) -> str:

@override
async def reset(self, dry_run: bool) -> None:
self.__run_streams_clean_up_job(dry_run, delete_output=False)
await self.__run_streams_clean_up_job(dry_run, delete_output=False)

@override
async def clean(self, dry_run: bool) -> None:
self.__run_streams_clean_up_job(dry_run, delete_output=True)
await self.__run_streams_clean_up_job(dry_run, delete_output=True)

def __run_streams_clean_up_job(self, dry_run: bool, delete_output: bool) -> None:
async def __run_streams_clean_up_job(self, dry_run: bool, delete_output: bool) -> None:
"""Run clean job for this Streams app.
:param dry_run: Whether to do a dry run of the command
:param delete_output: Whether to delete the output of the app that is being cleaned
"""
values = self.to_helm_values()
values["streams"]["deleteOutput"] = delete_output
self._run_clean_up_job(
await self._run_clean_up_job(
values=values,
dry_run=dry_run,
retain_clean_jobs=self.config.retain_clean_jobs,
Expand Down
37 changes: 12 additions & 25 deletions kpops/pipeline_generator/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,59 +86,47 @@ def build_execution_graph_from(
reverse: bool,
runner: Callable[[PipelineComponent], Coroutine],
):
print("This are components for subgraph")
for component in components:
print(component.id)

print("built graph")
print(self.graph.nodes)
print(self.graph.edges)
sub_graph_nodes = []
for component in components:
sub_graph_nodes.append(component.id)
sub_graph_nodes.append(list(component.inputs))
sub_graph_nodes.append(list(component.outputs))

async def run_parallel_tasks(tasks):
asyncio_tasks = []
for coroutine in tasks:
print("this is coroutine")
print(coroutine)
asyncio_tasks.append((asyncio.create_task(coroutine)))

Check failure on line 99 in kpops/pipeline_generator/pipeline.py

View workflow job for this annotation

GitHub Actions / Test (ubuntu-22.04, 3.10)

[*] Avoid extraneous parentheses
await asyncio.gather(*asyncio_tasks)

async def run_graph_tasks(pending_tasks: list[Awaitable]):
for pending_task in pending_tasks:
await pending_task

nodes = [node_component.id for node_component in components]
transformed_graph = self.graph.copy()
print("This is subgraph")
print(transformed_graph.nodes)
print(transformed_graph.edges)

if reverse:
transformed_graph = transformed_graph.reverse()
sub_graph = self.graph.subgraph(sub_graph_nodes)
transformed_graph = sub_graph.copy()

root_node = "root_node_bfs"
transformed_graph.add_node(root_node)

for node in self.graph:
predecessors = list(self.graph.predecessors(node))
for node in sub_graph:
predecessors = list(sub_graph.predecessors(node))
if not predecessors:
print("Here the transformation")
print(node)
transformed_graph.add_edge(root_node, node)

layers_graph = list(nx.bfs_layers(transformed_graph, root_node))

sorted_tasks = []
print(layers_graph)
for layer in layers_graph[1:]:
print(layer)
parallel_tasks = self.__get_parallel_task_from(layer, runner)

if parallel_tasks:
sorted_tasks.append(run_parallel_tasks(parallel_tasks))

print(sorted_tasks)
if reverse:
sorted_tasks.reverse()

execution = run_graph_tasks(sorted_tasks)
print(execution)
return execution

Check failure on line 130 in kpops/pipeline_generator/pipeline.py

View workflow job for this annotation

GitHub Actions / Test (ubuntu-22.04, 3.10)

Unnecessary assignment to `execution` before `return` statement

def __get_parallel_task_from(self, layer, runner):
Expand All @@ -147,7 +135,6 @@ def __get_parallel_task_from(self, layer, runner):
for node_in_layer in layer:
component_node = self._component_index[node_in_layer]
if component_node.component is not None and not component_node.is_topic:
print(component_node.name)
parallel_tasks.append(runner(component_node.component))

return parallel_tasks
Expand Down
21 changes: 0 additions & 21 deletions test.py

This file was deleted.

Loading

0 comments on commit 73e2564

Please sign in to comment.