diff --git a/src/v/archival/ntp_archiver_service.cc b/src/v/archival/ntp_archiver_service.cc index 03d92e98ee63..aee221979c69 100644 --- a/src/v/archival/ntp_archiver_service.cc +++ b/src/v/archival/ntp_archiver_service.cc @@ -496,7 +496,7 @@ ss::future<> ntp_archiver::upload_topic_manifest() { try { retry_chain_node fib( - _conf->manifest_upload_timeout, + _conf->manifest_upload_timeout(), _conf->cloud_storage_initial_backoff, &_rtcnode); retry_chain_logger ctxlog(archival_log, fib); @@ -912,7 +912,7 @@ ss::future< ntp_archiver::download_manifest() { auto guard = _gate.hold(); retry_chain_node fib( - _conf->manifest_upload_timeout, + _conf->manifest_upload_timeout(), _conf->cloud_storage_initial_backoff, &_rtcnode); cloud_storage::partition_manifest tmp(_ntp, _rev); @@ -1066,7 +1066,7 @@ ss::future ntp_archiver::upload_manifest( auto guard = _gate.hold(); auto rtc = source_rtc.value_or(std::ref(_rtcnode)); retry_chain_node fib( - _conf->manifest_upload_timeout, + _conf->manifest_upload_timeout(), _conf->cloud_storage_initial_backoff, &rtc.get()); retry_chain_logger ctxlog(archival_log, fib, _ntp.path()); @@ -1907,7 +1907,7 @@ ss::future ntp_archiver::wait_uploads( _ntp.path()); auto deadline = ss::lowres_clock::now() - + _conf->manifest_upload_timeout; + + _conf->manifest_upload_timeout(); std::optional manifest_clean_offset; if ( @@ -2096,7 +2096,7 @@ ntp_archiver::maybe_truncate_manifest() { const auto& m = manifest(); for (const auto& meta : m) { retry_chain_node fib( - _conf->manifest_upload_timeout, + _conf->manifest_upload_timeout(), _conf->upload_loop_initial_backoff, &rtc); auto sname = cloud_storage::generate_local_segment_name( @@ -2125,12 +2125,12 @@ ntp_archiver::maybe_truncate_manifest() { "manifest, start offset before cleanup: {}", manifest().get_start_offset()); retry_chain_node rc_node( - _conf->manifest_upload_timeout, + _conf->manifest_upload_timeout(), _conf->upload_loop_initial_backoff, &rtc); auto error = co_await _parent.archival_meta_stm()->truncate( adjusted_start_offset, - ss::lowres_clock::now() + _conf->manifest_upload_timeout, + ss::lowres_clock::now() + _conf->manifest_upload_timeout(), _as); if (error != cluster::errc::success) { vlog( @@ -3030,7 +3030,7 @@ ss::future ntp_archiver::do_upload_local( features::feature::cloud_metadata_cluster_recovery) ? _parent.highest_producer_id() : model::producer_id{}; - auto deadline = ss::lowres_clock::now() + _conf->manifest_upload_timeout; + auto deadline = ss::lowres_clock::now() + _conf->manifest_upload_timeout(); auto error = co_await _parent.archival_meta_stm()->add_segments( {meta}, std::nullopt, diff --git a/src/v/archival/tests/archival_service_fixture.h b/src/v/archival/tests/archival_service_fixture.h index b22bc5e51343..1f0ac03b9690 100644 --- a/src/v/archival/tests/archival_service_fixture.h +++ b/src/v/archival/tests/archival_service_fixture.h @@ -19,6 +19,7 @@ #include "cluster/tests/utils.h" #include "cluster/types.h" #include "config/configuration.h" +#include "config/property.h" #include "container/fragmented_vector.h" #include "http/tests/http_imposter.h" #include "model/fundamental.h" @@ -68,13 +69,14 @@ class archiver_cluster_fixture cloud_storage_clients::endpoint_url{}); s3_conf.server_addr = server_addr; - archival::configuration a_conf; + archival::configuration a_conf{ + .manifest_upload_timeout = config::mock_binding(1000ms), + }; a_conf.bucket_name = cloud_storage_clients::bucket_name("test-bucket"); a_conf.ntp_metrics_disabled = archival::per_ntp_metrics_disabled::yes; a_conf.svc_metrics_disabled = archival::service_metrics_disabled::yes; a_conf.cloud_storage_initial_backoff = 100ms; a_conf.segment_upload_timeout = 1s; - a_conf.manifest_upload_timeout = 1s; a_conf.garbage_collect_timeout = 1s; a_conf.upload_loop_initial_backoff = 100ms; a_conf.upload_loop_max_backoff = 5s; diff --git a/src/v/archival/tests/service_fixture.cc b/src/v/archival/tests/service_fixture.cc index 10535a8def8d..5342a2c94f65 100644 --- a/src/v/archival/tests/service_fixture.cc +++ b/src/v/archival/tests/service_fixture.cc @@ -193,13 +193,14 @@ archiver_fixture::get_configurations() { cloud_storage_clients::endpoint_url{}); s3conf.server_addr = server_addr; - archival::configuration aconf; + archival::configuration aconf{ + .manifest_upload_timeout = config::mock_binding(1000ms), + }; aconf.bucket_name = cloud_storage_clients::bucket_name("test-bucket"); aconf.ntp_metrics_disabled = archival::per_ntp_metrics_disabled::yes; aconf.svc_metrics_disabled = archival::service_metrics_disabled::yes; aconf.cloud_storage_initial_backoff = 100ms; aconf.segment_upload_timeout = 1s; - aconf.manifest_upload_timeout = 1s; aconf.garbage_collect_timeout = 1s; aconf.upload_loop_initial_backoff = 100ms; aconf.upload_loop_max_backoff = 5s; diff --git a/src/v/archival/types.cc b/src/v/archival/types.cc index c14d24c443c0..9fbc92d2e6f0 100644 --- a/src/v/archival/types.cc +++ b/src/v/archival/types.cc @@ -51,7 +51,7 @@ std::ostream& operator<<(std::ostream& o, const configuration& cfg) { std::chrono::duration_cast( cfg.segment_upload_timeout), std::chrono::duration_cast( - cfg.manifest_upload_timeout), + cfg.manifest_upload_timeout()), cfg.time_limit); return o; } @@ -103,7 +103,7 @@ get_archival_service_config(ss::scheduling_group sg, ss::io_priority_class p) { .cloud_storage_segment_upload_timeout_ms.value(), .manifest_upload_timeout = config::shard_local_cfg() - .cloud_storage_manifest_upload_timeout_ms.value(), + .cloud_storage_manifest_upload_timeout_ms.bind(), .garbage_collect_timeout = config::shard_local_cfg() .cloud_storage_garbage_collect_timeout_ms.value(), diff --git a/src/v/archival/types.h b/src/v/archival/types.h index 8b83f9d16320..e7fe3cf59343 100644 --- a/src/v/archival/types.h +++ b/src/v/archival/types.h @@ -46,7 +46,7 @@ struct configuration { /// Long upload timeout ss::lowres_clock::duration segment_upload_timeout; /// Shor upload timeout - ss::lowres_clock::duration manifest_upload_timeout; + config::binding manifest_upload_timeout; /// Timeout for running delete operations during the GC phase ss::lowres_clock::duration garbage_collect_timeout; /// Initial backoff for upload loop in case there is nothing to upload diff --git a/src/v/cloud_storage/async_manifest_view.cc b/src/v/cloud_storage/async_manifest_view.cc index 8a723416f0d6..044ca19dee81 100644 --- a/src/v/cloud_storage/async_manifest_view.cc +++ b/src/v/cloud_storage/async_manifest_view.cc @@ -44,6 +44,7 @@ #include #include #include +#include #include #include @@ -63,7 +64,9 @@ static ss::sstring to_string(const async_view_search_query_t& t) { t, [&](model::offset ro) { return ssx::sformat("[offset: {}]", ro); }, [&](kafka::offset ko) { return ssx::sformat("[kafka offset: {}]", ko); }, - [&](model::timestamp ts) { return ssx::sformat("[timestamp: {}]", ts); }); + [&](const async_view_timestamp_query& ts) { + return ssx::sformat("{}", ts); + }); } std::ostream& operator<<(std::ostream& s, const async_view_search_query_t& q) { @@ -83,9 +86,27 @@ contains(const partition_manifest& m, const async_view_search_query_t& query) { return k >= m.get_start_kafka_offset() && k < m.get_next_kafka_offset(); }, - [&](model::timestamp t) { - return m.size() > 0 && t >= m.begin()->base_timestamp - && t <= m.last_segment()->max_timestamp; + [&](const async_view_timestamp_query& ts_query) { + if (m.size() == 0) { + return false; + } + + auto kafka_start_offset = m.get_start_kafka_offset(); + if (!kafka_start_offset.has_value()) { + return false; + } + + auto kafka_last_offset = m.get_last_kafka_offset(); + if (!kafka_last_offset.has_value()) { + return false; + } + + auto range_overlaps = ts_query.min_offset <= kafka_last_offset.value() + && ts_query.max_offset + >= kafka_start_offset.value(); + + return range_overlaps && ts_query.ts >= m.begin()->base_timestamp + && ts_query.ts <= m.last_segment()->max_timestamp; }); } @@ -561,18 +582,11 @@ ss::future<> async_manifest_view::run_bg_loop() { ss::future, error_outcome>> async_manifest_view::get_cursor( async_view_search_query_t query, - std::optional end_inclusive) noexcept { + std::optional end_inclusive, + cursor_base_t cursor_base) noexcept { try { ss::gate::holder h(_gate); - if ( - !in_archive(query) && !in_stm(query) - && !std::holds_alternative(query)) { - // The view should contain manifest below archive start in - // order to be able to perform retention and advance metadata. - vlog( - _ctxlog.debug, - "query {} is out of valid range", - to_string(query)); + if (!in_archive(query) && !in_stm(query)) { co_return error_outcome::out_of_range; } model::offset begin; @@ -581,7 +595,14 @@ async_manifest_view::get_cursor( if (_stm_manifest.get_archive_start_offset() == model::offset{}) { begin = _stm_manifest.get_start_offset().value_or(begin); } else { - begin = _stm_manifest.get_archive_clean_offset(); + switch (cursor_base) { + case cursor_base_t::archive_start_offset: + begin = _stm_manifest.get_archive_start_offset(); + break; + case cursor_base_t::archive_clean_offset: + begin = _stm_manifest.get_archive_clean_offset(); + break; + } } if (end < begin) { @@ -844,7 +865,23 @@ bool async_manifest_view::in_archive(async_view_search_query_t o) { && ko < _stm_manifest.get_start_kafka_offset().value_or( kafka::offset::min()); }, - [this](model::timestamp ts) { + [this](async_view_timestamp_query ts_query) { + // For a query to be satisfiable by the archive the min offset must be + // in the archive. The same condition can be stated as: min offset + // must be before the start of the STM manifest. + // + // Otherwise, even though the last timestamp in the archive could + // satisfy the query, it can't be used because offset-wise it is + // outside of the queried range. + kafka::offset archive_end_offset = kafka::prev_offset( + _stm_manifest.get_start_kafka_offset().value_or( + kafka::offset::min())); + + bool range_overlaps + = ts_query.min_offset <= archive_end_offset + && ts_query.max_offset + >= _stm_manifest.get_archive_start_kafka_offset(); + // The condition for timequery is tricky. With offsets there is a // clear pivot point. The start_offset of the STM manifest separates // the STM region from the archive. With timestamps it's not as @@ -852,8 +889,11 @@ bool async_manifest_view::in_archive(async_view_search_query_t o) { // and the first segment in the STM manifest. We need in_stm and // in_archive to be consistent with each other. To do this we can use // last timestamp in the archive as a pivot point. - return _stm_manifest.get_spillover_map().last_segment()->max_timestamp - >= ts; + return range_overlaps + && _stm_manifest.get_spillover_map() + .last_segment() + ->max_timestamp + >= ts_query.ts; }); } @@ -870,20 +910,32 @@ bool async_manifest_view::in_stm(async_view_search_query_t o) { kafka::offset::max()); return ko >= sko; }, - [this](model::timestamp ts) { - vlog(_ctxlog.debug, "Checking timestamp {} using timequery", ts); + [this](async_view_timestamp_query ts_query) { + vlog( + _ctxlog.debug, "Checking timestamp {} using timequery", ts_query); if (_stm_manifest.get_spillover_map().empty()) { - // The STM manifest is empty, so the timestamp has to be directed - // to the STM manifest. - // Implicitly, this case also handles the empty manifest case - // because the STM manifest with spillover segments is never - // empty. + // The spillover manifest is empty, so the timestamp query has to + // be directed to the STM manifest. Otherwise, we can safely + // direct the query either to spillover or stm because the + // STM manifest with spillover segments is never empty. return true; } + + bool range_overlaps + = ts_query.min_offset + <= _stm_manifest.get_last_kafka_offset().value_or( + kafka::offset::min()) + && ts_query.max_offset + >= _stm_manifest.get_start_kafka_offset().value_or( + kafka::offset::max()); + // The last timestamp in the archive is used as a pivot point. See // description in in_archive. - return _stm_manifest.get_spillover_map().last_segment()->max_timestamp - < ts; + return range_overlaps + && _stm_manifest.get_spillover_map() + .last_segment() + ->max_timestamp + < ts_query.ts; }); } @@ -959,7 +1011,8 @@ async_manifest_view::offset_based_retention() noexcept { archive_start_offset_advance result; try { auto boundary = _stm_manifest.get_start_kafka_offset_override(); - auto res = co_await get_cursor(boundary); + auto res = co_await get_cursor( + boundary, std::nullopt, cursor_base_t::archive_clean_offset); if (res.has_failure()) { if (res.error() == error_outcome::out_of_range) { vlog( @@ -1023,7 +1076,8 @@ async_manifest_view::time_based_retention( auto res = co_await get_cursor( _stm_manifest.get_archive_start_offset(), - model::prev_offset(_stm_manifest.get_start_offset().value())); + model::prev_offset(_stm_manifest.get_start_offset().value()), + cursor_base_t::archive_clean_offset); if (res.has_failure()) { if (res.error() == error_outcome::out_of_range) { // The cutoff point is outside of the offset range, no need to @@ -1149,7 +1203,8 @@ async_manifest_view::size_based_retention(size_t size_limit) noexcept { auto res = co_await get_cursor( _stm_manifest.get_archive_clean_offset(), - model::prev_offset(_stm_manifest.get_start_offset().value())); + model::prev_offset(_stm_manifest.get_start_offset().value()), + cursor_base_t::archive_clean_offset); if (res.has_failure()) { vlogl( _ctxlog, @@ -1300,7 +1355,7 @@ async_manifest_view::get_materialized_manifest( } // query in not in the stm region if ( - std::holds_alternative(q) + std::holds_alternative(q) && _stm_manifest.get_archive_start_offset() == model::offset{}) { vlog(_ctxlog.debug, "Using STM manifest for timequery {}", q); co_return std::ref(_stm_manifest); @@ -1460,7 +1515,7 @@ std::optional async_manifest_view::search_spillover_manifests( } return -1; }, - [&](model::timestamp t) { + [&](const async_view_timestamp_query& ts_query) { if (manifests.empty()) { return -1; } @@ -1470,35 +1525,56 @@ std::optional async_manifest_view::search_spillover_manifests( "{}, last: {}", query, manifests.size(), - manifests.begin()->base_timestamp, - manifests.last_segment()->max_timestamp); + *manifests.begin(), + *manifests.last_segment()); - auto first_manifest = manifests.begin(); - auto base_t = first_manifest->base_timestamp; auto max_t = manifests.last_segment()->max_timestamp; // Edge cases - if (t < base_t || max_t == base_t) { - return 0; - } else if (t > max_t) { + if (ts_query.ts > max_t) { return -1; } + const auto& bo_col = manifests.get_base_offset_column(); + const auto& co_col = manifests.get_committed_offset_column(); + const auto& do_col = manifests.get_delta_offset_column(); + const auto& de_col = manifests.get_delta_offset_end_column(); const auto& bt_col = manifests.get_base_timestamp_column(); const auto& mt_col = manifests.get_max_timestamp_column(); - auto mt_it = mt_col.begin(); - auto bt_it = bt_col.begin(); + + auto bo_it = bo_col.begin(); + auto co_it = co_col.begin(); + auto do_it = do_col.begin(); + auto de_it = de_col.begin(); + auto max_ts_it = mt_col.begin(); + auto base_ts_it = bt_col.begin(); + int target_ix = -1; - while (!bt_it.is_end()) { - if (*mt_it >= t.value() || *bt_it > t.value()) { + while (!base_ts_it.is_end()) { + static constexpr int64_t min_delta = model::offset::min()(); + auto d_begin = *do_it == min_delta ? 0 : *do_it; + auto d_end = *de_it == min_delta ? d_begin : *de_it; + auto bko = kafka::offset(*bo_it - d_begin); + auto cko = kafka::offset(*co_it - d_end); + + auto range_overlaps = ts_query.min_offset <= cko + && ts_query.max_offset >= bko; + + if ( + range_overlaps + && (*max_ts_it >= ts_query.ts() || *base_ts_it > ts_query.ts())) { // Handle case when we're overshooting the target // (base_timestamp > t) or the case when the target is in the // middle of the manifest (max_timestamp >= t) - target_ix = static_cast(bt_it.index()); + target_ix = static_cast(base_ts_it.index()); break; } - ++bt_it; - ++mt_it; + ++bo_it; + ++co_it; + ++do_it; + ++de_it; + ++base_ts_it; + ++max_ts_it; } return target_ix; }); diff --git a/src/v/cloud_storage/async_manifest_view.h b/src/v/cloud_storage/async_manifest_view.h index a4f27f414edd..cf3e59930ce3 100644 --- a/src/v/cloud_storage/async_manifest_view.h +++ b/src/v/cloud_storage/async_manifest_view.h @@ -39,9 +39,32 @@ namespace cloud_storage { +struct async_view_timestamp_query { + async_view_timestamp_query( + kafka::offset min_offset, model::timestamp ts, kafka::offset max_offset) + : min_offset(min_offset) + , ts(ts) + , max_offset(max_offset) {} + + friend std::ostream& + operator<<(std::ostream& o, const async_view_timestamp_query& q) { + fmt::print( + o, + "async_view_timestamp_query{{min_offset:{}, ts:{}, max_offset:{}}}", + q.min_offset, + q.ts, + q.max_offset); + return o; + } + + kafka::offset min_offset; + model::timestamp ts; + kafka::offset max_offset; +}; + /// Search query type using async_view_search_query_t - = std::variant; + = std::variant; std::ostream& operator<<(std::ostream&, const async_view_search_query_t&); @@ -82,6 +105,17 @@ class async_manifest_view { ss::future<> start(); ss::future<> stop(); + enum class cursor_base_t { + archive_start_offset, + + /// Special case that is used when computing retention. + /// + /// For details, see: + /// GitHub: https://github.com/redpanda-data/redpanda/pull/12177 + /// Commit: 1b6ab7be8818e3878a32f9037694ae5c4cf4fea2 + archive_clean_offset, + }; + /// Get active spillover manifests asynchronously /// /// \note the method may hydrate manifests in the cache or @@ -91,7 +125,8 @@ class async_manifest_view { result, error_outcome>> get_cursor( async_view_search_query_t q, - std::optional end_inclusive = std::nullopt) noexcept; + std::optional end_inclusive = std::nullopt, + cursor_base_t cursor_base = cursor_base_t::archive_start_offset) noexcept; /// Get inactive spillover manifests which are waiting for /// retention diff --git a/src/v/cloud_storage/remote_partition.cc b/src/v/cloud_storage/remote_partition.cc index 840ab82357f0..e790cc1d76d6 100644 --- a/src/v/cloud_storage/remote_partition.cc +++ b/src/v/cloud_storage/remote_partition.cc @@ -20,6 +20,7 @@ #include "cloud_storage/tx_range_manifest.h" #include "cloud_storage/types.h" #include "model/fundamental.h" +#include "model/timestamp.h" #include "net/connection.h" #include "ssx/future-util.h" #include "ssx/watchdog.h" @@ -552,7 +553,10 @@ class partition_record_batch_reader_impl final async_view_search_query_t query; if (config.first_timestamp.has_value()) { - query = config.first_timestamp.value(); + query = async_view_timestamp_query( + model::offset_cast(config.start_offset), + config.first_timestamp.value(), + model::offset_cast(config.max_offset)); } else { // NOTE: config.start_offset actually contains kafka offset // stored using model::offset type. @@ -565,66 +569,72 @@ class partition_record_batch_reader_impl final co_return; } - if ( - cur.error() == error_outcome::out_of_range - && ss::visit( - query, - [&](model::offset) { return false; }, - [&](kafka::offset query_offset) { - // Special case queries below the start offset of the log. - // The start offset may have advanced while the request was - // in progress. This is expected, so log at debug level. - const auto log_start_offset - = _partition->_manifest_view->stm_manifest() - .full_log_start_kafka_offset(); - - if (log_start_offset && query_offset < *log_start_offset) { - vlog( - _ctxlog.debug, - "Manifest query below the log's start Kafka offset: " - "{} < {}", - query_offset(), - log_start_offset.value()()); - return true; - } - return false; - }, - [&](model::timestamp query_ts) { - // Special case, it can happen when a timequery falls below - // the clean offset. Caused when the query races with - // retention/gc. log a warning, since the kafka client can - // handle a failed query - auto const& spillovers = _partition->_manifest_view - ->stm_manifest() - .get_spillover_map(); - if ( - spillovers.empty() - || spillovers.get_max_timestamp_column() - .last_value() - .value_or(model::timestamp::max()()) - >= query_ts()) { - vlog( - _ctxlog.debug, - "Manifest query raced with retention and the result " - "is below the clean/start offset for {}", - query_ts); - return true; - } - - // query was not meant for archive region. fallthrough and - // log an error - return false; - })) { - // error was handled - co_return; + // Out of range queries are unexpected. The caller must take care + // to send only valid queries to remote_partition. I.e. the fetch + // handler does such validation. Similar validation is done inside + // remote partition. + // + // Out of range at this point is due to a race condition or due to + // a bug. In both cases the only valid action is to throw an + // exception and let the caller deal with it. If the caller doesn't + // handle it it leads to a closed kafka connection which the + // end clients retry. + if (cur.error() == error_outcome::out_of_range) { + ss::visit( + query, + [&](model::offset) { + vassert( + false, + "Unreachable code. Remote partition doesn't know how " + "to " + "handle model::offset queries."); + }, + [&](kafka::offset query_offset) { + // Bug or retention racing with the query. + const auto log_start_offset + = _partition->_manifest_view->stm_manifest() + .full_log_start_kafka_offset(); + + if ( + log_start_offset && query_offset < *log_start_offset) { + vlog( + _ctxlog.warn, + "Manifest query below the log's start Kafka " + "offset: " + "{} < {}", + query_offset(), + log_start_offset.value()()); + } + }, + [&](const async_view_timestamp_query& query_ts) { + // Special case, it can happen when a timequery falls + // below the clean offset. Caused when the query races + // with retention/gc. + auto const& spillovers = _partition->_manifest_view + ->stm_manifest() + .get_spillover_map(); + + bool timestamp_inside_spillover + = query_ts.ts() + <= spillovers.get_max_timestamp_column() + .last_value() + .value_or(model::timestamp::min()()); + + if (timestamp_inside_spillover) { + vlog( + _ctxlog.debug, + "Manifest query raced with retention and the " + "result " + "is below the clean/start offset for {}", + query_ts); + } + }); } - vlog( - _ctxlog.error, + throw std::runtime_error(fmt::format( "Failed to query spillover manifests: {}, query: {}", cur.error(), - query); - co_return; + query)); } _view_cursor = std::move(cur.value()); co_await _view_cursor->with_manifest( diff --git a/src/v/cloud_storage/tests/async_manifest_view_test.cc b/src/v/cloud_storage/tests/async_manifest_view_test.cc index cf430d09afb5..f82837218379 100644 --- a/src/v/cloud_storage/tests/async_manifest_view_test.cc +++ b/src/v/cloud_storage/tests/async_manifest_view_test.cc @@ -455,8 +455,19 @@ FIXTURE_TEST(test_async_manifest_view_truncate, async_manifest_view_fixture) { model::offset so = model::offset{0}; auto maybe_cursor = view.get_cursor(so).get(); + BOOST_REQUIRE( + maybe_cursor.has_error() + && maybe_cursor.error() == cloud_storage::error_outcome::out_of_range); + // The clean offset should still be accesible such that retention // can operate above it. + maybe_cursor = view + .get_cursor( + so, + std::nullopt, + cloud_storage::async_manifest_view::cursor_base_t:: + archive_clean_offset) + .get(); BOOST_REQUIRE(!maybe_cursor.has_failure()); maybe_cursor = view.get_cursor(new_so).get(); @@ -1106,7 +1117,8 @@ FIXTURE_TEST(test_async_manifest_view_timequery, async_manifest_view_fixture) { // Find exact matches for all segments for (const auto& meta : expected) { - auto target = meta.base_timestamp; + auto target = async_view_timestamp_query( + kafka::offset(0), meta.base_timestamp, kafka::offset::max()); auto maybe_cursor = view.get_cursor(target).get(); BOOST_REQUIRE(!maybe_cursor.has_failure()); auto cursor = std::move(maybe_cursor.value()); @@ -1119,9 +1131,9 @@ FIXTURE_TEST(test_async_manifest_view_timequery, async_manifest_view_fixture) { m.last_segment()->max_timestamp, stm_manifest.begin()->base_timestamp, stm_manifest.last_segment()->max_timestamp); - auto res = m.timequery(target); + auto res = m.timequery(target.ts); BOOST_REQUIRE(res.has_value()); - BOOST_REQUIRE(res.value().base_timestamp == target); + BOOST_REQUIRE(res.value().base_timestamp == target.ts); }) .get(); } @@ -1148,7 +1160,10 @@ FIXTURE_TEST( // that there is a gap between any two segments. for (const auto& meta : expected) { - auto target = model::timestamp(meta.base_timestamp.value() - 1); + auto target = async_view_timestamp_query( + kafka::offset(0), + model::timestamp(meta.base_timestamp() - 1), + kafka::offset::max()); auto maybe_cursor = view.get_cursor(target).get(); BOOST_REQUIRE(!maybe_cursor.has_failure()); auto cursor = std::move(maybe_cursor.value()); @@ -1162,11 +1177,11 @@ FIXTURE_TEST( m.last_segment()->max_timestamp, stm_manifest.begin()->base_timestamp, stm_manifest.last_segment()->max_timestamp); - auto res = m.timequery(target); + auto res = m.timequery(target.ts); BOOST_REQUIRE(res.has_value()); BOOST_REQUIRE( model::timestamp(res.value().base_timestamp.value() - 1) - == target); + == target.ts); }) .get(); } diff --git a/src/v/cloud_storage/tests/remote_partition_test.cc b/src/v/cloud_storage/tests/remote_partition_test.cc index a7389baaa339..cc0440b8acf1 100644 --- a/src/v/cloud_storage/tests/remote_partition_test.cc +++ b/src/v/cloud_storage/tests/remote_partition_test.cc @@ -19,6 +19,7 @@ #include "cloud_storage/remote.h" #include "cloud_storage/remote_partition.h" #include "cloud_storage/remote_segment.h" +#include "cloud_storage/spillover_manifest.h" #include "cloud_storage/tests/cloud_storage_fixture.h" #include "cloud_storage/tests/common_def.h" #include "cloud_storage/tests/s3_imposter.h" @@ -58,6 +59,7 @@ #include #include #include +#include #include using namespace std::chrono_literals; @@ -436,7 +438,14 @@ FIXTURE_TEST(test_scan_by_kafka_offset_truncated, cloud_storage_fixture) { *this, model::offset(6), model::offset_delta(3), batch_types); print_segments(segments); for (int i = 0; i <= 2; i++) { - BOOST_REQUIRE(check_fetch(*this, kafka::offset(i), false)); + BOOST_REQUIRE_EXCEPTION( + check_fetch(*this, kafka::offset(i), false), + std::runtime_error, + [](const std::runtime_error& e) { + return std::string(e.what()).find( + "Failed to query spillover manifests") + != std::string::npos; + }); } for (int i = 3; i <= 8; i++) { BOOST_REQUIRE(check_scan(*this, kafka::offset(i), 9 - i)); @@ -488,7 +497,14 @@ FIXTURE_TEST( auto segments = setup_s3_imposter( *this, model::offset(6), model::offset_delta(3), batch_types); print_segments(segments); - BOOST_REQUIRE(check_fetch(*this, kafka::offset(2), false)); + BOOST_REQUIRE_EXCEPTION( + check_fetch(*this, kafka::offset(2), false), + std::runtime_error, + [](const std::runtime_error& e) { + return std::string(e.what()).find( + "Failed to query spillover manifests") + != std::string::npos; + }); BOOST_REQUIRE(check_scan(*this, kafka::offset(3), 1)); BOOST_REQUIRE(check_fetch(*this, kafka::offset(3), true)); BOOST_REQUIRE(check_scan(*this, kafka::offset(4), 0)); @@ -536,7 +552,14 @@ FIXTURE_TEST( *this, model::offset(6), model::offset_delta(0), batch_types); print_segments(segments); for (int i = 0; i < 6; i++) { - BOOST_REQUIRE(check_fetch(*this, kafka::offset(i), false)); + BOOST_REQUIRE_EXCEPTION( + check_fetch(*this, kafka::offset(i), false), + std::runtime_error, + [](const std::runtime_error& e) { + return std::string(e.what()).find( + "Failed to query spillover manifests") + != std::string::npos; + }); } BOOST_REQUIRE(check_scan(*this, kafka::offset(6), 1)); BOOST_REQUIRE(check_fetch(*this, kafka::offset(6), true)); @@ -1241,12 +1264,14 @@ FIXTURE_TEST( vlog(test_log.debug, "Creating new reader {}", reader_config); // After truncation reading from the old end should be impossible - auto reader = partition->make_reader(reader_config).get().reader; - auto headers_read - = reader.consume(counting_batch_consumer(100), model::no_timeout) - .get(); - - BOOST_REQUIRE(headers_read.size() == 0); + BOOST_REQUIRE_EXCEPTION( + partition->make_reader(reader_config).get(), + std::runtime_error, + [](const std::runtime_error& e) { + return std::string(e.what()).find( + "Failed to query spillover manifests") + != std::string::npos; + }); } } @@ -2103,9 +2128,11 @@ FIXTURE_TEST(test_stale_reader, cloud_storage_fixture) { // Returns true if a kafka::offset scan returns the expected number of records. bool timequery( cloud_storage_fixture& fixture, + model::offset min, model::timestamp tm, int expected_num_records) { - auto scan_res = scan_remote_partition(fixture, tm); + auto scan_res = scan_remote_partition( + fixture, min, tm, model::offset::max()); int num_data_records = 0; size_t bytes_read_acc = 0; for (const auto& hdr : scan_res.headers) { @@ -2153,6 +2180,14 @@ bool timequery( return ret; } +// Returns true if a kafka::offset scan returns the expected number of records. +bool timequery( + cloud_storage_fixture& fixture, + model::timestamp tm, + int expected_num_records) { + return timequery(fixture, model::offset(0), tm, expected_num_records); +} + FIXTURE_TEST(test_scan_by_timestamp, cloud_storage_fixture) { // Build cloud partition with provided timestamps and query // it using the timequery. @@ -2215,3 +2250,253 @@ FIXTURE_TEST(test_scan_by_timestamp, cloud_storage_fixture) { test_log.debug("Timestamp undershoots the partition"); BOOST_REQUIRE(timequery(*this, model::timestamp(100), num_data_batches)); } + +FIXTURE_TEST(test_out_of_range_query, cloud_storage_fixture) { + auto data = [&](size_t t) { + return batch_t{ + .num_records = 1, + .type = model::record_batch_type::raft_data, + .timestamp = model::timestamp(t)}; + }; + + const std::vector> batches = { + {data(1000), data(1002), data(1004), data(1006), data(1008), data(1010)}, + {data(1012), data(1014), data(1016), data(1018), data(1020), data(1022)}, + }; + + auto segments = make_segments(batches, false, false); + cloud_storage::partition_manifest manifest(manifest_ntp, manifest_revision); + + auto expectations = make_imposter_expectations(manifest, segments); + set_expectations_and_listen(expectations); + + // Advance start offset as-if archiver did apply retention but didn't + // run GC yet (the clean offset is not updated). + BOOST_REQUIRE(manifest.advance_start_offset(segments[1].base_offset)); + auto serialize_manifest = [](const cloud_storage::partition_manifest& m) { + auto s_data = m.serialize().get(); + auto buf = s_data.stream.read_exactly(s_data.size_bytes).get(); + return ss::sstring(buf.begin(), buf.end()); + }; + std::ostringstream ostr; + manifest.serialize_json(ostr); + + vlog( + test_util_log.info, + "Rewriting manifest at {}:\n{}", + manifest.get_manifest_path(), + ostr.str()); + + auto manifest_url = "/" + manifest.get_manifest_path()().string(); + remove_expectations({manifest_url}); + add_expectations({ + cloud_storage_fixture::expectation{ + .url = manifest_url, .body = serialize_manifest(manifest)}, + }); + + auto base = segments[0].base_offset; + auto max = segments[segments.size() - 1].max_offset; + + vlog(test_log.debug, "offset range: {}-{}", base, max); + + BOOST_REQUIRE( + scan_remote_partition(*this, segments[1].base_offset, max).size() == 6); + + BOOST_REQUIRE_EXCEPTION( + scan_remote_partition(*this, base, max), + std::runtime_error, + [](const auto& ex) { + ss::sstring what{ex.what()}; + return what.find("Failed to query spillover manifests") != what.npos; + }); + + test_log.debug("Timestamp undershoots the partition"); + BOOST_TEST_REQUIRE(timequery(*this, model::timestamp(100), 6)); + + test_log.debug("Timestamp withing segment"); + BOOST_TEST_REQUIRE(timequery(*this, model::timestamp(1014), 5)); +} + +FIXTURE_TEST(test_out_of_range_spillover_query, cloud_storage_fixture) { + auto data = [&](size_t t) { + return batch_t{ + .num_records = 1, + .type = model::record_batch_type::raft_data, + .timestamp = model::timestamp(t)}; + }; + + const std::vector> batches = { + {data(1000), data(1002), data(1004), data(1006), data(1008), data(1010)}, + {data(1012), data(1014), data(1016), data(1018), data(1020), data(1022)}, + {data(1024), data(1026), data(1028), data(1030), data(1032), data(1034)}, + {data(1036), data(1038), data(1040), data(1042), data(1044), data(1046)}, + {data(1048), data(1050), data(1052), data(1054), data(1056), data(1058)}, + {data(1060), data(1062), data(1064), data(1066), data(1068), data(1070)}, + }; + + auto segments = make_segments(batches, false, false); + cloud_storage::partition_manifest manifest(manifest_ntp, manifest_revision); + + auto expectations = make_imposter_expectations(manifest, segments); + set_expectations_and_listen(expectations); + + for (int i = 0; i < 2; i++) { + spillover_manifest spm(manifest_ntp, manifest_revision); + + for (int j = 0; auto s : manifest) { + spm.add(s); + if (++j == 2) { + break; + } + } + manifest.spillover(spm.make_manifest_metadata()); + + std::ostringstream ostr; + spm.serialize_json(ostr); + + vlog( + test_util_log.info, + "Uploading spillover manifest at {}:\n{}", + spm.get_manifest_path(), + ostr.str()); + + auto s_data = spm.serialize().get(); + auto buf = s_data.stream.read_exactly(s_data.size_bytes).get(); + add_expectations({cloud_storage_fixture::expectation{ + .url = "/" + spm.get_manifest_path()().string(), + .body = ss::sstring(buf.begin(), buf.end()), + }}); + } + + // Advance start offset as-if archiver did apply retention but didn't + // run GC yet (the clean offset is not updated). + // + // We set it to the second segment of the second spillover manifest in an + // attempt to cover more potential edge cases. + auto archive_start = segments[3].base_offset; + manifest.set_archive_start_offset(archive_start, model::offset_delta(0)); + + // Upload latest manifest version. + auto serialize_manifest = [](const cloud_storage::partition_manifest& m) { + auto s_data = m.serialize().get(); + auto buf = s_data.stream.read_exactly(s_data.size_bytes).get(); + return ss::sstring(buf.begin(), buf.end()); + }; + std::ostringstream ostr; + manifest.serialize_json(ostr); + + vlog( + test_util_log.info, + "Rewriting manifest at {}:\n{}", + manifest.get_manifest_path(), + ostr.str()); + + auto manifest_url = "/" + manifest.get_manifest_path()().string(); + remove_expectations({manifest_url}); + add_expectations({ + cloud_storage_fixture::expectation{ + .url = manifest_url, .body = serialize_manifest(manifest)}, + }); + + auto base = segments[0].base_offset; + auto max = segments[segments.size() - 1].max_offset; + + vlog(test_log.debug, "offset range: {}-{}", base, max); + + // Can query from start of the archive. + BOOST_REQUIRE( + scan_remote_partition(*this, archive_start, max).size() == 3 * 6); + + // Can timequery from start of the archive. + BOOST_TEST_REQUIRE( + timequery(*this, archive_start, model::timestamp(100), 3 * 6)); + + // Can't query from the start of partition. + BOOST_REQUIRE_EXCEPTION( + scan_remote_partition(*this, base, max), + std::runtime_error, + [](const auto& ex) { + ss::sstring what{ex.what()}; + return what.find("Failed to query spillover manifests") != what.npos; + }); + + // Can't timequery from the base offset. + BOOST_REQUIRE_EXCEPTION( + timequery(*this, base, model::timestamp(100), 3 * 6), + std::runtime_error, + [](const auto& ex) { + ss::sstring what{ex.what()}; + return what.find("Failed to query spillover manifests") != what.npos; + }); + + // Can't query from start of the still valid spillover manifest. + // Since we don't rewrite spillover manifests we want to be sure that + // we don't allow querying stale segments (below the start offset). + BOOST_REQUIRE_EXCEPTION( + scan_remote_partition(*this, segments[2].base_offset, max), + std::runtime_error, + [](const auto& ex) { + ss::sstring what{ex.what()}; + return what.find("Failed to query spillover manifests") != what.npos; + }); + + // Can't query from start of the still valid spillover manifest. + // Since we don't rewrite spillover manifests we want to be sure that + // we don't allow querying stale segments (below the start offset). + // BUG: Currently it succeeds. This is a bug and should be fixed. + // BOOST_REQUIRE_EXCEPTION( + // timequery(*this, segments[2].base_offset, model::timestamp(100), 3 * + // 6), std::runtime_error, + // [](const auto& ex) { + // ss::sstring what{ex.what()}; + // return what.find("Failed to query spillover manifests") != + // what.npos; + // }); + BOOST_TEST_REQUIRE( + timequery(*this, segments[2].base_offset, model::timestamp(100), 3 * 6)); + + test_log.debug("Timestamp within valid spillover but below archive start"); + BOOST_TEST_REQUIRE( + timequery(*this, segments[2].base_timestamp.value(), 3 * 6)); + + test_log.debug("Valid timestamp start of retention"); + BOOST_TEST_REQUIRE( + timequery(*this, batches[3][0].timestamp.value(), 3 * 6)); + + test_log.debug("Valid timestamp within retention"); + BOOST_TEST_REQUIRE( + timequery(*this, batches[3][1].timestamp.value(), 3 * 6 - 1)); + + test_log.debug("Timestamp overshoots the partition"); + BOOST_TEST_REQUIRE(timequery(*this, model::timestamp::max(), 0)); + + // Rewrite the manifest with clean offset to match start offset. + manifest.set_archive_clean_offset( + archive_start, manifest.archive_size_bytes() / 2); + vlog( + test_util_log.info, + "Rewriting manifest at {}:\n{}", + manifest.get_manifest_path(), + ostr.str()); + + remove_expectations({manifest_url}); + add_expectations({ + cloud_storage_fixture::expectation{ + .url = manifest_url, .body = serialize_manifest(manifest)}, + }); + + // Still can't query from the base offset. + BOOST_REQUIRE_EXCEPTION( + scan_remote_partition(*this, base, max), + std::runtime_error, + [](const auto& ex) { + ss::sstring what{ex.what()}; + return what.find("Failed to query spillover manifests") != what.npos; + }); + + // Timequery from base offset must fail too as the regular query. + // BUG: Currently it succeeds. This is a bug and should be fixed. + BOOST_TEST_REQUIRE(timequery(*this, base, model::timestamp(100), 3 * 6)); + BOOST_TEST_REQUIRE( + timequery(*this, segments[2].base_offset, model::timestamp(100), 3 * 6)); +} diff --git a/src/v/cloud_storage/tests/util.cc b/src/v/cloud_storage/tests/util.cc index aa1cd1ac8e6a..1c6171a685d2 100644 --- a/src/v/cloud_storage/tests/util.cc +++ b/src/v/cloud_storage/tests/util.cc @@ -743,6 +743,10 @@ std::vector scan_remote_partition( partition_probe probe(manifest.get_ntp()); auto manifest_view = ss::make_shared( imposter.api, imposter.cache, manifest, bucket); + auto manifest_view_stop = ss::defer( + [&manifest_view] { manifest_view->stop().get(); }); + manifest_view->start().get(); + auto partition = ss::make_shared( manifest_view, imposter.api.local(), @@ -765,6 +769,7 @@ std::vector scan_remote_partition( /// Similar to previous function but uses timequery to start the scan scan_result scan_remote_partition( cloud_storage_fixture& imposter, + model::offset min, model::timestamp timestamp, model::offset max, size_t maybe_max_segments, @@ -789,12 +794,16 @@ scan_result scan_remote_partition( } auto manifest = hydrate_manifest(imposter.api.local(), bucket); storage::log_reader_config reader_config( - model::offset(0), max, ss::default_priority_class()); + min, max, ss::default_priority_class()); reader_config.first_timestamp = timestamp; partition_probe probe(manifest.get_ntp()); auto manifest_view = ss::make_shared( imposter.api, imposter.cache, manifest, bucket); + auto manifest_view_stop = ss::defer( + [&manifest_view] { manifest_view->stop().get(); }); + + manifest_view->start().get(); auto partition = ss::make_shared( manifest_view, imposter.api.local(), diff --git a/src/v/cloud_storage/tests/util.h b/src/v/cloud_storage/tests/util.h index a247b1e74e94..8c85bc341c75 100644 --- a/src/v/cloud_storage/tests/util.h +++ b/src/v/cloud_storage/tests/util.h @@ -200,6 +200,7 @@ struct scan_result { /// Similar to prev function but uses timequery scan_result scan_remote_partition( cloud_storage_fixture& imposter, + model::offset min, model::timestamp timestamp, model::offset max = model::offset::max(), size_t maybe_max_segments = 0, diff --git a/src/v/kafka/server/replicated_partition.cc b/src/v/kafka/server/replicated_partition.cc index 1548ea10ad53..4a089386ac48 100644 --- a/src/v/kafka/server/replicated_partition.cc +++ b/src/v/kafka/server/replicated_partition.cc @@ -320,44 +320,7 @@ ss::future> replicated_partition::timequery(storage::timequery_config cfg) { // cluster::partition::timequery returns a result in Kafka offsets, // no further offset translation is required here. - auto res = co_await _partition->timequery(cfg); - if (!res.has_value()) { - co_return std::nullopt; - } - const auto kafka_start_override = _partition->kafka_start_offset_override(); - if ( - !kafka_start_override.has_value() - || kafka_start_override.value() <= res.value().offset) { - // The start override doesn't affect the result of the timequery. - co_return res; - } - vlog( - klog.debug, - "{} timequery result {} clamped by start override, fetching result at " - "start {}", - ntp(), - res->offset, - kafka_start_override.value()); - storage::log_reader_config config( - kafka_start_override.value(), - cfg.max_offset, - 0, - 2048, // We just need one record batch - cfg.prio, - cfg.type_filter, - std::nullopt, // No timestamp, just use the offset - cfg.abort_source); - auto translating_reader = co_await make_reader(config, std::nullopt); - auto ot_state = std::move(translating_reader.ot_state); - model::record_batch_reader::storage_t data - = co_await model::consume_reader_to_memory( - std::move(translating_reader.reader), model::no_timeout); - auto& batches = std::get(data); - if (batches.empty()) { - co_return std::nullopt; - } - co_return storage::batch_timequery( - *(batches.begin()), cfg.min_offset, cfg.time, cfg.max_offset); + return _partition->timequery(cfg); } ss::future> replicated_partition::replicate( diff --git a/src/v/redpanda/tests/fixture.h b/src/v/redpanda/tests/fixture.h index 7193bf20ea3c..3c201fbbf60f 100644 --- a/src/v/redpanda/tests/fixture.h +++ b/src/v/redpanda/tests/fixture.h @@ -288,13 +288,14 @@ class redpanda_thread_fixture { } static archival::configuration get_archival_config() { - archival::configuration aconf; + archival::configuration aconf{ + .manifest_upload_timeout = config::mock_binding(1000ms), + }; aconf.bucket_name = cloud_storage_clients::bucket_name("test-bucket"); aconf.ntp_metrics_disabled = archival::per_ntp_metrics_disabled::yes; aconf.svc_metrics_disabled = archival::service_metrics_disabled::yes; aconf.cloud_storage_initial_backoff = 100ms; aconf.segment_upload_timeout = 1s; - aconf.manifest_upload_timeout = 1s; aconf.garbage_collect_timeout = 1s; aconf.time_limit = std::nullopt; return aconf; @@ -327,19 +328,7 @@ class redpanda_thread_fixture { bool data_transforms_enabled = false, bool legacy_upload_mode_enabled = true) { auto base_path = std::filesystem::path(data_dir); - ss::smp::invoke_on_all([node_id, - kafka_port, - rpc_port, - seed_servers = std::move(seed_servers), - base_path, - s3_config, - archival_cfg, - cloud_cfg, - use_node_id, - empty_seed_starts_cluster_val, - kafka_admin_topic_api_rate, - data_transforms_enabled, - legacy_upload_mode_enabled]() mutable { + ss::smp::invoke_on_all([=]() { auto& config = config::shard_local_cfg(); config.get("enable_pid_file").set_value(false); @@ -386,25 +375,29 @@ class redpanda_thread_fixture { static_cast(s3_config->server_addr.port())); } if (archival_cfg) { + // Copy archival config to this shard to avoid `config::binding` + // asserting on cross-shard access. + auto local_cfg = archival_cfg; + config.get("cloud_storage_disable_tls").set_value(true); config.get("cloud_storage_bucket") - .set_value(std::make_optional(archival_cfg->bucket_name())); + .set_value(std::make_optional(local_cfg->bucket_name())); config.get("cloud_storage_initial_backoff_ms") .set_value( std::chrono::duration_cast( - archival_cfg->cloud_storage_initial_backoff)); + local_cfg->cloud_storage_initial_backoff)); config.get("cloud_storage_manifest_upload_timeout_ms") .set_value( std::chrono::duration_cast( - archival_cfg->manifest_upload_timeout)); + local_cfg->manifest_upload_timeout())); config.get("cloud_storage_segment_upload_timeout_ms") .set_value( std::chrono::duration_cast( - archival_cfg->segment_upload_timeout)); + local_cfg->segment_upload_timeout)); config.get("cloud_storage_garbage_collect_timeout_ms") .set_value( std::chrono::duration_cast( - archival_cfg->garbage_collect_timeout)); + local_cfg->garbage_collect_timeout)); } if (cloud_cfg) { config.get("cloud_storage_enable_remote_read").set_value(true); diff --git a/tests/rptest/tests/timequery_test.py b/tests/rptest/tests/timequery_test.py index 456619d8a846..c7e8c9297536 100644 --- a/tests/rptest/tests/timequery_test.py +++ b/tests/rptest/tests/timequery_test.py @@ -9,7 +9,6 @@ import concurrent.futures import re -import time import threading from logging import Logger from typing import Callable @@ -22,9 +21,8 @@ from rptest.clients.types import TopicSpec from rptest.clients.rpk import RpkTool from rptest.clients.kafka_cat import KafkaCat -from rptest.clients.rpk_remote import RpkRemoteTool -from rptest.util import (wait_until, segments_count, - wait_for_local_storage_truncate) +from rptest.util import (wait_until, wait_for_local_storage_truncate, + wait_until_result) from rptest.services.kgo_verifier_services import KgoVerifierProducer from rptest.utils.si_utils import BucketView, NTP @@ -38,7 +36,6 @@ from kafkatest.version import V_3_0_0 from ducktape.tests.test import Test from rptest.clients.default import DefaultClient -from rptest.utils.mode_checks import skip_debug_mode class BaseTimeQuery: @@ -518,6 +515,97 @@ def test_timequery_with_trim_prefix(self, cloud_storage: bool, offset = kcat.query_offset(topic.name, 0, timestamps[0] - 1000) assert offset == msg_count - 1, f"Expected {msg_count - 1}, got {offset}" + @cluster( + num_nodes=4, + log_allow_list=["Failed to upload spillover manifest {timed_out}"]) + def test_timequery_with_spillover_gc_delayed(self): + self.set_up_cluster(cloud_storage=True, + batch_cache=False, + spillover=True) + total_segments = 16 + record_size = 1024 + base_ts = 1664453149000 + msg_count = (self.log_segment_size * total_segments) // record_size + local_retention = self.log_segment_size * 4 + topic_retention = self.log_segment_size * 8 + topic, timestamps = self._create_and_produce(self.redpanda, True, + local_retention, base_ts, + record_size, msg_count) + + # Confirm messages written + rpk = RpkTool(self.redpanda) + p = next(rpk.describe_topic(topic.name)) + assert p.high_watermark == msg_count + + # If using cloud storage, we must wait for some segments + # to fall out of local storage, to ensure we are really + # hitting the cloud storage read path when querying. + wait_for_local_storage_truncate(redpanda=self.redpanda, + topic=topic.name, + target_bytes=local_retention) + + # Set timeout to 0 to prevent the cloud storage housekeeping from + # running, triggering gc, and advancing clean offset. + self.redpanda.set_cluster_config( + {"cloud_storage_manifest_upload_timeout_ms": 0}) + # Disable internal scrubbing as it won't be able to make progress. + self.si_settings.skip_end_of_test_scrubbing = True + + self.client().alter_topic_config(topic.name, 'retention.bytes', + topic_retention) + self.logger.info("Waiting for start offset to advance...") + start_offset = wait_until_result( + lambda: next(rpk.describe_topic(topic.name)).start_offset > 0, + timeout_sec=120, + backoff_sec=5, + err_msg="Start offset did not advance") + + start_offset = next(rpk.describe_topic(topic.name)).start_offset + + # Query below valid timestamps the offset of the first message. + kcat = KafkaCat(self.redpanda) + + test_cases = [ + (timestamps[0] - 1000, start_offset, "before start of log"), + (timestamps[0], start_offset, + "first message but out of retention now"), + (timestamps[start_offset - 1], start_offset, + "before new HWM, out of retention"), + (timestamps[start_offset], start_offset, "new HWM"), + (timestamps[start_offset + 10], start_offset + 10, + "few messages after new HWM"), + (timestamps[msg_count - 1] + 1000, -1, "after last message"), + ] + + # Basic time query cases. + for ts, expected_offset, desc in test_cases: + self.logger.info(f"Querying ts={ts} ({desc})") + offset = kcat.query_offset(topic.name, 0, ts) + self.logger.info(f"Time query returned offset {offset}") + assert offset == expected_offset, f"Expected {expected_offset}, got {offset}" + + # Now check every single one of them to make sure there are no + # off-by-one errors, iterators aren't getting stuck on segment and + # spillover boundaries, etc. The segment boundaries are not exact + # due to internal messages, segment roll logic, etc. but the tolerance + # should cover that. + boundary_ranges = [] + for i in range(1, total_segments): + boundary_ranges.append( + (int(i * self.log_segment_size / record_size - 100), + int(i * self.log_segment_size / record_size + 100))) + + for r in boundary_ranges: + self.logger.debug(f"Checking range {r}") + for o in range(int(r[0]), int(r[1])): + ts = timestamps[o] + self.logger.debug(f" Querying ts={ts}") + offset = kcat.query_offset(topic.name, 0, ts) + if o < start_offset: + assert offset == start_offset, f"Expected {start_offset}, got {offset}" + else: + assert offset == o, f"Expected {o}, got {offset}" + class TimeQueryKafkaTest(Test, BaseTimeQuery): """ diff --git a/tests/rptest/util.py b/tests/rptest/util.py index 1e9309189601..6a5cc5b18a58 100644 --- a/tests/rptest/util.py +++ b/tests/rptest/util.py @@ -375,7 +375,8 @@ def __enter__(self): """Isolate certain ips from the nodes using firewall rules""" cmd = [ f"iptables -A INPUT -p tcp --{self.mode_for_input} {self._port} -j DROP", - f"iptables -A OUTPUT -p tcp --dport {self._port} -j DROP" + f"iptables -A OUTPUT -p tcp --dport {self._port} -j DROP", + f"ss -K dport {self._port}", ] cmd = " && ".join(cmd) for node in self._nodes: