Skip to content

Commit

Permalink
Merge pull request #14597 from VladLazar/reset-to-uploaded-manifest
Browse files Browse the repository at this point in the history
cloud_storage: reset in-memory partition manifest to uploaded manifest
  • Loading branch information
piyushredpanda authored Nov 3, 2023
2 parents db86bd6 + 4c2fd8d commit 5d3eefa
Show file tree
Hide file tree
Showing 8 changed files with 453 additions and 55 deletions.
2 changes: 1 addition & 1 deletion src/v/archival/ntp_archiver_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ class ntp_archiver {
// Held while the inner segment upload/manifest sync loop is running,
// to enable code that uses _paused to wait until ongoing activity
// has stopped.
ss::semaphore _uploads_active{1};
ssx::named_semaphore<ss::lowres_clock> _uploads_active{1, "uploads_active"};

config::binding<std::chrono::milliseconds> _housekeeping_interval;
simple_time_jitter<ss::lowres_clock> _housekeeping_jitter;
Expand Down
274 changes: 224 additions & 50 deletions src/v/cluster/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "raft/state_machine_manager.h"
#include "raft/types.h"

#include <seastar/coroutine/as_future.hh>
#include <seastar/util/defer.hh>

#include <exception>
Expand Down Expand Up @@ -541,8 +542,6 @@ ss::future<> partition::start(std::optional<topic_configuration> topic_cfg) {
}
}

maybe_construct_archiver();

if (_cloud_storage_manifest_view) {
co_await _cloud_storage_manifest_view->start();
}
Expand All @@ -551,9 +550,19 @@ ss::future<> partition::start(std::optional<topic_configuration> topic_cfg) {
co_await _cloud_storage_partition->start();
}

if (_archiver) {
co_await _archiver->start();
{
auto archiver_reset_guard = co_await ssx::with_timeout_abortable(
ss::get_units(_archiver_reset_mutex, 1),
ss::lowres_clock::now() + archiver_reset_mutex_timeout,
_as);

maybe_construct_archiver();

if (_archiver) {
co_await _archiver->start();
}
}

co_return co_await _raft->start(std::move(builder));
}

