Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Kafka connect config name for deletion #361

Merged
merged 15 commits into from
Sep 19, 2023
95 changes: 42 additions & 53 deletions kpops/components/base_components/kafka_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ class KafkaConnector(PipelineComponent, ABC):
description=describe_attr("resetter_values", __doc__),
)

_connector_type: KafkaConnectorType = Field(default=..., hidden_from_schema=True)

@validator("app", pre=True)
def connector_config_should_have_component_name(
cls,
Expand Down Expand Up @@ -98,7 +100,16 @@ def helm(self) -> Helm:
)
return helm

def _get_resetter_helm_chart(self) -> str:
@property
def _connector_resetter_release_name(self) -> str:
raminqaf marked this conversation as resolved.
Show resolved Hide resolved
"""Get connector resetter's release name"""
suffix = "-clean"
clean_up_release_name = self.full_name + suffix
trimmed_name = trim_release_name(clean_up_release_name, suffix)
return trimmed_name

@property
def _resetter_helm_chart(self) -> str:
disrupted marked this conversation as resolved.
Show resolved Hide resolved
"""Get reseter Helm chart
raminqaf marked this conversation as resolved.
Show resolved Hide resolved

:return: returns the component resetter's helm chart
raminqaf marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -111,7 +122,7 @@ def dry_run_handler(self) -> DryRunHandler:
return DryRunHandler(self.helm, helm_diff, self.namespace)

@property
def kafka_connect_resetter_chart(self) -> str:
def _kafka_connect_resetter_chart(self) -> str:
"""Resetter chart for this component"""
return f"{self.repo_config.repository_name}/kafka-connect-resetter"

Expand Down Expand Up @@ -148,7 +159,9 @@ def deploy(self, dry_run: bool) -> None:

@override
def destroy(self, dry_run: bool) -> None:
self.handlers.connector_handler.destroy_connector(self.name, dry_run=dry_run)
self.handlers.connector_handler.destroy_connector(
self.full_name, dry_run=dry_run
)

@override
def clean(self, dry_run: bool) -> None:
Expand All @@ -161,8 +174,6 @@ def clean(self, dry_run: bool) -> None:

def _run_connect_resetter(
self,
connector_name: str,
connector_type: KafkaConnectorType,
dry_run: bool,
retain_clean_jobs: bool,
**kwargs,
Expand All @@ -173,54 +184,40 @@ def _run_connect_resetter(
to make sure that there is no running clean job in the cluster. Then it releases a cleanup job.
If the retain_clean_jobs flag is set to false the cleanup job will be deleted.

:param connector_name: Name of the connector
:param connector_type: Type of the connector (SINK or SOURCE)
:param dry_run: If the cleanup should be run in dry run mode or not
:param retain_clean_jobs: If the cleanup job should be kept
:param kwargs: Other values for the KafkaConnectResetter
"""
trimmed_name = self._get_kafka_resetter_release_name(connector_name)

log.info(
magentaify(
f"Connector Cleanup: uninstalling cleanup job Helm release from previous runs for {connector_name}"
f"Connector Cleanup: uninstalling cleanup job Helm release from previous runs for {self.full_name}"
)
)
self.__uninstall_connect_resetter(trimmed_name, dry_run)
self.__uninstall_connect_resetter(
self._connector_resetter_release_name, dry_run
)

log.info(
magentaify(
f"Connector Cleanup: deploy Connect {connector_type.value} resetter for {connector_name}"
f"Connector Cleanup: deploy Connect {self._connector_type.value} resetter for {self.full_name}"
)
)

stdout = self.__install_connect_resetter(
trimmed_name, connector_name, connector_type, dry_run, **kwargs
)
stdout = self.__install_connect_resetter(dry_run, **kwargs)

if dry_run:
self.dry_run_handler.print_helm_diff(stdout, trimmed_name, log)
self.dry_run_handler.print_helm_diff(
stdout, self._connector_resetter_release_name, log
)

if not retain_clean_jobs:
log.info(magentaify("Connector Cleanup: uninstall Kafka Resetter."))
self.__uninstall_connect_resetter(trimmed_name, dry_run)

def _get_kafka_resetter_release_name(self, connector_name: str) -> str:
"""Get connector resetter's release name

:param connector_name: Name of the connector to be reset
:return: The name of the resetter to be used
"""
suffix = "-clean"
clean_up_release_name = connector_name + suffix
trimmed_name = trim_release_name(clean_up_release_name, suffix)
return trimmed_name
self.__uninstall_connect_resetter(
self._connector_resetter_release_name, dry_run
)

