Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Kafka REST Proxy & Kafka Connect hosts default and improve Schema Registry config #354

Merged
merged 26 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions docs/docs/resources/variables/config_env_vars.env
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,7 @@
KPOPS_ENVIRONMENT # No default value, required
# brokers
# The comma separated Kafka brokers address.
KPOPS_KAFKA_BROKERS # No default value, required
# kafka_rest_url
# Address of the Kafka REST Proxy.
KPOPS_REST_PROXY_URL=http://localhost:8082
# kafka_connect_url
# Address of Kafka Connect.
KPOPS_CONNECT_URL=http://localhost:8083
KPOPS_BROKERS # No default value, required
# timeout
# The timeout in seconds that specifies when actions like deletion or
# deploy timeout.
Expand Down
14 changes: 6 additions & 8 deletions docs/docs/resources/variables/config_env_vars.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@

These variables are a lower priority alternative to the settings in `config.yaml`. Variables marked as required can instead be set in the pipeline config.

| Name | Default Value |Required| Description | Setting name |
|-----------------------|---------------------|--------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------|
|KPOPS_ENVIRONMENT | |True |The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development).|environment |
|KPOPS_KAFKA_BROKERS | |True |The comma separated Kafka brokers address. |brokers |
|KPOPS_REST_PROXY_URL |http://localhost:8082|False |Address of the Kafka REST Proxy. |kafka_rest_url |
|KPOPS_CONNECT_URL |http://localhost:8083|False |Address of Kafka Connect. |kafka_connect_url|
|KPOPS_TIMEOUT | 300|False |The timeout in seconds that specifies when actions like deletion or deploy timeout. |timeout |
|KPOPS_RETAIN_CLEAN_JOBS|False |False |Whether to retain clean up jobs in the cluster or uninstall the, after completion. |retain_clean_jobs|
| Name |Default Value|Required| Description | Setting name |
|-----------------------|-------------|--------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------|
|KPOPS_ENVIRONMENT | |True |The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development).|environment |
|KPOPS_BROKERS | |True |The comma separated Kafka brokers address. |brokers |
|KPOPS_TIMEOUT | 300|False |The timeout in seconds that specifies when actions like deletion or deploy timeout. |timeout |
|KPOPS_RETAIN_CLEAN_JOBS|False |False |Whether to retain clean up jobs in the cluster or uninstall the, after completion. |retain_clean_jobs|
raminqaf marked this conversation as resolved.
Show resolved Hide resolved
98 changes: 72 additions & 26 deletions docs/docs/schema/config.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"$ref": "#/definitions/PipelineConfig",
"$ref": "#/definitions/KpopsConfig",
"definitions": {
"HelmConfig": {
"description": "Global Helm configuration",
Expand Down Expand Up @@ -41,15 +41,57 @@
"title": "HelmDiffConfig",
"type": "object"
},
"PipelineConfig": {
"KafkaConnectConfig": {
"additionalProperties": false,
"description": "Configuration for Kafka Connect.",
"properties": {
"url": {
"default": "http://localhost:8083",
"description": "Address of Kafka Connect.",
"env": "KPOPS__KAFKA_CONNECT_URL",
"env_names": [
"kpops__kafka_connect_url"
],
"format": "uri",
"maxLength": 65536,
"minLength": 1,
"title": "Url",
"type": "string"
}
},
"title": "KafkaConnectConfig",
"type": "object"
},
"KafkaRestConfig": {
"additionalProperties": false,
"description": "Configuration for Kafka REST Proxy.",
"properties": {
"url": {
"default": "http://localhost:8082",
"description": "Address of the Kafka REST Proxy.",
"env": "KPOPS__KAFKA_REST_URL",
raminqaf marked this conversation as resolved.
Show resolved Hide resolved
"env_names": [
"kpops__kafka_rest_url"
],
"format": "uri",
"maxLength": 65536,
"minLength": 1,
"title": "Url",
"type": "string"
}
},
"title": "KafkaRestConfig",
"type": "object"
},
"KpopsConfig": {
"additionalProperties": false,
"description": "Pipeline configuration unrelated to the components.",
"properties": {
"brokers": {
"description": "The comma separated Kafka brokers address.",
"env": "KPOPS_KAFKA_BROKERS",
"env": "KPOPS_BROKERS",
"env_names": [
"kpops_kafka_brokers"
"kpops_brokers"
],
"example": "broker1:9092,broker2:9092,broker3:9092",
"title": "Brokers",
Expand Down Expand Up @@ -126,31 +168,35 @@
],
"title": "Helm Diff Config"
},
"kafka_connect_url": {
"default": "http://localhost:8083",
"description": "Address of Kafka Connect.",
"env": "KPOPS_CONNECT_URL",
"kafka_connect": {
"allOf": [
{
"$ref": "#/definitions/KafkaConnectConfig"
}
],
"default": {
"url": "http://localhost:8083"
},
"description": "Configuration for Kafka Connect.",
"env_names": [
"kpops_connect_url"
"kpops_kafka_connect"
],
"format": "uri",
"maxLength": 65536,
"minLength": 1,
"title": "Kafka Connect Url",
"type": "string"
"title": "Kafka Connect"
},
"kafka_rest_url": {
"default": "http://localhost:8082",
"description": "Address of the Kafka REST Proxy.",
"env": "KPOPS_REST_PROXY_URL",
"kafka_rest": {
"allOf": [
{
"$ref": "#/definitions/KafkaRestConfig"
}
],
"default": {
"url": "http://localhost:8082"
},
"description": "Configuration for Kafka REST Proxy.",
"env_names": [
"kpops_rest_proxy_url"
"kpops_kafka_rest"
],
"format": "uri",
"maxLength": 65536,
"minLength": 1,
"title": "Kafka Rest Url",
"type": "string"
"title": "Kafka Rest"
},
"retain_clean_jobs": {
"default": false,
Expand All @@ -172,7 +218,7 @@
"enabled": false,
"url": "http://localhost:8081"
},
"description": "Configure the Schema Registry.",
"description": "Configuration for Schema Registry.",
"env_names": [
"kpops_schema_registry"
],
Expand Down Expand Up @@ -209,7 +255,7 @@
"environment",
"brokers"
],
"title": "PipelineConfig",
"title": "KpopsConfig",
"type": "object"
},
"SchemaRegistryConfig": {
Expand Down
13 changes: 8 additions & 5 deletions docs/docs/user/migration-guide/v2-v3.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ The breaking changes target the `config.yaml` file:

- The `schema_registry_url` is replaced with `schema_registry.enabled` (default `false`) and `schema_registry.url` (default `http://localhost:8081`).
raminqaf marked this conversation as resolved.
Show resolved Hide resolved

- `kafka_connect_host` is renamed to `kafka_connect_url` (default `http://localhost:8083`).
- `kafka_rest_host` is renamed to `kafka_rest_url` (default `http://localhost:8082`).
- `kafka_rest_host` is renamed to `kafka_rest.url` (default `http://localhost:8082`).

The environment variable name of these config fields changed respectively. Please refer to the [environment variables documentation page](../core-concepts/variables/environment_variables.md).
- `kafka_connect_host` is replaced with `kafka_connect.url` (default `http://localhost:8083`).

The environment variable name of these config fields changed respectively. The environment variable `KPOPS_KAFKA_BROKERS` changed to `KPOPS_BROKERS`. Please refer to the [environment variables documentation page](../core-concepts/variables/environment_variables.md) to see the newest changes.

Your `config.yaml` will change to:

Expand All @@ -19,8 +20,10 @@ Your `config.yaml` will change to:
- kafka_rest_host: "http://my-custom-rest.url:8082"
- kafka_connect_host: "http://my-custom-connect.url:8083"
- schema_registry_url: "http://my-custom-sr.url:8081"
+ kafka_rest_url: "http://my-custom-rest.url:8082"
+ Kafka_connect_url: "http://my-custom-connect.url:8083"
+ kafka_rest:
+ url: "http://my-custom-rest.url:8082"
+ Kafka_connect:
raminqaf marked this conversation as resolved.
Show resolved Hide resolved
+ url: "http://my-custom-connect.url:8083"
+ schema_registry:
+ enabled: true
+ url: "http://my-custom-sr.url:8081"
Expand Down
6 changes: 4 additions & 2 deletions examples/bakdata/atm-fraud-detection/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ schema_registry:
enabled: true
url: "http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081"

kafka_rest_url: "http://localhost:8082"
kafka_rest:
url: "http://localhost:8082"

kafka_connect_url: "http://localhost:8083"
kafka_connect:
url: "http://localhost:8083"

defaults_path: .
6 changes: 3 additions & 3 deletions hooks/gen_docs/gen_docs_env_vars.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from hooks import PATH_ROOT
from hooks.gen_docs import SuperEnum
from kpops.cli import main
from kpops.cli.pipeline_config import PipelineConfig
from kpops.cli.config import KpopsConfig
raminqaf marked this conversation as resolved.
Show resolved Hide resolved

PATH_DOCS_RESOURCES = PATH_ROOT / "docs/docs/resources"
PATH_DOCS_VARIABLES = PATH_DOCS_RESOURCES / "variables"
Expand Down Expand Up @@ -250,9 +250,9 @@ def __fill_csv_pipeline_config(target: Path) -> None:
"""
# NOTE: This does not see nested fields, hence if there are env vars in a class like
# TopicConfig(), they wil not be listed. Possible fix with recursion.
config_fields = PipelineConfig.__fields__
config_fields = KpopsConfig.__fields__
for config_field in config_fields.values():
config_field_info = PipelineConfig.Config.get_field_info(config_field.name)
config_field_info = KpopsConfig.Config.get_field_info(config_field.name)
config_field_description: str = (
config_field.field_info.description
or "No description available, please refer to the pipeline config documentation."
Expand Down
48 changes: 33 additions & 15 deletions kpops/cli/pipeline_config.py → kpops/cli/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,29 @@ class SchemaRegistryConfig(BaseSettings):
)


class PipelineConfig(BaseSettings):
class KafkaRestConfig(BaseSettings):
"""Configuration for Kafka REST Proxy."""

url: AnyHttpUrl = Field(
# For validating URLs use parse_obj_as
# https://github.com/pydantic/pydantic/issues/1106
raminqaf marked this conversation as resolved.
Show resolved Hide resolved
default=parse_obj_as(AnyHttpUrl, "http://localhost:8082"),
env=f"{ENV_PREFIX}_KAFKA_REST_URL",
description="Address of the Kafka REST Proxy.",
)


class KafkaConnectConfig(BaseSettings):
"""Configuration for Kafka Connect."""

url: AnyHttpUrl = Field(
default=parse_obj_as(AnyHttpUrl, "http://localhost:8083"),
env=f"{ENV_PREFIX}_KAFKA_CONNECT_URL",
description="Address of Kafka Connect.",
)


class KpopsConfig(BaseSettings):
"""Pipeline configuration unrelated to the components."""

defaults_path: Path = Field(
Expand All @@ -61,7 +83,7 @@ class PipelineConfig(BaseSettings):
)
brokers: str = Field(
default=...,
env=f"{ENV_PREFIX}KAFKA_BROKERS",
env=f"{ENV_PREFIX}BROKERS",
description="The comma separated Kafka brokers address.",
example="broker1:9092,broker2:9092,broker3:9092",
)
Expand All @@ -75,19 +97,15 @@ class PipelineConfig(BaseSettings):
)
schema_registry: SchemaRegistryConfig = Field(
default=SchemaRegistryConfig(),
description="Configure the Schema Registry.",
description="Configuration for Schema Registry.",
raminqaf marked this conversation as resolved.
Show resolved Hide resolved
)
kafka_rest_url: AnyHttpUrl = Field(
# For validating URLs use parse_obj_as
# https://github.com/pydantic/pydantic/issues/1106
default=parse_obj_as(AnyHttpUrl, "http://localhost:8082"),
env=f"{ENV_PREFIX}REST_PROXY_URL",
description="Address of the Kafka REST Proxy.",
kafka_rest: KafkaRestConfig = Field(
default=KafkaRestConfig(),
description="Configuration for Kafka REST Proxy.",
)
kafka_connect_url: AnyHttpUrl = Field(
default=parse_obj_as(AnyHttpUrl, "http://localhost:8083"),
env=f"{ENV_PREFIX}CONNECT_URL",
description="Address of Kafka Connect.",
kafka_connect: KafkaConnectConfig = Field(
default=KafkaConnectConfig(),
description="Configuration for Kafka Connect.",
)
timeout: int = Field(
default=300,
Expand Down Expand Up @@ -125,7 +143,7 @@ def customise_sources(
env_settings: SettingsSourceCallable,
file_secret_settings: SettingsSourceCallable,
) -> tuple[
SettingsSourceCallable | Callable[[PipelineConfig], dict[str, Any]], ...
SettingsSourceCallable | Callable[[KpopsConfig], dict[str, Any]], ...
]:
return (
env_settings,
Expand All @@ -135,7 +153,7 @@ def customise_sources(
)


def yaml_config_settings_source(settings: PipelineConfig) -> dict[str, Any]:
def yaml_config_settings_source(settings: KpopsConfig) -> dict[str, Any]:
path_to_config = settings.Config.config_path
if path_to_config.exists():
if isinstance(source := load_yaml_file(path_to_config), dict):
Expand Down
14 changes: 7 additions & 7 deletions kpops/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import typer

from kpops import __version__
from kpops.cli.config import ENV_PREFIX, KpopsConfig
from kpops.cli.custom_formatter import CustomFormatter
from kpops.cli.pipeline_config import ENV_PREFIX, PipelineConfig
from kpops.cli.registry import Registry
from kpops.component_handlers import ComponentHandlers
from kpops.component_handlers.kafka_connect.kafka_connect_handler import (
Expand Down Expand Up @@ -111,7 +111,7 @@ def setup_pipeline(
pipeline_base_dir: Path,
pipeline_path: Path,
components_module: str | None,
pipeline_config: PipelineConfig,
pipeline_config: KpopsConfig,
) -> Pipeline:
registry = Registry()
if components_module:
Expand All @@ -125,7 +125,7 @@ def setup_pipeline(


def setup_handlers(
components_module: str | None, config: PipelineConfig
components_module: str | None, config: KpopsConfig
) -> ComponentHandlers:
schema_handler = SchemaHandler.load_schema_handler(components_module, config)
connector_handler = KafkaConnectHandler.from_pipeline_config(config)
Expand Down Expand Up @@ -193,13 +193,13 @@ def log_action(action: str, pipeline_component: PipelineComponent):

def create_pipeline_config(
config: Path, defaults: Optional[Path], verbose: bool
) -> PipelineConfig:
) -> KpopsConfig:
setup_logging_level(verbose)
PipelineConfig.Config.config_path = config
KpopsConfig.Config.config_path = config
if defaults:
pipeline_config = PipelineConfig(defaults_path=defaults)
pipeline_config = KpopsConfig(defaults_path=defaults)
raminqaf marked this conversation as resolved.
Show resolved Hide resolved
else:
pipeline_config = PipelineConfig()
pipeline_config = KpopsConfig()
pipeline_config.defaults_path = config.parent / pipeline_config.defaults_path
return pipeline_config

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from typing_extensions import Self

if TYPE_CHECKING:
from kpops.cli.pipeline_config import PipelineConfig
from kpops.cli.config import KpopsConfig

log = logging.getLogger("KafkaConnectHandler")

Expand Down Expand Up @@ -136,8 +136,8 @@ def __dry_run_connector_deletion(self, connector_name: str) -> None:
)

@classmethod
def from_pipeline_config(cls, pipeline_config: PipelineConfig) -> Self:
def from_pipeline_config(cls, config: KpopsConfig) -> Self:
return cls(
connect_wrapper=ConnectWrapper(url=pipeline_config.kafka_connect_url),
timeout=pipeline_config.timeout,
connect_wrapper=ConnectWrapper(url=config.kafka_connect.url),
timeout=config.timeout,
)
Loading