From 95e9e7596b3c904e6899e6fc0cc6bdcfa5751959 Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Thu, 2 May 2024 10:31:38 +0200 Subject: [PATCH] Allow custom timeout for external services (#485) --- README.md | 2 +- .../resources/variables/config_env_vars.env | 13 +++-- .../resources/variables/config_env_vars.md | 40 ++++++++-------- docs/docs/schema/config.json | 48 ++++++++++++++++--- docs/docs/user/migration-guide/v4-v5.md | 22 +++++++++ docs/mkdocs.yml | 1 + .../kafka_connect/connect_wrapper.py | 2 +- .../kafka_connect/kafka_connect_handler.py | 35 +++----------- .../schema_handler/schema_handler.py | 3 +- .../component_handlers/topic/proxy_wrapper.py | 4 +- kpops/config.py | 13 +++-- .../test_init_project/config_include_opt.yaml | 4 +- tests/cli/test_kpops_config.py | 4 +- .../kafka_connect/test_connect_handler.py | 5 +- 14 files changed, 123 insertions(+), 73 deletions(-) create mode 100644 docs/docs/user/migration-guide/v4-v5.md diff --git a/README.md b/README.md index 5241c662a..cc9d8ea32 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,7 @@ the [documentation](https://bakdata.github.io/kpops/latest). KPOps comes as a [PyPI package](https://pypi.org/project/kpops/). You can install it with [pip](https://github.com/pypa/pip): -```shell +```sh pip install kpops ``` diff --git a/docs/docs/resources/variables/config_env_vars.env b/docs/docs/resources/variables/config_env_vars.env index 8c9d13f7b..12195fe9d 100644 --- a/docs/docs/resources/variables/config_env_vars.env +++ b/docs/docs/resources/variables/config_env_vars.env @@ -30,16 +30,21 @@ KPOPS_SCHEMA_REGISTRY__ENABLED=False # schema_registry.url # Address of the Schema Registry. KPOPS_SCHEMA_REGISTRY__URL=http://localhost:8081/ +# schema_registry.timeout +# Operation timeout in seconds. +KPOPS_SCHEMA_REGISTRY__TIMEOUT=30 # kafka_rest.url # Address of the Kafka REST Proxy. KPOPS_KAFKA_REST__URL=http://localhost:8082/ +# kafka_rest.timeout +# Operation timeout in seconds. +KPOPS_KAFKA_REST__TIMEOUT=30 # kafka_connect.url # Address of Kafka Connect. KPOPS_KAFKA_CONNECT__URL=http://localhost:8083/ -# timeout -# The timeout in seconds that specifies when actions like deletion or -# deploy timeout. -KPOPS_TIMEOUT=300 +# kafka_connect.timeout +# Operation timeout in seconds. +KPOPS_KAFKA_CONNECT__TIMEOUT=30 # create_namespace # Flag for `helm upgrade --install`. Create the release namespace if # not present. diff --git a/docs/docs/resources/variables/config_env_vars.md b/docs/docs/resources/variables/config_env_vars.md index 1a0bfaf73..62871c466 100644 --- a/docs/docs/resources/variables/config_env_vars.md +++ b/docs/docs/resources/variables/config_env_vars.md @@ -1,21 +1,23 @@ These variables take precedence over the settings in `config.yaml`. Variables marked as required can instead be set in the global config. -| Name | Default Value |Required| Description | Setting name | -|--------------------------------------------------|----------------------------------------|--------|-----------------------------------------------------------------------------------|-------------------------------------------| -|KPOPS_COMPONENTS_MODULE | |False |Custom Python module defining project-specific KPOps components |components_module | -|KPOPS_PIPELINE_BASE_DIR |. |False |Base directory to the pipelines (default is current working directory) |pipeline_base_dir | -|KPOPS_KAFKA_BROKERS | |True |The comma separated Kafka brokers address. |kafka_brokers | -|KPOPS_DEFAULTS_FILENAME_PREFIX |defaults |False |The name of the defaults file and the prefix of the defaults environment file. |defaults_filename_prefix | -|KPOPS_TOPIC_NAME_CONFIG__DEFAULT_OUTPUT_TOPIC_NAME|${pipeline.name}-${component.name} |False |Configures the value for the variable ${output_topic_name} |topic_name_config.default_output_topic_name| -|KPOPS_TOPIC_NAME_CONFIG__DEFAULT_ERROR_TOPIC_NAME |${pipeline.name}-${component.name}-error|False |Configures the value for the variable ${error_topic_name} |topic_name_config.default_error_topic_name | -|KPOPS_SCHEMA_REGISTRY__ENABLED |False |False |Whether the Schema Registry handler should be initialized. |schema_registry.enabled | -|KPOPS_SCHEMA_REGISTRY__URL |http://localhost:8081/ |False |Address of the Schema Registry. |schema_registry.url | -|KPOPS_KAFKA_REST__URL |http://localhost:8082/ |False |Address of the Kafka REST Proxy. |kafka_rest.url | -|KPOPS_KAFKA_CONNECT__URL |http://localhost:8083/ |False |Address of Kafka Connect. |kafka_connect.url | -|KPOPS_TIMEOUT |300 |False |The timeout in seconds that specifies when actions like deletion or deploy timeout.|timeout | -|KPOPS_CREATE_NAMESPACE |False |False |Flag for `helm upgrade --install`. Create the release namespace if not present. |create_namespace | -|KPOPS_HELM_CONFIG__CONTEXT | |False |Name of kubeconfig context (`--kube-context`) |helm_config.context | -|KPOPS_HELM_CONFIG__DEBUG |False |False |Run Helm in Debug mode |helm_config.debug | -|KPOPS_HELM_CONFIG__API_VERSION | |False |Kubernetes API version used for `Capabilities.APIVersions` |helm_config.api_version | -|KPOPS_HELM_DIFF_CONFIG__IGNORE | |True |Set of keys that should not be checked. |helm_diff_config.ignore | -|KPOPS_RETAIN_CLEAN_JOBS |False |False |Whether to retain clean up jobs in the cluster or uninstall the, after completion. |retain_clean_jobs | +| Name | Default Value |Required| Description | Setting name | +|--------------------------------------------------|----------------------------------------|--------|----------------------------------------------------------------------------------|-------------------------------------------| +|KPOPS_COMPONENTS_MODULE | |False |Custom Python module defining project-specific KPOps components |components_module | +|KPOPS_PIPELINE_BASE_DIR |. |False |Base directory to the pipelines (default is current working directory) |pipeline_base_dir | +|KPOPS_KAFKA_BROKERS | |True |The comma separated Kafka brokers address. |kafka_brokers | +|KPOPS_DEFAULTS_FILENAME_PREFIX |defaults |False |The name of the defaults file and the prefix of the defaults environment file. |defaults_filename_prefix | +|KPOPS_TOPIC_NAME_CONFIG__DEFAULT_OUTPUT_TOPIC_NAME|${pipeline.name}-${component.name} |False |Configures the value for the variable ${output_topic_name} |topic_name_config.default_output_topic_name| +|KPOPS_TOPIC_NAME_CONFIG__DEFAULT_ERROR_TOPIC_NAME |${pipeline.name}-${component.name}-error|False |Configures the value for the variable ${error_topic_name} |topic_name_config.default_error_topic_name | +|KPOPS_SCHEMA_REGISTRY__ENABLED |False |False |Whether the Schema Registry handler should be initialized. |schema_registry.enabled | +|KPOPS_SCHEMA_REGISTRY__URL |http://localhost:8081/ |False |Address of the Schema Registry. |schema_registry.url | +|KPOPS_SCHEMA_REGISTRY__TIMEOUT |30 |False |Operation timeout in seconds. |schema_registry.timeout | +|KPOPS_KAFKA_REST__URL |http://localhost:8082/ |False |Address of the Kafka REST Proxy. |kafka_rest.url | +|KPOPS_KAFKA_REST__TIMEOUT |30 |False |Operation timeout in seconds. |kafka_rest.timeout | +|KPOPS_KAFKA_CONNECT__URL |http://localhost:8083/ |False |Address of Kafka Connect. |kafka_connect.url | +|KPOPS_KAFKA_CONNECT__TIMEOUT |30 |False |Operation timeout in seconds. |kafka_connect.timeout | +|KPOPS_CREATE_NAMESPACE |False |False |Flag for `helm upgrade --install`. Create the release namespace if not present. |create_namespace | +|KPOPS_HELM_CONFIG__CONTEXT | |False |Name of kubeconfig context (`--kube-context`) |helm_config.context | +|KPOPS_HELM_CONFIG__DEBUG |False |False |Run Helm in Debug mode |helm_config.debug | +|KPOPS_HELM_CONFIG__API_VERSION | |False |Kubernetes API version used for `Capabilities.APIVersions` |helm_config.api_version | +|KPOPS_HELM_DIFF_CONFIG__IGNORE | |True |Set of keys that should not be checked. |helm_diff_config.ignore | +|KPOPS_RETAIN_CLEAN_JOBS |False |False |Whether to retain clean up jobs in the cluster or uninstall the, after completion.|retain_clean_jobs | diff --git a/docs/docs/schema/config.json b/docs/docs/schema/config.json index 33871a91c..f2a070842 100644 --- a/docs/docs/schema/config.json +++ b/docs/docs/schema/config.json @@ -64,6 +64,19 @@ "additionalProperties": false, "description": "Configuration for Kafka Connect.", "properties": { + "timeout": { + "anyOf": [ + { + "type": "integer" + }, + { + "type": "number" + } + ], + "default": 30, + "description": "Operation timeout in seconds.", + "title": "Timeout" + }, "url": { "default": "http://localhost:8083/", "description": "Address of Kafka Connect.", @@ -80,6 +93,19 @@ "additionalProperties": false, "description": "Configuration for Kafka REST Proxy.", "properties": { + "timeout": { + "anyOf": [ + { + "type": "integer" + }, + { + "type": "number" + } + ], + "default": 30, + "description": "Operation timeout in seconds.", + "title": "Timeout" + }, "url": { "default": "http://localhost:8082/", "description": "Address of the Kafka REST Proxy.", @@ -102,6 +128,19 @@ "title": "Enabled", "type": "boolean" }, + "timeout": { + "anyOf": [ + { + "type": "integer" + }, + { + "type": "number" + } + ], + "default": 30, + "description": "Operation timeout in seconds.", + "title": "Timeout" + }, "url": { "default": "http://localhost:8081/", "description": "Address of the Schema Registry.", @@ -202,6 +241,7 @@ } ], "default": { + "timeout": 30, "url": "http://localhost:8083/" }, "description": "Configuration for Kafka Connect." @@ -213,6 +253,7 @@ } ], "default": { + "timeout": 30, "url": "http://localhost:8082/" }, "description": "Configuration for Kafka REST Proxy." @@ -238,16 +279,11 @@ ], "default": { "enabled": false, + "timeout": 30, "url": "http://localhost:8081/" }, "description": "Configuration for Schema Registry." }, - "timeout": { - "default": 300, - "description": "The timeout in seconds that specifies when actions like deletion or deploy timeout.", - "title": "Timeout", - "type": "integer" - }, "topic_name_config": { "allOf": [ { diff --git a/docs/docs/user/migration-guide/v4-v5.md b/docs/docs/user/migration-guide/v4-v5.md new file mode 100644 index 000000000..c6850ec64 --- /dev/null +++ b/docs/docs/user/migration-guide/v4-v5.md @@ -0,0 +1,22 @@ +# Migrate from V4 to V5 + +## [Allow custom timeout for external services](https://github.com/bakdata/kpops/pull/485) + +The global `timeout` setting has been removed. Instead, an individual timeout can be set for each external service. The default is 30 seconds. + +#### config.yaml + +```diff +- timeout: 300 + + kafka_rest: + url: "http://my-custom-rest.url:8082" ++ timeout: 30 + kafka_connect: + url: "http://my-custom-connect.url:8083" ++ timeout: 30 + schema_registry: + enabled: true + url: "http://my-custom-sr.url:8081" ++ timeout: 30 +``` diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index ac677a8e7..4940ede82 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -112,6 +112,7 @@ nav: - Migrate from v1 to v2: user/migration-guide/v1-v2.md - Migrate from v2 to v3: user/migration-guide/v2-v3.md - Migrate from v3 to v4: user/migration-guide/v3-v4.md + - Migrate from v4 to v5: user/migration-guide/v4-v5.md - CLI usage: user/references/cli-commands.md - Editor integration: user/references/editor-integration.md - CI integration: diff --git a/kpops/component_handlers/kafka_connect/connect_wrapper.py b/kpops/component_handlers/kafka_connect/connect_wrapper.py index dba4130d5..5c7914f54 100644 --- a/kpops/component_handlers/kafka_connect/connect_wrapper.py +++ b/kpops/component_handlers/kafka_connect/connect_wrapper.py @@ -31,7 +31,7 @@ class ConnectWrapper: def __init__(self, config: KafkaConnectConfig) -> None: self._config: KafkaConnectConfig = config - self._client = httpx.AsyncClient() + self._client = httpx.AsyncClient(timeout=config.timeout) @property def url(self) -> AnyHttpUrl: diff --git a/kpops/component_handlers/kafka_connect/kafka_connect_handler.py b/kpops/component_handlers/kafka_connect/kafka_connect_handler.py index f3f8c2596..dcfd41b75 100644 --- a/kpops/component_handlers/kafka_connect/kafka_connect_handler.py +++ b/kpops/component_handlers/kafka_connect/kafka_connect_handler.py @@ -8,7 +8,6 @@ ConnectorNotFoundException, ConnectorStateException, ) -from kpops.component_handlers.kafka_connect.timeout import timeout from kpops.utils.colorify import magentaify from kpops.utils.dict_differ import render_diff @@ -25,13 +24,8 @@ class KafkaConnectHandler: - def __init__( - self, - connect_wrapper: ConnectWrapper, - timeout: int, - ): + def __init__(self, connect_wrapper: ConnectWrapper): self._connect_wrapper = connect_wrapper - self._timeout = timeout async def create_connector( self, connector_config: KafkaConnectorConfig, *, dry_run: bool @@ -47,21 +41,11 @@ async def create_connector( await self.__dry_run_connector_creation(connector_config) else: try: - await timeout( - self._connect_wrapper.get_connector(connector_config.name), - secs=self._timeout, - ) - - await timeout( - self._connect_wrapper.update_connector_config(connector_config), - secs=self._timeout, - ) + await self._connect_wrapper.get_connector(connector_config.name) + await self._connect_wrapper.update_connector_config(connector_config) except ConnectorNotFoundException: - await timeout( - self._connect_wrapper.create_connector(connector_config), - secs=self._timeout, - ) + await self._connect_wrapper.create_connector(connector_config) async def destroy_connector(self, connector_name: str, *, dry_run: bool) -> None: """Delete a connector resource from the cluster. @@ -73,15 +57,9 @@ async def destroy_connector(self, connector_name: str, *, dry_run: bool) -> None await self.__dry_run_connector_deletion(connector_name) else: try: - await timeout( - self._connect_wrapper.get_connector(connector_name), - secs=self._timeout, - ) + await self._connect_wrapper.get_connector(connector_name) + await self._connect_wrapper.delete_connector(connector_name) - await timeout( - self._connect_wrapper.delete_connector(connector_name), - secs=self._timeout, - ) except ConnectorNotFoundException: log.warning( f"Connector Destruction: the connector {connector_name} does not exist. Skipping." @@ -138,5 +116,4 @@ async def __dry_run_connector_deletion(self, connector_name: str) -> None: def from_kpops_config(cls, config: KpopsConfig) -> Self: return cls( connect_wrapper=ConnectWrapper(config.kafka_connect), - timeout=config.timeout, ) diff --git a/kpops/component_handlers/schema_handler/schema_handler.py b/kpops/component_handlers/schema_handler/schema_handler.py index 1ad11bb18..936ba0223 100644 --- a/kpops/component_handlers/schema_handler/schema_handler.py +++ b/kpops/component_handlers/schema_handler/schema_handler.py @@ -26,7 +26,8 @@ class SchemaHandler: def __init__(self, kpops_config: KpopsConfig) -> None: self.schema_registry_client = AsyncSchemaRegistryClient( - str(kpops_config.schema_registry.url) + str(kpops_config.schema_registry.url), + timeout=kpops_config.schema_registry.timeout, # pyright: ignore[reportArgumentType] ) self.components_module = kpops_config.components_module diff --git a/kpops/component_handlers/topic/proxy_wrapper.py b/kpops/component_handlers/topic/proxy_wrapper.py index 9451966fc..334523651 100644 --- a/kpops/component_handlers/topic/proxy_wrapper.py +++ b/kpops/component_handlers/topic/proxy_wrapper.py @@ -32,8 +32,8 @@ class ProxyWrapper: def __init__(self, config: KafkaRestConfig) -> None: self._config: KafkaRestConfig = config - self._client = httpx.AsyncClient() - self._sync_client = httpx.Client() + self._client = httpx.AsyncClient(timeout=config.timeout) + self._sync_client = httpx.Client(timeout=config.timeout) @cached_property def cluster_id(self) -> str: diff --git a/kpops/config.py b/kpops/config.py index 28f407ce1..f82068c96 100644 --- a/kpops/config.py +++ b/kpops/config.py @@ -41,6 +41,9 @@ class SchemaRegistryConfig(BaseSettings): default=TypeAdapter(AnyHttpUrl).validate_python("http://localhost:8081"), # pyright: ignore[reportCallIssue] description="Address of the Schema Registry.", ) + timeout: int | float = Field( + default=30, description="Operation timeout in seconds." + ) class KafkaRestConfig(BaseSettings): @@ -50,6 +53,9 @@ class KafkaRestConfig(BaseSettings): default=TypeAdapter(AnyHttpUrl).validate_python("http://localhost:8082"), # pyright: ignore[reportCallIssue] description="Address of the Kafka REST Proxy.", ) + timeout: int | float = Field( + default=30, description="Operation timeout in seconds." + ) class KafkaConnectConfig(BaseSettings): @@ -59,6 +65,9 @@ class KafkaConnectConfig(BaseSettings): default=TypeAdapter(AnyHttpUrl).validate_python("http://localhost:8083"), # pyright: ignore[reportCallIssue] description="Address of Kafka Connect.", ) + timeout: int | float = Field( + default=30, description="Operation timeout in seconds." + ) class KpopsConfig(BaseSettings): @@ -99,10 +108,6 @@ class KpopsConfig(BaseSettings): default=KafkaConnectConfig(), description=describe_object(KafkaConnectConfig.__doc__), ) - timeout: int = Field( - default=300, - description="The timeout in seconds that specifies when actions like deletion or deploy timeout.", - ) create_namespace: bool = Field( default=False, description="Flag for `helm upgrade --install`. Create the release namespace if not present.", diff --git a/tests/cli/snapshots/test_init/test_init_project/config_include_opt.yaml b/tests/cli/snapshots/test_init/test_init_project/config_include_opt.yaml index 337de0d96..ae708fef2 100644 --- a/tests/cli/snapshots/test_init/test_init_project/config_include_opt.yaml +++ b/tests/cli/snapshots/test_init/test_init_project/config_include_opt.yaml @@ -13,15 +13,17 @@ helm_config: debug: false helm_diff_config: {} kafka_connect: + timeout: 30 url: http://localhost:8083/ kafka_rest: + timeout: 30 url: http://localhost:8082/ pipeline_base_dir: . retain_clean_jobs: false schema_registry: enabled: false + timeout: 30 url: http://localhost:8081/ -timeout: 300 topic_name_config: default_error_topic_name: ${pipeline.name}-${component.name}-error default_output_topic_name: ${pipeline.name}-${component.name} diff --git a/tests/cli/test_kpops_config.py b/tests/cli/test_kpops_config.py index 7d1ed7811..1af05c626 100644 --- a/tests/cli/test_kpops_config.py +++ b/tests/cli/test_kpops_config.py @@ -25,7 +25,9 @@ def test_kpops_config_with_default_values(): assert default_config.schema_registry.url == AnyUrl("http://localhost:8081") assert default_config.kafka_rest.url == AnyUrl("http://localhost:8082") assert default_config.kafka_connect.url == AnyUrl("http://localhost:8083") - assert default_config.timeout == 300 + assert default_config.kafka_rest.timeout == 30 + assert default_config.kafka_connect.timeout == 30 + assert default_config.schema_registry.timeout == 30 assert default_config.create_namespace is False assert default_config.helm_config.context is None assert default_config.helm_config.debug is False diff --git a/tests/component_handlers/kafka_connect/test_connect_handler.py b/tests/component_handlers/kafka_connect/test_connect_handler.py index 216ff78fc..83ce7ae35 100644 --- a/tests/component_handlers/kafka_connect/test_connect_handler.py +++ b/tests/component_handlers/kafka_connect/test_connect_handler.py @@ -52,10 +52,7 @@ def renderer_diff_mock(self, mocker: MockerFixture) -> MagicMock: @staticmethod def connector_handler(connect_wrapper: MagicMock) -> KafkaConnectHandler: - return KafkaConnectHandler( - connect_wrapper=connect_wrapper, - timeout=0, - ) + return KafkaConnectHandler(connect_wrapper=connect_wrapper) @pytest.fixture() def connector_config(self) -> KafkaConnectorConfig: