Skip to content

Commit

Permalink
Merge pull request #18097 from nvartolomei/nv/fix-timequery
Browse files Browse the repository at this point in the history
Fix timequery not returning results when racing with archival retention and gc
  • Loading branch information
nvartolomei authored May 17, 2024
2 parents 3ce9d32 + 1d7a1e3 commit 0886164
Show file tree
Hide file tree
Showing 16 changed files with 680 additions and 201 deletions.
16 changes: 8 additions & 8 deletions src/v/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ ss::future<> ntp_archiver::upload_topic_manifest() {

try {
retry_chain_node fib(
_conf->manifest_upload_timeout,
_conf->manifest_upload_timeout(),
_conf->cloud_storage_initial_backoff,
&_rtcnode);
retry_chain_logger ctxlog(archival_log, fib);
Expand Down Expand Up @@ -912,7 +912,7 @@ ss::future<
ntp_archiver::download_manifest() {
auto guard = _gate.hold();
retry_chain_node fib(
_conf->manifest_upload_timeout,
_conf->manifest_upload_timeout(),
_conf->cloud_storage_initial_backoff,
&_rtcnode);
cloud_storage::partition_manifest tmp(_ntp, _rev);
Expand Down Expand Up @@ -1066,7 +1066,7 @@ ss::future<cloud_storage::upload_result> ntp_archiver::upload_manifest(
auto guard = _gate.hold();
auto rtc = source_rtc.value_or(std::ref(_rtcnode));
retry_chain_node fib(
_conf->manifest_upload_timeout,
_conf->manifest_upload_timeout(),
_conf->cloud_storage_initial_backoff,
&rtc.get());
retry_chain_logger ctxlog(archival_log, fib, _ntp.path());
Expand Down Expand Up @@ -1907,7 +1907,7 @@ ss::future<ntp_archiver::upload_group_result> ntp_archiver::wait_uploads(
_ntp.path());

auto deadline = ss::lowres_clock::now()
+ _conf->manifest_upload_timeout;
+ _conf->manifest_upload_timeout();

std::optional<model::offset> manifest_clean_offset;
if (
Expand Down Expand Up @@ -2096,7 +2096,7 @@ ntp_archiver::maybe_truncate_manifest() {
const auto& m = manifest();
for (const auto& meta : m) {
retry_chain_node fib(
_conf->manifest_upload_timeout,
_conf->manifest_upload_timeout(),
_conf->upload_loop_initial_backoff,
&rtc);
auto sname = cloud_storage::generate_local_segment_name(
Expand Down Expand Up @@ -2125,12 +2125,12 @@ ntp_archiver::maybe_truncate_manifest() {
"manifest, start offset before cleanup: {}",
manifest().get_start_offset());
retry_chain_node rc_node(
_conf->manifest_upload_timeout,
_conf->manifest_upload_timeout(),
_conf->upload_loop_initial_backoff,
&rtc);
auto error = co_await _parent.archival_meta_stm()->truncate(
adjusted_start_offset,
ss::lowres_clock::now() + _conf->manifest_upload_timeout,
ss::lowres_clock::now() + _conf->manifest_upload_timeout(),
_as);
if (error != cluster::errc::success) {
vlog(
Expand Down Expand Up @@ -3030,7 +3030,7 @@ ss::future<bool> ntp_archiver::do_upload_local(
features::feature::cloud_metadata_cluster_recovery)
? _parent.highest_producer_id()
: model::producer_id{};
auto deadline = ss::lowres_clock::now() + _conf->manifest_upload_timeout;
auto deadline = ss::lowres_clock::now() + _conf->manifest_upload_timeout();
auto error = co_await _parent.archival_meta_stm()->add_segments(
{meta},
std::nullopt,
Expand Down
6 changes: 4 additions & 2 deletions src/v/archival/tests/archival_service_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "cluster/tests/utils.h"
#include "cluster/types.h"
#include "config/configuration.h"
#include "config/property.h"
#include "container/fragmented_vector.h"
#include "http/tests/http_imposter.h"
#include "model/fundamental.h"
Expand Down Expand Up @@ -68,13 +69,14 @@ class archiver_cluster_fixture
cloud_storage_clients::endpoint_url{});
s3_conf.server_addr = server_addr;

archival::configuration a_conf;
archival::configuration a_conf{
.manifest_upload_timeout = config::mock_binding(1000ms),
};
a_conf.bucket_name = cloud_storage_clients::bucket_name("test-bucket");
a_conf.ntp_metrics_disabled = archival::per_ntp_metrics_disabled::yes;
a_conf.svc_metrics_disabled = archival::service_metrics_disabled::yes;
a_conf.cloud_storage_initial_backoff = 100ms;
a_conf.segment_upload_timeout = 1s;
a_conf.manifest_upload_timeout = 1s;
a_conf.garbage_collect_timeout = 1s;
a_conf.upload_loop_initial_backoff = 100ms;
a_conf.upload_loop_max_backoff = 5s;
Expand Down
5 changes: 3 additions & 2 deletions src/v/archival/tests/service_fixture.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,13 +193,14 @@ archiver_fixture::get_configurations() {
cloud_storage_clients::endpoint_url{});
s3conf.server_addr = server_addr;

archival::configuration aconf;
archival::configuration aconf{
.manifest_upload_timeout = config::mock_binding(1000ms),
};
aconf.bucket_name = cloud_storage_clients::bucket_name("test-bucket");
aconf.ntp_metrics_disabled = archival::per_ntp_metrics_disabled::yes;
aconf.svc_metrics_disabled = archival::service_metrics_disabled::yes;
aconf.cloud_storage_initial_backoff = 100ms;
aconf.segment_upload_timeout = 1s;
aconf.manifest_upload_timeout = 1s;
aconf.garbage_collect_timeout = 1s;
aconf.upload_loop_initial_backoff = 100ms;
aconf.upload_loop_max_backoff = 5s;
Expand Down
4 changes: 2 additions & 2 deletions src/v/archival/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ std::ostream& operator<<(std::ostream& o, const configuration& cfg) {
std::chrono::duration_cast<std::chrono::milliseconds>(
cfg.segment_upload_timeout),
std::chrono::duration_cast<std::chrono::milliseconds>(
cfg.manifest_upload_timeout),
cfg.manifest_upload_timeout()),
cfg.time_limit);
return o;
}
Expand Down Expand Up @@ -103,7 +103,7 @@ get_archival_service_config(ss::scheduling_group sg, ss::io_priority_class p) {
.cloud_storage_segment_upload_timeout_ms.value(),
.manifest_upload_timeout
= config::shard_local_cfg()
.cloud_storage_manifest_upload_timeout_ms.value(),
.cloud_storage_manifest_upload_timeout_ms.bind(),
.garbage_collect_timeout
= config::shard_local_cfg()
.cloud_storage_garbage_collect_timeout_ms.value(),
Expand Down
2 changes: 1 addition & 1 deletion src/v/archival/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ struct configuration {
/// Long upload timeout
ss::lowres_clock::duration segment_upload_timeout;
/// Shor upload timeout
ss::lowres_clock::duration manifest_upload_timeout;
config::binding<std::chrono::milliseconds> manifest_upload_timeout;
/// Timeout for running delete operations during the GC phase
ss::lowres_clock::duration garbage_collect_timeout;
/// Initial backoff for upload loop in case there is nothing to upload
Expand Down
Loading

0 comments on commit 0886164

Please sign in to comment.