Skip to content

Commit

Permalink
Fix Kafka connect config name for deletion (#361)
Browse files Browse the repository at this point in the history
Fixes #360
  • Loading branch information
raminqaf authored Sep 19, 2023
1 parent 1aadef2 commit 9aa380b
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 124 deletions.
102 changes: 37 additions & 65 deletions kpops/components/base_components/kafka_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class KafkaConnector(PipelineComponent, ABC):
:param version: Helm chart version, defaults to "1.0.4"
:param resetter_values: Overriding Kafka Connect Resetter Helm values. E.g. to override the Image Tag etc.,
defaults to dict
:param _connector_type: Defines the type of the connector (Source or Sink)
"""

namespace: str = Field(
Expand All @@ -71,6 +72,8 @@ class KafkaConnector(PipelineComponent, ABC):
description=describe_attr("resetter_values", __doc__),
)

_connector_type: KafkaConnectorType = Field(default=..., hidden_from_schema=True)

@validator("app", pre=True)
def connector_config_should_have_component_name(
cls,
Expand Down Expand Up @@ -98,23 +101,22 @@ def helm(self) -> Helm:
)
return helm

def _get_resetter_helm_chart(self) -> str:
"""Get reseter Helm chart
@property
def _resetter_release_name(self) -> str:
suffix = "-clean"
clean_up_release_name = self.full_name + suffix
trimmed_name = trim_release_name(clean_up_release_name, suffix)
return trimmed_name

:return: returns the component resetter's helm chart
"""
@property
def _resetter_helm_chart(self) -> str:
return f"{self.repo_config.repository_name}/kafka-connect-resetter"

@cached_property
def dry_run_handler(self) -> DryRunHandler:
helm_diff = HelmDiff(self.config.helm_diff_config)
return DryRunHandler(self.helm, helm_diff, self.namespace)

@property
def kafka_connect_resetter_chart(self) -> str:
"""Resetter chart for this component"""
return f"{self.repo_config.repository_name}/kafka-connect-resetter"

@property
def helm_flags(self) -> HelmFlags:
"""Return shared flags for Helm commands"""
Expand Down Expand Up @@ -148,7 +150,9 @@ def deploy(self, dry_run: bool) -> None:

@override
def destroy(self, dry_run: bool) -> None:
self.handlers.connector_handler.destroy_connector(self.name, dry_run=dry_run)
self.handlers.connector_handler.destroy_connector(
self.full_name, dry_run=dry_run
)

@override
def clean(self, dry_run: bool) -> None:
Expand All @@ -161,8 +165,6 @@ def clean(self, dry_run: bool) -> None:

def _run_connect_resetter(
self,
connector_name: str,
connector_type: KafkaConnectorType,
dry_run: bool,
retain_clean_jobs: bool,
**kwargs,
Expand All @@ -173,69 +175,48 @@ def _run_connect_resetter(
to make sure that there is no running clean job in the cluster. Then it releases a cleanup job.
If the retain_clean_jobs flag is set to false the cleanup job will be deleted.
:param connector_name: Name of the connector
:param connector_type: Type of the connector (SINK or SOURCE)
:param dry_run: If the cleanup should be run in dry run mode or not
:param retain_clean_jobs: If the cleanup job should be kept
:param kwargs: Other values for the KafkaConnectResetter
"""
trimmed_name = self._get_kafka_resetter_release_name(connector_name)

log.info(
magentaify(
f"Connector Cleanup: uninstalling cleanup job Helm release from previous runs for {connector_name}"
f"Connector Cleanup: uninstalling cleanup job Helm release from previous runs for {self.full_name}"
)
)
self.__uninstall_connect_resetter(trimmed_name, dry_run)
self.__uninstall_connect_resetter(self._resetter_release_name, dry_run)

log.info(
magentaify(
f"Connector Cleanup: deploy Connect {connector_type.value} resetter for {connector_name}"
f"Connector Cleanup: deploy Connect {self._connector_type.value} resetter for {self.full_name}"
)
)

stdout = self.__install_connect_resetter(
trimmed_name, connector_name, connector_type, dry_run, **kwargs
)
stdout = self.__install_connect_resetter(dry_run, **kwargs)

if dry_run:
self.dry_run_handler.print_helm_diff(stdout, trimmed_name, log)
self.dry_run_handler.print_helm_diff(
stdout, self._resetter_release_name, log
)

if not retain_clean_jobs:
log.info(magentaify("Connector Cleanup: uninstall Kafka Resetter."))
self.__uninstall_connect_resetter(trimmed_name, dry_run)

def _get_kafka_resetter_release_name(self, connector_name: str) -> str:
"""Get connector resetter's release name
:param connector_name: Name of the connector to be reset
:return: The name of the resetter to be used
"""
suffix = "-clean"
clean_up_release_name = connector_name + suffix
trimmed_name = trim_release_name(clean_up_release_name, suffix)
return trimmed_name
self.__uninstall_connect_resetter(self._resetter_release_name, dry_run)

def __install_connect_resetter(
self,
release_name: str,
connector_name: str,
connector_type: KafkaConnectorType,
dry_run: bool,
**kwargs,
) -> str:
"""Install connector resetter
:param release_name: Release name for the resetter
:param connector_name: Name of the connector-to-be-reset
:param connector_type: Type of the connector
:param dry_run: Whether to dry run the command
:return: The output of `helm upgrade --install`
"""
return self.helm.upgrade_install(
release_name=release_name,
release_name=self._resetter_release_name,
namespace=self.namespace,
chart=self.kafka_connect_resetter_chart,
chart=self._resetter_helm_chart,
dry_run=dry_run,
flags=HelmUpgradeInstallFlags(
create_namespace=self.config.create_namespace,
Expand All @@ -244,33 +225,27 @@ def __install_connect_resetter(
wait=True,
),
values=self._get_kafka_connect_resetter_values(
connector_name,
connector_type,
**kwargs,
),
)

def _get_kafka_connect_resetter_values(
self,
connector_name: str,
connector_type: KafkaConnectorType,
**kwargs,
) -> dict:
"""Get connector resetter helm chart values
:param connector_name: Name of the connector
:param connector_type: Type of the connector
:return: The Helm chart values of the connector resetter
"""
return {
**KafkaConnectResetterValues(
config=KafkaConnectResetterConfig(
connector=connector_name,
connector=self.full_name,
brokers=self.config.brokers,
**kwargs,
),
connector_type=connector_type.value,
name_override=connector_name,
connector_type=self._connector_type.value,
name_override=self.full_name,
).dict(),
**self.resetter_values,
}
Expand Down Expand Up @@ -301,20 +276,20 @@ class KafkaSourceConnector(KafkaConnector):
description=describe_attr("offset_topic", __doc__),
)

_connector_type = KafkaConnectorType.SOURCE

@override
def apply_from_inputs(self, name: str, topic: FromTopic) -> NoReturn:
raise NotImplementedError("Kafka source connector doesn't support FromSection")

@override
def template(self) -> None:
values = self._get_kafka_connect_resetter_values(
self.name,
KafkaConnectorType.SOURCE,
offset_topic=self.offset_topic,
)
stdout = self.helm.template(
self._get_kafka_resetter_release_name(self.name),
self._get_resetter_helm_chart(),
self._resetter_release_name,
self._resetter_helm_chart,
self.namespace,
values,
self.template_flags,
Expand All @@ -336,8 +311,6 @@ def __run_kafka_connect_resetter(self, dry_run: bool) -> None:
:param dry_run: Whether to do a dry run of the command
"""
self._run_connect_resetter(
connector_name=self.name,
connector_type=KafkaConnectorType.SOURCE,
dry_run=dry_run,
retain_clean_jobs=self.config.retain_clean_jobs,
offset_topic=self.offset_topic,
Expand All @@ -347,6 +320,8 @@ def __run_kafka_connect_resetter(self, dry_run: bool) -> None:
class KafkaSinkConnector(KafkaConnector):
"""Kafka sink connector model"""

_connector_type = KafkaConnectorType.SINK

@override
def add_input_topics(self, topics: list[str]) -> None:
existing_topics: str | None = getattr(self.app, "topics", None)
Expand All @@ -356,12 +331,10 @@ def add_input_topics(self, topics: list[str]) -> None:

@override
def template(self) -> None:
values = self._get_kafka_connect_resetter_values(
self.name, KafkaConnectorType.SINK
)
values = self._get_kafka_connect_resetter_values()
stdout = self.helm.template(
self._get_kafka_resetter_release_name(self.name),
self._get_resetter_helm_chart(),
self._resetter_release_name,
self._resetter_helm_chart,
self.namespace,
values,
self.template_flags,
Expand Down Expand Up @@ -391,10 +364,9 @@ def __run_kafka_connect_resetter(
"""Runs the connector resetter
:param dry_run: Whether to do a dry run of the command
:param delete_consumer_group: Whether the consumer group should be deleted or not
"""
self._run_connect_resetter(
connector_name=self.name,
connector_type=KafkaConnectorType.SINK,
dry_run=dry_run,
retain_clean_jobs=self.config.retain_clean_jobs,
delete_consumer_group=delete_consumer_group,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ def test_should_call_delete_connector_when_destroying_existing_connector_not_dry
handler = self.connector_handler(connector_wrapper)

handler.destroy_connector(CONNECTOR_NAME, dry_run=False)

assert connector_wrapper.mock_calls == [
mock.call.get_connector(CONNECTOR_NAME),
mock.call.delete_connector(CONNECTOR_NAME),
Expand Down
12 changes: 5 additions & 7 deletions tests/components/test_kafka_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@

DEFAULTS_PATH = Path(__file__).parent / "resources"
CONNECTOR_NAME = "test-connector-with-long-name-0123456789abcdefghijklmnop"
CONNECTOR_NAME_PREFIXED = (
"${pipeline_name}-test-connector-with-long-name-0123456789abcdefghijklmnop"
)
CONNECTOR_CLEAN_NAME = "test-connector-with-long-name-0123456789abcdef-clean"
CONNECTOR_FULL_NAME = "${pipeline_name}-" + CONNECTOR_NAME
CONNECTOR_CLEAN_FULL_NAME = "${pipeline_name}-test-connector-with-long-name-clean"
CONNECTOR_CLASS = "com.bakdata.connect.TestConnector"


Expand Down Expand Up @@ -58,7 +56,7 @@ def connector_config(self) -> KafkaConnectorConfig:
return KafkaConnectorConfig(
**{
"connector.class": CONNECTOR_CLASS,
"name": CONNECTOR_NAME_PREFIXED,
"name": CONNECTOR_FULL_NAME,
}
)

Expand All @@ -75,7 +73,7 @@ def test_connector_config_name_override(
app=connector_config,
namespace="test-namespace",
)
assert connector.app.name == CONNECTOR_NAME_PREFIXED
assert connector.app.name == CONNECTOR_FULL_NAME

connector = KafkaConnector(
name=CONNECTOR_NAME,
Expand All @@ -84,7 +82,7 @@ def test_connector_config_name_override(
app={"connector.class": CONNECTOR_CLASS}, # type: ignore
namespace="test-namespace",
)
assert connector.app.name == CONNECTOR_NAME_PREFIXED
assert connector.app.name == CONNECTOR_FULL_NAME

with pytest.raises(
ValueError, match="Connector name should be the same as component name"
Expand Down
Loading

0 comments on commit 9aa380b

Please sign in to comment.