Skip to content

Commit

Permalink
change connector type
Browse files Browse the repository at this point in the history
  • Loading branch information
raminqaf committed Sep 18, 2023
1 parent 599f46c commit fafc12d
Showing 1 changed file with 10 additions and 11 deletions.
21 changes: 10 additions & 11 deletions kpops/components/base_components/kafka_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ class KafkaConnector(PipelineComponent, ABC):
description=describe_attr("resetter_values", __doc__),
)

_connector_type: KafkaConnectorType = Field(default=..., hidden_from_schema=True)

@validator("app", pre=True)
def connector_config_should_have_component_name(
cls,
Expand Down Expand Up @@ -172,7 +174,6 @@ def clean(self, dry_run: bool) -> None:

def _run_connect_resetter(
self,
connector_type: KafkaConnectorType,
dry_run: bool,
retain_clean_jobs: bool,
**kwargs,
Expand All @@ -183,7 +184,6 @@ def _run_connect_resetter(
to make sure that there is no running clean job in the cluster. Then it releases a cleanup job.
If the retain_clean_jobs flag is set to false the cleanup job will be deleted.
:param connector_type: Type of the connector (SINK or SOURCE)
:param dry_run: If the cleanup should be run in dry run mode or not
:param retain_clean_jobs: If the cleanup job should be kept
:param kwargs: Other values for the KafkaConnectResetter
Expand All @@ -199,11 +199,11 @@ def _run_connect_resetter(

log.info(
magentaify(
f"Connector Cleanup: deploy Connect {connector_type.value} resetter for {self.full_name}"
f"Connector Cleanup: deploy Connect {self._connector_type.value} resetter for {self.full_name}"
)
)

stdout = self.__install_connect_resetter(connector_type, dry_run, **kwargs)
stdout = self.__install_connect_resetter(dry_run, **kwargs)

if dry_run:
self.dry_run_handler.print_helm_diff(
Expand All @@ -218,7 +218,6 @@ def _run_connect_resetter(

def __install_connect_resetter(
self,
connector_type: KafkaConnectorType,
dry_run: bool,
**kwargs,
) -> str:
Expand All @@ -242,14 +241,12 @@ def __install_connect_resetter(
wait=True,
),
values=self._get_kafka_connect_resetter_values(
connector_type,
**kwargs,
),
)

def _get_kafka_connect_resetter_values(
self,
connector_type: KafkaConnectorType,
**kwargs,
) -> dict:
"""Get connector resetter helm chart values
Expand All @@ -264,7 +261,7 @@ def _get_kafka_connect_resetter_values(
brokers=self.config.brokers,
**kwargs,
),
connector_type=connector_type.value,
connector_type=self._connector_type.value,
name_override=self.full_name,
).dict(),
**self.resetter_values,
Expand Down Expand Up @@ -296,14 +293,15 @@ class KafkaSourceConnector(KafkaConnector):
description=describe_attr("offset_topic", __doc__),
)

_connector_type = KafkaConnectorType.SOURCE

@override
def apply_from_inputs(self, name: str, topic: FromTopic) -> NoReturn:
raise NotImplementedError("Kafka source connector doesn't support FromSection")

@override
def template(self) -> None:
values = self._get_kafka_connect_resetter_values(
KafkaConnectorType.SOURCE,
offset_topic=self.offset_topic,
)
stdout = self.helm.template(
Expand All @@ -330,7 +328,6 @@ def __run_kafka_connect_resetter(self, dry_run: bool) -> None:
:param dry_run: Whether to do a dry run of the command
"""
self._run_connect_resetter(
connector_type=KafkaConnectorType.SOURCE,
dry_run=dry_run,
retain_clean_jobs=self.config.retain_clean_jobs,
offset_topic=self.offset_topic,
Expand All @@ -340,6 +337,8 @@ def __run_kafka_connect_resetter(self, dry_run: bool) -> None:
class KafkaSinkConnector(KafkaConnector):
"""Kafka sink connector model"""

_connector_type = KafkaConnectorType.SINK

@override
def add_input_topics(self, topics: list[str]) -> None:
existing_topics: str | None = getattr(self.app, "topics", None)
Expand All @@ -349,7 +348,7 @@ def add_input_topics(self, topics: list[str]) -> None:

@override
def template(self) -> None:
values = self._get_kafka_connect_resetter_values(KafkaConnectorType.SINK)
values = self._get_kafka_connect_resetter_values()
stdout = self.helm.template(
self._connector_resetter_release_name,
self._resetter_helm_chart,
Expand Down

0 comments on commit fafc12d

Please sign in to comment.