diff --git a/src/v/cloud_storage/remote.cc b/src/v/cloud_storage/remote.cc index ddee4e81956c6..9339775028d53 100644 --- a/src/v/cloud_storage/remote.cc +++ b/src/v/cloud_storage/remote.cc @@ -1016,7 +1016,9 @@ ss::future remote::list_objects( retry_chain_node& parent, std::optional prefix, std::optional delimiter, - std::optional item_filter) { + std::optional item_filter, + std::optional max_keys, + std::optional continuation_token) { ss::gate::holder gh{_gate}; retry_chain_node fib(&parent); retry_chain_logger ctxlog(cst_log, fib); @@ -1026,18 +1028,23 @@ ss::future remote::list_objects( std::optional result; bool items_remaining = true; - std::optional continuation_token = std::nullopt; // Gathers the items from a series of successful ListObjectsV2 calls cloud_storage_clients::client::list_bucket_result list_bucket_result; - // Keep iterating until the ListObjectsV2 calls has more items to return + const auto caller_handle_truncation = max_keys.has_value(); + + if (caller_handle_truncation) { + vassert(max_keys.value() > 0, "Max keys must be greater than 0."); + } + + // Keep iterating while the ListObjectsV2 calls has more items to return while (!_gate.is_closed() && permit.is_allowed && !result) { auto res = co_await lease.client->list_objects( bucket, prefix, std::nullopt, - std::nullopt, + max_keys, continuation_token, fib.get_timeout(), delimiter, @@ -1070,6 +1077,14 @@ ss::future remote::list_objects( // Continue to list the remaining items if (items_remaining) { + // But, return early if max_keys was specified (caller will + // handle truncation) + if (caller_handle_truncation) { + list_bucket_result.is_truncated = true; + list_bucket_result.next_continuation_token + = continuation_token.value(); + co_return list_bucket_result; + } continue; } diff --git a/src/v/cloud_storage/remote.h b/src/v/cloud_storage/remote.h index 44341499e76dd..921c602a78483 100644 --- a/src/v/cloud_storage/remote.h +++ b/src/v/cloud_storage/remote.h @@ -325,15 +325,29 @@ class remote : public ss::peering_sharded_service { /// \param prefix Optional prefix to restrict listing of objects /// \param delimiter A character to use as a delimiter when grouping list /// results - /// \param item_filter Optional filter to apply to items before - /// collecting + /// \param max_keys The maximum number of keys to return. If left + /// unspecified, all object keys that fulfill the request will be collected, + /// and the result will not be truncated (truncation not allowed). If + /// specified, it will be up to the user to deal with a possibly-truncated + /// result (using list_result.is_truncated) at the call site, most likely in + /// a while loop. The continuation-token generated by that request will be + /// available through list_result.next_continuation_token for future + /// requests. It is also important to note that the value for max_keys will + /// be capped by the cloud provider default (which may vary between + /// providers, e.g AWS has a limit of 1000 keys per ListObjects request). + /// \param continuation_token The token hopefully passed back to the user + /// from a prior list_objects() request, in the case that they are handling + /// a truncated result manually. + /// \param item_filter Optional filter to apply to items before collecting ss::future list_objects( const cloud_storage_clients::bucket_name& name, retry_chain_node& parent, std::optional prefix = std::nullopt, std::optional delimiter = std::nullopt, std::optional item_filter - = std::nullopt); + = std::nullopt, + std::optional max_keys = std::nullopt, + std::optional continuation_token = std::nullopt); /// \brief Upload small objects to bucket. Suitable for uploading simple /// strings, does not check for leadership before upload like the segment diff --git a/src/v/cloud_storage/tests/remote_test.cc b/src/v/cloud_storage/tests/remote_test.cc index 039d8e6194dfc..b9e5a2d74ab97 100644 --- a/src/v/cloud_storage/tests/remote_test.cc +++ b/src/v/cloud_storage/tests/remote_test.cc @@ -641,6 +641,130 @@ FIXTURE_TEST(test_list_bucket, remote_fixture) { } } +FIXTURE_TEST(test_list_bucket_with_max_keys, remote_fixture) { + set_expectations_and_listen({}); + cloud_storage_clients::bucket_name bucket{"test"}; + retry_chain_node fib(never_abort, 10s, 20ms); + + const auto s3_imposter_max_keys = s3_imposter_fixture::default_max_keys; + const auto size = s3_imposter_max_keys + 50; + for (int i = 0; i < size; i++) { + cloud_storage_clients::object_key path{fmt::format("{}", i)}; + auto result + = remote.local().upload_object(bucket, path, iobuf{}, fib).get(); + BOOST_REQUIRE_EQUAL(cloud_storage::upload_result::success, result); + } + + { + // Passing max_keys indicates we, as a user, will handle truncation + // results. Here, we know that that size > s3_imposter_max_keys, and the + // result will end up truncated. + auto max_keys = s3_imposter_max_keys; + auto result + = remote.local() + .list_objects( + bucket, fib, std::nullopt, std::nullopt, std::nullopt, max_keys) + .get(); + BOOST_REQUIRE(result.has_value()); + BOOST_REQUIRE(result.value().is_truncated); + // This continuation token is /54 because objects are sorted + // lexicographically. + BOOST_REQUIRE_EQUAL(result.value().next_continuation_token, "/54"); + BOOST_REQUIRE_EQUAL( + result.value().contents.size(), s3_imposter_max_keys); + BOOST_REQUIRE(result.value().common_prefixes.empty()); + + // Now, we can use the next_continuation_token from the previous, + // truncated result in order to query for the rest of the objects. We + // should expect to get the rest of the objects in "storage", and that + // this request is not truncated. + auto next_result = remote.local() + .list_objects( + bucket, + fib, + std::nullopt, + std::nullopt, + std::nullopt, + max_keys, + result.value().next_continuation_token) + .get(); + BOOST_REQUIRE(next_result.has_value()); + BOOST_REQUIRE(!next_result.value().is_truncated); + BOOST_REQUIRE_EQUAL( + next_result.value().contents.size(), size - s3_imposter_max_keys); + BOOST_REQUIRE(next_result.value().common_prefixes.empty()); + } + { + // On the other hand, passing max_keys as std::nullopt means + // truncation will be handled by the remote API, (all object keys will + // be read in a loop, we should expect no truncation in the return + // value), and the result contents should be full. + auto max_keys = std::nullopt; + auto result + = remote.local() + .list_objects( + bucket, fib, std::nullopt, std::nullopt, std::nullopt, max_keys) + .get(); + BOOST_REQUIRE(result.has_value()); + BOOST_REQUIRE(!result.value().is_truncated); + BOOST_REQUIRE_EQUAL(result.value().contents.size(), size); + BOOST_REQUIRE(result.value().common_prefixes.empty()); + } + { + auto max_keys = 2; + auto result + = remote.local() + .list_objects( + bucket, fib, std::nullopt, std::nullopt, std::nullopt, max_keys) + .get(); + BOOST_REQUIRE(result.has_value()); + BOOST_REQUIRE(result.value().is_truncated); + // This continuation token is /10 because objects are sorted + // lexicographically. + BOOST_REQUIRE_EQUAL(result.value().next_continuation_token, "/10"); + const auto& contents = result.value().contents; + BOOST_REQUIRE_EQUAL(contents.size(), max_keys); + BOOST_REQUIRE_EQUAL(contents[0].key, "0"); + BOOST_REQUIRE_EQUAL(contents[1].key, "1"); + BOOST_REQUIRE(result.value().common_prefixes.empty()); + } + { + // This will also be truncated, since size > s3_imposter_max_keys. + auto max_keys = size; + auto result + = remote.local() + .list_objects( + bucket, fib, std::nullopt, std::nullopt, std::nullopt, max_keys) + .get(); + BOOST_REQUIRE(result.has_value()); + BOOST_REQUIRE(result.value().is_truncated); + BOOST_REQUIRE_EQUAL( + result.value().contents.size(), s3_imposter_max_keys); + // This continuation token is /54 because objects are sorted + // lexicographically. + BOOST_REQUIRE_EQUAL(result.value().next_continuation_token, "/54"); + BOOST_REQUIRE(result.value().common_prefixes.empty()); + + // Reissue another request with continuation-token. This should capture + // the rest of the object keys, we expect a non-truncated result. + auto next_result = remote.local() + .list_objects( + bucket, + fib, + std::nullopt, + std::nullopt, + std::nullopt, + max_keys, + result.value().next_continuation_token) + .get(); + BOOST_REQUIRE(next_result.has_value()); + BOOST_REQUIRE(!next_result.value().is_truncated); + BOOST_REQUIRE_EQUAL( + next_result.value().contents.size(), size - s3_imposter_max_keys); + BOOST_REQUIRE(next_result.value().common_prefixes.empty()); + } +} + FIXTURE_TEST(test_list_bucket_with_prefix, remote_fixture) { set_expectations_and_listen({}); cloud_storage_clients::bucket_name bucket{"test"}; @@ -668,7 +792,6 @@ FIXTURE_TEST(test_list_bucket_with_prefix, remote_fixture) { BOOST_REQUIRE_EQUAL(request.method, "GET"); BOOST_REQUIRE_EQUAL(request.q_list_type, "2"); BOOST_REQUIRE_EQUAL(request.q_prefix, "x/"); - BOOST_REQUIRE_EQUAL(request.h_prefix, "x/"); } FIXTURE_TEST(test_list_bucket_with_filter, remote_fixture) { diff --git a/src/v/cloud_storage/tests/s3_imposter.cc b/src/v/cloud_storage/tests/s3_imposter.cc index fc863fc25d4c0..c9c7c9a200b5a 100644 --- a/src/v/cloud_storage/tests/s3_imposter.cc +++ b/src/v/cloud_storage/tests/s3_imposter.cc @@ -42,17 +42,39 @@ uint16_t unit_test_httpd_port_number() { return 4442; } namespace { +using expectation_map_t + = std::map; + // Takes the input map of keys to expectations and returns a stringified XML // corresponding to the appropriate S3 response. ss::sstring list_objects_resp( - const std::map& objects, + const expectation_map_t& objects, ss::sstring prefix, - ss::sstring delimiter) { + ss::sstring delimiter, + std::optional max_keys_opt, + std::optional continuation_token_opt) { std::map content_key_to_size; std::set common_prefixes; // Filter by prefix and group by the substring between the prefix and first // delimiter. - for (const auto& [_, expectation] : objects) { + auto max_keys = max_keys_opt.has_value() + ? std::min( + max_keys_opt.value(), + s3_imposter_fixture::default_max_keys) + : s3_imposter_fixture::default_max_keys; + auto it = (continuation_token_opt.has_value()) + ? objects.find(continuation_token_opt.value()) + : objects.begin(); + auto end_it = objects.end(); + ss::sstring next_continuation_token = ""; + for (; it != end_it; ++it) { + const auto& expectation = it->second; + + if (content_key_to_size.size() == max_keys) { + next_continuation_token = it->first; + break; + } + auto key = expectation.url; if (!key.empty() && key[0] == '/') { // Remove / character that S3 client adds @@ -89,6 +111,8 @@ ss::sstring list_objects_resp( prefix + key.substr(prefix.size(), delimiter_pos - prefix.size() + 1)); } + + const bool is_truncated = (it != end_it); // Populate the returned XML. ss::sstring ret; ret += fmt::format( @@ -97,14 +121,17 @@ ss::sstring list_objects_resp( test-bucket {} {} - 1000 + {} {} - false - next + {} + {} )xml", prefix, content_key_to_size.size(), - delimiter); + max_keys, + delimiter, + is_truncated, + next_continuation_token); for (const auto& [key, size] : content_key_to_size) { ret += fmt::format( R"xml( @@ -241,15 +268,33 @@ void s3_imposter_fixture::set_routes( if ( fixture._search_on_get_list && request.get_query_param("list-type") == "2") { - auto prefix = request.get_header("prefix"); - auto delimiter = request.get_header("delimiter"); + auto prefix = request.get_query_param("prefix"); + auto delimiter = request.get_query_param("delimiter"); + auto max_keys_str = request.get_query_param("max-keys"); + auto continuation_token_str = request.get_query_param( + "continuation-token"); + std::optional max_keys = (max_keys_str.empty()) + ? std::optional{} + : std::stoi( + max_keys_str); + std::optional continuation_token + = (continuation_token_str.empty()) + ? std::optional{} + : continuation_token_str; vlog( fixt_log.trace, - "S3 imposter list request {} - {} - {}", + "S3 imposter list request {} - {} - {} - {} - {}", prefix, delimiter, + max_keys, + continuation_token, request._method); - return list_objects_resp(expectations, prefix, delimiter); + return list_objects_resp( + expectations, + prefix, + delimiter, + max_keys, + continuation_token); } auto it = expectations.find(request._url); if (it == expectations.end() || !it->second.body.has_value()) { diff --git a/src/v/cloud_storage/tests/s3_imposter.h b/src/v/cloud_storage/tests/s3_imposter.h index f650097a168bf..f51833c2daf84 100644 --- a/src/v/cloud_storage/tests/s3_imposter.h +++ b/src/v/cloud_storage/tests/s3_imposter.h @@ -38,6 +38,7 @@ /// be retrieved using the GET request or deleted using the DELETE request. class s3_imposter_fixture { public: + static constexpr size_t default_max_keys = 100; uint16_t httpd_port_number(); static constexpr const char* httpd_host_name = "127.0.0.1"; diff --git a/src/v/cloud_storage_clients/abs_client.cc b/src/v/cloud_storage_clients/abs_client.cc index 06c8b593a33af..429a9f786e64c 100644 --- a/src/v/cloud_storage_clients/abs_client.cc +++ b/src/v/cloud_storage_clients/abs_client.cc @@ -285,11 +285,11 @@ abs_request_creator::make_list_blobs_request( const bucket_name& name, bool files_only, std::optional prefix, - [[maybe_unused]] std::optional start_after, - std::optional max_keys, + std::optional max_results, + std::optional marker, std::optional delimiter) { // GET /{container-id}?restype=container&comp=list&prefix={prefix}... - // ...&max_results{max_keys} + // ...&maxresults{max_keys} // HTTP/1.1 Host: {storage-account-id}.blob.core.windows.net // x-ms-date:{req-datetime in RFC9110} # added by 'add_auth' // x-ms-version:"2023-01-23" # added by 'add_auth' @@ -299,14 +299,18 @@ abs_request_creator::make_list_blobs_request( target += fmt::format("&prefix={}", prefix.value()().string()); } - if (max_keys) { - target += fmt::format("&max_results={}", max_keys.value()); + if (max_results) { + target += fmt::format("&maxresults={}", max_results.value()); } if (delimiter) { target += fmt::format("&delimiter={}", delimiter.value()); } + if (marker.has_value()) { + target += fmt::format("&marker={}", marker.value()); + } + if (files_only) { target += fmt::format("&showonly=files"); } @@ -746,7 +750,7 @@ ss::future> abs_client::list_objects( const bucket_name& name, std::optional prefix, - std::optional start_after, + [[maybe_unused]] std::optional start_after, std::optional max_keys, std::optional continuation_token, ss::lowres_clock::duration timeout, @@ -756,7 +760,6 @@ abs_client::list_objects( do_list_objects( name, std::move(prefix), - std::move(start_after), max_keys, std::move(continuation_token), timeout, @@ -768,9 +771,8 @@ abs_client::list_objects( ss::future abs_client::do_list_objects( const bucket_name& name, std::optional prefix, - std::optional start_after, - std::optional max_keys, - [[maybe_unused]] std::optional continuation_token, + std::optional max_results, + std::optional marker, ss::lowres_clock::duration timeout, std::optional delimiter, std::optional gather_item_if) { @@ -778,8 +780,8 @@ ss::future abs_client::do_list_objects( name, _adls_client.has_value(), std::move(prefix), - std::move(start_after), - max_keys, + max_results, + std::move(marker), delimiter); if (!header) { vlog( diff --git a/src/v/cloud_storage_clients/abs_client.h b/src/v/cloud_storage_clients/abs_client.h index cdfbe884349b4..34943016bbd4f 100644 --- a/src/v/cloud_storage_clients/abs_client.h +++ b/src/v/cloud_storage_clients/abs_client.h @@ -71,7 +71,8 @@ class abs_request_creator { /// \param name of the container /// \param files_only should always be set to true when HNS is enabled and false otherwise /// \param prefix prefix of returned blob's names - /// \param start_after is always ignored \param max_keys is the max number of returned objects + /// \param max_results is the max number of returned objects + /// \param marker is the "continuation-token" /// \param delimiter used to group common prefixes /// \return initialized and signed http header or error // clang-format on @@ -79,8 +80,8 @@ class abs_request_creator { const bucket_name& name, bool files_only, std::optional prefix, - std::optional start_after, - std::optional max_keys, + std::optional max_results, + std::optional marker, std::optional delimiter = std::nullopt); /// \brief Init http header for 'Get Account Information' request @@ -269,9 +270,8 @@ class abs_client : public client { ss::future do_list_objects( const bucket_name& name, std::optional prefix, - std::optional start_after, - std::optional max_keys, - std::optional continuation_token, + std::optional max_results, + std::optional marker, ss::lowres_clock::duration timeout, std::optional delimiter = std::nullopt, std::optional collect_item_if = std::nullopt); diff --git a/src/v/cloud_storage_clients/client.h b/src/v/cloud_storage_clients/client.h index fef5e51228b2f..b6755bfbfb242 100644 --- a/src/v/cloud_storage_clients/client.h +++ b/src/v/cloud_storage_clients/client.h @@ -96,7 +96,7 @@ class client { ss::sstring etag; }; struct list_bucket_result { - bool is_truncated; + bool is_truncated = false; ss::sstring prefix; ss::sstring next_continuation_token; std::vector contents; diff --git a/src/v/cloud_storage_clients/s3_client.cc b/src/v/cloud_storage_clients/s3_client.cc index 2e29c991ba453..11c36ef39f0ed 100644 --- a/src/v/cloud_storage_clients/s3_client.cc +++ b/src/v/cloud_storage_clients/s3_client.cc @@ -182,6 +182,18 @@ request_creator::make_list_objects_v2_request( if (prefix.has_value()) { target = fmt::format("{}&prefix={}", target, (*prefix)().string()); } + + if (start_after.has_value()) { + target = fmt::format("{}&start-after={}", target, *start_after); + } + + if (max_keys.has_value()) { + target = fmt::format("{}&max-keys={}", target, *max_keys); + } + if (continuation_token.has_value()) { + target = fmt::format( + "{}&continuation-token={}", target, *continuation_token); + } if (delimiter.has_value()) { target = fmt::format("{}&delimiter={}", target, *delimiter); } @@ -192,26 +204,6 @@ request_creator::make_list_objects_v2_request( header.insert(boost::beast::http::field::host, host); header.insert(boost::beast::http::field::content_length, "0"); - if (prefix) { - header.insert(aws_header_names::prefix, (*prefix)().string()); - } - if (start_after) { - header.insert(aws_header_names::start_after, (*start_after)().string()); - } - if (max_keys) { - header.insert(aws_header_names::max_keys, std::to_string(*max_keys)); - } - if (continuation_token) { - header.insert( - aws_header_names::continuation_token, - {continuation_token->data(), continuation_token->size()}); - } - - if (delimiter) { - header.insert( - aws_header_names::delimiter, std::string(1, delimiter.value())); - } - auto ec = _apply_credentials->add_auth(header); vlog(s3_log.trace, "ListObjectsV2:\n {}", header); if (ec) { @@ -751,7 +743,7 @@ ss::future s3_client::do_list_objects_v2( name, std::move(prefix), std::move(start_after), - max_keys, + std::move(max_keys), std::move(continuation_token), delimiter); if (!header) { diff --git a/src/v/cloud_storage_clients/tests/s3_client_test.cc b/src/v/cloud_storage_clients/tests/s3_client_test.cc index 8cdda0331082e..3a2dd0a06c8a6 100644 --- a/src/v/cloud_storage_clients/tests/s3_client_test.cc +++ b/src/v/cloud_storage_clients/tests/s3_client_test.cc @@ -164,7 +164,7 @@ void set_routes(ss::httpd::routes& r) { [](const_req req, reply& reply) { BOOST_REQUIRE(!req.get_header("x-amz-content-sha256").empty()); BOOST_REQUIRE_EQUAL(req.get_query_param("list-type"), "2"); - auto prefix = req.get_header("prefix"); + auto prefix = req.get_query_param("prefix"); if (prefix == "test") { // normal response return list_objects_payload; @@ -173,7 +173,8 @@ void set_routes(ss::httpd::routes& r) { reply.set_status(reply::status_type::internal_server_error); return error_payload; } else if (prefix == "test-cont") { - BOOST_REQUIRE_EQUAL(req.get_header("continuation-token"), "ctok"); + BOOST_REQUIRE_EQUAL( + req.get_query_param("continuation-token"), "ctok"); return list_objects_payload; } return ""; diff --git a/src/v/http/tests/registered_urls.h b/src/v/http/tests/registered_urls.h index e9a037db2917b..62f63ad1d1454 100644 --- a/src/v/http/tests/registered_urls.h +++ b/src/v/http/tests/registered_urls.h @@ -45,7 +45,6 @@ struct request_info { */ ss::sstring q_list_type; ss::sstring q_prefix; - ss::sstring h_prefix; bool has_q_delete; explicit request_info(const ss::http::request& req) @@ -55,7 +54,6 @@ struct request_info { , content_length(req.content_length) { q_list_type = req.get_query_param("list-type"); q_prefix = req.get_query_param("prefix"); - h_prefix = req.get_header("prefix"); has_q_delete = req.query_parameters.contains("delete"); }