From e9c9911f3c2ce0a6620bfd0c9607efd3161a7711 Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Wed, 8 May 2024 13:27:12 +0100 Subject: [PATCH] schema_registry: Support `deleted=true` for `POST /subjects/` This allows looking up a schema against a subject even if it has been soft deleted. Signed-off-by: Ben Pope (cherry picked from commit e881a8e5d67284d7bc83c8bc2e5ce40ad57c0246) --- .../api/api-doc/schema_registry.json | 12 ++++ src/v/pandaproxy/schema_registry/handlers.cc | 10 ++- .../schema_registry/sharded_store.cc | 8 +-- .../schema_registry/sharded_store.h | 3 +- tests/rptest/tests/schema_registry_test.py | 61 +++++++++++++++++-- 5 files changed, 81 insertions(+), 13 deletions(-) diff --git a/src/v/pandaproxy/api/api-doc/schema_registry.json b/src/v/pandaproxy/api/api-doc/schema_registry.json index 8e0f788ac6eb0..eff50dcb95f5a 100644 --- a/src/v/pandaproxy/api/api-doc/schema_registry.json +++ b/src/v/pandaproxy/api/api-doc/schema_registry.json @@ -466,6 +466,12 @@ "required": true, "type": "string" }, + { + "name": "deleted", + "in": "query", + "required": false, + "type": "boolean" + }, { "name": "schema_def", "in": "body", @@ -482,6 +488,12 @@ "$ref": "#/definitions/subject_schema" } }, + "404": { + "description": "Not found", + "schema": { + "$ref": "#/definitions/error_body" + } + }, "409": { "description": "Incompatible schema", "schema": { diff --git a/src/v/pandaproxy/schema_registry/handlers.cc b/src/v/pandaproxy/schema_registry/handlers.cc index 335980b1d8d21..2c8874591bb67 100644 --- a/src/v/pandaproxy/schema_registry/handlers.cc +++ b/src/v/pandaproxy/schema_registry/handlers.cc @@ -329,12 +329,15 @@ post_subject(server::request_t rq, server::reply_t rp) { parse_content_type_header(rq); parse_accept_header(rq, rp); auto sub = parse::request_param(*rq.req, "subject"); - vlog(plog.debug, "post_subject subject='{}'", sub); + auto inc_del{ + parse::query_param>(*rq.req, "deleted") + .value_or(include_deleted::no)}; + vlog(plog.debug, "post_subject subject='{}', deleted='{}'", sub, inc_del); // We must sync co_await rq.service().writer().read_sync(); // Force 40401 if no subject - co_await rq.service().schema_store().get_versions(sub, include_deleted::no); + co_await rq.service().schema_store().get_versions(sub, inc_del); canonical_schema schema; try { @@ -353,7 +356,8 @@ post_subject(server::request_t rq, server::reply_t rp) { rq.req.reset(); - auto sub_schema = co_await rq.service().schema_store().has_schema(schema); + auto sub_schema = co_await rq.service().schema_store().has_schema( + schema, inc_del); auto json_rslt{json::rjson_serialize(post_subject_versions_version_response{ .schema{std::move(sub_schema.schema)}, diff --git a/src/v/pandaproxy/schema_registry/sharded_store.cc b/src/v/pandaproxy/schema_registry/sharded_store.cc index 0fc747e06fd01..b257f3a45c89f 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.cc +++ b/src/v/pandaproxy/schema_registry/sharded_store.cc @@ -254,8 +254,9 @@ ss::future sharded_store::has_schema(schema_id id) { }); } -ss::future sharded_store::has_schema(canonical_schema schema) { - auto versions = co_await get_versions(schema.sub(), include_deleted::no); +ss::future +sharded_store::has_schema(canonical_schema schema, include_deleted inc_del) { + auto versions = co_await get_versions(schema.sub(), inc_del); try { co_await validate_schema(schema); @@ -266,8 +267,7 @@ ss::future sharded_store::has_schema(canonical_schema schema) { std::optional sub_schema; for (auto ver : versions) { try { - auto res = co_await get_subject_schema( - schema.sub(), ver, include_deleted::no); + auto res = co_await get_subject_schema(schema.sub(), ver, inc_del); if (schema.def() == res.schema.def()) { sub_schema.emplace(std::move(res)); break; diff --git a/src/v/pandaproxy/schema_registry/sharded_store.h b/src/v/pandaproxy/schema_registry/sharded_store.h index 798fa7e62c4d5..4c6a43e11c98d 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.h +++ b/src/v/pandaproxy/schema_registry/sharded_store.h @@ -58,7 +58,8 @@ class sharded_store { is_deleted deleted); ss::future has_schema(schema_id id); - ss::future has_schema(canonical_schema schema); + ss::future has_schema( + canonical_schema schema, include_deleted inc_del = include_deleted::no); ///\brief Return a schema definition by id. ss::future get_schema_definition(schema_id id); diff --git a/tests/rptest/tests/schema_registry_test.py b/tests/rptest/tests/schema_registry_test.py index 4749ccf3d29db..d66feb33241d3 100644 --- a/tests/rptest/tests/schema_registry_test.py +++ b/tests/rptest/tests/schema_registry_test.py @@ -361,13 +361,15 @@ def _get_subjects(self, deleted=False, headers=HTTP_GET_HEADERS, **kwargs): def _post_subjects_subject(self, subject, data, + deleted=False, headers=HTTP_POST_HEADERS, **kwargs): - return self._request("POST", - f"subjects/{subject}", - headers=headers, - data=data, - **kwargs) + return self._request( + "POST", + f"subjects/{subject}{'?deleted=true' if deleted else ''}", + headers=headers, + data=data, + **kwargs) def _post_subjects_subject_versions(self, subject, @@ -833,6 +835,55 @@ def test_post_subjects_subject(self): assert result["error_code"] == 40403 assert result["message"] == f"Schema not found" + self.logger.info("Soft deleting the schema") + result_raw = self._delete_subject_version(subject=subject, + version=1, + permanent=False) + assert result_raw.status_code == requests.codes.ok + + self.logger.info( + "Posting deleted existing schema should be fail (no subject)") + result_raw = self._post_subjects_subject(subject=subject, + data=json.dumps( + {"schema": schema1_def})) + self.logger.info(result_raw) + self.logger.info(result_raw.content) + assert result_raw.status_code == requests.codes.not_found + result = result_raw.json() + assert result["error_code"] == 40401 + assert result["message"] == f"Subject '{subject}' not found." + + self.logger.info("Posting deleted existing schema should be success") + result_raw = self._post_subjects_subject( + subject=subject, + data=json.dumps({"schema": schema1_def}, ), + deleted=True) + self.logger.info(result_raw) + self.logger.info(result_raw.content) + assert result_raw.status_code == requests.codes.ok + result = result_raw.json() + assert result["subject"] == subject + assert result["id"] == 1 + assert result["version"] == 1 + assert result["schema"] + + self.logger.info("Posting compatible schema should be success") + result_raw = self._post_subjects_subject_versions( + subject=subject, data=json.dumps({"schema": schema2_def})) + assert result_raw.status_code == requests.codes.ok + + self.logger.info( + "Posting deleted existing schema should be fail (no schema)") + result_raw = self._post_subjects_subject(subject=subject, + data=json.dumps( + {"schema": schema1_def})) + self.logger.info(result_raw) + self.logger.info(result_raw.content) + assert result_raw.status_code == requests.codes.not_found + result = result_raw.json() + assert result["error_code"] == 40403 + assert result["message"] == f"Schema not found" + @cluster(num_nodes=3) def test_config(self): """