Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Kafka connect config name for deletion #361

Merged
merged 15 commits into from
Sep 19, 2023
56 changes: 24 additions & 32 deletions kpops/components/base_components/kafka_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,18 @@ def helm(self) -> Helm:
)
return helm

@property
def connector_name(self) -> str:
return self.app.name

@property
def connector_resseter_release_name(self) -> str:
raminqaf marked this conversation as resolved.
Show resolved Hide resolved
"""Get connector resetter's release name"""
suffix = "-clean"
clean_up_release_name = self.connector_name + suffix
trimmed_name = trim_release_name(clean_up_release_name, suffix)
return trimmed_name

def _get_resetter_helm_chart(self) -> str:
"""Get reseter Helm chart
raminqaf marked this conversation as resolved.
Show resolved Hide resolved

Expand Down Expand Up @@ -148,7 +160,9 @@ def deploy(self, dry_run: bool) -> None:

@override
def destroy(self, dry_run: bool) -> None:
self.handlers.connector_handler.destroy_connector(self.name, dry_run=dry_run)
self.handlers.connector_handler.destroy_connector(
self.connector_name, dry_run=dry_run
disrupted marked this conversation as resolved.
Show resolved Hide resolved
)

@override
def clean(self, dry_run: bool) -> None:
Expand All @@ -161,7 +175,6 @@ def clean(self, dry_run: bool) -> None:

def _run_connect_resetter(
self,
connector_name: str,
connector_type: KafkaConnectorType,
dry_run: bool,
retain_clean_jobs: bool,
Expand All @@ -173,29 +186,28 @@ def _run_connect_resetter(
to make sure that there is no running clean job in the cluster. Then it releases a cleanup job.
If the retain_clean_jobs flag is set to false the cleanup job will be deleted.

:param connector_name: Name of the connector
:param connector_type: Type of the connector (SINK or SOURCE)
:param dry_run: If the cleanup should be run in dry run mode or not
:param retain_clean_jobs: If the cleanup job should be kept
:param kwargs: Other values for the KafkaConnectResetter
"""
trimmed_name = self._get_kafka_resetter_release_name(connector_name)
trimmed_name = self.connector_resseter_release_name

log.info(
magentaify(
f"Connector Cleanup: uninstalling cleanup job Helm release from previous runs for {connector_name}"
f"Connector Cleanup: uninstalling cleanup job Helm release from previous runs for {self.connector_name}"
)
)
self.__uninstall_connect_resetter(trimmed_name, dry_run)

log.info(
magentaify(
f"Connector Cleanup: deploy Connect {connector_type.value} resetter for {connector_name}"
f"Connector Cleanup: deploy Connect {connector_type.value} resetter for {self.connector_name}"
)
)

stdout = self.__install_connect_resetter(
trimmed_name, connector_name, connector_type, dry_run, **kwargs
trimmed_name, connector_type, dry_run, **kwargs
)