def __install_connect_resetter(
self,
release_name: str,
connector_name: str,
connector_type: KafkaConnectorType,
dry_run: bool,
**kwargs,
) -> str:
Expand All @@ -233,9 +230,9 @@ def __install_connect_resetter(
:return: The output of `helm upgrade --install`
"""
return self.helm.upgrade_install(
release_name=release_name,
release_name=self._connector_resetter_release_name,
namespace=self.namespace,
chart=self.kafka_connect_resetter_chart,
chart=self._kafka_connect_resetter_chart,
dry_run=dry_run,
flags=HelmUpgradeInstallFlags(
create_namespace=self.config.create_namespace,
Expand All @@ -244,33 +241,28 @@ def __install_connect_resetter(
wait=True,
),
values=self._get_kafka_connect_resetter_values(
connector_name,
connector_type,
**kwargs,
),
)

def _get_kafka_connect_resetter_values(
self,
connector_name: str,
connector_type: KafkaConnectorType,
**kwargs,
) -> dict:
"""Get connector resetter helm chart values

:param connector_name: Name of the connector
:param connector_type: Type of the connector
:return: The Helm chart values of the connector resetter
"""
return {
**KafkaConnectResetterValues(
config=KafkaConnectResetterConfig(
connector=connector_name,
connector=self.full_name,
brokers=self.config.brokers,
**kwargs,
),
connector_type=connector_type.value,
name_override=connector_name,
connector_type=self._connector_type.value,
name_override=self.full_name,
).dict(),
**self.resetter_values,
}
Expand Down Expand Up @@ -301,20 +293,20 @@ class KafkaSourceConnector(KafkaConnector):
description=describe_attr("offset_topic", __doc__),
)

_connector_type = KafkaConnectorType.SOURCE

@override
def apply_from_inputs(self, name: str, topic: FromTopic) -> NoReturn:
raise NotImplementedError("Kafka source connector doesn't support FromSection")

@override
def template(self) -> None:
values = self._get_kafka_connect_resetter_values(
self.name,
KafkaConnectorType.SOURCE,
offset_topic=self.offset_topic,
)
stdout = self.helm.template(
self._get_kafka_resetter_release_name(self.name),
self._get_resetter_helm_chart(),
self._connector_resetter_release_name,
self._resetter_helm_chart,
self.namespace,
values,
self.template_flags,
Expand All @@ -336,8 +328,6 @@ def __run_kafka_connect_resetter(self, dry_run: bool) -> None:
:param dry_run: Whether to do a dry run of the command
"""
self._run_connect_resetter(
connector_name=self.name,
connector_type=KafkaConnectorType.SOURCE,
dry_run=dry_run,
retain_clean_jobs=self.config.retain_clean_jobs,
offset_topic=self.offset_topic,
Expand All @@ -347,6 +337,8 @@ def __run_kafka_connect_resetter(self, dry_run: bool) -> None:
class KafkaSinkConnector(KafkaConnector):
"""Kafka sink connector model"""

_connector_type = KafkaConnectorType.SINK

@override
def add_input_topics(self, topics: list[str]) -> None:
existing_topics: str | None = getattr(self.app, "topics", None)
Expand All @@ -356,12 +348,10 @@ def add_input_topics(self, topics: list[str]) -> None:

@override
def template(self) -> None:
values = self._get_kafka_connect_resetter_values(
self.name, KafkaConnectorType.SINK
)
values = self._get_kafka_connect_resetter_values()
stdout = self.helm.template(
self._get_kafka_resetter_release_name(self.name),
self._get_resetter_helm_chart(),
self._connector_resetter_release_name,
self._resetter_helm_chart,
self.namespace,
values,
self.template_flags,
Expand Down Expand Up @@ -393,7 +383,6 @@ def __run_kafka_connect_resetter(
:param dry_run: Whether to do a dry run of the command
"""
self._run_connect_resetter(
connector_name=self.name,
connector_type=KafkaConnectorType.SINK,
raminqaf marked this conversation as resolved.
Show resolved Hide resolved
dry_run=dry_run,
retain_clean_jobs=self.config.retain_clean_jobs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ def test_should_call_delete_connector_when_destroying_existing_connector_not_dry
handler = self.connector_handler(connector_wrapper)

handler.destroy_connector(CONNECTOR_NAME, dry_run=False)

assert connector_wrapper.mock_calls == [
mock.call.get_connector(CONNECTOR_NAME),
mock.call.delete_connector(CONNECTOR_NAME),
Expand Down
10 changes: 5 additions & 5 deletions tests/components/test_kafka_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@

DEFAULTS_PATH = Path(__file__).parent / "resources"
CONNECTOR_NAME = "test-connector-with-long-name-0123456789abcdefghijklmnop"
CONNECTOR_NAME_PREFIXED = (
CONNECTOR_FULL_NAME = (
"${pipeline_name}-test-connector-with-long-name-0123456789abcdefghijklmnop"
)
raminqaf marked this conversation as resolved.
Show resolved Hide resolved
CONNECTOR_CLEAN_NAME = "test-connector-with-long-name-0123456789abcdef-clean"
CONNECTOR_CLEAN_FULL_NAME = "${pipeline_name}-test-connector-with-long-name-clean"
CONNECTOR_CLASS = "com.bakdata.connect.TestConnector"


Expand Down Expand Up @@ -58,7 +58,7 @@ def connector_config(self) -> KafkaConnectorConfig:
return KafkaConnectorConfig(
**{
"connector.class": CONNECTOR_CLASS,
"name": CONNECTOR_NAME_PREFIXED,
"name": CONNECTOR_FULL_NAME,
}
)

Expand All @@ -75,7 +75,7 @@ def test_connector_config_name_override(
app=connector_config,
namespace="test-namespace",
)
assert connector.app.name == CONNECTOR_NAME_PREFIXED
assert connector.app.name == CONNECTOR_FULL_NAME

connector = KafkaConnector(
name=CONNECTOR_NAME,
Expand All @@ -84,7 +84,7 @@ def test_connector_config_name_override(
app={"connector.class": CONNECTOR_CLASS}, # type: ignore
namespace="test-namespace",
)
assert connector.app.name == CONNECTOR_NAME_PREFIXED
assert connector.app.name == CONNECTOR_FULL_NAME

with pytest.raises(
ValueError, match="Connector name should be the same as component name"
Expand Down
Loading