From 70b29f52c8d6130aaf319a703cc3c1c74ba31a91 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Tue, 31 Oct 2023 11:48:03 +0000 Subject: [PATCH 1/7] archival: use named sempahore for active uploads This commit changes the `_uploads_active` semaphore to a named one. Specifying the clock allows us to use the deadline interface of `ss::get_units`. --- src/v/archival/ntp_archiver_service.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/v/archival/ntp_archiver_service.h b/src/v/archival/ntp_archiver_service.h index 9716aec9db469..d952cfffc1540 100644 --- a/src/v/archival/ntp_archiver_service.h +++ b/src/v/archival/ntp_archiver_service.h @@ -625,7 +625,7 @@ class ntp_archiver { // Held while the inner segment upload/manifest sync loop is running, // to enable code that uses _paused to wait until ongoing activity // has stopped. - ss::semaphore _uploads_active{1}; + ssx::named_semaphore _uploads_active{1, "uploads_active"}; config::binding _housekeeping_interval; simple_time_jitter _housekeeping_jitter; From 196e42cfdfa13329660ce36951945c63b396d8eb Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Tue, 31 Oct 2023 11:53:51 +0000 Subject: [PATCH 2/7] cluster: add unsafe reset from cloud for ntp manifest `partition` gains a new function which resets the partition's manifest to the manifest uploaded to cloud storage. If the downloaded manifest has its last offset below the start offset of the local log, we refuse the change since it would lead to data loss. I also refactored the manifest reset methods a bit to share code and added some comments to the method definitions. --- src/v/cluster/partition.cc | 181 +++++++++++++++++++++++++++------ src/v/cluster/partition.h | 22 +++- src/v/redpanda/admin_server.cc | 5 +- 3 files changed, 175 insertions(+), 33 deletions(-) diff --git a/src/v/cluster/partition.cc b/src/v/cluster/partition.cc index 9754d35a9a2be..8c28c86ee744d 100644 --- a/src/v/cluster/partition.cc +++ b/src/v/cluster/partition.cc @@ -26,6 +26,7 @@ #include "raft/state_machine_manager.h" #include "raft/types.h" +#include #include #include @@ -1101,8 +1102,52 @@ partition::get_follower_metrics() const { return _raft->get_follower_metrics(); } -ss::future<> partition::unsafe_reset_remote_partition_manifest(iobuf buf) { - vlog(clusterlog.info, "[{}] Unsafe manifest reset requested", ntp()); +ss::future<> +partition::replicate_unsafe_reset(cloud_storage::partition_manifest manifest) { + vlog( + clusterlog.info, + "Replicating replace manifest command. New manifest details: {{ " + "start_offset: {}, last_offset: {}}}", + manifest.get_start_offset(), + manifest.get_last_offset()); + + // Replicate the reset command which contains the downloaded manifest + auto sync_timeout = config::shard_local_cfg() + .cloud_storage_metadata_sync_timeout_ms.value(); + auto replication_deadline = ss::lowres_clock::now() + sync_timeout; + std::vector builders; + + auto reset_builder = _archival_meta_stm->batch_start( + replication_deadline, _as); + reset_builder.replace_manifest(manifest.to_iobuf()); + + auto errc = co_await reset_builder.replicate(); + if (errc) { + if (errc == raft::errc::shutting_down) { + // During shutdown, act like we hit an abort source rather + // than trying to log+handle this like a write error. + throw ss::abort_requested_exception(); + } + + vlog( + clusterlog.warn, + "[{}] Unsafe reset failed to update archival STM: " + "{}", + ntp(), + errc.message()); + throw std::runtime_error( + fmt::format("Failed to update archival STM: {}", errc.message())); + } + + vlog( + clusterlog.info, + "[{}] Unsafe reset replicated STM commands successfully", + ntp()); +} + +ss::future<> +partition::unsafe_reset_remote_partition_manifest_from_json(iobuf json_buf) { + vlog(clusterlog.info, "[{}] Manual unsafe manifest reset requested", ntp()); if (!(config::shard_local_cfg().cloud_storage_enabled() && _archival_meta_stm)) { @@ -1124,45 +1169,121 @@ ss::future<> partition::unsafe_reset_remote_partition_manifest(iobuf buf) { // Deserialise provided manifest cloud_storage::partition_manifest req_m{ _raft->ntp(), _raft->log_config().get_initial_revision()}; - req_m.update_with_json(std::move(buf)); + req_m.update_with_json(std::move(json_buf)); - // A generous timeout of 60 seconds is used as it applies - // for the replication multiple batches. - auto deadline = ss::lowres_clock::now() + 60s; - std::vector builders; - - auto reset_builder = _archival_meta_stm->batch_start(deadline, _as); - - // Add command to replace manifest. When applied, the current manifest - // will be replaced with the one provided. - reset_builder.replace_manifest(req_m.to_iobuf()); + co_await replicate_unsafe_reset(std::move(req_m)); +} +ss::future<> partition::unsafe_reset_remote_partition_manifest_from_cloud() { vlog( clusterlog.info, - "Replicating replace manifest command. New manifest start offset: {}", - req_m.get_start_offset()); + "[{}] Unsafe manifest reset from cloud state requested", + ntp()); - auto errc = co_await reset_builder.replicate(); - if (errc) { - if (errc == raft::errc::shutting_down) { - // During shutdown, act like we hit an abort source rather - // than trying to log+handle this like a write error. - throw ss::abort_requested_exception(); + if (!(config::shard_local_cfg().cloud_storage_enabled() + && _archival_meta_stm)) { + vlog( + clusterlog.warn, + "[{}] Archival STM not present. Skipping unsafe reset ...", + ntp()); + throw std::runtime_error("Archival STM not present"); + } + + // Stop the archiver and its housekeeping jobs + if (_archiver) { + _upload_housekeeping.local().deregister_jobs( + _archiver->get_housekeeping_jobs()); + co_await _archiver->stop(); + _archiver = nullptr; + } + + auto start_archiver = [this]() { + maybe_construct_archiver(); + if (_archiver) { + // Topic configs may have changed while the archiver was + // stopped, so mark them as dirty just in case. + _archiver->notify_topic_config(); + return _archiver->start(); } + return ss::now(); + }; + + // Ensure that all commands on the log have been applied. + // No new archival commands should be replicated now. + auto sync_timeout = config::shard_local_cfg() + .cloud_storage_metadata_sync_timeout_ms.value(); + auto sync_result = co_await ss::coroutine::as_future( + _archival_meta_stm->sync(sync_timeout)); + if (sync_result.failed() || sync_result.get() == false) { vlog( clusterlog.warn, - "[{}] Unsafe reset failed to update archival STM: {}", - ntp(), - errc.message()); - throw std::runtime_error( - fmt::format("Failed to update archival STM: {}", errc.message())); + "[{}] Could not sync with log. Skipping unsafe reset ...", + ntp()); + + co_await start_archiver(); + throw std::runtime_error("Could not sync with log"); } - vlog( - clusterlog.info, - "[{}] Unsafe reset replicated STM commands successfully", - ntp()); + // Attempt the reset + auto future_result = co_await ss::coroutine::as_future( + do_unsafe_reset_remote_partition_manifest_from_cloud()); + + // Reconstruct the archiver and start it if needed + co_await start_archiver(); + + // Rethrow the exception if we failed to reset + future_result.get(); +} + +ss::future<> partition::do_unsafe_reset_remote_partition_manifest_from_cloud() { + const auto initial_rev = _raft->log_config().get_initial_revision(); + const auto bucket = [this]() { + if (is_read_replica_mode_enabled()) { + return get_read_replica_bucket(); + } + + const auto& bucket_config + = cloud_storage::configuration::get_bucket_config(); + vassert( + bucket_config.value(), + "configuration property {} must be set", + bucket_config.name()); + + return cloud_storage_clients::bucket_name{ + bucket_config.value().value()}; + }(); + + // Download the current partition manifest from the cloud + cloud_storage::partition_manifest new_manifest{ntp(), initial_rev}; + + auto timeout + = config::shard_local_cfg().cloud_storage_manifest_upload_timeout_ms(); + auto backoff = config::shard_local_cfg().cloud_storage_initial_backoff_ms(); + + retry_chain_node rtc(_as, timeout, backoff); + auto [res, res_fmt] + = co_await _cloud_storage_api.local().try_download_partition_manifest( + bucket, new_manifest, rtc); + + if (res != cloud_storage::download_result::success) { + throw std::runtime_error(ssx::sformat( + "Failed to download partition manifest with error: {}", res)); + } + + const auto max_collectible + = _raft->log()->stm_manager()->max_collectible_offset(); + if (new_manifest.get_last_offset() < max_collectible) { + throw std::runtime_error(ssx::sformat( + "Applying the cloud manifest would cause data loss since the last " + "offset in the downloaded manifest is below the max_collectible " + "offset " + "{} < {}", + new_manifest.get_last_offset(), + max_collectible)); + } + + co_await replicate_unsafe_reset(std::move(new_manifest)); } std::ostream& operator<<(std::ostream& o, const partition& x) { diff --git a/src/v/cluster/partition.h b/src/v/cluster/partition.h index c72333483f460..647dd962068d5 100644 --- a/src/v/cluster/partition.h +++ b/src/v/cluster/partition.h @@ -452,9 +452,29 @@ class partition { result> get_follower_metrics() const; - ss::future<> unsafe_reset_remote_partition_manifest(iobuf buf); + // Attempt to reset the partition manifest of a cloud storage partition + // from an iobuf containing the JSON representation of the manifest. + // + // Warning: in order to call this safely, one must stop the archiver + // manually whilst ensuring that the max collectible offset reported + // by the archival metadata STM remains stable. Prefer its sibling + // which resets from the cloud state. + // + // Returns a failed future if unsuccessful. + ss::future<> + unsafe_reset_remote_partition_manifest_from_json(iobuf json_buf); + + // Attempt to reset the partition manifest of a cloud storage partition + // to the one last uploaded to cloud storage. + // Returns a failed future if unsuccessful. + ss::future<> unsafe_reset_remote_partition_manifest_from_cloud(); private: + ss::future<> + replicate_unsafe_reset(cloud_storage::partition_manifest manifest); + + ss::future<> do_unsafe_reset_remote_partition_manifest_from_cloud(); + ss::future> cloud_storage_timequery(storage::timequery_config); diff --git a/src/v/redpanda/admin_server.cc b/src/v/redpanda/admin_server.cc index ab2a4c778745b..ccb867fb220d8 100644 --- a/src/v/redpanda/admin_server.cc +++ b/src/v/redpanda/admin_server.cc @@ -5094,8 +5094,9 @@ admin_server::unsafe_reset_metadata( buf.append(content.data(), content.size()); content = {}; - return partition->unsafe_reset_remote_partition_manifest( - std::move(buf)); + return partition + ->unsafe_reset_remote_partition_manifest_from_json( + std::move(buf)); }); } catch (const std::runtime_error& err) { throw ss::httpd::server_error_exception(err.what()); From 7b9e57d6234b516648cc3491756f8205eb63c30f Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Tue, 31 Oct 2023 11:56:33 +0000 Subject: [PATCH 3/7] admin: add endpoint for resetting meta from cloud A new endpoint is added: `/v1/cloud_storage/unsafe_reset_metadata_from_cloud/...`. It resets the manifest to the one in cloud storage, updating all replicas with the given manifest. The request is refused if applying the change would cause data loss. --- .../admin/api-doc/shadow_indexing.json | 37 +++++++++++++ src/v/redpanda/admin_server.cc | 52 +++++++++++++++++++ src/v/redpanda/admin_server.h | 3 ++ 3 files changed, 92 insertions(+) diff --git a/src/v/redpanda/admin/api-doc/shadow_indexing.json b/src/v/redpanda/admin/api-doc/shadow_indexing.json index c2a7f66e4abdb..49fc23fe4e19f 100644 --- a/src/v/redpanda/admin/api-doc/shadow_indexing.json +++ b/src/v/redpanda/admin/api-doc/shadow_indexing.json @@ -218,6 +218,43 @@ ] } ] + }, + { + "path": "/v1/cloud_storage/unsafe_reset_metadata_from_cloud/{namespace}/{topic}/{partition}", + "operations": [ + { + "method": "POST", + "summary": "Resets the manifest to the one in cloud storage, updating all replicas with the given manifest. The request is refused if applying the change would cause data loss.", + "operationId": "unsafe_reset_metadata_from_cloud", + "nickname": "unsafe_reset_metadata_from_cloud", + "parameters": [ + { + "name": "namespace", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "topic", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "partition", + "in": "path", + "required": true, + "type": "integer" + } + ], + "responseMessages": [ + { + "code": 200, + "message": "Partition metadata is reset" + } + ] + } + ] } ], "models": { diff --git a/src/v/redpanda/admin_server.cc b/src/v/redpanda/admin_server.cc index ccb867fb220d8..22f0f3ab9bfe5 100644 --- a/src/v/redpanda/admin_server.cc +++ b/src/v/redpanda/admin_server.cc @@ -5700,6 +5700,48 @@ admin_server::get_cloud_storage_anomalies( co_return map_anomalies_to_json(ntp, *initial_rev, *status); } +ss::future> +admin_server::unsafe_reset_metadata_from_cloud( + std::unique_ptr request, + std::unique_ptr reply) { + reply->set_content_type("json"); + + auto ntp = parse_ntp_from_request(request->param); + if (need_redirect_to_leader(ntp, _metadata_cache)) { + vlog( + logger.info, + "Need to redirect unsafe reset metadata from cloud request"); + throw co_await redirect_to_leader(*request, ntp); + } + + const auto shard = _shard_table.local().shard_for(ntp); + if (!shard) { + throw ss::httpd::not_found_exception(fmt::format( + "{} could not be found on the node. Perhaps it has been moved " + "during the redirect.", + ntp)); + } + + try { + co_await _partition_manager.invoke_on( + *shard, [ntp = std::move(ntp), shard](auto& pm) { + auto partition = pm.get(ntp); + if (!partition) { + throw ss::httpd::not_found_exception( + fmt::format("Could not find {} on shard {}", ntp, *shard)); + } + + return partition + ->unsafe_reset_remote_partition_manifest_from_cloud(); + }); + } catch (const std::runtime_error& err) { + throw ss::httpd::server_error_exception(err.what()); + } + + reply->set_status(ss::http::reply::status_type::ok); + co_return reply; +} + void admin_server::register_shadow_indexing_routes() { register_route( ss::httpd::shadow_indexing_json::sync_local_state, @@ -5753,6 +5795,16 @@ void admin_server::register_shadow_indexing_routes() { register_route( ss::httpd::shadow_indexing_json::get_cloud_storage_anomalies, [this](auto req) { return get_cloud_storage_anomalies(std::move(req)); }); + + request_handler_fn unsafe_reset_metadata_from_cloud_handler = + [this](auto req, auto reply) { + return unsafe_reset_metadata_from_cloud( + std::move(req), std::move(reply)); + }; + + register_route( + ss::httpd::shadow_indexing_json::unsafe_reset_metadata_from_cloud, + std::move(unsafe_reset_metadata_from_cloud_handler)); } constexpr std::string_view to_string_view(service_kind kind) { diff --git a/src/v/redpanda/admin_server.h b/src/v/redpanda/admin_server.h index c5b28e0cbf543..89e9cb3aea463 100644 --- a/src/v/redpanda/admin_server.h +++ b/src/v/redpanda/admin_server.h @@ -487,6 +487,9 @@ class admin_server { sync_local_state_handler(std::unique_ptr); ss::future> unsafe_reset_metadata( std::unique_ptr, std::unique_ptr); + ss::future> + unsafe_reset_metadata_from_cloud( + std::unique_ptr, std::unique_ptr); ss::future> initiate_topic_scan_and_recovery( std::unique_ptr, std::unique_ptr); From b2d7db19c22a8bb7c0ea31c354ca011f9d774b87 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Thu, 2 Nov 2023 10:45:06 +0000 Subject: [PATCH 4/7] cluster: add force flag to reset from cloud When resetting the manifest from cloud we check that the last offset in the manifest is not below the max collectible offset of the partition as a safety measure. In some circumstances, this check may be too strict: read replicas and topics where remote write is turned off, will have their collectible offset pegged at `offset::max`. This commit adds a force flag to the admin request which can be used to deal with such cases. The flag is `false` by default. --- src/v/cluster/partition.cc | 22 ++++++++++++++----- src/v/cluster/partition.h | 9 ++++++-- .../admin/api-doc/shadow_indexing.json | 8 ++++++- src/v/redpanda/admin_server.cc | 6 +++-- 4 files changed, 35 insertions(+), 10 deletions(-) diff --git a/src/v/cluster/partition.cc b/src/v/cluster/partition.cc index 8c28c86ee744d..23bfb7c1c0113 100644 --- a/src/v/cluster/partition.cc +++ b/src/v/cluster/partition.cc @@ -1174,7 +1174,8 @@ partition::unsafe_reset_remote_partition_manifest_from_json(iobuf json_buf) { co_await replicate_unsafe_reset(std::move(req_m)); } -ss::future<> partition::unsafe_reset_remote_partition_manifest_from_cloud() { +ss::future<> +partition::unsafe_reset_remote_partition_manifest_from_cloud(bool force) { vlog( clusterlog.info, "[{}] Unsafe manifest reset from cloud state requested", @@ -1227,7 +1228,7 @@ ss::future<> partition::unsafe_reset_remote_partition_manifest_from_cloud() { // Attempt the reset auto future_result = co_await ss::coroutine::as_future( - do_unsafe_reset_remote_partition_manifest_from_cloud()); + do_unsafe_reset_remote_partition_manifest_from_cloud(force)); // Reconstruct the archiver and start it if needed co_await start_archiver(); @@ -1236,7 +1237,8 @@ ss::future<> partition::unsafe_reset_remote_partition_manifest_from_cloud() { future_result.get(); } -ss::future<> partition::do_unsafe_reset_remote_partition_manifest_from_cloud() { +ss::future<> +partition::do_unsafe_reset_remote_partition_manifest_from_cloud(bool force) { const auto initial_rev = _raft->log_config().get_initial_revision(); const auto bucket = [this]() { if (is_read_replica_mode_enabled()) { @@ -1274,13 +1276,23 @@ ss::future<> partition::do_unsafe_reset_remote_partition_manifest_from_cloud() { const auto max_collectible = _raft->log()->stm_manager()->max_collectible_offset(); if (new_manifest.get_last_offset() < max_collectible) { - throw std::runtime_error(ssx::sformat( + auto msg = ssx::sformat( "Applying the cloud manifest would cause data loss since the last " "offset in the downloaded manifest is below the max_collectible " "offset " "{} < {}", new_manifest.get_last_offset(), - max_collectible)); + max_collectible); + + if (!force) { + throw std::runtime_error(msg); + } + + vlog( + clusterlog.warn, + "[{}] {}. Proceeding since the force flag was used.", + ntp(), + msg); } co_await replicate_unsafe_reset(std::move(new_manifest)); diff --git a/src/v/cluster/partition.h b/src/v/cluster/partition.h index 647dd962068d5..0b07ad8c2c222 100644 --- a/src/v/cluster/partition.h +++ b/src/v/cluster/partition.h @@ -466,14 +466,19 @@ class partition { // Attempt to reset the partition manifest of a cloud storage partition // to the one last uploaded to cloud storage. + // + // If `force` is true, the safety checks will be disregarded, which + // may lead to data loss. + // // Returns a failed future if unsuccessful. - ss::future<> unsafe_reset_remote_partition_manifest_from_cloud(); + ss::future<> unsafe_reset_remote_partition_manifest_from_cloud(bool force); private: ss::future<> replicate_unsafe_reset(cloud_storage::partition_manifest manifest); - ss::future<> do_unsafe_reset_remote_partition_manifest_from_cloud(); + ss::future<> + do_unsafe_reset_remote_partition_manifest_from_cloud(bool force); ss::future> cloud_storage_timequery(storage::timequery_config); diff --git a/src/v/redpanda/admin/api-doc/shadow_indexing.json b/src/v/redpanda/admin/api-doc/shadow_indexing.json index 49fc23fe4e19f..8d510388923db 100644 --- a/src/v/redpanda/admin/api-doc/shadow_indexing.json +++ b/src/v/redpanda/admin/api-doc/shadow_indexing.json @@ -224,7 +224,7 @@ "operations": [ { "method": "POST", - "summary": "Resets the manifest to the one in cloud storage, updating all replicas with the given manifest. The request is refused if applying the change would cause data loss.", + "summary": "Resets the manifest to the one in cloud storage, updating all replicas with the given manifest. The request is refused if applying the change would cause data loss and the force query parameter is unspecified or false.", "operationId": "unsafe_reset_metadata_from_cloud", "nickname": "unsafe_reset_metadata_from_cloud", "parameters": [ @@ -245,6 +245,12 @@ "in": "path", "required": true, "type": "integer" + }, + { + "name": "force", + "in": "query", + "required": false, + "type": "boolean" } ], "responseMessages": [ diff --git a/src/v/redpanda/admin_server.cc b/src/v/redpanda/admin_server.cc index 22f0f3ab9bfe5..2dd968338d869 100644 --- a/src/v/redpanda/admin_server.cc +++ b/src/v/redpanda/admin_server.cc @@ -5722,9 +5722,11 @@ admin_server::unsafe_reset_metadata_from_cloud( ntp)); } + bool force = get_boolean_query_param(*request, "force"); + try { co_await _partition_manager.invoke_on( - *shard, [ntp = std::move(ntp), shard](auto& pm) { + *shard, [ntp = std::move(ntp), shard, force](auto& pm) { auto partition = pm.get(ntp); if (!partition) { throw ss::httpd::not_found_exception( @@ -5732,7 +5734,7 @@ admin_server::unsafe_reset_metadata_from_cloud( } return partition - ->unsafe_reset_remote_partition_manifest_from_cloud(); + ->unsafe_reset_remote_partition_manifest_from_cloud(force); }); } catch (const std::runtime_error& err) { throw ss::httpd::server_error_exception(err.what()); From bad6a57ab325e1bb23be17ac409f0b9f1b502af0 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Thu, 2 Nov 2023 17:30:05 +0000 Subject: [PATCH 5/7] cluster: fix race between leader move and unsafe reset Before transferring leadership, we inform the archiver via `prepare_transfer_leadership`. Similarly, when the leadership transfer is complete we call `complete_transfer_leadership`. Previously, the code assumed that the archiver outlives the leadership transfer. That's not necessarily the case. We destroy the archiver when changing the topic configs and when performing a rest of the manifest from the cloud. The fix is to check if the archiver is alive after the leadership transfer completed and call the respective method only in that case. Why is this correct? `complete_transfer_leadership` unpauses the archiver. If the archiver is not present at the point when we want to unpause, it will be created later in its default unpaused state. --- src/v/cluster/partition.cc | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/v/cluster/partition.cc b/src/v/cluster/partition.cc index 23bfb7c1c0113..17490f82fc19a 100644 --- a/src/v/cluster/partition.cc +++ b/src/v/cluster/partition.cc @@ -1051,8 +1051,12 @@ partition::transfer_leadership(transfer_leadership_request req) { auto archival_timeout = config::shard_local_cfg().cloud_storage_graceful_transfer_timeout_ms(); if (_archiver && archival_timeout.has_value()) { - complete_archiver.emplace( - [a = _archiver.get()]() { a->complete_transfer_leadership(); }); + complete_archiver.emplace([this]() { + if (_archiver) { + _archiver->complete_transfer_leadership(); + } + }); + vlog( clusterlog.debug, "transfer_leadership[{}]: entering archiver prepare", From da1eb5e03310826aa2a084cbdb3405a7a257e4f0 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Fri, 3 Nov 2023 14:37:16 +0000 Subject: [PATCH 6/7] cluster: introduce archiver mutex to partition The partition may interact with the archiver at various points in its lifetime: * when it gets started/stopped * when leadership moves * when the topic configs are updated * when the manifest needs to be reset from the cloud These operations may race and lead to all sorts of issues like double starting or double stopping the archiver. In order to avoid such cases, a mutex is introduced to protect the archiver in this commit. It should be grabbed every time when the partition needs to start and/or stop the archiver. --- src/v/cluster/partition.cc | 75 ++++++++++++++++++++++++++++---------- src/v/cluster/partition.h | 5 +++ 2 files changed, 61 insertions(+), 19 deletions(-) diff --git a/src/v/cluster/partition.cc b/src/v/cluster/partition.cc index 17490f82fc19a..720d4a2632a61 100644 --- a/src/v/cluster/partition.cc +++ b/src/v/cluster/partition.cc @@ -542,8 +542,6 @@ ss::future<> partition::start(std::optional topic_cfg) { } } - maybe_construct_archiver(); - if (_cloud_storage_manifest_view) { co_await _cloud_storage_manifest_view->start(); } @@ -552,9 +550,19 @@ ss::future<> partition::start(std::optional topic_cfg) { co_await _cloud_storage_partition->start(); } - if (_archiver) { - co_await _archiver->start(); + { + auto archiver_reset_guard = co_await ssx::with_timeout_abortable( + ss::get_units(_archiver_reset_mutex, 1), + ss::lowres_clock::now() + archiver_reset_mutex_timeout, + _as); + + maybe_construct_archiver(); + + if (_archiver) { + co_await _archiver->start(); + } } + co_return co_await _raft->start(std::move(builder)); } @@ -563,14 +571,21 @@ ss::future<> partition::stop() { vlog(clusterlog.debug, "Stopping partition: {}", partition_ntp); _as.request_abort(); - if (_archiver) { - _upload_housekeeping.local().deregister_jobs( - _archiver->get_housekeeping_jobs()); - vlog( - clusterlog.debug, - "Stopping archiver on partition: {}", - partition_ntp); - co_await _archiver->stop(); + { + // `partition_manager::do_shutdown` (caller of stop) will assert + // out on any thrown exceptions. Hence, acquire the units without + // a timeout or abort source. + auto archiver_reset_guard = ss::get_units(_archiver_reset_mutex, 1); + + if (_archiver) { + _upload_housekeeping.local().deregister_jobs( + _archiver->get_housekeeping_jobs()); + vlog( + clusterlog.debug, + "Stopping archiver on partition: {}", + partition_ntp); + co_await _archiver->stop(); + } } if (_cloud_storage_partition) { @@ -760,16 +775,20 @@ partition::local_timequery(storage::timequery_config cfg) { co_return result; } -void partition::maybe_construct_archiver() { +bool partition::should_construct_archiver() { // NOTE: construct and archiver even if shadow indexing isn't enabled, e.g. // in the case of read replicas -- we still need the archiver to drive // manifest updates, etc. - auto& ntp_config = _raft->log()->config(); - if ( - config::shard_local_cfg().cloud_storage_enabled() - && _cloud_storage_api.local_is_initialized() - && _raft->ntp().ns == model::kafka_namespace - && (ntp_config.is_archival_enabled() || ntp_config.is_read_replica_mode_enabled())) { + const auto& ntp_config = _raft->log()->config(); + return config::shard_local_cfg().cloud_storage_enabled() + && _cloud_storage_api.local_is_initialized() + && _raft->ntp().ns == model::kafka_namespace + && (ntp_config.is_archival_enabled() || ntp_config.is_read_replica_mode_enabled()); +} + +void partition::maybe_construct_archiver() { + if (should_construct_archiver()) { + const auto& ntp_config = _raft->log()->config(); _archiver = std::make_unique( ntp_config, _archival_conf, @@ -869,6 +888,12 @@ ss::future<> partition::update_configuration(topic_properties properties) { "update_configuration[{}]: updating archiver for config {}", new_ntp_config, _raft->ntp()); + + auto archiver_reset_guard = co_await ssx::with_timeout_abortable( + ss::get_units(_archiver_reset_mutex, 1), + ss::lowres_clock::now() + archiver_reset_mutex_timeout, + _as); + if (_archiver) { _upload_housekeeping.local().deregister_jobs( _archiver->get_housekeeping_jobs()); @@ -1185,6 +1210,8 @@ partition::unsafe_reset_remote_partition_manifest_from_cloud(bool force) { "[{}] Unsafe manifest reset from cloud state requested", ntp()); + _as.check(); + if (!(config::shard_local_cfg().cloud_storage_enabled() && _archival_meta_stm)) { vlog( @@ -1194,6 +1221,14 @@ partition::unsafe_reset_remote_partition_manifest_from_cloud(bool force) { throw std::runtime_error("Archival STM not present"); } + std::optional archiver_reset_guard; + if (should_construct_archiver()) { + archiver_reset_guard = co_await ssx::with_timeout_abortable( + ss::get_units(_archiver_reset_mutex, 1), + ss::lowres_clock::now() + archiver_reset_mutex_timeout, + _as); + } + // Stop the archiver and its housekeeping jobs if (_archiver) { _upload_housekeeping.local().deregister_jobs( @@ -1230,6 +1265,8 @@ partition::unsafe_reset_remote_partition_manifest_from_cloud(bool force) { throw std::runtime_error("Could not sync with log"); } + _as.check(); + // Attempt the reset auto future_result = co_await ss::coroutine::as_future( do_unsafe_reset_remote_partition_manifest_from_cloud(force)); diff --git a/src/v/cluster/partition.h b/src/v/cluster/partition.h index 0b07ad8c2c222..432a5ab9897be 100644 --- a/src/v/cluster/partition.h +++ b/src/v/cluster/partition.h @@ -71,6 +71,7 @@ class partition { ss::future<> start(std::optional); ss::future<> stop(); + bool should_construct_archiver(); /// Part of constructor that we may sometimes need to do again /// after a configuration change. void maybe_construct_archiver(); @@ -511,7 +512,11 @@ class partition { ss::shared_ptr _cloud_storage_manifest_view; ss::shared_ptr _cloud_storage_partition; + + static constexpr auto archiver_reset_mutex_timeout = 10s; + ssx::semaphore _archiver_reset_mutex{1, "archiver_reset"}; std::unique_ptr _archiver; + std::optional _read_replica_bucket{ std::nullopt}; bool _remote_delete_enabled{storage::ntp_config::default_remote_delete}; From 4c2fd8d9bcfd6eb9b2cd0ad90673563675be78e0 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Wed, 1 Nov 2023 11:02:31 +0000 Subject: [PATCH 7/7] rptest: add duck test for manifest reset from cloud Test the unsafe_reset_metadata_from_cloud endpoint by repeatedly calling it to re-set the manifest from the uploaded one while producing to the partition. Once the produce finishes, wait for all uploads to complete and do a full read of the log to bubble up any inconsistencies. --- tests/rptest/services/admin.py | 6 ++ .../rptest/tests/e2e_shadow_indexing_test.py | 89 ++++++++++++++++++- 2 files changed, 94 insertions(+), 1 deletion(-) diff --git a/tests/rptest/services/admin.py b/tests/rptest/services/admin.py index 09954267a8390..5b49bb2d4cbdc 100644 --- a/tests/rptest/services/admin.py +++ b/tests/rptest/services/admin.py @@ -475,6 +475,12 @@ def unsafe_reset_cloud_metadata(self, topic, partition, manifest): f"debug/unsafe_reset_metadata/{topic}/{partition}", json=manifest) + def unsafe_reset_metadata_from_cloud(self, namespace, topic, partition): + return self._request( + 'POST', + f"cloud_storage/unsafe_reset_metadata_from_cloud/{namespace}/{topic}/{partition}" + ) + def put_feature(self, feature_name, body): return self._request("PUT", f"features/{feature_name}", json=body) diff --git a/tests/rptest/tests/e2e_shadow_indexing_test.py b/tests/rptest/tests/e2e_shadow_indexing_test.py index dae84df1ad0f4..e89436c3fb014 100644 --- a/tests/rptest/tests/e2e_shadow_indexing_test.py +++ b/tests/rptest/tests/e2e_shadow_indexing_test.py @@ -11,6 +11,8 @@ import re import time from collections import defaultdict +from datetime import datetime, timedelta +from requests.exceptions import HTTPError from ducktape.mark import matrix from ducktape.tests.test import TestContext @@ -37,7 +39,7 @@ wait_for_local_storage_truncate, ) from rptest.utils.mode_checks import skip_debug_mode -from rptest.utils.si_utils import nodes_report_cloud_segments, BucketView, NTP +from rptest.utils.si_utils import nodes_report_cloud_segments, BucketView, NTP, quiesce_uploads class EndToEndShadowIndexingBase(EndToEndTest): @@ -426,6 +428,91 @@ def has_data(self) -> bool: # validate that messages from the manifests that were removed from the manifest are not readable assert consumer.consumer_status.validator.valid_reads == messages_to_read + @cluster( + num_nodes=4, + log_allow_list=["Applying the cloud manifest would cause data loss"]) + @matrix(cloud_storage_type=get_cloud_storage_type()) + def test_reset_from_cloud(self, cloud_storage_type): + """ + Test the unsafe_reset_metadata_from_cloud endpoint by repeatedly + calling it to re-set the manifest from the uploaded one while + producing to the partition. Once the produce finishes, wait for + all uploads to complete and do a full read of the log to bubble + up any inconsistencies. + """ + msg_size = 2056 + self.redpanda.set_cluster_config({ + "cloud_storage_housekeeping_interval_ms": + 10000, + "cloud_storage_spillover_manifest_max_segments": + 10 + }) + + # Set a very low local retetion to race manifest resets wih retention + self.rpk.alter_topic_config(self.topic, 'retention.local.target.bytes', + self.segment_size * 1) + + msg_per_segment = self.segment_size // msg_size + total_messages = 250 * msg_per_segment + producer = KgoVerifierProducer(self.test_context, + self.redpanda, + self.topic, + msg_size=msg_size, + msg_count=total_messages, + rate_limit_bps=1024 * 1024 * 5) + + producer.start() + + resets_done = 0 + resets_refused = 0 + resets_failed = 0 + + seconds_between_reset = 10 + next_reset = datetime.now() + timedelta(seconds=seconds_between_reset) + + # Repeatedly reset the manifest while producing to the partition + while not producer.is_complete(): + now = datetime.now() + if now >= next_reset: + try: + self.redpanda._admin.unsafe_reset_metadata_from_cloud( + namespace="kafka", topic=self.topic, partition=0) + resets_done += 1 + except HTTPError as ex: + if "would cause data loss" in ex.response.text: + resets_refused += 1 + else: + resets_failed += 1 + self.logger.info(f"Reset from cloud failed: {ex}") + next_reset = now + timedelta(seconds=seconds_between_reset) + + time.sleep(2) + + producer.wait(timeout_sec=120) + producer.free() + + self.logger.info( + f"Producer workload complete: {resets_done=}, {resets_refused=}, {resets_failed=}" + ) + + assert resets_done + resets_refused > 0, "No resets done during the test" + assert resets_failed == 0, f"{resets_failed} resets failed during the test" + + # Wait for all uploads to complete and read the log in full. + # This should highlight any data consistency issues. + # Note that we are re-using the node where the producer ran, + # which allows for validation of the consumed offests. + quiesce_uploads(self.redpanda, [self.topic], timeout_sec=120) + + consumer = KgoVerifierSeqConsumer(self.test_context, + self.redpanda, + self.topic, + debug_logs=True, + trace_logs=True) + + consumer.start() + consumer.wait(timeout_sec=120) + @cluster(num_nodes=5) @matrix(cloud_storage_type=get_cloud_storage_type()) def test_write(self, cloud_storage_type):