if dry_run:
Expand All @@ -205,21 +217,9 @@ def _run_connect_resetter(
log.info(magentaify("Connector Cleanup: uninstall Kafka Resetter."))
self.__uninstall_connect_resetter(trimmed_name, dry_run)

def _get_kafka_resetter_release_name(self, connector_name: str) -> str:
"""Get connector resetter's release name

:param connector_name: Name of the connector to be reset
:return: The name of the resetter to be used
"""
suffix = "-clean"
clean_up_release_name = connector_name + suffix
trimmed_name = trim_release_name(clean_up_release_name, suffix)
return trimmed_name

def __install_connect_resetter(
self,
release_name: str,
connector_name: str,
connector_type: KafkaConnectorType,
dry_run: bool,
**kwargs,
Expand All @@ -244,33 +244,30 @@ def __install_connect_resetter(
wait=True,
),
values=self._get_kafka_connect_resetter_values(
connector_name,
connector_type,
**kwargs,
),
)

def _get_kafka_connect_resetter_values(
self,
connector_name: str,
connector_type: KafkaConnectorType,
**kwargs,
) -> dict:
"""Get connector resetter helm chart values

:param connector_name: Name of the connector
:param connector_type: Type of the connector
:return: The Helm chart values of the connector resetter
"""
return {
**KafkaConnectResetterValues(
config=KafkaConnectResetterConfig(
connector=connector_name,
connector=self.connector_name,
brokers=self.config.brokers,
**kwargs,
),
connector_type=connector_type.value,
name_override=connector_name,
name_override=self.connector_name,
).dict(),
**self.resetter_values,
}
Expand Down Expand Up @@ -308,12 +305,11 @@ def apply_from_inputs(self, name: str, topic: FromTopic) -> NoReturn:
@override
def template(self) -> None:
values = self._get_kafka_connect_resetter_values(
self.name,
KafkaConnectorType.SOURCE,
offset_topic=self.offset_topic,
)
stdout = self.helm.template(
self._get_kafka_resetter_release_name(self.name),
self.connector_resseter_release_name,
self._get_resetter_helm_chart(),
self.namespace,
values,
Expand All @@ -336,7 +332,6 @@ def __run_kafka_connect_resetter(self, dry_run: bool) -> None:
:param dry_run: Whether to do a dry run of the command
"""
self._run_connect_resetter(
connector_name=self.name,
connector_type=KafkaConnectorType.SOURCE,
dry_run=dry_run,
retain_clean_jobs=self.config.retain_clean_jobs,
Expand All @@ -356,11 +351,9 @@ def add_input_topics(self, topics: list[str]) -> None:

@override
def template(self) -> None:
values = self._get_kafka_connect_resetter_values(
self.name, KafkaConnectorType.SINK
)
values = self._get_kafka_connect_resetter_values(KafkaConnectorType.SINK)
stdout = self.helm.template(
self._get_kafka_resetter_release_name(self.name),
self.connector_resseter_release_name,
self._get_resetter_helm_chart(),
self.namespace,
values,
Expand Down Expand Up @@ -393,7 +386,6 @@ def __run_kafka_connect_resetter(
:param dry_run: Whether to do a dry run of the command
"""
self._run_connect_resetter(
connector_name=self.name,
connector_type=KafkaConnectorType.SINK,
raminqaf marked this conversation as resolved.
Show resolved Hide resolved
dry_run=dry_run,
retain_clean_jobs=self.config.retain_clean_jobs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ def test_should_call_delete_connector_when_destroying_existing_connector_not_dry
handler = self.connector_handler(connector_wrapper)

handler.destroy_connector(CONNECTOR_NAME, dry_run=False)

assert connector_wrapper.mock_calls == [
mock.call.get_connector(CONNECTOR_NAME),
mock.call.delete_connector(CONNECTOR_NAME),
Expand Down
2 changes: 1 addition & 1 deletion tests/components/test_kafka_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
CONNECTOR_NAME_PREFIXED = (
"${pipeline_name}-test-connector-with-long-name-0123456789abcdefghijklmnop"
)
CONNECTOR_CLEAN_NAME = "test-connector-with-long-name-0123456789abcdef-clean"
CONNECTOR_CLEAN_NAME_PREFIXED = "${pipeline_name}-test-connector-with-long-name-clean"
raminqaf marked this conversation as resolved.
Show resolved Hide resolved
CONNECTOR_CLASS = "com.bakdata.connect.TestConnector"


Expand Down
42 changes: 23 additions & 19 deletions tests/components/test_kafka_sink_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@
)
from kpops.utils.colorify import magentaify
from tests.components.test_kafka_connector import (
CONNECTOR_CLEAN_NAME,
CONNECTOR_CLEAN_NAME_PREFIXED,
CONNECTOR_NAME,
CONNECTOR_NAME_PREFIXED,
TestKafkaConnector,
)

Expand Down Expand Up @@ -161,14 +162,17 @@ def test_destroy(
self,
connector: KafkaSinkConnector,
mocker: MockerFixture,
connector_config: KafkaConnectorConfig,
):
mock_destroy_connector = mocker.patch.object(
connector.handlers.connector_handler, "destroy_connector"
)

connector.destroy(dry_run=True)

mock_destroy_connector.assert_called_once_with(CONNECTOR_NAME, dry_run=True)
mock_destroy_connector.assert_called_once_with(
CONNECTOR_NAME_PREFIXED, dry_run=True
)

def test_reset_when_dry_run_is_true(
self,
Expand Down Expand Up @@ -208,11 +212,11 @@ def test_reset_when_dry_run_is_false(
),
mocker.call.helm.uninstall(
namespace="test-namespace",
release_name=CONNECTOR_CLEAN_NAME,
release_name=CONNECTOR_CLEAN_NAME_PREFIXED,
dry_run=dry_run,
),
mocker.call.helm.upgrade_install(
release_name=CONNECTOR_CLEAN_NAME,
release_name=CONNECTOR_CLEAN_NAME_PREFIXED,
namespace="test-namespace",
chart="bakdata-kafka-connect-resetter/kafka-connect-resetter",
dry_run=dry_run,
Expand All @@ -225,15 +229,15 @@ def test_reset_when_dry_run_is_false(
"connectorType": "sink",
"config": {
"brokers": "broker:9092",
"connector": CONNECTOR_NAME,
"connector": CONNECTOR_NAME_PREFIXED,
"deleteConsumerGroup": False,
},
"nameOverride": CONNECTOR_NAME,
"nameOverride": CONNECTOR_NAME_PREFIXED,
},
),
mocker.call.helm.uninstall(
namespace="test-namespace",
release_name=CONNECTOR_CLEAN_NAME,
release_name=CONNECTOR_CLEAN_NAME_PREFIXED,
dry_run=dry_run,
),
]
Expand Down Expand Up @@ -294,12 +298,12 @@ def test_clean_when_dry_run_is_false(
assert log_info_mock.mock_calls == [
call.log_info(
magentaify(
f"Connector Cleanup: uninstalling cleanup job Helm release from previous runs for {CONNECTOR_NAME}"
f"Connector Cleanup: uninstalling cleanup job Helm release from previous runs for {connector_config.name}"
)
),
call.log_info(
magentaify(
f"Connector Cleanup: deploy Connect {KafkaConnectorType.SINK.value} resetter for {CONNECTOR_NAME}"
f"Connector Cleanup: deploy Connect {KafkaConnectorType.SINK.value} resetter for {connector_config.name}"
)
),
call.log_info(magentaify("Connector Cleanup: uninstall Kafka Resetter.")),
Expand All @@ -314,11 +318,11 @@ def test_clean_when_dry_run_is_false(
),
mocker.call.helm.uninstall(
namespace="test-namespace",
release_name=CONNECTOR_CLEAN_NAME,
release_name=CONNECTOR_CLEAN_NAME_PREFIXED,
dry_run=dry_run,
),
mocker.call.helm.upgrade_install(
release_name=CONNECTOR_CLEAN_NAME,
release_name=CONNECTOR_CLEAN_NAME_PREFIXED,
namespace="test-namespace",
chart="bakdata-kafka-connect-resetter/kafka-connect-resetter",
dry_run=dry_run,
Expand All @@ -331,15 +335,15 @@ def test_clean_when_dry_run_is_false(
"connectorType": "sink",
"config": {
"brokers": "broker:9092",
"connector": CONNECTOR_NAME,
"connector": connector_config.name,
"deleteConsumerGroup": True,
},
"nameOverride": CONNECTOR_NAME,
"nameOverride": connector_config.name,
},
),
mocker.call.helm.uninstall(
namespace="test-namespace",
release_name=CONNECTOR_CLEAN_NAME,
release_name=CONNECTOR_CLEAN_NAME_PREFIXED,
dry_run=dry_run,
),
]
Expand Down Expand Up @@ -408,11 +412,11 @@ def test_clean_without_to_when_dry_run_is_false(
),
mocker.call.helm.uninstall(
namespace="test-namespace",
release_name=CONNECTOR_CLEAN_NAME,
release_name=CONNECTOR_CLEAN_NAME_PREFIXED,
dry_run=dry_run,
),
mocker.call.helm.upgrade_install(
release_name=CONNECTOR_CLEAN_NAME,
release_name=CONNECTOR_CLEAN_NAME_PREFIXED,
namespace="test-namespace",
chart="bakdata-kafka-connect-resetter/kafka-connect-resetter",
dry_run=dry_run,
Expand All @@ -425,15 +429,15 @@ def test_clean_without_to_when_dry_run_is_false(
"connectorType": "sink",
"config": {
"brokers": "broker:9092",
"connector": CONNECTOR_NAME,
"connector": CONNECTOR_NAME_PREFIXED,
"deleteConsumerGroup": True,
},
"nameOverride": CONNECTOR_NAME,
"nameOverride": CONNECTOR_NAME_PREFIXED,
},
),
mocker.call.helm.uninstall(
namespace="test-namespace",
release_name=CONNECTOR_CLEAN_NAME,
release_name=CONNECTOR_CLEAN_NAME_PREFIXED,
dry_run=dry_run,
),
]
Expand Down
Loading