From 4c706fb144441e1dfd1d030fda64cfd13cc8a9f5 Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Fri, 26 Apr 2024 18:16:45 +0100 Subject: [PATCH 01/13] rptest: kill existing connections in firewall_blocked The intention of firewall_blocked is to always prevent communication. However, if a connection already exists it might take a long time until linux gives up retrying sending and the problem is reported. To avoid this, we attempt to kill the connections instantly. This was necessary for a variation of a timequery test I was writing. Although is not strictly necessary anymore, I consider it to be a nice addition. --- tests/rptest/util.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/rptest/util.py b/tests/rptest/util.py index 1e9309189601..6a5cc5b18a58 100644 --- a/tests/rptest/util.py +++ b/tests/rptest/util.py @@ -375,7 +375,8 @@ def __enter__(self): """Isolate certain ips from the nodes using firewall rules""" cmd = [ f"iptables -A INPUT -p tcp --{self.mode_for_input} {self._port} -j DROP", - f"iptables -A OUTPUT -p tcp --dport {self._port} -j DROP" + f"iptables -A OUTPUT -p tcp --dport {self._port} -j DROP", + f"ss -K dport {self._port}", ] cmd = " && ".join(cmd) for node in self._nodes: From aab5fe7db7c96d1623a1e56e5263f887a5016fb9 Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Tue, 30 Apr 2024 10:38:49 +0100 Subject: [PATCH 02/13] archival: live settable manifest upload timeout This is needed for a ducktape test where we want to change the manifest upload timeout at runtime. E.g. set it to 0 to prevent manifest uploading from on point in time onward. It is declared in configuration.cc as not requiring restart but in fact it did require one prior to this commit. Other properties of the archiver configuration should be fixed too in a separate commit. --- src/v/archival/ntp_archiver_service.cc | 16 ++++----- .../archival/tests/archival_service_fixture.h | 6 ++-- src/v/archival/tests/service_fixture.cc | 5 +-- src/v/archival/types.cc | 4 +-- src/v/archival/types.h | 2 +- src/v/redpanda/tests/fixture.h | 33 ++++++++----------- 6 files changed, 31 insertions(+), 35 deletions(-) diff --git a/src/v/archival/ntp_archiver_service.cc b/src/v/archival/ntp_archiver_service.cc index 03d92e98ee63..aee221979c69 100644 --- a/src/v/archival/ntp_archiver_service.cc +++ b/src/v/archival/ntp_archiver_service.cc @@ -496,7 +496,7 @@ ss::future<> ntp_archiver::upload_topic_manifest() { try { retry_chain_node fib( - _conf->manifest_upload_timeout, + _conf->manifest_upload_timeout(), _conf->cloud_storage_initial_backoff, &_rtcnode); retry_chain_logger ctxlog(archival_log, fib); @@ -912,7 +912,7 @@ ss::future< ntp_archiver::download_manifest() { auto guard = _gate.hold(); retry_chain_node fib( - _conf->manifest_upload_timeout, + _conf->manifest_upload_timeout(), _conf->cloud_storage_initial_backoff, &_rtcnode); cloud_storage::partition_manifest tmp(_ntp, _rev); @@ -1066,7 +1066,7 @@ ss::future ntp_archiver::upload_manifest( auto guard = _gate.hold(); auto rtc = source_rtc.value_or(std::ref(_rtcnode)); retry_chain_node fib( - _conf->manifest_upload_timeout, + _conf->manifest_upload_timeout(), _conf->cloud_storage_initial_backoff, &rtc.get()); retry_chain_logger ctxlog(archival_log, fib, _ntp.path()); @@ -1907,7 +1907,7 @@ ss::future ntp_archiver::wait_uploads( _ntp.path()); auto deadline = ss::lowres_clock::now() - + _conf->manifest_upload_timeout; + + _conf->manifest_upload_timeout(); std::optional manifest_clean_offset; if ( @@ -2096,7 +2096,7 @@ ntp_archiver::maybe_truncate_manifest() { const auto& m = manifest(); for (const auto& meta : m) { retry_chain_node fib( - _conf->manifest_upload_timeout, + _conf->manifest_upload_timeout(), _conf->upload_loop_initial_backoff, &rtc); auto sname = cloud_storage::generate_local_segment_name( @@ -2125,12 +2125,12 @@ ntp_archiver::maybe_truncate_manifest() { "manifest, start offset before cleanup: {}", manifest().get_start_offset()); retry_chain_node rc_node( - _conf->manifest_upload_timeout, + _conf->manifest_upload_timeout(), _conf->upload_loop_initial_backoff, &rtc); auto error = co_await _parent.archival_meta_stm()->truncate( adjusted_start_offset, - ss::lowres_clock::now() + _conf->manifest_upload_timeout, + ss::lowres_clock::now() + _conf->manifest_upload_timeout(), _as); if (error != cluster::errc::success) { vlog( @@ -3030,7 +3030,7 @@ ss::future ntp_archiver::do_upload_local( features::feature::cloud_metadata_cluster_recovery) ? _parent.highest_producer_id() : model::producer_id{}; - auto deadline = ss::lowres_clock::now() + _conf->manifest_upload_timeout; + auto deadline = ss::lowres_clock::now() + _conf->manifest_upload_timeout(); auto error = co_await _parent.archival_meta_stm()->add_segments( {meta}, std::nullopt, diff --git a/src/v/archival/tests/archival_service_fixture.h b/src/v/archival/tests/archival_service_fixture.h index b22bc5e51343..1f0ac03b9690 100644 --- a/src/v/archival/tests/archival_service_fixture.h +++ b/src/v/archival/tests/archival_service_fixture.h @@ -19,6 +19,7 @@ #include "cluster/tests/utils.h" #include "cluster/types.h" #include "config/configuration.h" +#include "config/property.h" #include "container/fragmented_vector.h" #include "http/tests/http_imposter.h" #include "model/fundamental.h" @@ -68,13 +69,14 @@ class archiver_cluster_fixture cloud_storage_clients::endpoint_url{}); s3_conf.server_addr = server_addr; - archival::configuration a_conf; + archival::configuration a_conf{ + .manifest_upload_timeout = config::mock_binding(1000ms), + }; a_conf.bucket_name = cloud_storage_clients::bucket_name("test-bucket"); a_conf.ntp_metrics_disabled = archival::per_ntp_metrics_disabled::yes; a_conf.svc_metrics_disabled = archival::service_metrics_disabled::yes; a_conf.cloud_storage_initial_backoff = 100ms; a_conf.segment_upload_timeout = 1s; - a_conf.manifest_upload_timeout = 1s; a_conf.garbage_collect_timeout = 1s; a_conf.upload_loop_initial_backoff = 100ms; a_conf.upload_loop_max_backoff = 5s; diff --git a/src/v/archival/tests/service_fixture.cc b/src/v/archival/tests/service_fixture.cc index 10535a8def8d..5342a2c94f65 100644 --- a/src/v/archival/tests/service_fixture.cc +++ b/src/v/archival/tests/service_fixture.cc @@ -193,13 +193,14 @@ archiver_fixture::get_configurations() { cloud_storage_clients::endpoint_url{}); s3conf.server_addr = server_addr; - archival::configuration aconf; + archival::configuration aconf{ + .manifest_upload_timeout = config::mock_binding(1000ms), + }; aconf.bucket_name = cloud_storage_clients::bucket_name("test-bucket"); aconf.ntp_metrics_disabled = archival::per_ntp_metrics_disabled::yes; aconf.svc_metrics_disabled = archival::service_metrics_disabled::yes; aconf.cloud_storage_initial_backoff = 100ms; aconf.segment_upload_timeout = 1s; - aconf.manifest_upload_timeout = 1s; aconf.garbage_collect_timeout = 1s; aconf.upload_loop_initial_backoff = 100ms; aconf.upload_loop_max_backoff = 5s; diff --git a/src/v/archival/types.cc b/src/v/archival/types.cc index c14d24c443c0..9fbc92d2e6f0 100644 --- a/src/v/archival/types.cc +++ b/src/v/archival/types.cc @@ -51,7 +51,7 @@ std::ostream& operator<<(std::ostream& o, const configuration& cfg) { std::chrono::duration_cast( cfg.segment_upload_timeout), std::chrono::duration_cast( - cfg.manifest_upload_timeout), + cfg.manifest_upload_timeout()), cfg.time_limit); return o; } @@ -103,7 +103,7 @@ get_archival_service_config(ss::scheduling_group sg, ss::io_priority_class p) { .cloud_storage_segment_upload_timeout_ms.value(), .manifest_upload_timeout = config::shard_local_cfg() - .cloud_storage_manifest_upload_timeout_ms.value(), + .cloud_storage_manifest_upload_timeout_ms.bind(), .garbage_collect_timeout = config::shard_local_cfg() .cloud_storage_garbage_collect_timeout_ms.value(), diff --git a/src/v/archival/types.h b/src/v/archival/types.h index 8b83f9d16320..e7fe3cf59343 100644 --- a/src/v/archival/types.h +++ b/src/v/archival/types.h @@ -46,7 +46,7 @@ struct configuration { /// Long upload timeout ss::lowres_clock::duration segment_upload_timeout; /// Shor upload timeout - ss::lowres_clock::duration manifest_upload_timeout; + config::binding manifest_upload_timeout; /// Timeout for running delete operations during the GC phase ss::lowres_clock::duration garbage_collect_timeout; /// Initial backoff for upload loop in case there is nothing to upload diff --git a/src/v/redpanda/tests/fixture.h b/src/v/redpanda/tests/fixture.h index 7193bf20ea3c..3c201fbbf60f 100644 --- a/src/v/redpanda/tests/fixture.h +++ b/src/v/redpanda/tests/fixture.h @@ -288,13 +288,14 @@ class redpanda_thread_fixture { } static archival::configuration get_archival_config() { - archival::configuration aconf; + archival::configuration aconf{ + .manifest_upload_timeout = config::mock_binding(1000ms), + }; aconf.bucket_name = cloud_storage_clients::bucket_name("test-bucket"); aconf.ntp_metrics_disabled = archival::per_ntp_metrics_disabled::yes; aconf.svc_metrics_disabled = archival::service_metrics_disabled::yes; aconf.cloud_storage_initial_backoff = 100ms; aconf.segment_upload_timeout = 1s; - aconf.manifest_upload_timeout = 1s; aconf.garbage_collect_timeout = 1s; aconf.time_limit = std::nullopt; return aconf; @@ -327,19 +328,7 @@ class redpanda_thread_fixture { bool data_transforms_enabled = false, bool legacy_upload_mode_enabled = true) { auto base_path = std::filesystem::path(data_dir); - ss::smp::invoke_on_all([node_id, - kafka_port, - rpc_port, - seed_servers = std::move(seed_servers), - base_path, - s3_config, - archival_cfg, - cloud_cfg, - use_node_id, - empty_seed_starts_cluster_val, - kafka_admin_topic_api_rate, - data_transforms_enabled, - legacy_upload_mode_enabled]() mutable { + ss::smp::invoke_on_all([=]() { auto& config = config::shard_local_cfg(); config.get("enable_pid_file").set_value(false); @@ -386,25 +375,29 @@ class redpanda_thread_fixture { static_cast(s3_config->server_addr.port())); } if (archival_cfg) { + // Copy archival config to this shard to avoid `config::binding` + // asserting on cross-shard access. + auto local_cfg = archival_cfg; + config.get("cloud_storage_disable_tls").set_value(true); config.get("cloud_storage_bucket") - .set_value(std::make_optional(archival_cfg->bucket_name())); + .set_value(std::make_optional(local_cfg->bucket_name())); config.get("cloud_storage_initial_backoff_ms") .set_value( std::chrono::duration_cast( - archival_cfg->cloud_storage_initial_backoff)); + local_cfg->cloud_storage_initial_backoff)); config.get("cloud_storage_manifest_upload_timeout_ms") .set_value( std::chrono::duration_cast( - archival_cfg->manifest_upload_timeout)); + local_cfg->manifest_upload_timeout())); config.get("cloud_storage_segment_upload_timeout_ms") .set_value( std::chrono::duration_cast( - archival_cfg->segment_upload_timeout)); + local_cfg->segment_upload_timeout)); config.get("cloud_storage_garbage_collect_timeout_ms") .set_value( std::chrono::duration_cast( - archival_cfg->garbage_collect_timeout)); + local_cfg->garbage_collect_timeout)); } if (cloud_cfg) { config.get("cloud_storage_enable_remote_read").set_value(true); From b53deac17c7f6bb2d3f1163965e4ef1397dafcac Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Fri, 26 Apr 2024 10:35:16 +0100 Subject: [PATCH 03/13] cloud_storage: throw if can not create manifest cursor It is not reasonable to continue work after this point. The absence of a cursor cursor is interpreted as EOF in other parts of the system. Not throwing makes it impossible to differentiate between "no more data available" vs. "an error occurred". This is required for an upcoming commit that fixes a timequery bug where cloud storage returns "no offset found" instead of propagating an internal error. --- src/v/cloud_storage/remote_partition.cc | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/v/cloud_storage/remote_partition.cc b/src/v/cloud_storage/remote_partition.cc index 840ab82357f0..b6d3886a8220 100644 --- a/src/v/cloud_storage/remote_partition.cc +++ b/src/v/cloud_storage/remote_partition.cc @@ -619,12 +619,10 @@ class partition_record_batch_reader_impl final co_return; } - vlog( - _ctxlog.error, + throw std::runtime_error(fmt::format( "Failed to query spillover manifests: {}, query: {}", cur.error(), - query); - co_return; + query)); } _view_cursor = std::move(cur.value()); co_await _view_cursor->with_manifest( From d1543eeb8c929eccb414da678445fa0981fca075 Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Fri, 26 Apr 2024 13:13:07 +0100 Subject: [PATCH 04/13] cloud_storage: do not ignore out of range errors This code tried to be clever and ignore exception in some of the cases where it was assumed it is safe to do so. I.e. if the start offset stored in the cloud moved forward. I don't believe covering these edge cases is necessary. - Reasoning about correctness becomes harder as it masks the case where by mistake read an out of range offset. - It hide from the client the fact that the offset they just tried to read does not exist anymore. As a user, if the log is prefix truncated, then I expect the reply to Fetch to be out of range and not an empty response. --- src/v/cloud_storage/remote_partition.cc | 25 +++++-- .../tests/remote_partition_test.cc | 67 +++++++++++++++++++ 2 files changed, 86 insertions(+), 6 deletions(-) diff --git a/src/v/cloud_storage/remote_partition.cc b/src/v/cloud_storage/remote_partition.cc index b6d3886a8220..a3c0cc7683ba 100644 --- a/src/v/cloud_storage/remote_partition.cc +++ b/src/v/cloud_storage/remote_partition.cc @@ -565,27 +565,40 @@ class partition_record_batch_reader_impl final co_return; } + // Out of range queries are unexpected. The caller must take care + // to send only valid queries to remote_partition. I.e. the fetch + // handler does such validation. Similar validation is done inside + // remote partition. + // + // Out of range at this point is due to a race condition or due to + // a bug. In both cases the only valid action is to throw an + // exception and let the caller deal with it. If the caller doesn't + // handle it it leads to a closed kafka connection which the + // end clients retry. if ( cur.error() == error_outcome::out_of_range && ss::visit( query, - [&](model::offset) { return false; }, + [&](model::offset) { + vassert( + false, + "Unreachable code. Remote partition doesn't know how to " + "handle model::offset queries."); + return false; + }, [&](kafka::offset query_offset) { - // Special case queries below the start offset of the log. - // The start offset may have advanced while the request was - // in progress. This is expected, so log at debug level. + // Bug or retention racing with the query. const auto log_start_offset = _partition->_manifest_view->stm_manifest() .full_log_start_kafka_offset(); if (log_start_offset && query_offset < *log_start_offset) { vlog( - _ctxlog.debug, + _ctxlog.warn, "Manifest query below the log's start Kafka offset: " "{} < {}", query_offset(), log_start_offset.value()()); - return true; } return false; }, diff --git a/src/v/cloud_storage/tests/remote_partition_test.cc b/src/v/cloud_storage/tests/remote_partition_test.cc index a7389baaa339..b127612f752f 100644 --- a/src/v/cloud_storage/tests/remote_partition_test.cc +++ b/src/v/cloud_storage/tests/remote_partition_test.cc @@ -58,6 +58,7 @@ #include #include #include +#include #include using namespace std::chrono_literals; @@ -2215,3 +2216,69 @@ FIXTURE_TEST(test_scan_by_timestamp, cloud_storage_fixture) { test_log.debug("Timestamp undershoots the partition"); BOOST_REQUIRE(timequery(*this, model::timestamp(100), num_data_batches)); } + +FIXTURE_TEST(test_out_of_range_query, cloud_storage_fixture) { + auto data = [&](size_t t) { + return batch_t{ + .num_records = 1, + .type = model::record_batch_type::raft_data, + .timestamp = model::timestamp(t)}; + }; + + const std::vector> batches = { + {data(1000), data(1002), data(1004), data(1006), data(1008), data(1010)}, + {data(1012), data(1014), data(1016), data(1018), data(1020), data(1022)}, + }; + + auto segments = make_segments(batches, false, false); + cloud_storage::partition_manifest manifest(manifest_ntp, manifest_revision); + + auto expectations = make_imposter_expectations(manifest, segments); + set_expectations_and_listen(expectations); + + // Advance start offset as-if archiver did apply retention but didn't + // run GC yet (the clean offset is not updated). + BOOST_REQUIRE(manifest.advance_start_offset(segments[1].base_offset)); + auto serialize_manifest = [](const cloud_storage::partition_manifest& m) { + auto s_data = m.serialize().get(); + auto buf = s_data.stream.read_exactly(s_data.size_bytes).get(); + return ss::sstring(buf.begin(), buf.end()); + }; + std::ostringstream ostr; + manifest.serialize_json(ostr); + + vlog( + test_util_log.info, + "Rewriting manifest at {}:\n{}", + manifest.get_manifest_path(), + ostr.str()); + + auto manifest_url = "/" + manifest.get_manifest_path()().string(); + remove_expectations({manifest_url}); + add_expectations({ + cloud_storage_fixture::expectation{ + .url = manifest_url, .body = serialize_manifest(manifest)}, + }); + + auto base = segments[0].base_offset; + auto max = segments[segments.size() - 1].max_offset; + + vlog(test_log.debug, "offset range: {}-{}", base, max); + + BOOST_REQUIRE( + scan_remote_partition(*this, segments[1].base_offset, max).size() == 6); + + BOOST_REQUIRE_EXCEPTION( + scan_remote_partition(*this, base, max), + std::runtime_error, + [](const auto& ex) { + ss::sstring what{ex.what()}; + return what.find("Failed to query spillover manifests") != what.npos; + }); + + test_log.debug("Timestamp undershoots the partition"); + BOOST_TEST_REQUIRE(timequery(*this, model::timestamp(100), 6)); + + test_log.debug("Timestamp withing segment"); + BOOST_TEST_REQUIRE(timequery(*this, model::timestamp(1014), 5)); +} From 41eed623536a38cdfaf3f3bc193ffcef51a8fb71 Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Fri, 26 Apr 2024 14:27:11 +0100 Subject: [PATCH 05/13] cloud_storage: test querying with spillover Introduce a new test to show the existence of a bug. In particular, ```cpp // BUG: This assertion is disabled because it currently fails. // test_log.debug("Timestamp undershoots the partition"); // BOOST_TEST_REQUIRE(timequery(*this, model::timestamp(100), 3 * 6)); ``` The assertion is commented out because it is failing. --- .../tests/remote_partition_test.cc | 131 ++++++++++++++++++ src/v/cloud_storage/tests/util.cc | 8 ++ 2 files changed, 139 insertions(+) diff --git a/src/v/cloud_storage/tests/remote_partition_test.cc b/src/v/cloud_storage/tests/remote_partition_test.cc index b127612f752f..98e0cc6aa469 100644 --- a/src/v/cloud_storage/tests/remote_partition_test.cc +++ b/src/v/cloud_storage/tests/remote_partition_test.cc @@ -19,6 +19,7 @@ #include "cloud_storage/remote.h" #include "cloud_storage/remote_partition.h" #include "cloud_storage/remote_segment.h" +#include "cloud_storage/spillover_manifest.h" #include "cloud_storage/tests/cloud_storage_fixture.h" #include "cloud_storage/tests/common_def.h" #include "cloud_storage/tests/s3_imposter.h" @@ -2282,3 +2283,133 @@ FIXTURE_TEST(test_out_of_range_query, cloud_storage_fixture) { test_log.debug("Timestamp withing segment"); BOOST_TEST_REQUIRE(timequery(*this, model::timestamp(1014), 5)); } + +FIXTURE_TEST(test_out_of_range_spillover_query, cloud_storage_fixture) { + auto data = [&](size_t t) { + return batch_t{ + .num_records = 1, + .type = model::record_batch_type::raft_data, + .timestamp = model::timestamp(t)}; + }; + + const std::vector> batches = { + {data(1000), data(1002), data(1004), data(1006), data(1008), data(1010)}, + {data(1012), data(1014), data(1016), data(1018), data(1020), data(1022)}, + {data(1024), data(1026), data(1028), data(1030), data(1032), data(1034)}, + {data(1036), data(1038), data(1040), data(1042), data(1044), data(1046)}, + {data(1048), data(1050), data(1052), data(1054), data(1056), data(1058)}, + {data(1060), data(1062), data(1064), data(1066), data(1068), data(1070)}, + }; + + auto segments = make_segments(batches, false, false); + cloud_storage::partition_manifest manifest(manifest_ntp, manifest_revision); + + auto expectations = make_imposter_expectations(manifest, segments); + set_expectations_and_listen(expectations); + + for (int i = 0; i < 2; i++) { + spillover_manifest spm(manifest_ntp, manifest_revision); + + for (int j = 0; auto s : manifest) { + spm.add(s); + if (++j == 2) { + break; + } + } + manifest.spillover(spm.make_manifest_metadata()); + + std::ostringstream ostr; + spm.serialize_json(ostr); + + vlog( + test_util_log.info, + "Uploading spillover manifest at {}:\n{}", + spm.get_manifest_path(), + ostr.str()); + + auto s_data = spm.serialize().get(); + auto buf = s_data.stream.read_exactly(s_data.size_bytes).get(); + add_expectations({cloud_storage_fixture::expectation{ + .url = "/" + spm.get_manifest_path()().string(), + .body = ss::sstring(buf.begin(), buf.end()), + }}); + } + + // Advance start offset as-if archiver did apply retention but didn't + // run GC yet (the clean offset is not updated). + // + // We set it to the second segment of the second spillover manifest in an + // attempt to cover more potential edge cases. + auto archive_start = segments[3].base_offset; + manifest.set_archive_start_offset(archive_start, model::offset_delta(0)); + + // Upload latest manifest version. + auto serialize_manifest = [](const cloud_storage::partition_manifest& m) { + auto s_data = m.serialize().get(); + auto buf = s_data.stream.read_exactly(s_data.size_bytes).get(); + return ss::sstring(buf.begin(), buf.end()); + }; + std::ostringstream ostr; + manifest.serialize_json(ostr); + + vlog( + test_util_log.info, + "Rewriting manifest at {}:\n{}", + manifest.get_manifest_path(), + ostr.str()); + + auto manifest_url = "/" + manifest.get_manifest_path()().string(); + remove_expectations({manifest_url}); + add_expectations({ + cloud_storage_fixture::expectation{ + .url = manifest_url, .body = serialize_manifest(manifest)}, + }); + + auto base = segments[0].base_offset; + auto max = segments[segments.size() - 1].max_offset; + + vlog(test_log.debug, "offset range: {}-{}", base, max); + + // Can query from start of the archive. + BOOST_REQUIRE( + scan_remote_partition(*this, archive_start, max).size() == 3 * 6); + + // Can't query from the start of partition. + BOOST_REQUIRE_EXCEPTION( + scan_remote_partition(*this, base, max), + std::runtime_error, + [](const auto& ex) { + ss::sstring what{ex.what()}; + return what.find("Failed to query spillover manifests") != what.npos; + }); + + // Can't query from start of the still valid spillover manifest. + // Since we don't rewrite spillover manifests we want to be sure that + // we don't allow querying stale segments (below the start offset). + BOOST_REQUIRE_EXCEPTION( + scan_remote_partition(*this, segments[2].base_offset, max), + std::runtime_error, + [](const auto& ex) { + ss::sstring what{ex.what()}; + return what.find("Failed to query spillover manifests") != what.npos; + }); + + // BUG: This assertion is disabled because it currently fails. + // test_log.debug("Timestamp undershoots the partition"); + // BOOST_TEST_REQUIRE(timequery(*this, model::timestamp(100), 3 * 6)); + + test_log.debug("Timestamp within valid spillover but below archive start"); + BOOST_TEST_REQUIRE( + timequery(*this, segments[2].base_timestamp.value(), 3 * 6)); + + test_log.debug("Valid timestamp start of retention"); + BOOST_TEST_REQUIRE( + timequery(*this, batches[3][0].timestamp.value(), 3 * 6)); + + test_log.debug("Valid timestamp within retention"); + BOOST_TEST_REQUIRE( + timequery(*this, batches[3][1].timestamp.value(), 3 * 6 - 1)); + + test_log.debug("Timestamp overshoots the partition"); + BOOST_TEST_REQUIRE(timequery(*this, model::timestamp::max(), 0)); +} diff --git a/src/v/cloud_storage/tests/util.cc b/src/v/cloud_storage/tests/util.cc index aa1cd1ac8e6a..eec0925e507e 100644 --- a/src/v/cloud_storage/tests/util.cc +++ b/src/v/cloud_storage/tests/util.cc @@ -743,6 +743,10 @@ std::vector scan_remote_partition( partition_probe probe(manifest.get_ntp()); auto manifest_view = ss::make_shared( imposter.api, imposter.cache, manifest, bucket); + auto manifest_view_stop = ss::defer( + [&manifest_view] { manifest_view->stop().get(); }); + manifest_view->start().get(); + auto partition = ss::make_shared( manifest_view, imposter.api.local(), @@ -795,6 +799,10 @@ scan_result scan_remote_partition( partition_probe probe(manifest.get_ntp()); auto manifest_view = ss::make_shared( imposter.api, imposter.cache, manifest, bucket); + auto manifest_view_stop = ss::defer( + [&manifest_view] { manifest_view->stop().get(); }); + + manifest_view->start().get(); auto partition = ss::make_shared( manifest_view, imposter.api.local(), From c5eb52d80e7a4833ff03e14bb72502cb7c0373d8 Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Fri, 26 Apr 2024 15:05:59 +0100 Subject: [PATCH 06/13] cloud_storage: search for query from archive start not clean offset Starting the cursor from the clean offset is only required when computing retention because of an implementation detail which is documented in the added comment and referenced commits. In all other cases we must start searching from the archive start offset. This is particularly important for timequeries which must return the first visible batch above the timestamp. --- src/v/cloud_storage/async_manifest_view.cc | 22 +++++++--- src/v/cloud_storage/async_manifest_view.h | 14 ++++++- .../tests/async_manifest_view_test.cc | 11 +++++ .../tests/remote_partition_test.cc | 41 +++++++++++++++---- 4 files changed, 73 insertions(+), 15 deletions(-) diff --git a/src/v/cloud_storage/async_manifest_view.cc b/src/v/cloud_storage/async_manifest_view.cc index 8a723416f0d6..31836150f31c 100644 --- a/src/v/cloud_storage/async_manifest_view.cc +++ b/src/v/cloud_storage/async_manifest_view.cc @@ -44,6 +44,7 @@ #include #include #include +#include #include #include @@ -561,7 +562,8 @@ ss::future<> async_manifest_view::run_bg_loop() { ss::future, error_outcome>> async_manifest_view::get_cursor( async_view_search_query_t query, - std::optional end_inclusive) noexcept { + std::optional end_inclusive, + cursor_base_t cursor_base) noexcept { try { ss::gate::holder h(_gate); if ( @@ -581,7 +583,14 @@ async_manifest_view::get_cursor( if (_stm_manifest.get_archive_start_offset() == model::offset{}) { begin = _stm_manifest.get_start_offset().value_or(begin); } else { - begin = _stm_manifest.get_archive_clean_offset(); + switch (cursor_base) { + case cursor_base_t::archive_start_offset: + begin = _stm_manifest.get_archive_start_offset(); + break; + case cursor_base_t::archive_clean_offset: + begin = _stm_manifest.get_archive_clean_offset(); + break; + } } if (end < begin) { @@ -959,7 +968,8 @@ async_manifest_view::offset_based_retention() noexcept { archive_start_offset_advance result; try { auto boundary = _stm_manifest.get_start_kafka_offset_override(); - auto res = co_await get_cursor(boundary); + auto res = co_await get_cursor( + boundary, std::nullopt, cursor_base_t::archive_clean_offset); if (res.has_failure()) { if (res.error() == error_outcome::out_of_range) { vlog( @@ -1023,7 +1033,8 @@ async_manifest_view::time_based_retention( auto res = co_await get_cursor( _stm_manifest.get_archive_start_offset(), - model::prev_offset(_stm_manifest.get_start_offset().value())); + model::prev_offset(_stm_manifest.get_start_offset().value()), + cursor_base_t::archive_clean_offset); if (res.has_failure()) { if (res.error() == error_outcome::out_of_range) { // The cutoff point is outside of the offset range, no need to @@ -1149,7 +1160,8 @@ async_manifest_view::size_based_retention(size_t size_limit) noexcept { auto res = co_await get_cursor( _stm_manifest.get_archive_clean_offset(), - model::prev_offset(_stm_manifest.get_start_offset().value())); + model::prev_offset(_stm_manifest.get_start_offset().value()), + cursor_base_t::archive_clean_offset); if (res.has_failure()) { vlogl( _ctxlog, diff --git a/src/v/cloud_storage/async_manifest_view.h b/src/v/cloud_storage/async_manifest_view.h index a4f27f414edd..8221a6bcedcb 100644 --- a/src/v/cloud_storage/async_manifest_view.h +++ b/src/v/cloud_storage/async_manifest_view.h @@ -82,6 +82,17 @@ class async_manifest_view { ss::future<> start(); ss::future<> stop(); + enum class cursor_base_t { + archive_start_offset, + + /// Special case that is used when computing retention. + /// + /// For details, see: + /// GitHub: https://github.com/redpanda-data/redpanda/pull/12177 + /// Commit: 1b6ab7be8818e3878a32f9037694ae5c4cf4fea2 + archive_clean_offset, + }; + /// Get active spillover manifests asynchronously /// /// \note the method may hydrate manifests in the cache or @@ -91,7 +102,8 @@ class async_manifest_view { result, error_outcome>> get_cursor( async_view_search_query_t q, - std::optional end_inclusive = std::nullopt) noexcept; + std::optional end_inclusive = std::nullopt, + cursor_base_t cursor_base = cursor_base_t::archive_start_offset) noexcept; /// Get inactive spillover manifests which are waiting for /// retention diff --git a/src/v/cloud_storage/tests/async_manifest_view_test.cc b/src/v/cloud_storage/tests/async_manifest_view_test.cc index cf430d09afb5..3a48c10ccfc5 100644 --- a/src/v/cloud_storage/tests/async_manifest_view_test.cc +++ b/src/v/cloud_storage/tests/async_manifest_view_test.cc @@ -455,8 +455,19 @@ FIXTURE_TEST(test_async_manifest_view_truncate, async_manifest_view_fixture) { model::offset so = model::offset{0}; auto maybe_cursor = view.get_cursor(so).get(); + BOOST_REQUIRE( + maybe_cursor.has_error() + && maybe_cursor.error() == cloud_storage::error_outcome::out_of_range); + // The clean offset should still be accesible such that retention // can operate above it. + maybe_cursor = view + .get_cursor( + so, + std::nullopt, + cloud_storage::async_manifest_view::cursor_base_t:: + archive_clean_offset) + .get(); BOOST_REQUIRE(!maybe_cursor.has_failure()); maybe_cursor = view.get_cursor(new_so).get(); diff --git a/src/v/cloud_storage/tests/remote_partition_test.cc b/src/v/cloud_storage/tests/remote_partition_test.cc index 98e0cc6aa469..7bd90f818110 100644 --- a/src/v/cloud_storage/tests/remote_partition_test.cc +++ b/src/v/cloud_storage/tests/remote_partition_test.cc @@ -438,7 +438,14 @@ FIXTURE_TEST(test_scan_by_kafka_offset_truncated, cloud_storage_fixture) { *this, model::offset(6), model::offset_delta(3), batch_types); print_segments(segments); for (int i = 0; i <= 2; i++) { - BOOST_REQUIRE(check_fetch(*this, kafka::offset(i), false)); + BOOST_REQUIRE_EXCEPTION( + check_fetch(*this, kafka::offset(i), false), + std::runtime_error, + [](const std::runtime_error& e) { + return std::string(e.what()).find( + "Failed to query spillover manifests") + != std::string::npos; + }); } for (int i = 3; i <= 8; i++) { BOOST_REQUIRE(check_scan(*this, kafka::offset(i), 9 - i)); @@ -490,7 +497,14 @@ FIXTURE_TEST( auto segments = setup_s3_imposter( *this, model::offset(6), model::offset_delta(3), batch_types); print_segments(segments); - BOOST_REQUIRE(check_fetch(*this, kafka::offset(2), false)); + BOOST_REQUIRE_EXCEPTION( + check_fetch(*this, kafka::offset(2), false), + std::runtime_error, + [](const std::runtime_error& e) { + return std::string(e.what()).find( + "Failed to query spillover manifests") + != std::string::npos; + }); BOOST_REQUIRE(check_scan(*this, kafka::offset(3), 1)); BOOST_REQUIRE(check_fetch(*this, kafka::offset(3), true)); BOOST_REQUIRE(check_scan(*this, kafka::offset(4), 0)); @@ -538,7 +552,14 @@ FIXTURE_TEST( *this, model::offset(6), model::offset_delta(0), batch_types); print_segments(segments); for (int i = 0; i < 6; i++) { - BOOST_REQUIRE(check_fetch(*this, kafka::offset(i), false)); + BOOST_REQUIRE_EXCEPTION( + check_fetch(*this, kafka::offset(i), false), + std::runtime_error, + [](const std::runtime_error& e) { + return std::string(e.what()).find( + "Failed to query spillover manifests") + != std::string::npos; + }); } BOOST_REQUIRE(check_scan(*this, kafka::offset(6), 1)); BOOST_REQUIRE(check_fetch(*this, kafka::offset(6), true)); @@ -1243,12 +1264,14 @@ FIXTURE_TEST( vlog(test_log.debug, "Creating new reader {}", reader_config); // After truncation reading from the old end should be impossible - auto reader = partition->make_reader(reader_config).get().reader; - auto headers_read - = reader.consume(counting_batch_consumer(100), model::no_timeout) - .get(); - - BOOST_REQUIRE(headers_read.size() == 0); + BOOST_REQUIRE_EXCEPTION( + partition->make_reader(reader_config).get(), + std::runtime_error, + [](const std::runtime_error& e) { + return std::string(e.what()).find( + "Failed to query spillover manifests") + != std::string::npos; + }); } } From 680a67e5e644d5ddc65c6808bec3a50e0048dcda Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Fri, 26 Apr 2024 15:10:28 +0100 Subject: [PATCH 07/13] cloud_storage: throw on out of range timequery with spillover When reading from tiered storage, we create a `async_manifest_view_cursor` using a query (offset or timestamp) and a begin offset which is set to the start of the stm region or start of archive (spillover). There is a bug inside `async_manifest_view_cursor` which causes it to throw out_of_range error when spillover contains data which is logically prefix truncated but matches the timequery. This is mainly because the begin offset is not properly propagated together with the query which makes it possible for the query to match a spillover manifest which is below the begin offset. In this commit, we remove the logic to ignore the out of range error and propagate it to the caller. In a later commit, we will revisit the code so that this edge cases is handled correctly inside the async_manifest_view and it does seek to the correct offset rather than throwing an out of range exception up to the client. --- src/v/cloud_storage/remote_partition.cc | 21 ++++++------ .../tests/remote_partition_test.cc | 32 +++++++++++++++++-- 2 files changed, 38 insertions(+), 15 deletions(-) diff --git a/src/v/cloud_storage/remote_partition.cc b/src/v/cloud_storage/remote_partition.cc index a3c0cc7683ba..02964280c723 100644 --- a/src/v/cloud_storage/remote_partition.cc +++ b/src/v/cloud_storage/remote_partition.cc @@ -20,6 +20,7 @@ #include "cloud_storage/tx_range_manifest.h" #include "cloud_storage/types.h" #include "model/fundamental.h" +#include "model/timestamp.h" #include "net/connection.h" #include "ssx/future-util.h" #include "ssx/watchdog.h" @@ -605,27 +606,23 @@ class partition_record_batch_reader_impl final [&](model::timestamp query_ts) { // Special case, it can happen when a timequery falls below // the clean offset. Caused when the query races with - // retention/gc. log a warning, since the kafka client can - // handle a failed query + // retention/gc. auto const& spillovers = _partition->_manifest_view ->stm_manifest() .get_spillover_map(); - if ( - spillovers.empty() - || spillovers.get_max_timestamp_column() - .last_value() - .value_or(model::timestamp::max()()) - >= query_ts()) { + + bool timestamp_inside_spillover + = query_ts() <= spillovers.get_max_timestamp_column() + .last_value() + .value_or(model::timestamp::min()()); + + if (timestamp_inside_spillover) { vlog( _ctxlog.debug, "Manifest query raced with retention and the result " "is below the clean/start offset for {}", query_ts); - return true; } - - // query was not meant for archive region. fallthrough and - // log an error return false; })) { // error was handled diff --git a/src/v/cloud_storage/tests/remote_partition_test.cc b/src/v/cloud_storage/tests/remote_partition_test.cc index 7bd90f818110..caba7212298a 100644 --- a/src/v/cloud_storage/tests/remote_partition_test.cc +++ b/src/v/cloud_storage/tests/remote_partition_test.cc @@ -2417,9 +2417,17 @@ FIXTURE_TEST(test_out_of_range_spillover_query, cloud_storage_fixture) { return what.find("Failed to query spillover manifests") != what.npos; }); - // BUG: This assertion is disabled because it currently fails. - // test_log.debug("Timestamp undershoots the partition"); - // BOOST_TEST_REQUIRE(timequery(*this, model::timestamp(100), 3 * 6)); + // Timestamp undershoots the partition. + // It shouldn't throw. But as an interim workaround we throw until + // clean offset catches up with start offset. + // The clients will retry. + BOOST_REQUIRE_EXCEPTION( + timequery(*this, model::timestamp(100), 3 * 6), + std::runtime_error, + [](const auto& ex) { + ss::sstring what{ex.what()}; + return what.find("Failed to query spillover manifests") != what.npos; + }); test_log.debug("Timestamp within valid spillover but below archive start"); BOOST_TEST_REQUIRE( @@ -2435,4 +2443,22 @@ FIXTURE_TEST(test_out_of_range_spillover_query, cloud_storage_fixture) { test_log.debug("Timestamp overshoots the partition"); BOOST_TEST_REQUIRE(timequery(*this, model::timestamp::max(), 0)); + + // Rewrite the manifest with clean offset to match start offset. + manifest.set_archive_clean_offset( + archive_start, manifest.archive_size_bytes() / 2); + vlog( + test_util_log.info, + "Rewriting manifest at {}:\n{}", + manifest.get_manifest_path(), + ostr.str()); + + remove_expectations({manifest_url}); + add_expectations({ + cloud_storage_fixture::expectation{ + .url = manifest_url, .body = serialize_manifest(manifest)}, + }); + + // This query should now succeed. + BOOST_TEST_REQUIRE(timequery(*this, model::timestamp(100), 3 * 6)); } From 3a9058ab1fc0c475e89a2408325e3c1b835c0ebd Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Fri, 26 Apr 2024 15:11:46 +0100 Subject: [PATCH 08/13] cloud_storage: simplify the out_of_range logging code No functional changes. --- src/v/cloud_storage/remote_partition.cc | 100 ++++++++++++------------ 1 file changed, 49 insertions(+), 51 deletions(-) diff --git a/src/v/cloud_storage/remote_partition.cc b/src/v/cloud_storage/remote_partition.cc index 02964280c723..94fceafbf400 100644 --- a/src/v/cloud_storage/remote_partition.cc +++ b/src/v/cloud_storage/remote_partition.cc @@ -576,57 +576,55 @@ class partition_record_batch_reader_impl final // exception and let the caller deal with it. If the caller doesn't // handle it it leads to a closed kafka connection which the // end clients retry. - if ( - cur.error() == error_outcome::out_of_range - && ss::visit( - query, - [&](model::offset) { - vassert( - false, - "Unreachable code. Remote partition doesn't know how to " - "handle model::offset queries."); - return false; - }, - [&](kafka::offset query_offset) { - // Bug or retention racing with the query. - const auto log_start_offset - = _partition->_manifest_view->stm_manifest() - .full_log_start_kafka_offset(); - - if (log_start_offset && query_offset < *log_start_offset) { - vlog( - _ctxlog.warn, - "Manifest query below the log's start Kafka offset: " - "{} < {}", - query_offset(), - log_start_offset.value()()); - } - return false; - }, - [&](model::timestamp query_ts) { - // Special case, it can happen when a timequery falls below - // the clean offset. Caused when the query races with - // retention/gc. - auto const& spillovers = _partition->_manifest_view - ->stm_manifest() - .get_spillover_map(); - - bool timestamp_inside_spillover - = query_ts() <= spillovers.get_max_timestamp_column() - .last_value() - .value_or(model::timestamp::min()()); - - if (timestamp_inside_spillover) { - vlog( - _ctxlog.debug, - "Manifest query raced with retention and the result " - "is below the clean/start offset for {}", - query_ts); - } - return false; - })) { - // error was handled - co_return; + if (cur.error() == error_outcome::out_of_range) { + ss::visit( + query, + [&](model::offset) { + vassert( + false, + "Unreachable code. Remote partition doesn't know how " + "to " + "handle model::offset queries."); + }, + [&](kafka::offset query_offset) { + // Bug or retention racing with the query. + const auto log_start_offset + = _partition->_manifest_view->stm_manifest() + .full_log_start_kafka_offset(); + + if ( + log_start_offset && query_offset < *log_start_offset) { + vlog( + _ctxlog.warn, + "Manifest query below the log's start Kafka " + "offset: " + "{} < {}", + query_offset(), + log_start_offset.value()()); + } + }, + [&](model::timestamp query_ts) { + // Special case, it can happen when a timequery falls + // below the clean offset. Caused when the query races + // with retention/gc. + auto const& spillovers = _partition->_manifest_view + ->stm_manifest() + .get_spillover_map(); + + bool timestamp_inside_spillover + = query_ts() <= spillovers.get_max_timestamp_column() + .last_value() + .value_or(model::timestamp::min()()); + + if (timestamp_inside_spillover) { + vlog( + _ctxlog.debug, + "Manifest query raced with retention and the " + "result " + "is below the clean/start offset for {}", + query_ts); + } + }); } throw std::runtime_error(fmt::format( From 0735bdfdabf8dde7b95303e9b9b28d8851d9eb9e Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Tue, 30 Apr 2024 16:49:11 +0100 Subject: [PATCH 09/13] cloud_storage: carry min/max offset bounds into cloud storage timequery Tiered Storage physically has a superset of the addressable data. This can be caused at least by the following: a) trim-prefix, b) retention being applied but garbage collection not finishing yet. For offset queries this isn't problematic because the bounds can be applied at higher level. In particular, partition object does validate that offset is in range before passing control to the remote partition. For timequeries prior to this commit such bounds were not enforced leading to a bug where cloud storage would return an offset -1 (no data found) in result when there actually was data or returning a wrong offset. Wrong offset: it would be returned because reads could have started prior to the partition visible/addressable offset. E.g. after retention was applied but before GC was run. Or, after a trim-prefix with an offset which falls in a middle of a batch. Missing offset: would be returned when the higher level reader was created with visible/addressable partition offset bounds, say \[1000, 1200\] but cloud storage would find the offset in a manifest with bounds \[300, 400\] leading to an out of range error which used to be ignored. --- src/v/cloud_storage/async_manifest_view.cc | 127 +++++++++++++----- src/v/cloud_storage/async_manifest_view.h | 25 +++- src/v/cloud_storage/remote_partition.cc | 14 +- .../tests/async_manifest_view_test.cc | 16 ++- tests/rptest/tests/timequery_test.py | 98 +++++++++++++- 5 files changed, 227 insertions(+), 53 deletions(-) diff --git a/src/v/cloud_storage/async_manifest_view.cc b/src/v/cloud_storage/async_manifest_view.cc index 31836150f31c..e1c22232b04d 100644 --- a/src/v/cloud_storage/async_manifest_view.cc +++ b/src/v/cloud_storage/async_manifest_view.cc @@ -64,7 +64,9 @@ static ss::sstring to_string(const async_view_search_query_t& t) { t, [&](model::offset ro) { return ssx::sformat("[offset: {}]", ro); }, [&](kafka::offset ko) { return ssx::sformat("[kafka offset: {}]", ko); }, - [&](model::timestamp ts) { return ssx::sformat("[timestamp: {}]", ts); }); + [&](const async_view_timestamp_query& ts) { + return ssx::sformat("{}", ts); + }); } std::ostream& operator<<(std::ostream& s, const async_view_search_query_t& q) { @@ -84,9 +86,27 @@ contains(const partition_manifest& m, const async_view_search_query_t& query) { return k >= m.get_start_kafka_offset() && k < m.get_next_kafka_offset(); }, - [&](model::timestamp t) { - return m.size() > 0 && t >= m.begin()->base_timestamp - && t <= m.last_segment()->max_timestamp; + [&](const async_view_timestamp_query& ts_query) { + if (m.size() == 0) { + return false; + } + + auto kafka_start_offset = m.get_start_kafka_offset(); + if (!kafka_start_offset.has_value()) { + return false; + } + + auto kafka_last_offset = m.get_last_kafka_offset(); + if (!kafka_last_offset.has_value()) { + return false; + } + + auto range_overlaps = ts_query.min_offset <= kafka_last_offset.value() + && ts_query.max_offset + >= kafka_start_offset.value(); + + return range_overlaps && ts_query.ts >= m.begin()->base_timestamp + && ts_query.ts <= m.last_segment()->max_timestamp; }); } @@ -566,15 +586,7 @@ async_manifest_view::get_cursor( cursor_base_t cursor_base) noexcept { try { ss::gate::holder h(_gate); - if ( - !in_archive(query) && !in_stm(query) - && !std::holds_alternative(query)) { - // The view should contain manifest below archive start in - // order to be able to perform retention and advance metadata. - vlog( - _ctxlog.debug, - "query {} is out of valid range", - to_string(query)); + if (!in_archive(query) && !in_stm(query)) { co_return error_outcome::out_of_range; } model::offset begin; @@ -853,7 +865,7 @@ bool async_manifest_view::in_archive(async_view_search_query_t o) { && ko < _stm_manifest.get_start_kafka_offset().value_or( kafka::offset::min()); }, - [this](model::timestamp ts) { + [this](async_view_timestamp_query ts_query) { // The condition for timequery is tricky. With offsets there is a // clear pivot point. The start_offset of the STM manifest separates // the STM region from the archive. With timestamps it's not as @@ -861,8 +873,18 @@ bool async_manifest_view::in_archive(async_view_search_query_t o) { // and the first segment in the STM manifest. We need in_stm and // in_archive to be consistent with each other. To do this we can use // last timestamp in the archive as a pivot point. - return _stm_manifest.get_spillover_map().last_segment()->max_timestamp - >= ts; + bool range_overlaps + = ts_query.min_offset + < _stm_manifest.get_start_kafka_offset().value_or( + kafka::offset::min()) + && ts_query.max_offset + >= _stm_manifest.get_archive_start_kafka_offset(); + + return range_overlaps + && _stm_manifest.get_spillover_map() + .last_segment() + ->max_timestamp + >= ts_query.ts; }); } @@ -879,8 +901,9 @@ bool async_manifest_view::in_stm(async_view_search_query_t o) { kafka::offset::max()); return ko >= sko; }, - [this](model::timestamp ts) { - vlog(_ctxlog.debug, "Checking timestamp {} using timequery", ts); + [this](async_view_timestamp_query ts_query) { + vlog( + _ctxlog.debug, "Checking timestamp {} using timequery", ts_query); if (_stm_manifest.get_spillover_map().empty()) { // The STM manifest is empty, so the timestamp has to be directed // to the STM manifest. @@ -891,8 +914,19 @@ bool async_manifest_view::in_stm(async_view_search_query_t o) { } // The last timestamp in the archive is used as a pivot point. See // description in in_archive. - return _stm_manifest.get_spillover_map().last_segment()->max_timestamp - < ts; + bool range_overlaps + = ts_query.min_offset + <= _stm_manifest.get_last_kafka_offset().value_or( + kafka::offset::min()) + && ts_query.max_offset + >= _stm_manifest.get_start_kafka_offset().value_or( + kafka::offset::max()); + + return range_overlaps + && _stm_manifest.get_spillover_map() + .last_segment() + ->max_timestamp + < ts_query.ts; }); } @@ -1312,7 +1346,7 @@ async_manifest_view::get_materialized_manifest( } // query in not in the stm region if ( - std::holds_alternative(q) + std::holds_alternative(q) && _stm_manifest.get_archive_start_offset() == model::offset{}) { vlog(_ctxlog.debug, "Using STM manifest for timequery {}", q); co_return std::ref(_stm_manifest); @@ -1472,7 +1506,7 @@ std::optional async_manifest_view::search_spillover_manifests( } return -1; }, - [&](model::timestamp t) { + [&](const async_view_timestamp_query& ts_query) { if (manifests.empty()) { return -1; } @@ -1482,35 +1516,56 @@ std::optional async_manifest_view::search_spillover_manifests( "{}, last: {}", query, manifests.size(), - manifests.begin()->base_timestamp, - manifests.last_segment()->max_timestamp); + *manifests.begin(), + *manifests.last_segment()); - auto first_manifest = manifests.begin(); - auto base_t = first_manifest->base_timestamp; auto max_t = manifests.last_segment()->max_timestamp; // Edge cases - if (t < base_t || max_t == base_t) { - return 0; - } else if (t > max_t) { + if (ts_query.ts > max_t) { return -1; } + const auto& bo_col = manifests.get_base_offset_column(); + const auto& co_col = manifests.get_committed_offset_column(); + const auto& do_col = manifests.get_delta_offset_column(); + const auto& de_col = manifests.get_delta_offset_end_column(); const auto& bt_col = manifests.get_base_timestamp_column(); const auto& mt_col = manifests.get_max_timestamp_column(); - auto mt_it = mt_col.begin(); - auto bt_it = bt_col.begin(); + + auto bo_it = bo_col.begin(); + auto co_it = co_col.begin(); + auto do_it = do_col.begin(); + auto de_it = de_col.begin(); + auto max_ts_it = mt_col.begin(); + auto base_ts_it = bt_col.begin(); + int target_ix = -1; - while (!bt_it.is_end()) { - if (*mt_it >= t.value() || *bt_it > t.value()) { + while (!base_ts_it.is_end()) { + static constexpr int64_t min_delta = model::offset::min()(); + auto d_begin = *do_it == min_delta ? 0 : *do_it; + auto d_end = *de_it == min_delta ? d_begin : *de_it; + auto bko = kafka::offset(*bo_it - d_begin); + auto cko = kafka::offset(*co_it - d_end); + + auto range_overlaps = ts_query.min_offset <= cko + && ts_query.max_offset >= bko; + + if ( + range_overlaps + && (*max_ts_it >= ts_query.ts() || *base_ts_it > ts_query.ts())) { // Handle case when we're overshooting the target // (base_timestamp > t) or the case when the target is in the // middle of the manifest (max_timestamp >= t) - target_ix = static_cast(bt_it.index()); + target_ix = static_cast(base_ts_it.index()); break; } - ++bt_it; - ++mt_it; + ++bo_it; + ++co_it; + ++do_it; + ++de_it; + ++base_ts_it; + ++max_ts_it; } return target_ix; }); diff --git a/src/v/cloud_storage/async_manifest_view.h b/src/v/cloud_storage/async_manifest_view.h index 8221a6bcedcb..cf3e59930ce3 100644 --- a/src/v/cloud_storage/async_manifest_view.h +++ b/src/v/cloud_storage/async_manifest_view.h @@ -39,9 +39,32 @@ namespace cloud_storage { +struct async_view_timestamp_query { + async_view_timestamp_query( + kafka::offset min_offset, model::timestamp ts, kafka::offset max_offset) + : min_offset(min_offset) + , ts(ts) + , max_offset(max_offset) {} + + friend std::ostream& + operator<<(std::ostream& o, const async_view_timestamp_query& q) { + fmt::print( + o, + "async_view_timestamp_query{{min_offset:{}, ts:{}, max_offset:{}}}", + q.min_offset, + q.ts, + q.max_offset); + return o; + } + + kafka::offset min_offset; + model::timestamp ts; + kafka::offset max_offset; +}; + /// Search query type using async_view_search_query_t - = std::variant; + = std::variant; std::ostream& operator<<(std::ostream&, const async_view_search_query_t&); diff --git a/src/v/cloud_storage/remote_partition.cc b/src/v/cloud_storage/remote_partition.cc index 94fceafbf400..e790cc1d76d6 100644 --- a/src/v/cloud_storage/remote_partition.cc +++ b/src/v/cloud_storage/remote_partition.cc @@ -553,7 +553,10 @@ class partition_record_batch_reader_impl final async_view_search_query_t query; if (config.first_timestamp.has_value()) { - query = config.first_timestamp.value(); + query = async_view_timestamp_query( + model::offset_cast(config.start_offset), + config.first_timestamp.value(), + model::offset_cast(config.max_offset)); } else { // NOTE: config.start_offset actually contains kafka offset // stored using model::offset type. @@ -603,7 +606,7 @@ class partition_record_batch_reader_impl final log_start_offset.value()()); } }, - [&](model::timestamp query_ts) { + [&](const async_view_timestamp_query& query_ts) { // Special case, it can happen when a timequery falls // below the clean offset. Caused when the query races // with retention/gc. @@ -612,9 +615,10 @@ class partition_record_batch_reader_impl final .get_spillover_map(); bool timestamp_inside_spillover - = query_ts() <= spillovers.get_max_timestamp_column() - .last_value() - .value_or(model::timestamp::min()()); + = query_ts.ts() + <= spillovers.get_max_timestamp_column() + .last_value() + .value_or(model::timestamp::min()()); if (timestamp_inside_spillover) { vlog( diff --git a/src/v/cloud_storage/tests/async_manifest_view_test.cc b/src/v/cloud_storage/tests/async_manifest_view_test.cc index 3a48c10ccfc5..f82837218379 100644 --- a/src/v/cloud_storage/tests/async_manifest_view_test.cc +++ b/src/v/cloud_storage/tests/async_manifest_view_test.cc @@ -1117,7 +1117,8 @@ FIXTURE_TEST(test_async_manifest_view_timequery, async_manifest_view_fixture) { // Find exact matches for all segments for (const auto& meta : expected) { - auto target = meta.base_timestamp; + auto target = async_view_timestamp_query( + kafka::offset(0), meta.base_timestamp, kafka::offset::max()); auto maybe_cursor = view.get_cursor(target).get(); BOOST_REQUIRE(!maybe_cursor.has_failure()); auto cursor = std::move(maybe_cursor.value()); @@ -1130,9 +1131,9 @@ FIXTURE_TEST(test_async_manifest_view_timequery, async_manifest_view_fixture) { m.last_segment()->max_timestamp, stm_manifest.begin()->base_timestamp, stm_manifest.last_segment()->max_timestamp); - auto res = m.timequery(target); + auto res = m.timequery(target.ts); BOOST_REQUIRE(res.has_value()); - BOOST_REQUIRE(res.value().base_timestamp == target); + BOOST_REQUIRE(res.value().base_timestamp == target.ts); }) .get(); } @@ -1159,7 +1160,10 @@ FIXTURE_TEST( // that there is a gap between any two segments. for (const auto& meta : expected) { - auto target = model::timestamp(meta.base_timestamp.value() - 1); + auto target = async_view_timestamp_query( + kafka::offset(0), + model::timestamp(meta.base_timestamp() - 1), + kafka::offset::max()); auto maybe_cursor = view.get_cursor(target).get(); BOOST_REQUIRE(!maybe_cursor.has_failure()); auto cursor = std::move(maybe_cursor.value()); @@ -1173,11 +1177,11 @@ FIXTURE_TEST( m.last_segment()->max_timestamp, stm_manifest.begin()->base_timestamp, stm_manifest.last_segment()->max_timestamp); - auto res = m.timequery(target); + auto res = m.timequery(target.ts); BOOST_REQUIRE(res.has_value()); BOOST_REQUIRE( model::timestamp(res.value().base_timestamp.value() - 1) - == target); + == target.ts); }) .get(); } diff --git a/tests/rptest/tests/timequery_test.py b/tests/rptest/tests/timequery_test.py index 456619d8a846..c7e8c9297536 100644 --- a/tests/rptest/tests/timequery_test.py +++ b/tests/rptest/tests/timequery_test.py @@ -9,7 +9,6 @@ import concurrent.futures import re -import time import threading from logging import Logger from typing import Callable @@ -22,9 +21,8 @@ from rptest.clients.types import TopicSpec from rptest.clients.rpk import RpkTool from rptest.clients.kafka_cat import KafkaCat -from rptest.clients.rpk_remote import RpkRemoteTool -from rptest.util import (wait_until, segments_count, - wait_for_local_storage_truncate) +from rptest.util import (wait_until, wait_for_local_storage_truncate, + wait_until_result) from rptest.services.kgo_verifier_services import KgoVerifierProducer from rptest.utils.si_utils import BucketView, NTP @@ -38,7 +36,6 @@ from kafkatest.version import V_3_0_0 from ducktape.tests.test import Test from rptest.clients.default import DefaultClient -from rptest.utils.mode_checks import skip_debug_mode class BaseTimeQuery: @@ -518,6 +515,97 @@ def test_timequery_with_trim_prefix(self, cloud_storage: bool, offset = kcat.query_offset(topic.name, 0, timestamps[0] - 1000) assert offset == msg_count - 1, f"Expected {msg_count - 1}, got {offset}" + @cluster( + num_nodes=4, + log_allow_list=["Failed to upload spillover manifest {timed_out}"]) + def test_timequery_with_spillover_gc_delayed(self): + self.set_up_cluster(cloud_storage=True, + batch_cache=False, + spillover=True) + total_segments = 16 + record_size = 1024 + base_ts = 1664453149000 + msg_count = (self.log_segment_size * total_segments) // record_size + local_retention = self.log_segment_size * 4 + topic_retention = self.log_segment_size * 8 + topic, timestamps = self._create_and_produce(self.redpanda, True, + local_retention, base_ts, + record_size, msg_count) + + # Confirm messages written + rpk = RpkTool(self.redpanda) + p = next(rpk.describe_topic(topic.name)) + assert p.high_watermark == msg_count + + # If using cloud storage, we must wait for some segments + # to fall out of local storage, to ensure we are really + # hitting the cloud storage read path when querying. + wait_for_local_storage_truncate(redpanda=self.redpanda, + topic=topic.name, + target_bytes=local_retention) + + # Set timeout to 0 to prevent the cloud storage housekeeping from + # running, triggering gc, and advancing clean offset. + self.redpanda.set_cluster_config( + {"cloud_storage_manifest_upload_timeout_ms": 0}) + # Disable internal scrubbing as it won't be able to make progress. + self.si_settings.skip_end_of_test_scrubbing = True + + self.client().alter_topic_config(topic.name, 'retention.bytes', + topic_retention) + self.logger.info("Waiting for start offset to advance...") + start_offset = wait_until_result( + lambda: next(rpk.describe_topic(topic.name)).start_offset > 0, + timeout_sec=120, + backoff_sec=5, + err_msg="Start offset did not advance") + + start_offset = next(rpk.describe_topic(topic.name)).start_offset + + # Query below valid timestamps the offset of the first message. + kcat = KafkaCat(self.redpanda) + + test_cases = [ + (timestamps[0] - 1000, start_offset, "before start of log"), + (timestamps[0], start_offset, + "first message but out of retention now"), + (timestamps[start_offset - 1], start_offset, + "before new HWM, out of retention"), + (timestamps[start_offset], start_offset, "new HWM"), + (timestamps[start_offset + 10], start_offset + 10, + "few messages after new HWM"), + (timestamps[msg_count - 1] + 1000, -1, "after last message"), + ] + + # Basic time query cases. + for ts, expected_offset, desc in test_cases: + self.logger.info(f"Querying ts={ts} ({desc})") + offset = kcat.query_offset(topic.name, 0, ts) + self.logger.info(f"Time query returned offset {offset}") + assert offset == expected_offset, f"Expected {expected_offset}, got {offset}" + + # Now check every single one of them to make sure there are no + # off-by-one errors, iterators aren't getting stuck on segment and + # spillover boundaries, etc. The segment boundaries are not exact + # due to internal messages, segment roll logic, etc. but the tolerance + # should cover that. + boundary_ranges = [] + for i in range(1, total_segments): + boundary_ranges.append( + (int(i * self.log_segment_size / record_size - 100), + int(i * self.log_segment_size / record_size + 100))) + + for r in boundary_ranges: + self.logger.debug(f"Checking range {r}") + for o in range(int(r[0]), int(r[1])): + ts = timestamps[o] + self.logger.debug(f" Querying ts={ts}") + offset = kcat.query_offset(topic.name, 0, ts) + if o < start_offset: + assert offset == start_offset, f"Expected {start_offset}, got {offset}" + else: + assert offset == o, f"Expected {o}, got {offset}" + class TimeQueryKafkaTest(Test, BaseTimeQuery): """ From 9846ed93e5468b7655c55e5c376fb21c03bf1bb8 Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Tue, 30 Apr 2024 16:49:46 +0100 Subject: [PATCH 10/13] kafka: remove timequery retry hack from replicated partition It is not required anymore because we carry the actual partition start and end to the lowest layer of the systems. --- src/v/kafka/server/replicated_partition.cc | 39 +--------------------- 1 file changed, 1 insertion(+), 38 deletions(-) diff --git a/src/v/kafka/server/replicated_partition.cc b/src/v/kafka/server/replicated_partition.cc index 1548ea10ad53..4a089386ac48 100644 --- a/src/v/kafka/server/replicated_partition.cc +++ b/src/v/kafka/server/replicated_partition.cc @@ -320,44 +320,7 @@ ss::future> replicated_partition::timequery(storage::timequery_config cfg) { // cluster::partition::timequery returns a result in Kafka offsets, // no further offset translation is required here. - auto res = co_await _partition->timequery(cfg); - if (!res.has_value()) { - co_return std::nullopt; - } - const auto kafka_start_override = _partition->kafka_start_offset_override(); - if ( - !kafka_start_override.has_value() - || kafka_start_override.value() <= res.value().offset) { - // The start override doesn't affect the result of the timequery. - co_return res; - } - vlog( - klog.debug, - "{} timequery result {} clamped by start override, fetching result at " - "start {}", - ntp(), - res->offset, - kafka_start_override.value()); - storage::log_reader_config config( - kafka_start_override.value(), - cfg.max_offset, - 0, - 2048, // We just need one record batch - cfg.prio, - cfg.type_filter, - std::nullopt, // No timestamp, just use the offset - cfg.abort_source); - auto translating_reader = co_await make_reader(config, std::nullopt); - auto ot_state = std::move(translating_reader.ot_state); - model::record_batch_reader::storage_t data - = co_await model::consume_reader_to_memory( - std::move(translating_reader.reader), model::no_timeout); - auto& batches = std::get(data); - if (batches.empty()) { - co_return std::nullopt; - } - co_return storage::batch_timequery( - *(batches.begin()), cfg.min_offset, cfg.time, cfg.max_offset); + return _partition->timequery(cfg); } ss::future> replicated_partition::replicate( From 5ae5fcd91b4c1ae3dfa0da8e9db0748a6c797e2b Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Wed, 15 May 2024 21:11:09 +0100 Subject: [PATCH 11/13] cloud_storage: add additional timequery tests These cover more edge cases and highlight better an existing bug with the timequery in which start offset for timequery is ignored or handled inconsistently. See added code comments starting with "BUG:". --- .../tests/remote_partition_test.cc | 62 +++++++++++++++---- src/v/cloud_storage/tests/util.cc | 3 +- src/v/cloud_storage/tests/util.h | 1 + 3 files changed, 53 insertions(+), 13 deletions(-) diff --git a/src/v/cloud_storage/tests/remote_partition_test.cc b/src/v/cloud_storage/tests/remote_partition_test.cc index caba7212298a..cc0440b8acf1 100644 --- a/src/v/cloud_storage/tests/remote_partition_test.cc +++ b/src/v/cloud_storage/tests/remote_partition_test.cc @@ -2128,9 +2128,11 @@ FIXTURE_TEST(test_stale_reader, cloud_storage_fixture) { // Returns true if a kafka::offset scan returns the expected number of records. bool timequery( cloud_storage_fixture& fixture, + model::offset min, model::timestamp tm, int expected_num_records) { - auto scan_res = scan_remote_partition(fixture, tm); + auto scan_res = scan_remote_partition( + fixture, min, tm, model::offset::max()); int num_data_records = 0; size_t bytes_read_acc = 0; for (const auto& hdr : scan_res.headers) { @@ -2178,6 +2180,14 @@ bool timequery( return ret; } +// Returns true if a kafka::offset scan returns the expected number of records. +bool timequery( + cloud_storage_fixture& fixture, + model::timestamp tm, + int expected_num_records) { + return timequery(fixture, model::offset(0), tm, expected_num_records); +} + FIXTURE_TEST(test_scan_by_timestamp, cloud_storage_fixture) { // Build cloud partition with provided timestamps and query // it using the timequery. @@ -2397,6 +2407,10 @@ FIXTURE_TEST(test_out_of_range_spillover_query, cloud_storage_fixture) { BOOST_REQUIRE( scan_remote_partition(*this, archive_start, max).size() == 3 * 6); + // Can timequery from start of the archive. + BOOST_TEST_REQUIRE( + timequery(*this, archive_start, model::timestamp(100), 3 * 6)); + // Can't query from the start of partition. BOOST_REQUIRE_EXCEPTION( scan_remote_partition(*this, base, max), @@ -2406,29 +2420,41 @@ FIXTURE_TEST(test_out_of_range_spillover_query, cloud_storage_fixture) { return what.find("Failed to query spillover manifests") != what.npos; }); - // Can't query from start of the still valid spillover manifest. - // Since we don't rewrite spillover manifests we want to be sure that - // we don't allow querying stale segments (below the start offset). + // Can't timequery from the base offset. BOOST_REQUIRE_EXCEPTION( - scan_remote_partition(*this, segments[2].base_offset, max), + timequery(*this, base, model::timestamp(100), 3 * 6), std::runtime_error, [](const auto& ex) { ss::sstring what{ex.what()}; return what.find("Failed to query spillover manifests") != what.npos; }); - // Timestamp undershoots the partition. - // It shouldn't throw. But as an interim workaround we throw until - // clean offset catches up with start offset. - // The clients will retry. + // Can't query from start of the still valid spillover manifest. + // Since we don't rewrite spillover manifests we want to be sure that + // we don't allow querying stale segments (below the start offset). BOOST_REQUIRE_EXCEPTION( - timequery(*this, model::timestamp(100), 3 * 6), + scan_remote_partition(*this, segments[2].base_offset, max), std::runtime_error, [](const auto& ex) { ss::sstring what{ex.what()}; return what.find("Failed to query spillover manifests") != what.npos; }); + // Can't query from start of the still valid spillover manifest. + // Since we don't rewrite spillover manifests we want to be sure that + // we don't allow querying stale segments (below the start offset). + // BUG: Currently it succeeds. This is a bug and should be fixed. + // BOOST_REQUIRE_EXCEPTION( + // timequery(*this, segments[2].base_offset, model::timestamp(100), 3 * + // 6), std::runtime_error, + // [](const auto& ex) { + // ss::sstring what{ex.what()}; + // return what.find("Failed to query spillover manifests") != + // what.npos; + // }); + BOOST_TEST_REQUIRE( + timequery(*this, segments[2].base_offset, model::timestamp(100), 3 * 6)); + test_log.debug("Timestamp within valid spillover but below archive start"); BOOST_TEST_REQUIRE( timequery(*this, segments[2].base_timestamp.value(), 3 * 6)); @@ -2459,6 +2485,18 @@ FIXTURE_TEST(test_out_of_range_spillover_query, cloud_storage_fixture) { .url = manifest_url, .body = serialize_manifest(manifest)}, }); - // This query should now succeed. - BOOST_TEST_REQUIRE(timequery(*this, model::timestamp(100), 3 * 6)); + // Still can't query from the base offset. + BOOST_REQUIRE_EXCEPTION( + scan_remote_partition(*this, base, max), + std::runtime_error, + [](const auto& ex) { + ss::sstring what{ex.what()}; + return what.find("Failed to query spillover manifests") != what.npos; + }); + + // Timequery from base offset must fail too as the regular query. + // BUG: Currently it succeeds. This is a bug and should be fixed. + BOOST_TEST_REQUIRE(timequery(*this, base, model::timestamp(100), 3 * 6)); + BOOST_TEST_REQUIRE( + timequery(*this, segments[2].base_offset, model::timestamp(100), 3 * 6)); } diff --git a/src/v/cloud_storage/tests/util.cc b/src/v/cloud_storage/tests/util.cc index eec0925e507e..1c6171a685d2 100644 --- a/src/v/cloud_storage/tests/util.cc +++ b/src/v/cloud_storage/tests/util.cc @@ -769,6 +769,7 @@ std::vector scan_remote_partition( /// Similar to previous function but uses timequery to start the scan scan_result scan_remote_partition( cloud_storage_fixture& imposter, + model::offset min, model::timestamp timestamp, model::offset max, size_t maybe_max_segments, @@ -793,7 +794,7 @@ scan_result scan_remote_partition( } auto manifest = hydrate_manifest(imposter.api.local(), bucket); storage::log_reader_config reader_config( - model::offset(0), max, ss::default_priority_class()); + min, max, ss::default_priority_class()); reader_config.first_timestamp = timestamp; partition_probe probe(manifest.get_ntp()); diff --git a/src/v/cloud_storage/tests/util.h b/src/v/cloud_storage/tests/util.h index a247b1e74e94..8c85bc341c75 100644 --- a/src/v/cloud_storage/tests/util.h +++ b/src/v/cloud_storage/tests/util.h @@ -200,6 +200,7 @@ struct scan_result { /// Similar to prev function but uses timequery scan_result scan_remote_partition( cloud_storage_fixture& imposter, + model::offset min, model::timestamp timestamp, model::offset max = model::offset::max(), size_t maybe_max_segments = 0, From 943aa52273ae85a7dc37d55eff1ec768a9ef122e Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Wed, 15 May 2024 21:32:16 +0100 Subject: [PATCH 12/13] cloud_storage: reword code comment for in_stm with timequery I believe this makes it clearer. --- src/v/cloud_storage/async_manifest_view.cc | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/v/cloud_storage/async_manifest_view.cc b/src/v/cloud_storage/async_manifest_view.cc index e1c22232b04d..c088da306a33 100644 --- a/src/v/cloud_storage/async_manifest_view.cc +++ b/src/v/cloud_storage/async_manifest_view.cc @@ -905,11 +905,10 @@ bool async_manifest_view::in_stm(async_view_search_query_t o) { vlog( _ctxlog.debug, "Checking timestamp {} using timequery", ts_query); if (_stm_manifest.get_spillover_map().empty()) { - // The STM manifest is empty, so the timestamp has to be directed - // to the STM manifest. - // Implicitly, this case also handles the empty manifest case - // because the STM manifest with spillover segments is never - // empty. + // The spillover manifest is empty, so the timestamp query has to + // be directed to the STM manifest. Otherwise, we can safely + // direct the query either to spillover or stm because the + // STM manifest with spillover segments is never empty. return true; } // The last timestamp in the archive is used as a pivot point. See From 1d7a1e3e2827441b0749a0e40097c4be9d65a3a7 Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Wed, 15 May 2024 21:55:54 +0100 Subject: [PATCH 13/13] cloud_storage: document in_archive and in_stm methods --- src/v/cloud_storage/async_manifest_view.cc | 28 +++++++++++++++------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/src/v/cloud_storage/async_manifest_view.cc b/src/v/cloud_storage/async_manifest_view.cc index c088da306a33..044ca19dee81 100644 --- a/src/v/cloud_storage/async_manifest_view.cc +++ b/src/v/cloud_storage/async_manifest_view.cc @@ -866,6 +866,22 @@ bool async_manifest_view::in_archive(async_view_search_query_t o) { kafka::offset::min()); }, [this](async_view_timestamp_query ts_query) { + // For a query to be satisfiable by the archive the min offset must be + // in the archive. The same condition can be stated as: min offset + // must be before the start of the STM manifest. + // + // Otherwise, even though the last timestamp in the archive could + // satisfy the query, it can't be used because offset-wise it is + // outside of the queried range. + kafka::offset archive_end_offset = kafka::prev_offset( + _stm_manifest.get_start_kafka_offset().value_or( + kafka::offset::min())); + + bool range_overlaps + = ts_query.min_offset <= archive_end_offset + && ts_query.max_offset + >= _stm_manifest.get_archive_start_kafka_offset(); + // The condition for timequery is tricky. With offsets there is a // clear pivot point. The start_offset of the STM manifest separates // the STM region from the archive. With timestamps it's not as @@ -873,13 +889,6 @@ bool async_manifest_view::in_archive(async_view_search_query_t o) { // and the first segment in the STM manifest. We need in_stm and // in_archive to be consistent with each other. To do this we can use // last timestamp in the archive as a pivot point. - bool range_overlaps - = ts_query.min_offset - < _stm_manifest.get_start_kafka_offset().value_or( - kafka::offset::min()) - && ts_query.max_offset - >= _stm_manifest.get_archive_start_kafka_offset(); - return range_overlaps && _stm_manifest.get_spillover_map() .last_segment() @@ -911,8 +920,7 @@ bool async_manifest_view::in_stm(async_view_search_query_t o) { // STM manifest with spillover segments is never empty. return true; } - // The last timestamp in the archive is used as a pivot point. See - // description in in_archive. + bool range_overlaps = ts_query.min_offset <= _stm_manifest.get_last_kafka_offset().value_or( @@ -921,6 +929,8 @@ bool async_manifest_view::in_stm(async_view_search_query_t o) { >= _stm_manifest.get_start_kafka_offset().value_or( kafka::offset::max()); + // The last timestamp in the archive is used as a pivot point. See + // description in in_archive. return range_overlaps && _stm_manifest.get_spillover_map() .last_segment()