Expand All @@ -562,14 +571,21 @@ ss::future<> partition::stop() {
vlog(clusterlog.debug, "Stopping partition: {}", partition_ntp);
_as.request_abort();

if (_archiver) {
_upload_housekeeping.local().deregister_jobs(
_archiver->get_housekeeping_jobs());
vlog(
clusterlog.debug,
"Stopping archiver on partition: {}",
partition_ntp);
co_await _archiver->stop();
{
// `partition_manager::do_shutdown` (caller of stop) will assert
// out on any thrown exceptions. Hence, acquire the units without
// a timeout or abort source.
auto archiver_reset_guard = ss::get_units(_archiver_reset_mutex, 1);

if (_archiver) {
_upload_housekeeping.local().deregister_jobs(
_archiver->get_housekeeping_jobs());
vlog(
clusterlog.debug,
"Stopping archiver on partition: {}",
partition_ntp);
co_await _archiver->stop();
}
}

if (_cloud_storage_partition) {
Expand Down Expand Up @@ -759,16 +775,20 @@ partition::local_timequery(storage::timequery_config cfg) {
co_return result;
}

void partition::maybe_construct_archiver() {
bool partition::should_construct_archiver() {
// NOTE: construct and archiver even if shadow indexing isn't enabled, e.g.
// in the case of read replicas -- we still need the archiver to drive
// manifest updates, etc.
auto& ntp_config = _raft->log()->config();
if (
config::shard_local_cfg().cloud_storage_enabled()
&& _cloud_storage_api.local_is_initialized()
&& _raft->ntp().ns == model::kafka_namespace
&& (ntp_config.is_archival_enabled() || ntp_config.is_read_replica_mode_enabled())) {
const auto& ntp_config = _raft->log()->config();
return config::shard_local_cfg().cloud_storage_enabled()
&& _cloud_storage_api.local_is_initialized()
&& _raft->ntp().ns == model::kafka_namespace
&& (ntp_config.is_archival_enabled() || ntp_config.is_read_replica_mode_enabled());
}

void partition::maybe_construct_archiver() {
if (should_construct_archiver()) {
const auto& ntp_config = _raft->log()->config();
_archiver = std::make_unique<archival::ntp_archiver>(
ntp_config,
_archival_conf,
Expand Down Expand Up @@ -868,6 +888,12 @@ ss::future<> partition::update_configuration(topic_properties properties) {
"update_configuration[{}]: updating archiver for config {}",
new_ntp_config,
_raft->ntp());

auto archiver_reset_guard = co_await ssx::with_timeout_abortable(
ss::get_units(_archiver_reset_mutex, 1),
ss::lowres_clock::now() + archiver_reset_mutex_timeout,
_as);

if (_archiver) {
_upload_housekeeping.local().deregister_jobs(
_archiver->get_housekeeping_jobs());
Expand Down Expand Up @@ -1050,8 +1076,12 @@ partition::transfer_leadership(transfer_leadership_request req) {
auto archival_timeout
= config::shard_local_cfg().cloud_storage_graceful_transfer_timeout_ms();
if (_archiver && archival_timeout.has_value()) {
complete_archiver.emplace(
[a = _archiver.get()]() { a->complete_transfer_leadership(); });
complete_archiver.emplace([this]() {
if (_archiver) {
_archiver->complete_transfer_leadership();
}
});

vlog(
clusterlog.debug,
"transfer_leadership[{}]: entering archiver prepare",
Expand Down Expand Up @@ -1101,8 +1131,52 @@ partition::get_follower_metrics() const {
return _raft->get_follower_metrics();
}

ss::future<> partition::unsafe_reset_remote_partition_manifest(iobuf buf) {
vlog(clusterlog.info, "[{}] Unsafe manifest reset requested", ntp());
ss::future<>
partition::replicate_unsafe_reset(cloud_storage::partition_manifest manifest) {
vlog(
clusterlog.info,
"Replicating replace manifest command. New manifest details: {{ "
"start_offset: {}, last_offset: {}}}",
manifest.get_start_offset(),
manifest.get_last_offset());

// Replicate the reset command which contains the downloaded manifest
auto sync_timeout = config::shard_local_cfg()
.cloud_storage_metadata_sync_timeout_ms.value();
auto replication_deadline = ss::lowres_clock::now() + sync_timeout;
std::vector<cluster::command_batch_builder> builders;

auto reset_builder = _archival_meta_stm->batch_start(
replication_deadline, _as);
reset_builder.replace_manifest(manifest.to_iobuf());

auto errc = co_await reset_builder.replicate();
if (errc) {
if (errc == raft::errc::shutting_down) {
// During shutdown, act like we hit an abort source rather
// than trying to log+handle this like a write error.
throw ss::abort_requested_exception();
}

vlog(
clusterlog.warn,
"[{}] Unsafe reset failed to update archival STM: "
"{}",
ntp(),
errc.message());
throw std::runtime_error(
fmt::format("Failed to update archival STM: {}", errc.message()));
}

vlog(
clusterlog.info,
"[{}] Unsafe reset replicated STM commands successfully",
ntp());
}

ss::future<>
partition::unsafe_reset_remote_partition_manifest_from_json(iobuf json_buf) {
vlog(clusterlog.info, "[{}] Manual unsafe manifest reset requested", ntp());

if (!(config::shard_local_cfg().cloud_storage_enabled()
&& _archival_meta_stm)) {
Expand All @@ -1124,45 +1198,145 @@ ss::future<> partition::unsafe_reset_remote_partition_manifest(iobuf buf) {
// Deserialise provided manifest
cloud_storage::partition_manifest req_m{
_raft->ntp(), _raft->log_config().get_initial_revision()};
req_m.update_with_json(std::move(buf));

// A generous timeout of 60 seconds is used as it applies
// for the replication multiple batches.
auto deadline = ss::lowres_clock::now() + 60s;
std::vector<cluster::command_batch_builder> builders;
req_m.update_with_json(std::move(json_buf));

auto reset_builder = _archival_meta_stm->batch_start(deadline, _as);

// Add command to replace manifest. When applied, the current manifest
// will be replaced with the one provided.
reset_builder.replace_manifest(req_m.to_iobuf());
co_await replicate_unsafe_reset(std::move(req_m));
}

ss::future<>
partition::unsafe_reset_remote_partition_manifest_from_cloud(bool force) {
vlog(
clusterlog.info,
"Replicating replace manifest command. New manifest start offset: {}",
req_m.get_start_offset());
"[{}] Unsafe manifest reset from cloud state requested",
ntp());

auto errc = co_await reset_builder.replicate();
if (errc) {
if (errc == raft::errc::shutting_down) {
// During shutdown, act like we hit an abort source rather
// than trying to log+handle this like a write error.
throw ss::abort_requested_exception();
_as.check();

if (!(config::shard_local_cfg().cloud_storage_enabled()
&& _archival_meta_stm)) {
vlog(
clusterlog.warn,
"[{}] Archival STM not present. Skipping unsafe reset ...",
ntp());
throw std::runtime_error("Archival STM not present");
}

std::optional<ssx::semaphore_units> archiver_reset_guard;
if (should_construct_archiver()) {
archiver_reset_guard = co_await ssx::with_timeout_abortable(
ss::get_units(_archiver_reset_mutex, 1),
ss::lowres_clock::now() + archiver_reset_mutex_timeout,
_as);
}

// Stop the archiver and its housekeeping jobs
if (_archiver) {
_upload_housekeeping.local().deregister_jobs(
_archiver->get_housekeeping_jobs());
co_await _archiver->stop();
_archiver = nullptr;
}

auto start_archiver = [this]() {
maybe_construct_archiver();
if (_archiver) {
// Topic configs may have changed while the archiver was
// stopped, so mark them as dirty just in case.
_archiver->notify_topic_config();
return _archiver->start();
}

return ss::now();
};

// Ensure that all commands on the log have been applied.
// No new archival commands should be replicated now.
auto sync_timeout = config::shard_local_cfg()
.cloud_storage_metadata_sync_timeout_ms.value();
auto sync_result = co_await ss::coroutine::as_future(
_archival_meta_stm->sync(sync_timeout));
if (sync_result.failed() || sync_result.get() == false) {
vlog(
clusterlog.warn,
"[{}] Unsafe reset failed to update archival STM: {}",
"[{}] Could not sync with log. Skipping unsafe reset ...",
ntp());

co_await start_archiver();
throw std::runtime_error("Could not sync with log");
}

_as.check();

// Attempt the reset
auto future_result = co_await ss::coroutine::as_future(
do_unsafe_reset_remote_partition_manifest_from_cloud(force));

// Reconstruct the archiver and start it if needed
co_await start_archiver();

// Rethrow the exception if we failed to reset
future_result.get();
}

ss::future<>
partition::do_unsafe_reset_remote_partition_manifest_from_cloud(bool force) {
const auto initial_rev = _raft->log_config().get_initial_revision();
const auto bucket = [this]() {
if (is_read_replica_mode_enabled()) {
return get_read_replica_bucket();
}

const auto& bucket_config
= cloud_storage::configuration::get_bucket_config();
vassert(
bucket_config.value(),
"configuration property {} must be set",
bucket_config.name());

return cloud_storage_clients::bucket_name{
bucket_config.value().value()};
}();

// Download the current partition manifest from the cloud
cloud_storage::partition_manifest new_manifest{ntp(), initial_rev};

auto timeout
= config::shard_local_cfg().cloud_storage_manifest_upload_timeout_ms();
auto backoff = config::shard_local_cfg().cloud_storage_initial_backoff_ms();

retry_chain_node rtc(_as, timeout, backoff);
auto [res, res_fmt]
= co_await _cloud_storage_api.local().try_download_partition_manifest(
bucket, new_manifest, rtc);

if (res != cloud_storage::download_result::success) {
throw std::runtime_error(ssx::sformat(
"Failed to download partition manifest with error: {}", res));
}

const auto max_collectible
= _raft->log()->stm_manager()->max_collectible_offset();
if (new_manifest.get_last_offset() < max_collectible) {
auto msg = ssx::sformat(
"Applying the cloud manifest would cause data loss since the last "
"offset in the downloaded manifest is below the max_collectible "
"offset "
"{} < {}",
new_manifest.get_last_offset(),
max_collectible);

if (!force) {
throw std::runtime_error(msg);
}

vlog(
clusterlog.warn,
"[{}] {}. Proceeding since the force flag was used.",
ntp(),
errc.message());
throw std::runtime_error(
fmt::format("Failed to update archival STM: {}", errc.message()));
msg);
}

vlog(
clusterlog.info,
"[{}] Unsafe reset replicated STM commands successfully",
ntp());
co_await replicate_unsafe_reset(std::move(new_manifest));
}

std::ostream& operator<<(std::ostream& o, const partition& x) {
Expand Down
Loading

0 comments on commit 5d3eefa

Please sign in to comment.