diff --git a/src/v/pandaproxy/schema_registry/handlers.cc b/src/v/pandaproxy/schema_registry/handlers.cc index 10c811bb81a6b..146cac174df81 100644 --- a/src/v/pandaproxy/schema_registry/handlers.cc +++ b/src/v/pandaproxy/schema_registry/handlers.cc @@ -352,8 +352,17 @@ post_subject_versions(server::request_t rq, server::reply_t rp) { unparsed.id.value_or(invalid_schema_id), is_deleted::no}; - auto schema_id = co_await rq.service().writer().write_subject_version( - std::move(schema)); + auto ids = co_await rq.service().schema_store().get_schema_version(schema); + + schema_id schema_id{ids.id.value_or(invalid_schema_id)}; + if (!ids.version.has_value()) { + schema.id = ids.id.value_or(invalid_schema_id); + schema.version = schema.version == invalid_schema_version + ? ids.version.value_or(invalid_schema_version) + : schema.version; + schema_id = co_await rq.service().writer().write_subject_version( + std::move(schema)); + } auto json_rslt{ json::rjson_serialize(post_subject_versions_response{.id{schema_id}})}; diff --git a/src/v/pandaproxy/schema_registry/seq_writer.cc b/src/v/pandaproxy/schema_registry/seq_writer.cc index 9679d2cdd98e6..365697493a775 100644 --- a/src/v/pandaproxy/schema_registry/seq_writer.cc +++ b/src/v/pandaproxy/schema_registry/seq_writer.cc @@ -62,7 +62,7 @@ ss::future<> seq_writer::wait_for(model::offset offset) { .consume( consume_to_store{seq._store, seq}, model::no_timeout); } else { - vlog(plog.debug, "wait_for clean (offset {})", offset); + vlog(plog.trace, "wait_for clean (offset {})", offset); return ss::make_ready_future<>(); } }); diff --git a/src/v/pandaproxy/schema_registry/sharded_store.cc b/src/v/pandaproxy/schema_registry/sharded_store.cc index e1becfae68d97..bc08c87eb4f13 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.cc +++ b/src/v/pandaproxy/schema_registry/sharded_store.cc @@ -123,8 +123,8 @@ sharded_store::make_valid_schema(canonical_schema schema) { throw as_exception(invalid_schema_type(schema.type())); } -ss::future -sharded_store::project_ids(subject_schema schema) { +ss::future +sharded_store::get_schema_version(subject_schema schema) { // Validate the schema (may throw) co_await validate_schema(schema.schema); @@ -179,14 +179,18 @@ sharded_store::project_ids(subject_schema schema) { return res.value(); }); - auto has_version = s_id.has_value() - && absl::c_any_of( - versions, [id = *s_id](auto const& s_id_v) { - return s_id_v.id == id; - }); + std::optional v_id; + if (s_id.has_value()) { + auto v_it = absl::c_find_if(versions, [id = *s_id](auto const& s_id_v) { + return s_id_v.id == id; + }); + if (v_it != versions.end()) { + v_id.emplace(v_it->version); + } + } // Check compatibility of the schema - if (!has_version && !versions.empty()) { + if (!v_id.has_value() && !versions.empty()) { auto compat = co_await is_compatible( versions.back().version, schema.schema); if (!compat) { @@ -198,17 +202,23 @@ sharded_store::project_ids(subject_schema schema) { sub)); } } + co_return has_schema_result{s_id, v_id}; +} - if (!s_id) { +ss::future +sharded_store::project_ids(subject_schema schema) { + auto const& sub = schema.schema.sub(); + auto s_id = schema.id; + if (s_id == invalid_schema_id) { // New schema, project an ID for it. s_id = co_await project_schema_id(); - vlog(plog.debug, "project_ids: projected new ID {}", s_id.value()); + vlog(plog.debug, "project_ids: projected new ID {}", s_id); } auto sub_shard{shard_for(sub)}; auto v_id = co_await _store.invoke_on( - sub_shard, _smp_opts, [sub, id{s_id.value()}](store& s) { - return s.project_version(sub, id); + sub_shard, _smp_opts, [sub, s_id](store& s) { + return s.project_version(sub, s_id); }); const bool is_new = v_id.has_value(); @@ -217,7 +227,7 @@ sharded_store::project_ids(subject_schema schema) { } co_return insert_result{ - v_id.value_or(invalid_schema_version), s_id.value(), is_new}; + v_id.value_or(invalid_schema_version), s_id, is_new}; } ss::future sharded_store::upsert( diff --git a/src/v/pandaproxy/schema_registry/sharded_store.h b/src/v/pandaproxy/schema_registry/sharded_store.h index f9b317154123d..eb42c1a3e0c87 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.h +++ b/src/v/pandaproxy/schema_registry/sharded_store.h @@ -35,12 +35,17 @@ class sharded_store { ///\brief Construct a schema in the native format ss::future make_valid_schema(canonical_schema schema); + struct has_schema_result { + std::optional id; + std::optional version; + }; + ss::future get_schema_version(subject_schema schema); + struct insert_result { schema_version version; schema_id id; bool inserted; }; - ss::future project_ids(subject_schema schema); ss::future upsert(