diff --git a/src/v/pandaproxy/schema_registry/sharded_store.cc b/src/v/pandaproxy/schema_registry/sharded_store.cc index 67e15dc180a1..0fc747e06fd0 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.cc +++ b/src/v/pandaproxy/schema_registry/sharded_store.cc @@ -477,12 +477,12 @@ sharded_store::get_subject_version_written_at(subject sub, schema_version ver) { }); } -ss::future -sharded_store::delete_subject_version(subject sub, schema_version ver) { +ss::future sharded_store::delete_subject_version( + subject sub, schema_version ver, force force) { auto sub_shard{shard_for(sub)}; co_return co_await _store.invoke_on( - sub_shard, _smp_opts, [sub{std::move(sub)}, ver](store& s) { - return s.delete_subject_version(sub, ver).value(); + sub_shard, _smp_opts, [sub{std::move(sub)}, ver, force](store& s) { + return s.delete_subject_version(sub, ver, force).value(); }); } diff --git a/src/v/pandaproxy/schema_registry/sharded_store.h b/src/v/pandaproxy/schema_registry/sharded_store.h index c6473f0e6bc6..798fa7e62c4d 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.h +++ b/src/v/pandaproxy/schema_registry/sharded_store.h @@ -113,8 +113,9 @@ class sharded_store { get_subject_version_written_at(subject sub, schema_version version); ///\brief Delete a subject version - ss::future - delete_subject_version(subject sub, schema_version version); + /// \param force Override checks for soft-delete first. + ss::future delete_subject_version( + subject sub, schema_version version, force f = force::no); ///\brief Get the global compatibility level. ss::future get_compatibility(); diff --git a/src/v/pandaproxy/schema_registry/storage.h b/src/v/pandaproxy/schema_registry/storage.h index 39bfb98532d1..b39ab8a1898a 100644 --- a/src/v/pandaproxy/schema_registry/storage.h +++ b/src/v/pandaproxy/schema_registry/storage.h @@ -1243,7 +1243,7 @@ struct consume_to_store { if (!val) { try { co_await _store.delete_subject_version( - key.sub, key.version); + key.sub, key.version, force::yes); } catch (exception& e) { // This is allowed to throw not_found errors. When we // tombstone all the records referring to a particular diff --git a/src/v/pandaproxy/schema_registry/store.h b/src/v/pandaproxy/schema_registry/store.h index 456424e64651..a785354922a8 100644 --- a/src/v/pandaproxy/schema_registry/store.h +++ b/src/v/pandaproxy/schema_registry/store.h @@ -428,16 +428,19 @@ class store { } ///\brief Delete a subject version. - result - delete_subject_version(const subject& sub, schema_version version) { + result delete_subject_version( + const subject& sub, schema_version version, force force = force::no) { auto sub_it = BOOST_OUTCOME_TRYX( get_subject_iter(sub, include_deleted::yes)); auto& versions = sub_it->second.versions; auto v_it = BOOST_OUTCOME_TRYX( get_version_iter(*sub_it, version, include_deleted::yes)); - // A hard delete should always be preceded by a soft delete - if (!(v_it->deleted || sub_it->second.deleted)) { + // A hard delete should always be preceded by a soft delete, + // however, due to compaction, it's possible that a soft-delete does not + // appear on the topic. The topic is still correct, so override the + // check if force::yes. + if (!force && !(v_it->deleted || sub_it->second.deleted)) { return not_deleted(sub, version); } diff --git a/src/v/pandaproxy/schema_registry/test/consume_to_store.cc b/src/v/pandaproxy/schema_registry/test/consume_to_store.cc index 7fb28f1303f6..3c5e3e4ffa3f 100644 --- a/src/v/pandaproxy/schema_registry/test/consume_to_store.cc +++ b/src/v/pandaproxy/schema_registry/test/consume_to_store.cc @@ -191,3 +191,65 @@ SEASTAR_THREAD_TEST_CASE(test_consume_to_store) { auto sub_res = s.get_subjects(pps::include_deleted::no).get(); BOOST_REQUIRE_EQUAL(sub_res.size(), 0); } + +template +model::record_batch as_record_batch(Key key) { + storage::record_batch_builder rb{ + model::record_batch_type::raft_data, model::offset{0}}; + rb.add_raw_kv(to_json_iobuf(std::move(key)), std::nullopt); + return std::move(rb).build(); +} + +SEASTAR_THREAD_TEST_CASE(test_consume_to_store_after_compaction) { + pps::sharded_store s; + s.start(ss::default_smp_service_group()).get(); + auto stop_store = ss::defer([&s]() { s.stop().get(); }); + + // This kafka client will not be used by the sequencer + // (which itself is only instantiated to receive consume_to_store's + // offset updates), is just needed for constructor; + ss::sharded dummy_kafka_client; + dummy_kafka_client + .start( + to_yaml(kafka::client::configuration{}, config::redact_secrets::no)) + .get(); + auto stop_kafka_client = ss::defer( + [&dummy_kafka_client]() { dummy_kafka_client.stop().get(); }); + + ss::sharded seq; + seq + .start( + model::node_id{0}, + ss::default_smp_service_group(), + std::reference_wrapper(dummy_kafka_client), + std::reference_wrapper(s)) + .get(); + auto stop_seq = ss::defer([&seq]() { seq.stop().get(); }); + + auto c = pps::consume_to_store(s, seq.local()); + + auto sequence = model::offset{0}; + const auto node_id = model::node_id{123}; + + // Insert the schema at seq 0 + auto good_schema_1 = pps::as_record_batch( + pps::schema_key{sequence, node_id, subject0, version0, magic1}, + pps::canonical_schema_value{{subject0, string_def0}, version0, id0}); + BOOST_REQUIRE_NO_THROW(c(good_schema_1.copy()).get()); + // Roll the segment + // Soft delete the version (at seq 1) + // Perm delete the version (at seq 1) + // Compact that away, so we have a gap + // Restart + // Delete seq0, version 0, it now appears as not soft-deleted + auto perm_delete_schema_1 = as_record_batch( + pps::schema_key{sequence, node_id, subject0, version0, magic1}); + BOOST_REQUIRE_NO_THROW(c(perm_delete_schema_1.copy()).get()); + + BOOST_REQUIRE_EXCEPTION( + s.get_versions(subject0, pps::include_deleted::yes).get(), + pps::exception, + [](pps::exception e) { + return e.code() == pps::error_code::subject_not_found; + }); +} diff --git a/src/v/pandaproxy/schema_registry/types.h b/src/v/pandaproxy/schema_registry/types.h index 613382e0a8d9..1fb3d6d24839 100644 --- a/src/v/pandaproxy/schema_registry/types.h +++ b/src/v/pandaproxy/schema_registry/types.h @@ -31,6 +31,7 @@ using permanent_delete = ss::bool_class; using include_deleted = ss::bool_class; using is_deleted = ss::bool_class; using default_to_global = ss::bool_class; +using force = ss::bool_class; template std::enable_if_t, std::optional>