Skip to content

Commit

Permalink
Convert all values of Kafka connector and topic config to string (#544)
Browse files Browse the repository at this point in the history
Kafka Connect API interprets all config values as strings. Currently, it
creates an unnecessary diff during `kpops deploy --dry-run` if the user
doesn't specify all connector config values as strings. This is
something we can do automatically.


![image](https://github.com/user-attachments/assets/1b5ece70-6451-4cbb-9f6c-15a144747618)
  • Loading branch information
disrupted authored Dec 4, 2024
1 parent b811e54 commit feadb78
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 15 deletions.
17 changes: 16 additions & 1 deletion docs/docs/schema/defaults.json
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,22 @@
"KafkaConnectorConfig": {
"additionalProperties": true,
"additional_properties": {
"type": "string"
"type": {
"anyOf": [
{
"type": "string"
},
{
"type": "boolean"
},
{
"type": "integer"
},
{
"type": "number"
}
]
}
},
"description": "Settings specific to Kafka Connectors.",
"properties": {
Expand Down
17 changes: 16 additions & 1 deletion docs/docs/schema/pipeline.json
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,22 @@
"KafkaConnectorConfig": {
"additionalProperties": true,
"additional_properties": {
"type": "string"
"type": {
"anyOf": [
{
"type": "string"
},
{
"type": "boolean"
},
{
"type": "integer"
},
{
"type": "number"
}
]
}
},
"description": "Settings specific to Kafka Connectors.",
"properties": {
Expand Down
16 changes: 13 additions & 3 deletions kpops/component_handlers/kafka_connect/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
by_alias,
exclude_by_value,
to_dot,
to_str,
)


Expand All @@ -40,7 +41,16 @@ def json_schema_extra(schema: dict[str, Any], model: type[BaseModel]) -> None:
super(KafkaConnectorConfig, KafkaConnectorConfig).json_schema_extra(
schema, model
)
schema["additional_properties"] = {"type": "string"}
schema["additional_properties"] = {
"type": {
"anyOf": [
{"type": "string"},
{"type": "boolean"},
{"type": "integer"},
{"type": "number"},
],
}
}

model_config = ConfigDict(
extra="allow",
Expand Down Expand Up @@ -80,9 +90,9 @@ def serialize_model(
self,
default_serialize_handler: pydantic.SerializerFunctionWrapHandler,
info: pydantic.SerializationInfo,
) -> dict[str, Any]:
) -> dict[str, str]:
result = exclude_by_value(default_serialize_handler(self), None)
return {by_alias(self, name): value for name, value in result.items()}
return {by_alias(self, name): to_str(value) for name, value in result.items()}


class ConnectorTask(BaseModel):
Expand Down
6 changes: 5 additions & 1 deletion kpops/components/common/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from pydantic import BaseModel, ConfigDict, Field, model_validator

from kpops.utils.docstring import describe_attr
from kpops.utils.pydantic import DescConfigModel
from kpops.utils.pydantic import DescConfigModel, to_str


class OutputTopicTypes(str, Enum):
Expand Down Expand Up @@ -76,6 +76,10 @@ def extra_topic_label(self) -> Any:
raise ValueError(msg)
return self

@pydantic.field_serializer("configs")
def serialize_configs(self, configs: dict[str, str | int]) -> dict[str, str]:
return {key: to_str(value) for key, value in configs.items()}


class KafkaTopic(BaseModel):
"""Internal representation of a Kafka topic.
Expand Down
7 changes: 7 additions & 0 deletions kpops/utils/pydantic.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import logging
from pathlib import Path
from typing import Any
Expand Down Expand Up @@ -42,6 +43,12 @@ def by_alias(model: BaseModel, field_name: str) -> str:
return model.model_fields.get(field_name, Field()).alias or field_name


def to_str(value: Any) -> str:
if isinstance(value, str):
return value
return json.dumps(value)


_V = TypeVar("_V")


Expand Down
24 changes: 24 additions & 0 deletions tests/component_handlers/kafka_connect/test_connect_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,30 @@ def connector_config(self) -> KafkaConnectorConfig:
}
)

def test_convert_config_values_to_str(self):
# all values should be converted to strings
assert KafkaConnectorConfig.model_validate(
{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"name": "test-connector",
"batch.size": 50,
"max.buffered.records": 500,
"connection.password": "fake-password",
"store.kafka.keys": True,
"receive.buffer.bytes": -1,
"topic.tracking.allow.reset": False,
}
).model_dump() == {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"name": "test-connector",
"batch.size": "50",
"max.buffered.records": "500",
"connection.password": "fake-password",
"store.kafka.keys": "true",
"receive.buffer.bytes": "-1",
"topic.tracking.allow.reset": "false",
}

@pytest.mark.asyncio()
@patch("httpx.AsyncClient.post")
async def test_should_create_post_requests_for_given_connector_configuration(
Expand Down
22 changes: 22 additions & 0 deletions tests/component_handlers/topic/test_topic_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,28 @@ async def get_default_topic_response_mock(self) -> MagicMock:
wrapper.get_broker_config.return_value = BrokerConfigResponse(**broker_response)
return wrapper

def test_convert_config_values_to_str(self):
assert TopicConfig(
partitions_count=1,
configs={
"retention.ms": -1,
"cleanup.policy": "delete",
"delete.retention.ms": 123456789,
},
).model_dump() == {
"configs": {
"retention.ms": "-1",
"cleanup.policy": "delete",
"delete.retention.ms": "123456789",
},
"key_schema": None,
"label": None,
"partitions_count": 1,
"replication_factor": None,
"type": None,
"value_schema": None,
}

@pytest.mark.asyncio()
async def test_should_call_create_topic_with_dry_run_false(self):
wrapper = AsyncMock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,23 +405,23 @@
connectorType: sink
version: 1.0.4
config:
auto.create: true
connection.ds.pool.size: 5
auto.create: 'true'
connection.ds.pool.size: '5'
connection.password: AppPassword
connection.url: jdbc:postgresql://postgresql-dev.${NAMESPACE}.svc.cluster.local:5432/app_db
connection.user: app1
connector.class: io.confluent.connect.jdbc.JdbcSinkConnector
errors.deadletterqueue.context.headers.enable: true
errors.deadletterqueue.context.headers.enable: 'true'
errors.deadletterqueue.topic.name: postgres-request-sink-dead-letters
errors.deadletterqueue.topic.replication.factor: 1
errors.deadletterqueue.topic.replication.factor: '1'
errors.tolerance: all
insert.mode: insert
insert.mode.databaselevel: true
insert.mode.databaselevel: 'true'
key.converter: org.apache.kafka.connect.storage.StringConverter
name: atm-fraud-postgresql-connector
pk.mode: record_value
table.name.format: fraud_transactions
tasks.max: 1
tasks.max: '1'
topics: atm-fraud-account-linker-topic
transforms: flatten
transforms.flatten.type: org.apache.kafka.connect.transforms.Flatten$Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,9 @@
connector.class: com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector
key.converter: org.apache.kafka.connect.storage.StringConverter
name: word-count-redis-sink-connector
redis.database: 0
redis.database: '0'
redis.hosts: redis-headless:6379
tasks.max: 1
tasks.max: '1'
topics: word-count-word-counter-topic
value.converter: org.apache.kafka.connect.storage.StringConverter
name: redis-sink-connector
Expand Down
20 changes: 19 additions & 1 deletion tests/utils/test_pydantic.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from typing import Any

import pytest

from kpops.utils.pydantic import to_dash, to_dot, to_snake
from kpops.utils.pydantic import to_dash, to_dot, to_snake, to_str


@pytest.mark.parametrize(
Expand Down Expand Up @@ -46,3 +48,19 @@ def test_to_snake(input: str, expected: str):
)
def test_to_dot(input: str, expected: str):
assert to_dot(input) == expected


@pytest.mark.parametrize(
("input", "expected"),
[
("foo", "foo"),
("1", "1"),
(1, "1"),
(-1, "-1"),
(1.9, "1.9"),
(True, "true"),
(False, "false"),
],
)
def test_to_str(input: Any, expected: str):
assert to_str(input) == expected

0 comments on commit feadb78

Please sign in to comment.