Skip to content

Commit

Permalink
Merge pull request #17095 from vbotbuildovich/backport-pr-17091-v23.2…
Browse files Browse the repository at this point in the history
….x-978

[v23.2.x] schema_registry: Override soft-delete checks for consume_to_store
  • Loading branch information
michael-redpanda authored Mar 14, 2024
2 parents dd8cee2 + 534c815 commit ab88ad0
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 11 deletions.
8 changes: 4 additions & 4 deletions src/v/pandaproxy/schema_registry/sharded_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -477,12 +477,12 @@ sharded_store::get_subject_version_written_at(subject sub, schema_version ver) {
});
}

ss::future<bool>
sharded_store::delete_subject_version(subject sub, schema_version ver) {
ss::future<bool> sharded_store::delete_subject_version(
subject sub, schema_version ver, force force) {
auto sub_shard{shard_for(sub)};
co_return co_await _store.invoke_on(
sub_shard, _smp_opts, [sub{std::move(sub)}, ver](store& s) {
return s.delete_subject_version(sub, ver).value();
sub_shard, _smp_opts, [sub{std::move(sub)}, ver, force](store& s) {
return s.delete_subject_version(sub, ver, force).value();
});
}

Expand Down
5 changes: 3 additions & 2 deletions src/v/pandaproxy/schema_registry/sharded_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,9 @@ class sharded_store {
get_subject_version_written_at(subject sub, schema_version version);

///\brief Delete a subject version
ss::future<bool>
delete_subject_version(subject sub, schema_version version);
/// \param force Override checks for soft-delete first.
ss::future<bool> delete_subject_version(
subject sub, schema_version version, force f = force::no);

///\brief Get the global compatibility level.
ss::future<compatibility_level> get_compatibility();
Expand Down
2 changes: 1 addition & 1 deletion src/v/pandaproxy/schema_registry/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -1243,7 +1243,7 @@ struct consume_to_store {
if (!val) {
try {
co_await _store.delete_subject_version(
key.sub, key.version);
key.sub, key.version, force::yes);
} catch (exception& e) {
// This is allowed to throw not_found errors. When we
// tombstone all the records referring to a particular
Expand Down
11 changes: 7 additions & 4 deletions src/v/pandaproxy/schema_registry/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -428,16 +428,19 @@ class store {
}

///\brief Delete a subject version.
result<bool>
delete_subject_version(const subject& sub, schema_version version) {
result<bool> delete_subject_version(
const subject& sub, schema_version version, force force = force::no) {
auto sub_it = BOOST_OUTCOME_TRYX(
get_subject_iter(sub, include_deleted::yes));
auto& versions = sub_it->second.versions;
auto v_it = BOOST_OUTCOME_TRYX(
get_version_iter(*sub_it, version, include_deleted::yes));

// A hard delete should always be preceded by a soft delete
if (!(v_it->deleted || sub_it->second.deleted)) {
// A hard delete should always be preceded by a soft delete,
// however, due to compaction, it's possible that a soft-delete does not
// appear on the topic. The topic is still correct, so override the
// check if force::yes.
if (!force && !(v_it->deleted || sub_it->second.deleted)) {
return not_deleted(sub, version);
}

Expand Down
62 changes: 62 additions & 0 deletions src/v/pandaproxy/schema_registry/test/consume_to_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,65 @@ SEASTAR_THREAD_TEST_CASE(test_consume_to_store) {
auto sub_res = s.get_subjects(pps::include_deleted::no).get();
BOOST_REQUIRE_EQUAL(sub_res.size(), 0);
}

template<typename Key>
model::record_batch as_record_batch(Key key) {
storage::record_batch_builder rb{
model::record_batch_type::raft_data, model::offset{0}};
rb.add_raw_kv(to_json_iobuf(std::move(key)), std::nullopt);
return std::move(rb).build();
}

SEASTAR_THREAD_TEST_CASE(test_consume_to_store_after_compaction) {
pps::sharded_store s;
s.start(ss::default_smp_service_group()).get();
auto stop_store = ss::defer([&s]() { s.stop().get(); });

// This kafka client will not be used by the sequencer
// (which itself is only instantiated to receive consume_to_store's
// offset updates), is just needed for constructor;
ss::sharded<kafka::client::client> dummy_kafka_client;
dummy_kafka_client
.start(
to_yaml(kafka::client::configuration{}, config::redact_secrets::no))
.get();
auto stop_kafka_client = ss::defer(
[&dummy_kafka_client]() { dummy_kafka_client.stop().get(); });

ss::sharded<pps::seq_writer> seq;
seq
.start(
model::node_id{0},
ss::default_smp_service_group(),
std::reference_wrapper(dummy_kafka_client),
std::reference_wrapper(s))
.get();
auto stop_seq = ss::defer([&seq]() { seq.stop().get(); });

auto c = pps::consume_to_store(s, seq.local());

auto sequence = model::offset{0};
const auto node_id = model::node_id{123};

// Insert the schema at seq 0
auto good_schema_1 = pps::as_record_batch(
pps::schema_key{sequence, node_id, subject0, version0, magic1},
pps::canonical_schema_value{{subject0, string_def0}, version0, id0});
BOOST_REQUIRE_NO_THROW(c(good_schema_1.copy()).get());
// Roll the segment
// Soft delete the version (at seq 1)
// Perm delete the version (at seq 1)
// Compact that away, so we have a gap
// Restart
// Delete seq0, version 0, it now appears as not soft-deleted
auto perm_delete_schema_1 = as_record_batch(
pps::schema_key{sequence, node_id, subject0, version0, magic1});
BOOST_REQUIRE_NO_THROW(c(perm_delete_schema_1.copy()).get());

BOOST_REQUIRE_EXCEPTION(
s.get_versions(subject0, pps::include_deleted::yes).get(),
pps::exception,
[](pps::exception e) {
return e.code() == pps::error_code::subject_not_found;
});
}
1 change: 1 addition & 0 deletions src/v/pandaproxy/schema_registry/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ using permanent_delete = ss::bool_class<struct delete_tag>;
using include_deleted = ss::bool_class<struct include_deleted_tag>;
using is_deleted = ss::bool_class<struct is_deleted_tag>;
using default_to_global = ss::bool_class<struct default_to_global_tag>;
using force = ss::bool_class<struct force_tag>;

template<typename E>
std::enable_if_t<std::is_enum_v<E>, std::optional<E>>
Expand Down

0 comments on commit ab88ad0

Please sign in to comment.