From 45bf4f00bc8fb8cb27019b9fbc58060c66be08d7 Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Fri, 6 Sep 2024 10:01:14 -0700 Subject: [PATCH] cloud_io: put io_result in cloud_io namespace I didn't do this originally because I didn't want to update a ton of code via s/cloud_storage::upload_result/cloud_io::upload_result. But instead this just adds a using declaration for cloud_storage, and moves the result types into the cloud_io namespace. --- src/v/cloud_io/io_result.cc | 4 +- src/v/cloud_io/io_result.h | 5 +- src/v/cloud_io/remote.cc | 113 +++++++++++++------------- src/v/cloud_io/remote.h | 19 ++--- src/v/cloud_storage/inventory/types.h | 1 - src/v/cloud_storage/types.h | 3 + 6 files changed, 72 insertions(+), 73 deletions(-) diff --git a/src/v/cloud_io/io_result.cc b/src/v/cloud_io/io_result.cc index 50276cc4fb95d..48e1868ebaad6 100644 --- a/src/v/cloud_io/io_result.cc +++ b/src/v/cloud_io/io_result.cc @@ -9,7 +9,7 @@ */ #include "cloud_io/io_result.h" -namespace cloud_storage { +namespace cloud_io { std::ostream& operator<<(std::ostream& o, const download_result& r) { switch (r) { @@ -47,4 +47,4 @@ std::ostream& operator<<(std::ostream& o, const upload_result& r) { return o; } -} // namespace cloud_storage +} // namespace cloud_io diff --git a/src/v/cloud_io/io_result.h b/src/v/cloud_io/io_result.h index 2a523f4ab1ff0..a9f8e7c296b2e 100644 --- a/src/v/cloud_io/io_result.h +++ b/src/v/cloud_io/io_result.h @@ -12,8 +12,7 @@ #include #include -// TODO: move into cloud_io namespace. -namespace cloud_storage { +namespace cloud_io { enum class [[nodiscard]] download_result : int32_t { success, @@ -31,4 +30,4 @@ enum class [[nodiscard]] upload_result : int32_t { std::ostream& operator<<(std::ostream& o, const download_result& r); std::ostream& operator<<(std::ostream& o, const upload_result& r); -} // namespace cloud_storage +} // namespace cloud_io diff --git a/src/v/cloud_io/remote.cc b/src/v/cloud_io/remote.cc index 7430885bdfa90..275c9b73cf7fd 100644 --- a/src/v/cloud_io/remote.cc +++ b/src/v/cloud_io/remote.cc @@ -184,7 +184,7 @@ int remote::delete_objects_max_keys() const { } } -ss::future remote::upload_stream( +ss::future remote::upload_stream( transfer_details transfer_details, uint64_t content_length, const reset_input_stream& reset_str, @@ -203,7 +203,7 @@ ss::future remote::upload_stream( stream_label, transfer_details.key, content_length); - std::optional result; + std::optional result; while (!_gate.is_closed() && permit.is_allowed && !result && max_retries.value_or(1) > 0) { if (max_retries.has_value()) { @@ -222,7 +222,7 @@ ss::future remote::upload_stream( path, bucket); transfer_details.on_failure(); - co_return cloud_storage::upload_result::cancelled; + co_return upload_result::cancelled; } auto reader_handle = co_await reset_str(); @@ -240,7 +240,7 @@ ss::future remote::upload_stream( if (res) { transfer_details.on_success_size(content_length); - co_return cloud_storage::upload_result::success; + co_return upload_result::success; } lease.client->shutdown(); @@ -267,7 +267,7 @@ ss::future remote::upload_stream( // not expected during upload [[fallthrough]]; case cloud_storage_clients::error_outcome::fail: - result = cloud_storage::upload_result::failed; + result = upload_result::failed; break; } } @@ -292,10 +292,10 @@ ss::future remote::upload_stream( bucket, *result); } - co_return cloud_storage::upload_result::timedout; + co_return upload_result::timedout; } -ss::future remote::download_stream( +ss::future remote::download_stream( transfer_details transfer_details, const try_consume_stream& cons_str, const std::string_view stream_label, @@ -321,7 +321,7 @@ ss::future remote::download_stream( auto permit = fib.retry(); vlog(ctxlog.debug, "Download {} {}", stream_label, path); - std::optional result; + std::optional result; while (!_gate.is_closed() && permit.is_allowed && !result) { transfer_details.on_request(fib.retry_count()); @@ -342,7 +342,7 @@ ss::future remote::download_stream( uint64_t content_length = co_await cons_str( length, std::move(throttled_st)); transfer_details.on_success_size(content_length); - co_return cloud_storage::download_result::success; + co_return download_result::success; } catch (...) { const auto ex = std::current_exception(); vlog( @@ -375,10 +375,10 @@ ss::future remote::download_stream( case cloud_storage_clients::error_outcome::operation_not_supported: [[fallthrough]]; case cloud_storage_clients::error_outcome::fail: - result = cloud_storage::download_result::failed; + result = download_result::failed; break; case cloud_storage_clients::error_outcome::key_not_found: - result = cloud_storage::download_result::notfound; + result = download_result::notfound; break; } } @@ -391,7 +391,7 @@ ss::future remote::download_stream( stream_label, bucket, path); - result = cloud_storage::download_result::timedout; + result = download_result::timedout; } else { vlog( ctxlog.warn, @@ -404,7 +404,7 @@ ss::future remote::download_stream( co_return *result; } -ss::future +ss::future remote::download_object(download_request download_request) { auto guard = _gate.hold(); auto& transfer_details = download_request.transfer_details; @@ -420,7 +420,7 @@ remote::download_object(download_request download_request) { auto permit = fib.retry(); vlog(ctxlog.debug, "Downloading {} from {}", object_type, path); - std::optional result; + std::optional result; while (!_gate.is_closed() && permit.is_allowed && !result) { download_request.transfer_details.on_request(fib.retry_count()); auto resp = co_await lease.client->get_object( @@ -433,7 +433,7 @@ remote::download_object(download_request download_request) { resp.value()); download_request.payload.append_fragments(std::move(buffer)); transfer_details.on_success(); - co_return cloud_storage::download_result::success; + co_return download_result::success; } lease.client->shutdown(); @@ -454,10 +454,10 @@ remote::download_object(download_request download_request) { case cloud_storage_clients::error_outcome::operation_not_supported: [[fallthrough]]; case cloud_storage_clients::error_outcome::fail: - result = cloud_storage::download_result::failed; + result = download_result::failed; break; case cloud_storage_clients::error_outcome::key_not_found: - result = cloud_storage::download_result::notfound; + result = download_result::notfound; break; } } @@ -471,7 +471,7 @@ remote::download_object(download_request download_request) { bucket, object_type, path); - result = cloud_storage::download_result::timedout; + result = download_result::timedout; } else { vlog( ctxlog.warn, @@ -485,7 +485,7 @@ remote::download_object(download_request download_request) { co_return *result; } -ss::future remote::object_exists( +ss::future remote::object_exists( const cloud_storage_clients::bucket_name& bucket, const cloud_storage_clients::object_key& path, retry_chain_node& parent, @@ -496,7 +496,7 @@ ss::future remote::object_exists( auto lease = co_await _pool.local().acquire(fib.root_abort_source()); auto permit = fib.retry(); vlog(ctxlog.debug, "Check {} {}", object_type, path); - std::optional result; + std::optional result; while (!_gate.is_closed() && permit.is_allowed && !result) { auto resp = co_await lease.client->head_object( bucket, path, fib.get_timeout()); @@ -508,7 +508,7 @@ ss::future remote::object_exists( path, resp.value().object_size, resp.value().etag); - co_return cloud_storage::download_result::success; + co_return download_result::success; } // Error path @@ -527,10 +527,10 @@ ss::future remote::object_exists( case cloud_storage_clients::error_outcome::operation_not_supported: [[fallthrough]]; case cloud_storage_clients::error_outcome::fail: - result = cloud_storage::download_result::failed; + result = download_result::failed; break; case cloud_storage_clients::error_outcome::key_not_found: - result = cloud_storage::download_result::notfound; + result = download_result::notfound; break; } } @@ -542,7 +542,7 @@ ss::future remote::object_exists( bucket, object_type, path); - result = cloud_storage::download_result::timedout; + result = download_result::timedout; } else { vlog( ctxlog.warn, @@ -555,7 +555,7 @@ ss::future remote::object_exists( co_return *result; } -ss::future +ss::future remote::delete_object(transfer_details transfer_details) { const auto& bucket = transfer_details.bucket; const auto& path = transfer_details.key; @@ -566,19 +566,19 @@ remote::delete_object(transfer_details transfer_details) { auto lease = co_await _pool.local().acquire(fib.root_abort_source()); auto permit = fib.retry(); vlog(ctxlog.debug, "Delete object {}", path); - std::optional result; + std::optional result; while (!_gate.is_closed() && permit.is_allowed && !result) { // NOTE: DeleteObject in S3 doesn't return an error // if the object doesn't exist. Because of that we're - // using 'cloud_storage::upload_result' type as a return type. No need - // to handle NoSuchKey error. The 'cloud_storage::upload_result' + // using 'upload_result' type as a return type. No need + // to handle NoSuchKey error. The 'upload_result' // represents any mutable operation. transfer_details.on_request(fib.retry_count()); auto res = co_await lease.client->delete_object( bucket, path, fib.get_timeout()); if (res) { - co_return cloud_storage::upload_result::success; + co_return upload_result::success; } lease.client->shutdown(); @@ -597,7 +597,7 @@ remote::delete_object(transfer_details transfer_details) { case cloud_storage_clients::error_outcome::operation_not_supported: [[fallthrough]]; case cloud_storage_clients::error_outcome::fail: - result = cloud_storage::upload_result::failed; + result = upload_result::failed; break; case cloud_storage_clients::error_outcome::key_not_found: vassert( @@ -615,7 +615,7 @@ remote::delete_object(transfer_details transfer_details) { "DeleteObject {}, {}, backoff quota exceded, object not deleted", path, bucket); - result = cloud_storage::upload_result::timedout; + result = upload_result::timedout; } else { vlog( ctxlog.warn, @@ -632,7 +632,7 @@ requires std::ranges::range && std::same_as< std::ranges::range_value_t, cloud_storage_clients::object_key> -ss::future remote::delete_objects( +ss::future remote::delete_objects( const cloud_storage_clients::bucket_name& bucket, R keys, retry_chain_node& parent, @@ -642,7 +642,7 @@ ss::future remote::delete_objects( if (keys.empty()) { vlog(ctxlog.info, "No keys to delete, returning"); - co_return cloud_storage::upload_result::success; + co_return upload_result::success; } vlog(ctxlog.debug, "Deleting objects count {}", keys.size()); @@ -653,7 +653,7 @@ ss::future remote::delete_objects( } const auto batches_to_delete = num_chunks(keys, delete_objects_max_keys()); - std::vector results; + std::vector results; results.reserve(batches_to_delete); co_await ss::max_concurrent_for_each( @@ -691,15 +691,15 @@ ss::future remote::delete_objects( if (results.empty()) { vlog(ctxlog.error, "No keys were deleted"); - co_return cloud_storage::upload_result::failed; + co_return upload_result::failed; } co_return std::reduce( results.begin(), results.end(), - cloud_storage::upload_result::success, + upload_result::success, [](auto res_a, auto res_b) { - if (res_a != cloud_storage::upload_result::success) { + if (res_a != upload_result::success) { return res_a; } @@ -707,7 +707,7 @@ ss::future remote::delete_objects( }); } -ss::future remote::delete_object_batch( +ss::future remote::delete_object_batch( const cloud_storage_clients::bucket_name& bucket, std::vector keys, retry_chain_node& parent, @@ -719,7 +719,7 @@ ss::future remote::delete_object_batch( auto lease = co_await _pool.local().acquire(fib.root_abort_source()); auto permit = fib.retry(); vlog(ctxlog.debug, "Deleting a batch of size {}", keys.size()); - std::optional result; + std::optional result; while (!_gate.is_closed() && permit.is_allowed && !result) { req_cb(fib.retry_count()); auto res = co_await lease.client->delete_objects( @@ -735,10 +735,10 @@ ss::future remote::delete_object_batch( res.value().undeleted_keys.front().key, res.value().undeleted_keys.front().reason); - co_return cloud_storage::upload_result::failed; + co_return upload_result::failed; } - co_return cloud_storage::upload_result::success; + co_return upload_result::success; } lease.client->shutdown(); @@ -757,7 +757,7 @@ ss::future remote::delete_object_batch( case cloud_storage_clients::error_outcome::operation_not_supported: [[fallthrough]]; case cloud_storage_clients::error_outcome::fail: - result = cloud_storage::upload_result::failed; + result = upload_result::failed; break; case cloud_storage_clients::error_outcome::key_not_found: vassert( @@ -777,7 +777,7 @@ ss::future remote::delete_object_batch( "not deleted", keys.size(), bucket); - result = cloud_storage::upload_result::timedout; + result = upload_result::timedout; } else { vlog( ctxlog.warn, @@ -790,14 +790,14 @@ ss::future remote::delete_object_batch( co_return *result; } -template ss::future +template ss::future remote::delete_objects>( const cloud_storage_clients::bucket_name& bucket, std::vector keys, retry_chain_node& parent, std::function); -template ss::future +template ss::future remote::delete_objects>( const cloud_storage_clients::bucket_name& bucket, std::deque keys, @@ -809,7 +809,7 @@ requires std::ranges::range && std::same_as< std::ranges::range_value_t, cloud_storage_clients::object_key> -ss::future remote::delete_objects_sequentially( +ss::future remote::delete_objects_sequentially( const cloud_storage_clients::bucket_name& bucket, R keys, retry_chain_node& parent, @@ -835,7 +835,7 @@ ss::future remote::delete_objects_sequentially( .node = std::make_unique(&parent)}; }); - std::vector results; + std::vector results; results.reserve(key_nodes.size()); auto fut = co_await ss::coroutine::as_future(ss::max_concurrent_for_each( key_nodes.begin(), @@ -858,12 +858,12 @@ ss::future remote::delete_objects_sequentially( } else { vlog(ctxlog.error, "Failed to delete keys: {}", eptr); } - co_return cloud_storage::upload_result::failed; + co_return upload_result::failed; } if (results.empty()) { vlog(ctxlog.error, "No keys were deleted"); - co_return cloud_storage::upload_result::failed; + co_return upload_result::failed; } // This is not ideal, we lose all non-failures but the first one, but @@ -872,9 +872,9 @@ ss::future remote::delete_objects_sequentially( co_return std::reduce( results.begin(), results.end(), - cloud_storage::upload_result::success, + upload_result::success, [](auto res_a, auto res_b) { - if (res_a != cloud_storage::upload_result::success) { + if (res_a != upload_result::success) { return res_a; } @@ -1002,8 +1002,7 @@ ss::future remote::list_objects( co_return *result; } -ss::future -remote::upload_object(upload_request upload_request) { +ss::future remote::upload_object(upload_request upload_request) { auto guard = _gate.hold(); auto& transfer_details = upload_request.transfer_details; @@ -1015,7 +1014,7 @@ remote::upload_object(upload_request upload_request) { auto path = cloud_storage_clients::object_key(transfer_details.key()); auto upload_type = upload_request.display_str; - std::optional result; + std::optional result; while (!_gate.is_closed() && permit.is_allowed && !result) { auto lease = co_await _pool.local().acquire(fib.root_abort_source()); @@ -1038,7 +1037,7 @@ remote::upload_object(upload_request upload_request) { if (res) { transfer_details.on_success(); - co_return cloud_storage::upload_result::success; + co_return upload_result::success; } lease.client->shutdown(); @@ -1061,7 +1060,7 @@ remote::upload_object(upload_request upload_request) { case cloud_storage_clients::error_outcome::key_not_found: [[fallthrough]]; case cloud_storage_clients::error_outcome::fail: - result = cloud_storage::upload_result::failed; + result = upload_result::failed; break; } } @@ -1086,7 +1085,7 @@ remote::upload_object(upload_request upload_request) { *result, upload_type); } - co_return cloud_storage::upload_result::timedout; + co_return upload_result::timedout; } ss::future<> diff --git a/src/v/cloud_io/remote.h b/src/v/cloud_io/remote.h index 3cf34d701d033..1315f5f2e101a 100644 --- a/src/v/cloud_io/remote.h +++ b/src/v/cloud_io/remote.h @@ -113,10 +113,10 @@ class remote : public ss::peering_sharded_service { /// \param download_request holds a reference to an iobuf in the `payload` /// field which will hold the downloaded object if the download was /// successful - ss::future + ss::future download_object(download_request download_request); - ss::future object_exists( + ss::future object_exists( const cloud_storage_clients::bucket_name& bucket, const cloud_storage_clients::object_key& path, retry_chain_node& parent, @@ -128,7 +128,7 @@ class remote : public ss::peering_sharded_service { /// /// \param path is a full S3 object path /// \param bucket is a name of the S3 bucket - ss::future delete_object(transfer_details); + ss::future delete_object(transfer_details); /// \brief Delete multiple objects from S3 /// @@ -147,7 +147,7 @@ class remote : public ss::peering_sharded_service { && std::same_as< std::ranges::range_value_t, cloud_storage_clients::object_key> - ss::future delete_objects( + ss::future delete_objects( const cloud_storage_clients::bucket_name& bucket, R keys, retry_chain_node& parent, @@ -187,15 +187,14 @@ class remote : public ss::peering_sharded_service { /// \brief Upload small objects to bucket. Suitable for uploading simple /// strings, does not check for leadership before upload like the segment /// upload function. - ss::future - upload_object(upload_request upload_request); + ss::future upload_object(upload_request upload_request); // If you need to spawn a background task that relies on // this object staying alive, spawn it with this gate. seastar::gate& gate() { return _gate; }; ss::abort_source& as() { return _as; } - ss::future upload_stream( + ss::future upload_stream( transfer_details transfer_details, uint64_t content_length, const reset_input_stream& reset_str, @@ -203,7 +202,7 @@ class remote : public ss::peering_sharded_service { const std::string_view stream_label, std::optional max_retries); - ss::future download_stream( + ss::future download_stream( transfer_details transfer_details, const try_consume_stream& cons_str, const std::string_view stream_label, @@ -217,7 +216,7 @@ class remote : public ss::peering_sharded_service { && std::same_as< std::ranges::range_value_t, cloud_storage_clients::object_key> - ss::future delete_objects_sequentially( + ss::future delete_objects_sequentially( const cloud_storage_clients::bucket_name& bucket, R keys, retry_chain_node& parent, @@ -227,7 +226,7 @@ class remote : public ss::peering_sharded_service { /// backend limits. /// /// \pre the number of keys is <= delete_objects_max_keys - ss::future delete_object_batch( + ss::future delete_object_batch( const cloud_storage_clients::bucket_name& bucket, std::vector keys, retry_chain_node& parent, diff --git a/src/v/cloud_storage/inventory/types.h b/src/v/cloud_storage/inventory/types.h index 9fe67f82e2350..840018814d0c9 100644 --- a/src/v/cloud_storage/inventory/types.h +++ b/src/v/cloud_storage/inventory/types.h @@ -23,7 +23,6 @@ class retry_chain_node; namespace cloud_storage { class cloud_storage_api; -enum class upload_result; } // namespace cloud_storage namespace cloud_storage::inventory { diff --git a/src/v/cloud_storage/types.h b/src/v/cloud_storage/types.h index a8850a0289802..698b394b91417 100644 --- a/src/v/cloud_storage/types.h +++ b/src/v/cloud_storage/types.h @@ -34,6 +34,9 @@ namespace cloud_storage { +using upload_result = cloud_io::upload_result; +using download_result = cloud_io::download_result; + using remote_metrics_disabled = ss::bool_class;