Skip to content

Commit

Permalink
fix: adapt to new AnyHttpUrl and pydantic v2 validation
Browse files Browse the repository at this point in the history
  • Loading branch information
sujuka99 committed Oct 24, 2023
1 parent db7abbf commit 78001a2
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 73 deletions.
30 changes: 19 additions & 11 deletions docs/docs/resources/variables/config_env_vars.env
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,27 @@ defaults_path=.
# The environment you want to generate and deploy the pipeline to.
# Suffix your environment files with this value (e.g.
# defaults_development.yaml for environment=development).
KPOPS_ENVIRONMENT # No default value, required
environment=PydanticUndefined
# kafka_brokers
# The comma separated Kafka brokers address.
KPOPS_KAFKA_BROKERS # No default value, required
# url
# Address of the Schema Registry.
KPOPS_SCHEMA_REGISTRY_URL=http://localhost:8081
# url
# Address of the Kafka REST Proxy.
KPOPS_KAFKA_REST_URL=http://localhost:8082
# url
# Address of Kafka Connect.
KPOPS_KAFKA_CONNECT_URL=http://localhost:8083
kafka_brokers=PydanticUndefined
# defaults_filename_prefix
# The name of the defaults file and the prefix of the defaults
# environment file.
defaults_filename_prefix=defaults
# topic_name_config
# Configure the topic name variables you can use in the pipeline
# definition.
topic_name_config=default_output_topic_name='${pipeline_name}-${component_name}' default_error_topic_name='${pipeline_name}-${component_name}-error'
# schema_registry
# Configuration for Schema Registry.
schema_registry=enabled=False url=Url('http://localhost:8081/')
# kafka_rest
# Configuration for Kafka REST Proxy.
kafka_rest=url=Url('http://localhost:8082/')
# kafka_connect
# Configuration for Kafka Connect.
kafka_connect=url=Url('http://localhost:8083/')
# timeout
# The timeout in seconds that specifies when actions like deletion or
# deploy timeout.
Expand Down
24 changes: 15 additions & 9 deletions docs/docs/resources/variables/config_env_vars.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
These variables are a lower priority alternative to the settings in `config.yaml`. Variables marked as required can instead be set in the pipeline config.

