Skip to content

Commit

Permalink
Allow custom timeout for external services (#485)
Browse files Browse the repository at this point in the history
  • Loading branch information
disrupted authored May 2, 2024
1 parent 3f15ff6 commit 95e9e75
Show file tree
Hide file tree
Showing 14 changed files with 123 additions and 73 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ the [documentation](https://bakdata.github.io/kpops/latest).
KPOps comes as a [PyPI package](https://pypi.org/project/kpops/).
You can install it with [pip](https://github.com/pypa/pip):

```shell
```sh
pip install kpops
```

Expand Down
13 changes: 9 additions & 4 deletions docs/docs/resources/variables/config_env_vars.env
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,21 @@ KPOPS_SCHEMA_REGISTRY__ENABLED=False
# schema_registry.url
# Address of the Schema Registry.
KPOPS_SCHEMA_REGISTRY__URL=http://localhost:8081/
# schema_registry.timeout
# Operation timeout in seconds.
KPOPS_SCHEMA_REGISTRY__TIMEOUT=30
# kafka_rest.url
# Address of the Kafka REST Proxy.
KPOPS_KAFKA_REST__URL=http://localhost:8082/
# kafka_rest.timeout
# Operation timeout in seconds.
KPOPS_KAFKA_REST__TIMEOUT=30
# kafka_connect.url
# Address of Kafka Connect.
KPOPS_KAFKA_CONNECT__URL=http://localhost:8083/
# timeout
# The timeout in seconds that specifies when actions like deletion or
# deploy timeout.
KPOPS_TIMEOUT=300
# kafka_connect.timeout
# Operation timeout in seconds.
KPOPS_KAFKA_CONNECT__TIMEOUT=30
# create_namespace
# Flag for `helm upgrade --install`. Create the release namespace if
# not present.
Expand Down
40 changes: 21 additions & 19 deletions docs/docs/resources/variables/config_env_vars.md
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
These variables take precedence over the settings in `config.yaml`. Variables marked as required can instead be set in the global config.

| Name | Default Value |Required| Description | Setting name |
|--------------------------------------------------|----------------------------------------|--------|-----------------------------------------------------------------------------------|-------------------------------------------|
|KPOPS_COMPONENTS_MODULE | |False |Custom Python module defining project-specific KPOps components |components_module |
|KPOPS_PIPELINE_BASE_DIR |. |False |Base directory to the pipelines (default is current working directory) |pipeline_base_dir |
|KPOPS_KAFKA_BROKERS | |True |The comma separated Kafka brokers address. |kafka_brokers |
|KPOPS_DEFAULTS_FILENAME_PREFIX |defaults |False |The name of the defaults file and the prefix of the defaults environment file. |defaults_filename_prefix |
|KPOPS_TOPIC_NAME_CONFIG__DEFAULT_OUTPUT_TOPIC_NAME|${pipeline.name}-${component.name} |False |Configures the value for the variable ${output_topic_name} |topic_name_config.default_output_topic_name|
|KPOPS_TOPIC_NAME_CONFIG__DEFAULT_ERROR_TOPIC_NAME |${pipeline.name}-${component.name}-error|False |Configures the value for the variable ${error_topic_name} |topic_name_config.default_error_topic_name |
|KPOPS_SCHEMA_REGISTRY__ENABLED |False |False |Whether the Schema Registry handler should be initialized. |schema_registry.enabled |
|KPOPS_SCHEMA_REGISTRY__URL |http://localhost:8081/ |False |Address of the Schema Registry. |schema_registry.url |
|KPOPS_KAFKA_REST__URL |http://localhost:8082/ |False |Address of the Kafka REST Proxy. |kafka_rest.url |
|KPOPS_KAFKA_CONNECT__URL |http://localhost:8083/ |False |Address of Kafka Connect. |kafka_connect.url |
|KPOPS_TIMEOUT |300 |False |The timeout in seconds that specifies when actions like deletion or deploy timeout.|timeout |
|KPOPS_CREATE_NAMESPACE |False |False |Flag for `helm upgrade --install`. Create the release namespace if not present. |create_namespace |
|KPOPS_HELM_CONFIG__CONTEXT | |False |Name of kubeconfig context (`--kube-context`) |helm_config.context |
|KPOPS_HELM_CONFIG__DEBUG |False |False |Run Helm in Debug mode |helm_config.debug |
|KPOPS_HELM_CONFIG__API_VERSION | |False |Kubernetes API version used for `Capabilities.APIVersions` |helm_config.api_version |
|KPOPS_HELM_DIFF_CONFIG__IGNORE | |True |Set of keys that should not be checked. |helm_diff_config.ignore |
|KPOPS_RETAIN_CLEAN_JOBS |False |False |Whether to retain clean up jobs in the cluster or uninstall the, after completion. |retain_clean_jobs |
| Name | Default Value |Required| Description | Setting name |
|--------------------------------------------------|----------------------------------------|--------|----------------------------------------------------------------------------------|-------------------------------------------|
|KPOPS_COMPONENTS_MODULE | |False |Custom Python module defining project-specific KPOps components |components_module |
|KPOPS_PIPELINE_BASE_DIR |. |False |Base directory to the pipelines (default is current working directory) |pipeline_base_dir |
|KPOPS_KAFKA_BROKERS | |True |The comma separated Kafka brokers address. |kafka_brokers |
|KPOPS_DEFAULTS_FILENAME_PREFIX |defaults |False |The name of the defaults file and the prefix of the defaults environment file. |defaults_filename_prefix |
|KPOPS_TOPIC_NAME_CONFIG__DEFAULT_OUTPUT_TOPIC_NAME|${pipeline.name}-${component.name} |False |Configures the value for the variable ${output_topic_name} |topic_name_config.default_output_topic_name|
|KPOPS_TOPIC_NAME_CONFIG__DEFAULT_ERROR_TOPIC_NAME |${pipeline.name}-${component.name}-error|False |Configures the value for the variable ${error_topic_name} |topic_name_config.default_error_topic_name |
|KPOPS_SCHEMA_REGISTRY__ENABLED |False |False |Whether the Schema Registry handler should be initialized. |schema_registry.enabled |
|KPOPS_SCHEMA_REGISTRY__URL |http://localhost:8081/ |False |Address of the Schema Registry. |schema_registry.url |
|KPOPS_SCHEMA_REGISTRY__TIMEOUT |30 |False |Operation timeout in seconds. |schema_registry.timeout |
|KPOPS_KAFKA_REST__URL |http://localhost:8082/ |False |Address of the Kafka REST Proxy. |kafka_rest.url |
|KPOPS_KAFKA_REST__TIMEOUT |30 |False |Operation timeout in seconds. |kafka_rest.timeout |
|KPOPS_KAFKA_CONNECT__URL |http://localhost:8083/ |False |Address of Kafka Connect. |kafka_connect.url |
|KPOPS_KAFKA_CONNECT__TIMEOUT |30 |False |Operation timeout in seconds. |kafka_connect.timeout |
|KPOPS_CREATE_NAMESPACE |False |False |Flag for `helm upgrade --install`. Create the release namespace if not present. |create_namespace |
|KPOPS_HELM_CONFIG__CONTEXT | |False |Name of kubeconfig context (`--kube-context`) |helm_config.context |
|KPOPS_HELM_CONFIG__DEBUG |False |False |Run Helm in Debug mode |helm_config.debug |
|KPOPS_HELM_CONFIG__API_VERSION | |False |Kubernetes API version used for `Capabilities.APIVersions` |helm_config.api_version |
|KPOPS_HELM_DIFF_CONFIG__IGNORE | |True |Set of keys that should not be checked. |helm_diff_config.ignore |
|KPOPS_RETAIN_CLEAN_JOBS |False |False |Whether to retain clean up jobs in the cluster or uninstall the, after completion.|retain_clean_jobs |
48 changes: 42 additions & 6 deletions docs/docs/schema/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,19 @@
"additionalProperties": false,
"description": "Configuration for Kafka Connect.",
"properties": {
"timeout": {
"anyOf": [
{
"type": "integer"
},
{
"type": "number"
}
],
"default": 30,
"description": "Operation timeout in seconds.",
"title": "Timeout"
},
"url": {
"default": "http://localhost:8083/",
"description": "Address of Kafka Connect.",
Expand All @@ -80,6 +93,19 @@
"additionalProperties": false,
"description": "Configuration for Kafka REST Proxy.",
"properties": {
"timeout": {
"anyOf": [
{
"type": "integer"
},
{
"type": "number"
}
],
"default": 30,
"description": "Operation timeout in seconds.",
"title": "Timeout"
},
"url": {
"default": "http://localhost:8082/",
"description": "Address of the Kafka REST Proxy.",
Expand All @@ -102,6 +128,19 @@
"title": "Enabled",
"type": "boolean"
},
"timeout": {
"anyOf": [
{
"type": "integer"
},
{
"type": "number"
}
],
"default": 30,
"description": "Operation timeout in seconds.",
"title": "Timeout"
},
"url": {
"default": "http://localhost:8081/",
"description": "Address of the Schema Registry.",
Expand Down Expand Up @@ -202,6 +241,7 @@
}
],
"default": {
"timeout": 30,
"url": "http://localhost:8083/"
},
"description": "Configuration for Kafka Connect."
Expand All @@ -213,6 +253,7 @@
}
],
"default": {
"timeout": 30,
"url": "http://localhost:8082/"
},
"description": "Configuration for Kafka REST Proxy."
Expand All @@ -238,16 +279,11 @@
],
"default": {
"enabled": false,
"timeout": 30,
"url": "http://localhost:8081/"
},
"description": "Configuration for Schema Registry."
},
"timeout": {
"default": 300,
"description": "The timeout in seconds that specifies when actions like deletion or deploy timeout.",
"title": "Timeout",
"type": "integer"
},
"topic_name_config": {
"allOf": [
{
Expand Down
22 changes: 22 additions & 0 deletions docs/docs/user/migration-guide/v4-v5.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Migrate from V4 to V5

## [Allow custom timeout for external services](https://github.com/bakdata/kpops/pull/485)

The global `timeout` setting has been removed. Instead, an individual timeout can be set for each external service. The default is 30 seconds.

#### config.yaml

```diff
- timeout: 300

kafka_rest:
url: "http://my-custom-rest.url:8082"
+ timeout: 30
kafka_connect:
url: "http://my-custom-connect.url:8083"
+ timeout: 30
schema_registry:
enabled: true
url: "http://my-custom-sr.url:8081"
+ timeout: 30
```
1 change: 1 addition & 0 deletions docs/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ nav:
- Migrate from v1 to v2: user/migration-guide/v1-v2.md
- Migrate from v2 to v3: user/migration-guide/v2-v3.md
- Migrate from v3 to v4: user/migration-guide/v3-v4.md
- Migrate from v4 to v5: user/migration-guide/v4-v5.md
- CLI usage: user/references/cli-commands.md
- Editor integration: user/references/editor-integration.md
- CI integration:
Expand Down
2 changes: 1 addition & 1 deletion kpops/component_handlers/kafka_connect/connect_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class ConnectWrapper:

def __init__(self, config: KafkaConnectConfig) -> None:
self._config: KafkaConnectConfig = config
self._client = httpx.AsyncClient()
self._client = httpx.AsyncClient(timeout=config.timeout)

@property
def url(self) -> AnyHttpUrl:
Expand Down
35 changes: 6 additions & 29 deletions kpops/component_handlers/kafka_connect/kafka_connect_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
ConnectorNotFoundException,
ConnectorStateException,
)
from kpops.component_handlers.kafka_connect.timeout import timeout
from kpops.utils.colorify import magentaify
from kpops.utils.dict_differ import render_diff

Expand All @@ -25,13 +24,8 @@


class KafkaConnectHandler:
def __init__(
self,
connect_wrapper: ConnectWrapper,
timeout: int,
):
def __init__(self, connect_wrapper: ConnectWrapper):
self._connect_wrapper = connect_wrapper
self._timeout = timeout

async def create_connector(
self, connector_config: KafkaConnectorConfig, *, dry_run: bool
Expand All @@ -47,21 +41,11 @@ async def create_connector(
await self.__dry_run_connector_creation(connector_config)
else:
try:
await timeout(
self._connect_wrapper.get_connector(connector_config.name),
secs=self._timeout,
)

await timeout(
self._connect_wrapper.update_connector_config(connector_config),
secs=self._timeout,
)
await self._connect_wrapper.get_connector(connector_config.name)
await self._connect_wrapper.update_connector_config(connector_config)

except ConnectorNotFoundException:
await timeout(
self._connect_wrapper.create_connector(connector_config),
secs=self._timeout,
)
await self._connect_wrapper.create_connector(connector_config)

async def destroy_connector(self, connector_name: str, *, dry_run: bool) -> None:
"""Delete a connector resource from the cluster.
Expand All @@ -73,15 +57,9 @@ async def destroy_connector(self, connector_name: str, *, dry_run: bool) -> None
await self.__dry_run_connector_deletion(connector_name)
else:
try:
await timeout(
self._connect_wrapper.get_connector(connector_name),
secs=self._timeout,
)
await self._connect_wrapper.get_connector(connector_name)
await self._connect_wrapper.delete_connector(connector_name)

await timeout(
self._connect_wrapper.delete_connector(connector_name),
secs=self._timeout,
)
except ConnectorNotFoundException:
log.warning(
f"Connector Destruction: the connector {connector_name} does not exist. Skipping."
Expand Down Expand Up @@ -138,5 +116,4 @@ async def __dry_run_connector_deletion(self, connector_name: str) -> None:
def from_kpops_config(cls, config: KpopsConfig) -> Self:
return cls(
connect_wrapper=ConnectWrapper(config.kafka_connect),
timeout=config.timeout,
)
3 changes: 2 additions & 1 deletion kpops/component_handlers/schema_handler/schema_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
class SchemaHandler:
def __init__(self, kpops_config: KpopsConfig) -> None:
self.schema_registry_client = AsyncSchemaRegistryClient(
str(kpops_config.schema_registry.url)
str(kpops_config.schema_registry.url),
timeout=kpops_config.schema_registry.timeout, # pyright: ignore[reportArgumentType]
)
self.components_module = kpops_config.components_module

Expand Down
4 changes: 2 additions & 2 deletions kpops/component_handlers/topic/proxy_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ class ProxyWrapper:

def __init__(self, config: KafkaRestConfig) -> None:
self._config: KafkaRestConfig = config
self._client = httpx.AsyncClient()
self._sync_client = httpx.Client()
self._client = httpx.AsyncClient(timeout=config.timeout)
self._sync_client = httpx.Client(timeout=config.timeout)

@cached_property
def cluster_id(self) -> str:
Expand Down
Loading

0 comments on commit 95e9e75

Please sign in to comment.