diff --git a/src/karapace/config.py b/src/karapace/config.py index e9488b772..a3e03bcd7 100644 --- a/src/karapace/config.py +++ b/src/karapace/config.py @@ -140,6 +140,9 @@ def get_advertised_port(self) -> int: def get_advertised_hostname(self) -> str: return self.advertised_hostname or self.host + def get_address(self) -> str: + return f"{self.host}:{self.port}" + def get_rest_base_uri(self) -> str: return ( self.rest_base_uri diff --git a/src/schema_registry/controller.py b/src/schema_registry/controller.py index d8924d9a7..7de8bb75d 100644 --- a/src/schema_registry/controller.py +++ b/src/schema_registry/controller.py @@ -863,7 +863,7 @@ async def subject_post( except Exception as xx: raise xx - elif not primary_url: + if not primary_url or self.config.get_address() in primary_url: raise no_primary_url_error() else: return await forward_client.forward_request_remote( diff --git a/src/schema_registry/routers/config.py b/src/schema_registry/routers/config.py index 3d1884af6..a21c33d3b 100644 --- a/src/schema_registry/routers/config.py +++ b/src/schema_registry/routers/config.py @@ -6,6 +6,8 @@ from dependency_injector.wiring import inject, Provide from fastapi import APIRouter, Depends, Request from karapace.auth import AuthenticatorAndAuthorizer, Operation, User +from karapace.config import Config +from karapace.container import KarapaceContainer from karapace.forward_client import ForwardClient from karapace.typing import Subject from schema_registry.container import SchemaRegistryContainer @@ -46,6 +48,7 @@ async def config_put( forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), + config: Config = Depends(Provide[KarapaceContainer.config]), ) -> CompatibilityResponse: if authorizer and not authorizer.check_authorization(user, Operation.Write, "Config:"): raise unauthorized() @@ -53,7 +56,7 @@ async def config_put( i_am_primary, primary_url = await schema_registry.get_master() if i_am_primary: return await controller.config_set(compatibility_level_request=compatibility_level_request) - if not primary_url: + if not primary_url or config.get_address() in primary_url: raise no_primary_url_error() return await forward_client.forward_request_remote( request=request, primary_url=primary_url, response_type=CompatibilityResponse @@ -86,6 +89,7 @@ async def config_set_subject( forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), + config: Config = Depends(Provide[KarapaceContainer.config]), ) -> CompatibilityResponse: if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"): raise unauthorized() @@ -93,7 +97,7 @@ async def config_set_subject( i_am_primary, primary_url = await schema_registry.get_master() if i_am_primary: return await controller.config_subject_set(subject=subject, compatibility_level_request=compatibility_level_request) - if not primary_url: + if not primary_url or config.get_address() in primary_url: raise no_primary_url_error() return await forward_client.forward_request_remote( request=request, primary_url=primary_url, response_type=CompatibilityResponse @@ -110,6 +114,7 @@ async def config_delete_subject( forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), + config: Config = Depends(Provide[KarapaceContainer.config]), ) -> CompatibilityResponse: if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"): raise unauthorized() @@ -117,7 +122,7 @@ async def config_delete_subject( i_am_primary, primary_url = await schema_registry.get_master() if i_am_primary: return await controller.config_subject_delete(subject=subject) - if not primary_url: + if not primary_url or config.get_address() in primary_url: raise no_primary_url_error() return await forward_client.forward_request_remote( request=request, primary_url=primary_url, response_type=CompatibilityResponse diff --git a/src/schema_registry/routers/subjects.py b/src/schema_registry/routers/subjects.py index cd5352490..3498a4dcc 100644 --- a/src/schema_registry/routers/subjects.py +++ b/src/schema_registry/routers/subjects.py @@ -6,6 +6,8 @@ from dependency_injector.wiring import inject, Provide from fastapi import APIRouter, Depends, Request from karapace.auth import AuthenticatorAndAuthorizer, Operation, User +from karapace.config import Config +from karapace.container import KarapaceContainer from karapace.forward_client import ForwardClient from karapace.typing import Subject from schema_registry.container import SchemaRegistryContainer @@ -76,6 +78,7 @@ async def subjects_subject_delete( authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), + config: Config = Depends(Provide[KarapaceContainer.config]), ) -> list[int]: if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"): raise unauthorized() @@ -83,7 +86,7 @@ async def subjects_subject_delete( i_am_primary, primary_url = await schema_registry.get_master() if i_am_primary: return await controller.subject_delete(subject=subject, permanent=permanent) - if not primary_url: + if not primary_url or config.get_address() in primary_url: raise no_primary_url_error() return await forward_client.forward_request_remote(request=request, primary_url=primary_url, response_type=list[int]) @@ -157,6 +160,7 @@ async def subjects_subject_version_delete( authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), + config: Config = Depends(Provide[KarapaceContainer.config]), ) -> int: if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"): raise unauthorized() @@ -164,7 +168,7 @@ async def subjects_subject_version_delete( i_am_primary, primary_url = await schema_registry.get_master() if i_am_primary: return await controller.subject_version_delete(subject=subject, version=version, permanent=permanent) - if not primary_url: + if not primary_url or config.get_address() in primary_url: raise no_primary_url_error() return await forward_client.forward_request_remote(request=request, primary_url=primary_url, response_type=int) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 10386e832..0a2f87212 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -284,7 +284,7 @@ async def fixture_rest_async( config.bootstrap_uri = kafka_servers.bootstrap_servers[0] # Use non-default max request size for REST producer. config.producer_max_request_size = REST_PRODUCER_MAX_REQUEST_BYTES - config.waiting_time_before_acting_as_master_ms = 300 + config.waiting_time_before_acting_as_master_ms = 500 rest = KafkaRest(config=config) assert rest.serializer.registry_client @@ -352,7 +352,7 @@ async def fixture_rest_async_novalidation( # Use non-default max request size for REST producer. config.producer_max_request_size = REST_PRODUCER_MAX_REQUEST_BYTES config.name_strategy_validation = False # This should be only difference from rest_async - config.waiting_time_before_acting_as_master_ms = 300 + config.waiting_time_before_acting_as_master_ms = 500 rest = KafkaRest(config=config) assert rest.serializer.registry_client @@ -422,7 +422,7 @@ async def fixture_rest_async_registry_auth( config.registry_port = registry.port config.registry_user = "admin" config.registry_password = "admin" - config.waiting_time_before_acting_as_master_ms = 300 + config.waiting_time_before_acting_as_master_ms = 500 rest = KafkaRest(config=config) try: @@ -477,8 +477,10 @@ async def fixture_registry_async_pair( config1 = Config() config1.bootstrap_uri = kafka_servers.bootstrap_servers[0] + config1.waiting_time_before_acting_as_master_ms = 500 config2 = Config() config2.bootstrap_uri = kafka_servers.bootstrap_servers[0] + config2.waiting_time_before_acting_as_master_ms = 500 async with start_schema_registry_cluster( config_templates=[config1, config2], @@ -507,6 +509,7 @@ async def fixture_registry_cluster( return config = Config() config.bootstrap_uri = kafka_servers.bootstrap_servers[0] + config.waiting_time_before_acting_as_master_ms = 500 user_config = request.param.get("config", {}) if hasattr(request, "param") else {} config.__dict__.update(user_config) @@ -593,6 +596,7 @@ async def fixture_registry_https_endpoint( config = Config() config.bootstrap_uri = kafka_servers.bootstrap_servers[0] + config.waiting_time_before_acting_as_master_ms = 500 config.server_tls_certfile = server_cert config.server_tls_keyfile = server_key @@ -650,6 +654,7 @@ async def fixture_registry_http_auth_endpoint( config = Config() config.bootstrap_uri = kafka_servers.bootstrap_servers[0] + config.waiting_time_before_acting_as_master_ms = 500 config.registry_authfile = "tests/integration/config/karapace.auth.json" async with start_schema_registry_cluster( @@ -703,10 +708,12 @@ async def fixture_registry_async_auth_pair( config1 = Config() config1.bootstrap_uri = kafka_servers.bootstrap_servers[0] + config1.waiting_time_before_acting_as_master_ms = 500 config1.registry_authfile = "tests/integration/config/karapace.auth.json" config2 = Config() config2.bootstrap_uri = kafka_servers.bootstrap_servers[0] + config2.waiting_time_before_acting_as_master_ms = 500 config2.registry_authfile = "tests/integration/config/karapace.auth.json" async with start_schema_registry_cluster( diff --git a/tests/integration/test_master_coordinator.py b/tests/integration/test_master_coordinator.py index f3049be4c..99f69bf2f 100644 --- a/tests/integration/test_master_coordinator.py +++ b/tests/integration/test_master_coordinator.py @@ -10,11 +10,9 @@ from karapace.typing import SchemaReaderStoppper from tests.integration.utils.kafka_server import KafkaServers from tests.integration.utils.network import allocate_port -from tests.integration.utils.rest_client import RetryRestClient from tests.utils import new_random_name import asyncio -import json import pytest @@ -189,104 +187,3 @@ async def test_no_eligible_master(kafka_servers: KafkaServers) -> None: assert mc.schema_coordinator.master_url is None finally: await mc.close() - - -async def test_schema_request_forwarding( - registry_async_pair, - registry_async_retry_client: RetryRestClient, -) -> None: - master_url, slave_url = registry_async_pair - max_tries, counter = 5, 0 - wait_time = 0.5 - subject = new_random_name("subject") - schema = {"type": "string"} - other_schema = {"type": "int"} - # Config updates - for subj_path in [None, subject]: - if subj_path: - path = f"config/{subject}" - else: - path = "config" - for compat in ["FULL", "BACKWARD", "FORWARD", "NONE"]: - resp = await registry_async_retry_client.put(f"{slave_url}/{path}", json={"compatibility": compat}) - assert resp.ok - while True: - assert counter < max_tries, "Compat update not propagated" - resp = await registry_async_retry_client.get(f"{master_url}/{path}") - if not resp.ok: - print(f"Invalid http status code: {resp.status_code}") - continue - data = resp.json() - if "compatibilityLevel" not in data: - print(f"Invalid response: {data}") - counter += 1 - await asyncio.sleep(wait_time) - continue - if data["compatibilityLevel"] != compat: - print(f"Bad compatibility: {data}") - counter += 1 - await asyncio.sleep(wait_time) - continue - break - - # New schema updates, last compatibility is None - for s in [schema, other_schema]: - resp = await registry_async_retry_client.post( - f"{slave_url}/subjects/{subject}/versions", json={"schema": json.dumps(s)} - ) - assert resp.ok - data = resp.json() - assert "id" in data, data - counter = 0 - while True: - assert counter < max_tries, "Subject schema data not propagated yet" - resp = await registry_async_retry_client.get(f"{master_url}/subjects/{subject}/versions") - if not resp.ok: - print(f"Invalid http status code: {resp.status_code}") - counter += 1 - continue - data = resp.json() - if not data: - print(f"No versions registered for subject {subject} yet") - counter += 1 - continue - assert len(data) == 2, data - assert data[0] == 1, data - print("Subject schema data propagated") - break - - # Schema deletions - resp = await registry_async_retry_client.delete(f"{slave_url}/subjects/{subject}/versions/1") - assert resp.ok - counter = 0 - while True: - assert counter < max_tries, "Subject version deletion not propagated yet" - resp = await registry_async_retry_client.get( - f"{master_url}/subjects/{subject}/versions/1", expected_response_code=404 - ) - if resp.ok: - print(f"Subject {subject} still has version 1 on master") - counter += 1 - continue - assert resp.status_code == 404 - print(f"Subject {subject} no longer has version 1") - break - - # Subject deletion - resp = await registry_async_retry_client.get(f"{master_url}/subjects/") - assert resp.ok - data = resp.json() - assert subject in data - resp = await registry_async_retry_client.delete(f"{slave_url}/subjects/{subject}") - assert resp.ok - counter = 0 - while True: - assert counter < max_tries, "Subject deletion not propagated yet" - resp = await registry_async_retry_client.get(f"{master_url}/subjects/") - if not resp.ok: - print("Could not retrieve subject list on master") - counter += 1 - continue - data = resp.json() - assert subject not in data - break diff --git a/tests/integration/test_request_forwarding.py b/tests/integration/test_request_forwarding.py new file mode 100644 index 000000000..26236ddee --- /dev/null +++ b/tests/integration/test_request_forwarding.py @@ -0,0 +1,146 @@ +""" +karapace - test request forwarding + +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" + +from _pytest.fixtures import SubRequest +from typing import AsyncGenerator +from karapace.client import Client +from tests.integration.utils.rest_client import RetryRestClient +from tests.utils import new_random_name, repeat_until_master_is_available, repeat_until_successful_request + +import asyncio +import json +import pytest + + +@pytest.fixture(scope="function", name="request_forwarding_retry_client") +async def fixture_registry_async_client( + request: SubRequest, + registry_async_pair: list[str], +) -> AsyncGenerator[RetryRestClient, None]: + registry_async_pair[0] + client = Client( + server_uri=registry_async_pair[0], + server_ca=request.config.getoption("server_ca"), + ) + + try: + # wait until the server is listening, otherwise the tests may fail + await repeat_until_successful_request( + client.get, + "subjects", + json_data=None, + headers=None, + error_msg=f"Registry API {client.server_uri} is unreachable", + timeout=10, + sleep=0.3, + ) + await repeat_until_master_is_available(client) + yield RetryRestClient(client) + finally: + await client.close() + + +async def test_schema_request_forwarding( + registry_async_pair: list[str], + request_forwarding_retry_client: RetryRestClient, +) -> None: + master_url, slave_url = registry_async_pair + + max_tries, counter = 5, 0 + wait_time = 0.5 + subject = new_random_name("subject") + schema = {"type": "string"} + other_schema = {"type": "int"} + # Config updates + for subj_path in [None, subject]: + if subj_path: + path = f"config/{subject}" + else: + path = "config" + for compat in ["FULL", "BACKWARD", "FORWARD", "NONE"]: + resp = await request_forwarding_retry_client.put(f"{slave_url}/{path}", json={"compatibility": compat}) + assert resp.ok + while True: + assert counter < max_tries, "Compat update not propagated" + resp = await request_forwarding_retry_client.get(f"{master_url}/{path}") + if not resp.ok: + print(f"Invalid http status code: {resp.status_code}") + continue + data = resp.json() + if "compatibilityLevel" not in data: + print(f"Invalid response: {data}") + counter += 1 + await asyncio.sleep(wait_time) + continue + if data["compatibilityLevel"] != compat: + print(f"Bad compatibility: {data}") + counter += 1 + await asyncio.sleep(wait_time) + continue + break + + # New schema updates, last compatibility is None + for s in [schema, other_schema]: + resp = await request_forwarding_retry_client.post( + f"{slave_url}/subjects/{subject}/versions", json={"schema": json.dumps(s)} + ) + assert resp.ok + data = resp.json() + assert "id" in data, data + counter = 0 + while True: + assert counter < max_tries, "Subject schema data not propagated yet" + resp = await request_forwarding_retry_client.get(f"{master_url}/subjects/{subject}/versions") + if not resp.ok: + print(f"Invalid http status code: {resp.status_code}") + counter += 1 + continue + data = resp.json() + if not data: + print(f"No versions registered for subject {subject} yet") + counter += 1 + continue + assert len(data) == 2, data + assert data[0] == 1, data + print("Subject schema data propagated") + break + + # Schema deletions + resp = await request_forwarding_retry_client.delete(f"{slave_url}/subjects/{subject}/versions/1") + assert resp.ok + counter = 0 + while True: + assert counter < max_tries, "Subject version deletion not propagated yet" + resp = await request_forwarding_retry_client.get( + f"{master_url}/subjects/{subject}/versions/1", expected_response_code=404 + ) + if resp.ok: + print(f"Subject {subject} still has version 1 on master") + counter += 1 + continue + assert resp.status_code == 404 + print(f"Subject {subject} no longer has version 1") + break + + # Subject deletion + resp = await request_forwarding_retry_client.get(f"{master_url}/subjects/") + assert resp.ok + data = resp.json() + assert subject in data + resp = await request_forwarding_retry_client.delete(f"{slave_url}/subjects/{subject}") + assert resp.ok + counter = 0 + while True: + assert counter < max_tries, "Subject deletion not propagated yet" + resp = await request_forwarding_retry_client.get(f"{master_url}/subjects/") + if not resp.ok: + print("Could not retrieve subject list on master") + counter += 1 + continue + data = resp.json() + assert subject not in data + break diff --git a/tests/integration/test_schema_protobuf.py b/tests/integration/test_schema_protobuf.py index 227cefda1..2c2f9b17b 100644 --- a/tests/integration/test_schema_protobuf.py +++ b/tests/integration/test_schema_protobuf.py @@ -1333,10 +1333,12 @@ async def test_registering_normalized_schema(session_logdir: Path, kafka_servers config1 = Config() config1.bootstrap_uri = kafka_servers.bootstrap_servers[0] + config1.waiting_time_before_acting_as_master_ms = 500 config2 = Config() config2.bootstrap_uri = kafka_servers.bootstrap_servers[0] config2.use_protobuf_formatter = True + config2.waiting_time_before_acting_as_master_ms = 500 async with start_schema_registry_cluster( config_templates=[config1, config2], @@ -1347,7 +1349,7 @@ async def test_registering_normalized_schema(session_logdir: Path, kafka_servers client1 = Client(server_uri=servers[0], server_ca=None) client2 = Client(server_uri=servers[1], server_ca=None) - await asyncio.sleep(10) + await asyncio.sleep(2) body = {"schemaType": "PROTOBUF", "schema": SCHEMA_WITH_OPTION_ORDERED} res = await client1.post(f"subjects/{subject}/versions?normalize=true", json=body) diff --git a/tests/integration/utils/cluster.py b/tests/integration/utils/cluster.py index f2c7de2ff..df5bafb59 100644 --- a/tests/integration/utils/cluster.py +++ b/tests/integration/utils/cluster.py @@ -79,6 +79,8 @@ async def start_schema_registry_cluster( "KARAPACE_SERVER_TLS_CERTFILE": config.server_tls_certfile if config.server_tls_certfile else "", "KARAPACE_SERVER_TLS_KEYFILE": config.server_tls_keyfile if config.server_tls_keyfile else "", "KARAPACE_USE_PROTOBUF_FORMATTER": "true" if config.use_protobuf_formatter else "false", + "KARAPACE_WAITING_TIME_BEFORE_ACTING_AS_MASTER_MS": str(config.waiting_time_before_acting_as_master_ms), + "KARAPACE_MASTER_ELIGIBILITY": str(config.master_eligibility), } process = popen_karapace_all(module="schema_registry", env=env, stdout=logfile, stderr=errfile) stack.callback(stop_process, process) diff --git a/tests/unit/schema_registry/test_forwarding.py b/tests/unit/schema_registry/test_forwarding.py new file mode 100644 index 000000000..ccb152b5d --- /dev/null +++ b/tests/unit/schema_registry/test_forwarding.py @@ -0,0 +1,89 @@ +""" +karapace - test request forwarding in router + +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" + +from fastapi import Request +from fastapi.exceptions import HTTPException +from karapace.config import Config +from karapace.forward_client import ForwardClient +from schema_registry.controller import KarapaceSchemaRegistryController +from schema_registry.registry import KarapaceSchemaRegistry +from schema_registry.routers.config import config_put +from unittest.mock import AsyncMock, Mock + +from schema_registry.routers.requests import CompatibilityRequest + +import pytest + + +async def test_forwarding_not_a_primary_and_own_primary_url() -> None: + compatibility_request = CompatibilityRequest(compatibility="FORWARD") + forward_client_mock = AsyncMock(spec=ForwardClient) + config = Config() + config.host = "127.0.0.1" + config.port = 8081 + schema_registry_mock = AsyncMock(spec=KarapaceSchemaRegistry) + schema_registry_mock.get_master.return_value = (False, "http://127.0.0.1:8081") + + with pytest.raises(HTTPException): + await config_put( + request=Mock(spec=Request), + compatibility_level_request=compatibility_request, + schema_registry=schema_registry_mock, + user=None, + forward_client=forward_client_mock, + authorizer=None, + controller=None, + config=config, + ) + + forward_client_mock.forward_request_remote.assert_not_called() + + +async def test_forwarding_to_a_primary() -> None: + compatibility_request = CompatibilityRequest(compatibility="FORWARD") + forward_client_mock = AsyncMock(spec=ForwardClient) + config = Config() + config.host = "127.0.0.1" + schema_registry_mock = AsyncMock(spec=KarapaceSchemaRegistry) + schema_registry_mock.get_master.return_value = (False, "http://127.0.0.1:8082") + + await config_put( + request=Mock(spec=Request), + compatibility_level_request=compatibility_request, + schema_registry=schema_registry_mock, + user=None, + forward_client=forward_client_mock, + authorizer=None, + controller=None, + config=config, + ) + + forward_client_mock.forward_request_remote.assert_called_once() + + +async def test_no_forwarding_as_instance_is_primary() -> None: + compatibility_request = CompatibilityRequest(compatibility="FORWARD") + forward_client_mock = AsyncMock(spec=ForwardClient) + controller_mock = AsyncMock(spec=KarapaceSchemaRegistryController) + config = Config() + config.host = "127.0.0.1" + schema_registry_mock = AsyncMock(spec=KarapaceSchemaRegistry) + schema_registry_mock.get_master.return_value = (True, "http://127.0.0.1:8081") + + await config_put( + request=Mock(spec=Request), + compatibility_level_request=compatibility_request, + schema_registry=schema_registry_mock, + user=None, + forward_client=forward_client_mock, + authorizer=None, + controller=controller_mock, + config=config, + ) + + controller_mock.config_set.assert_called_once() + forward_client_mock.forward_request_remote.assert_not_called() diff --git a/tests/utils.py b/tests/utils.py index 634b77e6f..f578914f9 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -21,7 +21,6 @@ import os import ssl import sys -import time import uuid consumer_valid_payload = { @@ -328,7 +327,7 @@ async def repeat_until_master_is_available(client: Client) -> None: reply = res.json() if reply is not None and "master_available" in reply and reply["master_available"] is True: break - time.sleep(1) + await asyncio.sleep(1) def write_ini(file_path: Path, ini_data: dict) -> None: