Skip to content

Commit

Permalink
Merge pull request #14051 from BenPope/schema_registry_split_project_ids
Browse files Browse the repository at this point in the history
schema_registry: Split project_ids
  • Loading branch information
piyushredpanda authored Nov 2, 2023
2 parents 676e375 + d6871dd commit 1a2d39b
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 17 deletions.
13 changes: 11 additions & 2 deletions src/v/pandaproxy/schema_registry/handlers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,17 @@ post_subject_versions(server::request_t rq, server::reply_t rp) {
unparsed.id.value_or(invalid_schema_id),
is_deleted::no};

auto schema_id = co_await rq.service().writer().write_subject_version(
std::move(schema));
auto ids = co_await rq.service().schema_store().get_schema_version(schema);

schema_id schema_id{ids.id.value_or(invalid_schema_id)};
if (!ids.version.has_value()) {
schema.id = ids.id.value_or(invalid_schema_id);
schema.version = schema.version == invalid_schema_version
? ids.version.value_or(invalid_schema_version)
: schema.version;
schema_id = co_await rq.service().writer().write_subject_version(
std::move(schema));
}

auto json_rslt{
json::rjson_serialize(post_subject_versions_response{.id{schema_id}})};
Expand Down
2 changes: 1 addition & 1 deletion src/v/pandaproxy/schema_registry/seq_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ ss::future<> seq_writer::wait_for(model::offset offset) {
.consume(
consume_to_store{seq._store, seq}, model::no_timeout);
} else {
vlog(plog.debug, "wait_for clean (offset {})", offset);
vlog(plog.trace, "wait_for clean (offset {})", offset);
return ss::make_ready_future<>();
}
});
Expand Down
36 changes: 23 additions & 13 deletions src/v/pandaproxy/schema_registry/sharded_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ sharded_store::make_valid_schema(canonical_schema schema) {
throw as_exception(invalid_schema_type(schema.type()));
}

ss::future<sharded_store::insert_result>
sharded_store::project_ids(subject_schema schema) {
ss::future<sharded_store::has_schema_result>
sharded_store::get_schema_version(subject_schema schema) {
// Validate the schema (may throw)
co_await validate_schema(schema.schema);

Expand Down Expand Up @@ -179,14 +179,18 @@ sharded_store::project_ids(subject_schema schema) {
return res.value();
});

auto has_version = s_id.has_value()
&& absl::c_any_of(
versions, [id = *s_id](auto const& s_id_v) {
return s_id_v.id == id;
});
std::optional<schema_version> v_id;
if (s_id.has_value()) {
auto v_it = absl::c_find_if(versions, [id = *s_id](auto const& s_id_v) {
return s_id_v.id == id;
});
if (v_it != versions.end()) {
v_id.emplace(v_it->version);
}
}

// Check compatibility of the schema
if (!has_version && !versions.empty()) {
if (!v_id.has_value() && !versions.empty()) {
auto compat = co_await is_compatible(
versions.back().version, schema.schema);
if (!compat) {
Expand All @@ -198,17 +202,23 @@ sharded_store::project_ids(subject_schema schema) {
sub));
}
}
co_return has_schema_result{s_id, v_id};
}

if (!s_id) {
ss::future<sharded_store::insert_result>
sharded_store::project_ids(subject_schema schema) {
auto const& sub = schema.schema.sub();
auto s_id = schema.id;
if (s_id == invalid_schema_id) {
// New schema, project an ID for it.
s_id = co_await project_schema_id();
vlog(plog.debug, "project_ids: projected new ID {}", s_id.value());
vlog(plog.debug, "project_ids: projected new ID {}", s_id);
}

auto sub_shard{shard_for(sub)};
auto v_id = co_await _store.invoke_on(
sub_shard, _smp_opts, [sub, id{s_id.value()}](store& s) {
return s.project_version(sub, id);
sub_shard, _smp_opts, [sub, s_id](store& s) {
return s.project_version(sub, s_id);
});

const bool is_new = v_id.has_value();
Expand All @@ -217,7 +227,7 @@ sharded_store::project_ids(subject_schema schema) {
}

co_return insert_result{
v_id.value_or(invalid_schema_version), s_id.value(), is_new};
v_id.value_or(invalid_schema_version), s_id, is_new};
}

ss::future<bool> sharded_store::upsert(
Expand Down
7 changes: 6 additions & 1 deletion src/v/pandaproxy/schema_registry/sharded_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,17 @@ class sharded_store {
///\brief Construct a schema in the native format
ss::future<valid_schema> make_valid_schema(canonical_schema schema);

struct has_schema_result {
std::optional<schema_id> id;
std::optional<schema_version> version;
};
ss::future<has_schema_result> get_schema_version(subject_schema schema);

struct insert_result {
schema_version version;
schema_id id;
bool inserted;
};

ss::future<insert_result> project_ids(subject_schema schema);

ss::future<bool> upsert(
Expand Down

0 comments on commit 1a2d39b

Please sign in to comment.