Skip to content

Commit

Permalink
Merge pull request #24119 from ztlpn/iceberg-delete
Browse files Browse the repository at this point in the history
Iceberg topic lifecycle management
  • Loading branch information
piyushredpanda authored Nov 25, 2024
2 parents 26d696e + 123ccdd commit 531a357
Show file tree
Hide file tree
Showing 80 changed files with 2,204 additions and 470 deletions.
2 changes: 1 addition & 1 deletion src/v/cloud_storage/tests/cloud_storage_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ TEST_P(EndToEndFixture, TestProduceConsumeFromCloudWithSpillover) {
ASSERT_EQ(req.method, "PUT");
cloud_storage::partition_manifest spm(
partition->get_ntp_config().ntp(),
partition->get_ntp_config().get_initial_revision());
partition->get_ntp_config().get_remote_revision());
iobuf sbuf;
sbuf.append(req.content.data(), req.content_length);
vlog(
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/archival/archival_metadata_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ archival_metadata_stm::archival_metadata_stm(
, _logger(logger, ssx::sformat("ntp: {}", raft->ntp()))
, _mem_tracker(ss::make_shared<util::mem_tracker>(raft->ntp().path()))
, _manifest(ss::make_shared<cloud_storage::partition_manifest>(
raft->ntp(), raft->log_config().get_initial_revision(), _mem_tracker))
raft->ntp(), raft->log_config().get_remote_revision(), _mem_tracker))
, _cloud_storage_api(remote)
, _feature_table(ft)
, _remote_path_provider(remote_label, remote_topic_namespace_override) {}
Expand Down Expand Up @@ -1272,7 +1272,7 @@ ss::future<> archival_metadata_stm::apply_local_snapshot(

*_manifest = cloud_storage::partition_manifest(
_raft->ntp(),
_raft->log_config().get_initial_revision(),
_raft->log_config().get_remote_revision(),
_manifest->mem_tracker(),
snap.start_offset,
snap.last_offset,
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/archival/archiver_operations_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1088,7 +1088,7 @@ class cluster_partition : public detail::cluster_partition_api {
}

model::initial_revision_id get_initial_revision() const override {
return _part->log()->config().get_initial_revision();
return _part->log()->config().get_remote_revision();
}

// aborted transactions
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ ntp_archiver::ntp_archiver(
ss::shared_ptr<archiver_operations_api> ops,
ss::shared_ptr<archiver_scheduler_api<ss::lowres_clock>> sched)
: _ntp(ntp.ntp())
, _rev(ntp.get_initial_revision())
, _rev(ntp.get_remote_revision())
, _remote(remote)
, _cache(c)
, _parent(parent)
Expand Down
9 changes: 5 additions & 4 deletions src/v/cluster/archival/purger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,8 @@ ss::future<housekeeping_job::run_result> purger::run(run_quota_t quota) {
"Dropping lifecycle marker {}, is not suitable for remote purge",
marker.config.tp_ns);

co_await _topics_frontend.local().purged_topic(nt_revision, 5s);
co_await _topics_frontend.local().purged_topic(
nt_revision, cluster::topic_purge_domain::cloud_storage, 5s);
continue;
}

Expand Down Expand Up @@ -548,10 +549,10 @@ ss::future<housekeeping_job::run_result> purger::run(run_quota_t quota) {
co_return result;
}

// All topic-specific bucket contents are gone, we may erase
// our controller tombstone.
// All topic-specific bucket contents are gone, we may mark the
// topic as purged in the topic table.
auto purge_result = co_await _topics_frontend.local().purged_topic(
nt_revision, 5s);
nt_revision, cluster::topic_purge_domain::cloud_storage, 5s);
if (purge_result.ec != cluster::errc::success) {
auto errc = cluster::make_error_code(purge_result.ec);
// Just log: this will get retried next time the scrubber runs
Expand Down
8 changes: 4 additions & 4 deletions src/v/cluster/archival/tests/archival_metadata_stm_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ void check_snapshot_size(
FIXTURE_TEST(test_snapshot_loading, archival_metadata_stm_base_fixture) {
create_raft();
auto& ntp_cfg = _raft->log_config();
partition_manifest m(ntp_cfg.ntp(), ntp_cfg.get_initial_revision());
partition_manifest m(ntp_cfg.ntp(), ntp_cfg.get_remote_revision());
m.add(
segment_name("0-1-v1.log"),
segment_meta{
Expand Down Expand Up @@ -392,7 +392,7 @@ FIXTURE_TEST(test_snapshot_loading, archival_metadata_stm_base_fixture) {
FIXTURE_TEST(test_sname_derivation, archival_metadata_stm_base_fixture) {
create_raft();
auto& ntp_cfg = _raft->log_config();
partition_manifest m(ntp_cfg.ntp(), ntp_cfg.get_initial_revision());
partition_manifest m(ntp_cfg.ntp(), ntp_cfg.get_remote_revision());

// original segments
m.add(
Expand Down Expand Up @@ -509,7 +509,7 @@ FIXTURE_TEST(
.committed_offset = model::offset(399),
.archiver_term = model::term_id(1),
.segment_term = model::term_id(1)});
partition_manifest pm(ntp_cfg.ntp(), ntp_cfg.get_initial_revision());
partition_manifest pm(ntp_cfg.ntp(), ntp_cfg.get_remote_revision());
for (const auto& s : m) {
auto name = cloud_storage::generate_local_segment_name(
s.base_offset, model::term_id{1});
Expand Down Expand Up @@ -651,7 +651,7 @@ FIXTURE_TEST(
archival_metadata_stm_base_fixture) {
create_raft();
auto& ntp_cfg = _raft->log_config();
partition_manifest m(ntp_cfg.ntp(), ntp_cfg.get_initial_revision());
partition_manifest m(ntp_cfg.ntp(), ntp_cfg.get_remote_revision());
m.add(
segment_name("0-1-v1.log"),
segment_meta{
Expand Down
83 changes: 42 additions & 41 deletions src/v/cluster/archival/tests/archival_service_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,46 @@ constexpr int16_t fixture_port_number = 7676;
inline ss::logger arch_fixture_log("archival_service_fixture");
} // namespace

inline auto
get_cloud_storage_configurations(std::string_view hosthame, uint16_t port) {
net::unresolved_address server_addr(ss::sstring(hosthame), port);
cloud_storage_clients::s3_configuration s3_conf;
s3_conf.uri = cloud_storage_clients::access_point_uri(hosthame);
s3_conf.access_key = cloud_roles::public_key_str("access-key");
s3_conf.secret_key = cloud_roles::private_key_str("secret-key");
s3_conf.region = cloud_roles::aws_region_name("us-east-1");
s3_conf.url_style = cloud_storage_clients::s3_url_style::virtual_host;
s3_conf._probe = ss::make_shared<cloud_storage_clients::client_probe>(
net::metrics_disabled::yes,
net::public_metrics_disabled::yes,
cloud_roles::aws_region_name{},
cloud_storage_clients::endpoint_url{});
s3_conf.server_addr = server_addr;

archival::configuration a_conf{
.cloud_storage_initial_backoff = config::mock_binding(100ms),
.segment_upload_timeout = config::mock_binding(1000ms),
.manifest_upload_timeout = config::mock_binding(1000ms),
.garbage_collect_timeout = config::mock_binding(1000ms),
.upload_loop_initial_backoff = config::mock_binding(100ms),
.upload_loop_max_backoff = config::mock_binding(5000ms)};
a_conf.bucket_name = cloud_storage_clients::bucket_name("test-bucket");
a_conf.ntp_metrics_disabled = archival::per_ntp_metrics_disabled::yes;
a_conf.svc_metrics_disabled = archival::service_metrics_disabled::yes;
a_conf.time_limit = std::nullopt;

cloud_storage::configuration c_conf;
c_conf.client_config = s3_conf;
c_conf.bucket_name = cloud_storage_clients::bucket_name("test-bucket");
c_conf.connection_limit = archival::connection_limit(2);
c_conf.cloud_credentials_source
= model::cloud_credentials_source::config_file;
return std::make_tuple(
std::move(s3_conf),
ss::make_lw_shared<archival::configuration>(std::move(a_conf)),
c_conf);
}

class archiver_cluster_fixture
: public cluster_test_fixture
, public http_imposter_fixture {
Expand All @@ -54,51 +94,12 @@ class archiver_cluster_fixture
static constexpr int proxy_port_base = 8082;
static constexpr int schema_reg_port_base = 8081;

auto get_configurations() {
net::unresolved_address server_addr(
ss::sstring(httpd_host_name), httpd_port_number());
cloud_storage_clients::s3_configuration s3_conf;
s3_conf.uri = cloud_storage_clients::access_point_uri(httpd_host_name);
s3_conf.access_key = cloud_roles::public_key_str("access-key");
s3_conf.secret_key = cloud_roles::private_key_str("secret-key");
s3_conf.region = cloud_roles::aws_region_name("us-east-1");
s3_conf.url_style = cloud_storage_clients::s3_url_style::virtual_host;
s3_conf._probe = ss::make_shared<cloud_storage_clients::client_probe>(
net::metrics_disabled::yes,
net::public_metrics_disabled::yes,
cloud_roles::aws_region_name{},
cloud_storage_clients::endpoint_url{});
s3_conf.server_addr = server_addr;

archival::configuration a_conf{
.cloud_storage_initial_backoff = config::mock_binding(100ms),
.segment_upload_timeout = config::mock_binding(1000ms),
.manifest_upload_timeout = config::mock_binding(1000ms),
.garbage_collect_timeout = config::mock_binding(1000ms),
.upload_loop_initial_backoff = config::mock_binding(100ms),
.upload_loop_max_backoff = config::mock_binding(5000ms)};
a_conf.bucket_name = cloud_storage_clients::bucket_name("test-bucket");
a_conf.ntp_metrics_disabled = archival::per_ntp_metrics_disabled::yes;
a_conf.svc_metrics_disabled = archival::service_metrics_disabled::yes;
a_conf.time_limit = std::nullopt;

cloud_storage::configuration c_conf;
c_conf.client_config = s3_conf;
c_conf.bucket_name = cloud_storage_clients::bucket_name("test-bucket");
c_conf.connection_limit = archival::connection_limit(2);
c_conf.cloud_credentials_source
= model::cloud_credentials_source::config_file;
return std::make_tuple(
std::move(s3_conf),
ss::make_lw_shared<archival::configuration>(std::move(a_conf)),
c_conf);
}

public:
model::node_id add_node() {
auto id = model::node_id((int)apps.size());

auto [s3_conf, a_conf, cs_conf] = get_configurations();
auto [s3_conf, a_conf, cs_conf] = get_cloud_storage_configurations(
httpd_host_name, httpd_port_number());
auto node = create_node_application(
id,
kafka_port_base,
Expand Down
56 changes: 33 additions & 23 deletions src/v/cluster/controller_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1368,46 +1368,56 @@ ss::future<std::error_code> controller_backend::create_partition(
log_revision,
initial_replicas);

auto cfg = _topics.local().get_topic_cfg(model::topic_namespace_view(ntp));
if (!cfg) {
// partition was already removed, do nothing
co_return errc::success;
}

auto ec = co_await _shard_placement.prepare_create(ntp, log_revision);
if (ec) {
co_return ec;
}

topic_configuration cfg;
model::revision_id topic_rev;
model::initial_revision_id remote_rev;
{
auto topic_md = _topics.local().get_topic_metadata_ref(
model::topic_namespace_view(ntp));
if (!topic_md) {
// topic was already removed, do nothing
co_return errc::success;
}
cfg = topic_md->get().get_configuration();
topic_rev = topic_md->get().get_revision();
// Remote revision is used for cloud storage paths. If the topic was
// recovered, this is the value from the original manifest, and if topic
// is read replica, the value from remote topic manifest is used.
remote_rev = topic_md->get().get_remote_revision().value_or(
model::initial_revision_id{topic_rev});
}

// handle partially created topic
auto partition = _partition_manager.local().get(ntp);

// initial revision of the partition on the moment when it was created
// the value is used by shadow indexing
// if topic is read replica, the value from remote topic manifest is
// used
auto initial_rev = _topics.local().get_initial_revision(ntp);
if (!initial_rev) {
co_return errc::topic_not_exists;
}
// no partition exists, create one
if (likely(!partition)) {
std::vector<model::broker> initial_brokers = create_brokers_set(
initial_replicas, _members_table.local());

std::optional<cloud_storage_clients::bucket_name> read_replica_bucket;
if (cfg->is_read_replica()) {
if (cfg.is_read_replica()) {
read_replica_bucket = cloud_storage_clients::bucket_name(
cfg->properties.read_replica_bucket.value());
cfg.properties.read_replica_bucket.value());
}

std::optional<xshard_transfer_state> xst_state;
if (auto it = _xst_states.find(ntp); it != _xst_states.end()) {
xst_state = it->second;
}
auto ntp_config = cfg->make_ntp_config(
_data_directory, ntp.tp.partition, log_revision, initial_rev.value());
auto rtp = cfg->properties.remote_topic_properties;

auto ntp_config = cfg.make_ntp_config(
_data_directory,
ntp.tp.partition,
log_revision,
topic_rev,
remote_rev);
auto rtp = cfg.properties.remote_topic_properties;
const bool is_cloud_topic = ntp_config.is_archival_enabled()
|| ntp_config.is_remote_fetch_enabled();
const bool is_internal = ntp.ns == model::kafka_internal_namespace;
Expand All @@ -1423,7 +1433,7 @@ ss::future<std::error_code> controller_backend::create_partition(
// topic being cloud enabled implies existence of overrides
ntp_config.get_overrides().recovery_enabled
= storage::topic_recovery_enabled::yes;
rtp.emplace(*initial_rev, cfg->partition_count);
rtp.emplace(remote_rev, cfg.partition_count);
}
// we use offset as an rev as it is always increasing and it
// increases while ntp is being created again
Expand All @@ -1437,8 +1447,8 @@ ss::future<std::error_code> controller_backend::create_partition(
std::move(xst_state),
rtp,
read_replica_bucket,
cfg->properties.remote_label,
cfg->properties.remote_topic_namespace_override);
cfg.properties.remote_label,
cfg.properties.remote_topic_namespace_override);

_xst_states.erase(ntp);

Expand All @@ -1464,7 +1474,7 @@ ss::future<std::error_code> controller_backend::create_partition(
auto partition = _partition_manager.local().get(ntp);
if (partition) {
partition->set_topic_config(
std::make_unique<topic_configuration>(std::move(*cfg)));
std::make_unique<topic_configuration>(std::move(cfg)));
}
}

Expand Down
7 changes: 7 additions & 0 deletions src/v/cluster/controller_snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ ss::future<> topics_t::serde_async_write(iobuf& out) {
serde::write(out, highest_group_id);
co_await write_map_async(out, std::move(lifecycle_markers));
co_await write_map_async(out, partitions_to_force_recover);
co_await write_map_async(out, std::move(iceberg_tombstones));
}

ss::future<>
Expand All @@ -152,6 +153,12 @@ topics_t::serde_async_read(iobuf_parser& in, const serde::header h) {
in, h._bytes_left_limit);
}

if (h._version >= 2) {
iceberg_tombstones
= co_await read_map_async_nested<decltype(iceberg_tombstones)>(
in, h._bytes_left_limit);
}

if (in.bytes_left() > h._bytes_left_limit) {
in.skip(in.bytes_left() - h._bytes_left_limit);
}
Expand Down
9 changes: 8 additions & 1 deletion src/v/cluster/controller_snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ struct config_t

struct topics_t
: public serde::
envelope<topics_t, serde::version<1>, serde::compat_version<0>> {
envelope<topics_t, serde::version<2>, serde::compat_version<0>> {
// NOTE: layout here is a bit different than in the topic table because it
// allows more compact storage and more convenient generation of controller
// backend deltas when applying the snapshot.
Expand Down Expand Up @@ -193,6 +193,13 @@ struct topics_t

force_recoverable_partitions_t partitions_to_force_recover;

chunked_hash_map<
model::topic_namespace,
nt_iceberg_tombstone,
model::topic_namespace_hash,
model::topic_namespace_eq>
iceberg_tombstones;

friend bool operator==(const topics_t&, const topics_t&) = default;

ss::future<> serde_async_write(iobuf&);
Expand Down
10 changes: 7 additions & 3 deletions src/v/cluster/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1205,7 +1205,7 @@ partition::unsafe_reset_remote_partition_manifest_from_json(iobuf json_buf) {

// Deserialise provided manifest
cloud_storage::partition_manifest req_m{
_raft->ntp(), _raft->log_config().get_initial_revision()};
_raft->ntp(), _raft->log_config().get_remote_revision()};
req_m.update_with_json(std::move(json_buf));

co_await replicate_unsafe_reset(std::move(req_m));
Expand Down Expand Up @@ -1292,7 +1292,7 @@ partition::fetch_latest_cloud_offset_from_manifest(
co_return errc::invalid_partition_operation;
}

const auto initial_rev = _raft->log_config().get_initial_revision();
const auto initial_rev = _raft->log_config().get_remote_revision();
const auto bucket = [this]() {
if (is_read_replica_mode_enabled()) {
return get_read_replica_bucket();
Expand Down Expand Up @@ -1338,7 +1338,7 @@ partition::fetch_latest_cloud_offset_from_manifest(

ss::future<>
partition::do_unsafe_reset_remote_partition_manifest_from_cloud(bool force) {
const auto initial_rev = _raft->log_config().get_initial_revision();
const auto initial_rev = _raft->log_config().get_remote_revision();
const auto bucket = [this]() {
if (is_read_replica_mode_enabled()) {
return get_read_replica_bucket();
Expand Down Expand Up @@ -1647,6 +1647,10 @@ model::revision_id partition::get_log_revision_id() const {
return _raft->log_config().get_revision();
}

model::revision_id partition::get_topic_revision_id() const {
return _raft->log_config().get_topic_revision();
}

std::optional<model::node_id> partition::get_leader_id() const {
return _raft->get_leader_id();
}
Expand Down
Loading

0 comments on commit 531a357

Please sign in to comment.