diff --git a/testsuite/openshift/objects/api_key.py b/testsuite/openshift/objects/api_key.py index 15b20cfe..7655d14c 100644 --- a/testsuite/openshift/objects/api_key.py +++ b/testsuite/openshift/objects/api_key.py @@ -2,7 +2,7 @@ import base64 from testsuite.openshift.client import OpenShiftClient -from testsuite.openshift.objects import OpenShiftObject +from testsuite.openshift.objects import OpenShiftObject, modify class APIKey(OpenShiftObject): @@ -32,3 +32,8 @@ def create_instance(cls, openshift: OpenShiftClient, name, label, api_key): } return cls(model, context=openshift.context) + + @modify + def update_api_key(self, api_key): + """Updates API key Secret with new API key""" + self.model.data["api_key"] = base64.b64encode(api_key.encode("utf-8")).decode('ascii') diff --git a/testsuite/tests/kuadrant/authorino/identity/api_key/test_reconciliation.py b/testsuite/tests/kuadrant/authorino/identity/api_key/test_reconciliation.py new file mode 100644 index 00000000..1ad10e0d --- /dev/null +++ b/testsuite/tests/kuadrant/authorino/identity/api_key/test_reconciliation.py @@ -0,0 +1,56 @@ +"""Tests Secret reconciliation for API key identity verification & authentication""" +import pytest + +from testsuite.httpx.auth import HeaderApiKeyAuth + + +@pytest.fixture(scope="function") +def api_key(create_api_key, module_label): + """Creates API key Secret""" + api_key = "api_key_value" + return create_api_key("api-key", module_label, api_key) + + +@pytest.fixture(scope="function") +def auth(api_key): + """Valid API Key Auth""" + return HeaderApiKeyAuth(api_key) + + +@pytest.fixture(scope="module") +def authorization(authorization, module_label): + """Creates AuthConfig with API key identity""" + authorization.add_api_key_identity("api_key", match_label=module_label) + return authorization + + +def test_create_new_api_key(client, create_api_key, module_label): + """Test reconciliation when API key Secret is freshly created with valid label""" + api_key = create_api_key("api-key", module_label, "new_api_key") + auth = HeaderApiKeyAuth(api_key) + response = client.get("/get", auth=auth) + assert response.status_code == 200 + + +def test_delete_api_key(client, auth, api_key): + """Test reconciliation when API key Secret is deleted""" + response = client.get("/get", auth=auth) + assert response.status_code == 200 + + api_key.delete() + response = client.get("/get", auth=auth) + assert response.status_code == 401 + + +def test_update_api_key(client, auth, api_key): + """Test reconciliation when API key Secret is updated""" + response = client.get("/get", auth=auth) + assert response.status_code == 200 + + api_key.update_api_key("update_api_key") + response = client.get("/get", auth=auth) + assert response.status_code == 401 + + auth = HeaderApiKeyAuth(api_key) + response = client.get("/get", auth=auth) + assert response.status_code == 200