Skip to content

Commit

Permalink
Add logic to producer app
Browse files Browse the repository at this point in the history
  • Loading branch information
raminqaf committed Jun 27, 2024
1 parent aeb4bd2 commit c9e58ed
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 2 deletions.
10 changes: 9 additions & 1 deletion kpops/components/streams_bootstrap/producer/producer_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,18 @@ class ProducerApp(KafkaApp, StreamsBootstrap):
@computed_field
@cached_property
def _cleaner(self) -> ProducerAppCleaner:
producer_app_values = self.model_dump(
by_alias=True, exclude={"_cleaner", "from_", "to"}
)
cluster_values = self.helm_values()
if cluster_values is not None:
app_values = producer_app_values.get("app")
if app_values is not None:
app_values["imageTag"] = cluster_values["imageTag"]
return ProducerAppCleaner(
config=self.config,
handlers=self.handlers,
**self.model_dump(by_alias=True, exclude={"_cleaner", "from_", "to"}),
**producer_app_values,
)

@override
Expand Down
68 changes: 67 additions & 1 deletion tests/components/test_producer_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from kpops.component_handlers import ComponentHandlers
from kpops.component_handlers.helm_wrapper.model import HelmUpgradeInstallFlags
from kpops.component_handlers.helm_wrapper.utils import create_helm_release_name
from kpops.components import ProducerApp
from kpops.components import HelmApp, ProducerApp
from kpops.components.base_components.models.topic import (
KafkaTopic,
OutputTopicTypes,
Expand Down Expand Up @@ -352,3 +352,69 @@ def test_get_output_topics(
KafkaTopic(name="producer-app-output-topic"),
KafkaTopic(name="extra-topic-1"),
]

@pytest.mark.asyncio()
async def test_should_deploy_clean_up_job_with_image_tag_in_cluster(
self,
config: KpopsConfig,
handlers: ComponentHandlers,
mocker: MockerFixture,
):
image_tag_in_cluster = "1.1.1"
mocker.patch.object(
HelmApp,
"helm_values",
return_value={
"image": "registry/producer-app",
"imageTag": image_tag_in_cluster,
"nameOverride": PRODUCER_APP_NAME,
"replicaCount": 1,
"streams": {
"brokers": "fake-broker:9092",
"outputTopic": "test-output-topic",
"schemaRegistryUrl": "http://localhost:8081",
},
},
)
producer_app = ProducerApp(
name=PRODUCER_APP_NAME,
config=config,
handlers=handlers,
**{
"namespace": "test-namespace",
"app": {
"imageTag": "2.2.2",
"streams": {"brokers": "fake-broker:9092"},
},
"to": {
"topics": {
"test-output-topic": {"type": "output"},
}
},
},
)
mocker.patch.object(producer_app._cleaner.dry_run_handler, "print_helm_diff")
mocker.patch.object(producer_app._cleaner.helm, "uninstall")

mock_helm_upgrade_install = mocker.patch.object(
producer_app._cleaner.helm, "upgrade_install"
)

dry_run = True
await producer_app.clean(dry_run)

mock_helm_upgrade_install.assert_called_once_with(
PRODUCER_APP_CLEAN_RELEASE_NAME,
"bakdata-streams-bootstrap/producer-app-cleanup-job",
dry_run,
"test-namespace",
{
"nameOverride": PRODUCER_APP_CLEAN_HELM_NAMEOVERRIDE,
"imageTag": image_tag_in_cluster,
"streams": {
"brokers": "fake-broker:9092",
"outputTopic": "test-output-topic",
},
},
HelmUpgradeInstallFlags(version="2.9.0", wait=True, wait_for_jobs=True),
)

0 comments on commit c9e58ed

Please sign in to comment.