| Name | Default Value | Required | Description | Setting name |
| ------------------------- | --------------------- | -------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------------- |
| KPOPS_ENVIRONMENT | | True | The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). | environment |
| KPOPS_KAFKA_BROKERS | | True | The comma separated Kafka brokers address. | kafka_brokers |
| KPOPS_SCHEMA_REGISTRY_URL | http://localhost:8081 | False | Address of the Schema Registry. | url |
| KPOPS_KAFKA_REST_URL | http://localhost:8082 | False | Address of the Kafka REST Proxy. | url |
| KPOPS_KAFKA_CONNECT_URL | http://localhost:8083 | False | Address of Kafka Connect. | url |
| KPOPS_TIMEOUT | 300 | False | The timeout in seconds that specifies when actions like deletion or deploy timeout. | timeout |
| KPOPS_RETAIN_CLEAN_JOBS | False | False | Whether to retain clean up jobs in the cluster or uninstall the, after completion. | retain_clean_jobs |
| Name | Default Value |Required| Description | Setting name |
|------------------------|----------------------------------------------------------------------------------------------------------------------------------|--------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------|
|defaults_path |. |False |The path to the folder containing the defaults.yaml file and the environment defaults files. Paths can either be absolute or relative to `config.yaml` |defaults_path |
|environment |PydanticUndefined |False |The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development).|environment |
|kafka_brokers |PydanticUndefined |False |The comma separated Kafka brokers address. |kafka_brokers |
|defaults_filename_prefix|defaults |False |The name of the defaults file and the prefix of the defaults environment file. |defaults_filename_prefix|
|topic_name_config |default_output_topic_name='${pipeline_name}-${component_name}' default_error_topic_name='${pipeline_name}-${component_name}-error'|False |Configure the topic name variables you can use in the pipeline definition. |topic_name_config |
|schema_registry |enabled=False url=Url('http://localhost:8081/') |False |Configuration for Schema Registry. |schema_registry |
|kafka_rest |url=Url('http://localhost:8082/') |False |Configuration for Kafka REST Proxy. |kafka_rest |
|kafka_connect |url=Url('http://localhost:8083/') |False |Configuration for Kafka Connect. |kafka_connect |
|timeout |300 |False |The timeout in seconds that specifies when actions like deletion or deploy timeout. |timeout |
|create_namespace |False |False |Flag for `helm upgrade --install`. Create the release namespace if not present. |create_namespace |
|helm_config |context=None debug=False api_version=None |False |Global flags for Helm. |helm_config |
|helm_diff_config |ignore=set() |False |Configure Helm Diff. |helm_diff_config |
|retain_clean_jobs |False |False |Whether to retain clean up jobs in the cluster or uninstall the, after completion. |retain_clean_jobs |
52 changes: 31 additions & 21 deletions docs/docs/schema/pipeline.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
"type": "object"
},
"HelmApp": {
"additionalProperties": true,
"description": "Kubernetes app managed through Helm with an associated Helm chart.",
"properties": {
"app": {
Expand All @@ -68,15 +69,18 @@
"$ref": "#/definitions/KubernetesAppConfig"
}
],
"description": "Application-specific settings",
"title": "App"
"description": "Application-specific settings"
},
"from": {
"allOf": [
"anyOf": [
{
"$ref": "#/definitions/FromSection"
},
{
"type": "null"
}
],
"default": null,
"description": "Topic(s) and/or components from which the component will read input",
"title": "From"
},
Expand All @@ -97,36 +101,41 @@
"type": "string"
},
"repo_config": {
"allOf": [
"anyOf": [
{
"$ref": "#/definitions/HelmRepoConfig"
},
{
"type": "null"
}
],
"description": "Configuration of the Helm chart repo to be used for deploying the component",
"title": "Repo Config"
"default": null,
"description": "Configuration of the Helm chart repo to be used for deploying the component"
},
"to": {
"allOf": [
"anyOf": [
{
"$ref": "#/definitions/ToSection"
},
{
"type": "null"
}
],
"description": "Topic(s) into which the component will write output",
"title": "To"
},
"type": {
"default": "helm-app",
"description": "Kubernetes app managed through Helm with an associated Helm chart.",
"enum": [
"helm-app"
],
"title": "Component type",
"type": "string"
"default": null,
"description": "Topic(s) into which the component will write output"
},
"version": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Helm chart version",
"title": "Version",
"type": "string"
"title": "Version"
}
},
"required": [
Expand Down Expand Up @@ -424,7 +433,8 @@
"type": "object"
},
"KubernetesAppConfig": {
"description": "Settings specific to Kubernetes apps.",
"additionalProperties": true,
"description": "Settings specific to Kubernetes Apps.",
"properties": {},
"title": "KubernetesAppConfig",
"type": "object"
Expand Down
10 changes: 5 additions & 5 deletions kpops/component_handlers/kafka_connect/connect_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def create_connector(
config_json = connector_config.model_dump()
connect_data = {"name": connector_config.name, "config": config_json}
response = httpx.post(
url=f"{self.url}/connectors", headers=HEADERS, json=connect_data
url=f"{self.url}connectors", headers=HEADERS, json=connect_data
)
if response.status_code == httpx.codes.CREATED:
log.info(f"Connector {connector_config.name} created.")
Expand All @@ -74,7 +74,7 @@ def get_connector(self, connector_name: str | None) -> KafkaConnectResponse:
msg = "Connector name not set"
raise Exception(msg)
response = httpx.get(
url=f"{self.url}/connectors/{connector_name}", headers=HEADERS
url=f"{self.url}connectors/{connector_name}", headers=HEADERS
)
if response.status_code == httpx.codes.OK:
log.info(f"Connector {connector_name} exists.")
Expand Down Expand Up @@ -105,7 +105,7 @@ def update_connector_config(
connector_name = connector_config.name
config_json = connector_config.model_dump()
response = httpx.put(
url=f"{self.url}/connectors/{connector_name}/config",
url=f"{self.url}connectors/{connector_name}/config",
headers=HEADERS,
json=config_json,
)
Expand Down Expand Up @@ -136,7 +136,7 @@ def validate_connector_config(
:return: List of all found errors
"""
response = httpx.put(
url=f"{self.url}/connector-plugins/{connector_config.class_name}/config/validate",
url=f"{self.url}connector-plugins/{connector_config.class_name}/config/validate",
headers=HEADERS,
json=connector_config.model_dump(),
)
Expand Down Expand Up @@ -166,7 +166,7 @@ def delete_connector(self, connector_name: str) -> None:
:raises ConnectorNotFoundException: Connector not found
"""
response = httpx.delete(
url=f"{self.url}/connectors/{connector_name}", headers=HEADERS
url=f"{self.url}connectors/{connector_name}", headers=HEADERS
)
if response.status_code == httpx.codes.NO_CONTENT:
log.info(f"Connector {connector_name} deleted.")
Expand Down
2 changes: 1 addition & 1 deletion kpops/component_handlers/schema_handler/schema_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def __init__(
components_module: str | None,
) -> None:
self.schema_registry_client = SchemaRegistryClient(
kpops_config.schema_registry.url
str(kpops_config.schema_registry.url)
)
self.components_module = components_module

Expand Down
14 changes: 7 additions & 7 deletions kpops/component_handlers/topic/proxy_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def cluster_id(self) -> str:
:raises KafkaRestProxyError: Kafka REST proxy error
:return: The Kafka cluster ID.
"""
response = httpx.get(url=f"{self._config.url}/v3/clusters")
response = httpx.get(url=f"{self._config.url!s}v3/clusters")
if response.status_code == httpx.codes.OK:
cluster_information = response.json()
return cluster_information["data"][0]["cluster_id"]
Expand All @@ -67,7 +67,7 @@ def create_topic(self, topic_spec: TopicSpec) -> None:
:raises KafkaRestProxyError: Kafka REST proxy error
"""
response = httpx.post(
url=f"{self.url}/v3/clusters/{self.cluster_id}/topics",
url=f"{self.url!s}v3/clusters/{self.cluster_id}/topics",
headers=HEADERS,
json=topic_spec.model_dump(exclude_none=True),
)
Expand All @@ -88,7 +88,7 @@ def delete_topic(self, topic_name: str) -> None:
:raises KafkaRestProxyError: Kafka REST proxy error
"""
response = httpx.delete(
url=f"{self.url}/v3/clusters/{self.cluster_id}/topics/{topic_name}",
url=f"{self.url!s}v3/clusters/{self.cluster_id}/topics/{topic_name}",
headers=HEADERS,
)
if response.status_code == httpx.codes.NO_CONTENT:
Expand All @@ -109,7 +109,7 @@ def get_topic(self, topic_name: str) -> TopicResponse:
:return: Response of the get topic API.
"""
response = httpx.get(
url=f"{self.url}/v3/clusters/{self.cluster_id}/topics/{topic_name}",
url=f"{self.url!s}v3/clusters/{self.cluster_id}/topics/{topic_name}",
headers=HEADERS,
)
if response.status_code == httpx.codes.OK:
Expand Down Expand Up @@ -139,7 +139,7 @@ def get_topic_config(self, topic_name: str) -> TopicConfigResponse:
:return: The topic configuration.
"""
response = httpx.get(
url=f"{self.url}/v3/clusters/{self.cluster_id}/topics/{topic_name}/configs",
url=f"{self.url!s}v3/clusters/{self.cluster_id}/topics/{topic_name}/configs",
headers=HEADERS,
)

Expand Down Expand Up @@ -169,7 +169,7 @@ def batch_alter_topic_config(self, topic_name: str, json_body: list[dict]) -> No
:raises KafkaRestProxyError: Kafka REST proxy error
"""
response = httpx.post(
url=f"{self.url}/v3/clusters/{self.cluster_id}/topics/{topic_name}/configs:alter",
url=f"{self.url!s}v3/clusters/{self.cluster_id}/topics/{topic_name}/configs:alter",
headers=HEADERS,
json={"data": json_body},
)
Expand All @@ -189,7 +189,7 @@ def get_broker_config(self) -> BrokerConfigResponse:
:return: The broker configuration.
"""
response = httpx.get(
url=f"{self.url}/v3/clusters/{self.cluster_id}/brokers/-/configs",
url=f"{self.url!s}v3/clusters/{self.cluster_id}/brokers/-/configs",
headers=HEADERS,
)

Expand Down
Loading

0 comments on commit 78001a2

Please sign in to comment.