From c456984921c6656772293000baeddefcedc8006e Mon Sep 17 00:00:00 2001 From: Tommi Vainikainen Date: Wed, 8 Mar 2023 09:36:23 +0200 Subject: [PATCH 1/4] Do not log error when rejecting input due validity --- karapace/rapu.py | 1 - 1 file changed, 1 deletion(-) diff --git a/karapace/rapu.py b/karapace/rapu.py index 4cd81fe59..3b7796ed4 100644 --- a/karapace/rapu.py +++ b/karapace/rapu.py @@ -213,7 +213,6 @@ def check_rest_headers(self, request: HTTPRequest) -> dict: result["requests"] = header_info result["accepts"] = accept_matcher.groupdict() return result - self.log.error("Not acceptable: %r", request.get_header("accept")) http_error( message="HTTP 406 Not Acceptable", content_type=result["content_type"], From 87111372bb109d4ed957f1e689259db714c7a9b4 Mon Sep 17 00:00:00 2001 From: Tommi Vainikainen Date: Wed, 8 Mar 2023 14:16:23 +0200 Subject: [PATCH 2/4] Fix malformed request handling with subject & compatiblity endpoints --- karapace/schema_registry_apis.py | 16 +++++++++++++--- tests/integration/test_schema.py | 12 +++++++++--- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index 98f6fdae6..1532b7838 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -38,6 +38,7 @@ @unique class SchemaErrorCodes(Enum): + HTTP_BAD_REQUEST = HTTPStatus.BAD_REQUEST.value HTTP_NOT_FOUND = HTTPStatus.NOT_FOUND.value HTTP_CONFLICT = HTTPStatus.CONFLICT.value HTTP_UNPROCESSABLE_ENTITY = HTTPStatus.UNPROCESSABLE_ENTITY.value @@ -835,11 +836,11 @@ def _validate_schema_request_body(self, content_type: str, body: Union[dict, Any if not isinstance(body, dict): self.r( body={ - "error_code": SchemaErrorCodes.HTTP_INTERNAL_SERVER_ERROR.value, - "message": "Internal Server Error", + "error_code": SchemaErrorCodes.HTTP_BAD_REQUEST.value, + "message": "Malformed request", }, content_type=content_type, - status=HTTPStatus.INTERNAL_SERVER_ERROR, + status=HTTPStatus.BAD_REQUEST, ) for attr in body: if attr not in {"schema", "schemaType"}: @@ -853,6 +854,15 @@ def _validate_schema_request_body(self, content_type: str, body: Union[dict, Any ) def _validate_schema_type(self, content_type: str, data: JsonData) -> SchemaType: + if not isinstance(data, dict): + self.r( + body={ + "error_code": SchemaErrorCodes.HTTP_BAD_REQUEST.value, + "message": "Malformed request", + }, + content_type=content_type, + status=HTTPStatus.BAD_REQUEST, + ) schema_type_unparsed = data.get("schemaType", SchemaType.AVRO.value) try: schema_type = SchemaType(schema_type_unparsed) diff --git a/tests/integration/test_schema.py b/tests/integration/test_schema.py index 4e38f7196..f39b4c929 100644 --- a/tests/integration/test_schema.py +++ b/tests/integration/test_schema.py @@ -288,6 +288,12 @@ async def test_compatibility_endpoint(registry_async_client: Client, trail: str) subject = create_subject_name_factory(f"test_compatibility_endpoint-{trail}")() schema_name = create_schema_name_factory(f"test_compatibility_endpoint_{trail}")() + res = await registry_async_client.post( + f"subjects/{subject}/versions{trail}", + json=-1, + ) + assert res.status_code == 400 + schema = { "type": "record", "name": schema_name, @@ -2220,9 +2226,9 @@ async def test_schema_body_validation(registry_async_client: Client) -> None: assert res.json()["message"] == "Unrecognized field: invalid_field" # Invalid body type res = await registry_async_client.post(endpoint, json="invalid") - assert res.status_code == 500 - assert res.json()["error_code"] == 500 - assert res.json()["message"] == "Internal Server Error" + assert res.status_code == 400 + assert res.json()["error_code"] == 400 + assert res.json()["message"] == "Malformed request" async def test_version_number_validation(registry_async_client: Client) -> None: From ed90944d3d9889549268549426a7c3a71e42314a Mon Sep 17 00:00:00 2001 From: Tommi Vainikainen Date: Thu, 9 Mar 2023 09:24:44 +0200 Subject: [PATCH 3/4] Handle invalid schema on REST proxy publish If invalid schema is used when publishing into a topic with REST proxy calls, respond with unprocessable entity and explanation gracefully as it is caller error. --- karapace/kafka_rest_apis/__init__.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/karapace/kafka_rest_apis/__init__.py b/karapace/kafka_rest_apis/__init__.py index 82cbef558..2e44065b9 100644 --- a/karapace/kafka_rest_apis/__init__.py +++ b/karapace/kafka_rest_apis/__init__.py @@ -13,6 +13,7 @@ UnknownTopicOrPartitionError, ) from karapace.config import Config, create_client_ssl_context +from karapace.errors import InvalidSchema from karapace.kafka_rest_apis.admin import KafkaRestAdminClient from karapace.kafka_rest_apis.consumer_manager import ConsumerManager from karapace.kafka_rest_apis.error_codes import RESTErrorCodes @@ -779,11 +780,20 @@ async def validate_schema_info(self, data: dict, prefix: str, content_type: str, KafkaRest.r( body={ "error_code": RESTErrorCodes.SCHEMA_RETRIEVAL_ERROR.value, - "message": f"Error when registering schema. format = {schema_type}, subject = {topic}-{prefix}", + "message": f"Error when registering schema. format = {schema_type.value}, subject = {topic}-{prefix}", }, content_type=content_type, status=HTTPStatus.REQUEST_TIMEOUT, ) + except InvalidSchema: + KafkaRest.r( + body={ + "error_code": RESTErrorCodes.INVALID_DATA.value, + "message": f'Invalid schema. format = {schema_type.value}, schema = {data[f"{prefix}_schema"]}', + }, + content_type=content_type, + status=HTTPStatus.UNPROCESSABLE_ENTITY, + ) async def _prepare_records( self, From 90a360a687606d0241311c077d6cdd3b14f0175f Mon Sep 17 00:00:00 2001 From: Tommi Vainikainen Date: Thu, 9 Mar 2023 09:35:13 +0200 Subject: [PATCH 4/4] Handle cancellation due client disconnect Handle client disconnect during the request handling gracefully. E.g. if no primary selected, asyncio.sleep gets cancelled, and aiohttp handles it correctly, and it should not trigger internal server errors. --- karapace/rapu.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/karapace/rapu.py b/karapace/rapu.py index 3b7796ed4..a791e498e 100644 --- a/karapace/rapu.py +++ b/karapace/rapu.py @@ -330,6 +330,9 @@ async def _handle_request( data = ex.body status = ex.status headers = ex.headers + except asyncio.CancelledError: + # Re-raise if aiohttp cancelled the task (e.g. client disconnected) without internal server error + raise except: # pylint: disable=bare-except self.log.exception("Internal server error") headers = {"Content-Type": "application/json"}