Skip to content

Commit

Permalink
Merge pull request #24265 from Lazin/fix/CORE-7257
Browse files Browse the repository at this point in the history
[CORE-7257] cloud_storage: Remove assertion in `remote_segment`
  • Loading branch information
Lazin authored Nov 25, 2024
2 parents 531a357 + c2039dd commit 6ebd81f
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 1 deletion.
3 changes: 3 additions & 0 deletions src/v/cloud_storage/cache_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
#include <set>
#include <string_view>

struct cloud_storage_fixture;

namespace cloud_storage {

// These timeout/backoff settings are for S3 requests
Expand Down Expand Up @@ -360,6 +362,7 @@ class cache
ss::condition_variable _block_puts_cond;

friend class cache_test_fixture;
friend struct ::cloud_storage_fixture;

// List of probable deletion candidates from the last trim.
std::optional<fragmented_vector<file_list_item>> _last_trim_carryover;
Expand Down
15 changes: 14 additions & 1 deletion src/v/cloud_storage/remote_segment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "cloud_storage/remote_segment.h"

#include "base/vlog.h"
#include "bytes/iobuf.h"
#include "bytes/iostream.h"
#include "cloud_storage/cache_service.h"
Expand Down Expand Up @@ -37,6 +38,7 @@
#include <seastar/core/abort_source.hh>
#include <seastar/core/circular_buffer.hh>
#include <seastar/core/fstream.hh>
#include <seastar/core/future.hh>
#include <seastar/core/io_priority_class.hh>
#include <seastar/core/loop.hh>
#include <seastar/core/lowres_clock.hh>
Expand Down Expand Up @@ -1663,7 +1665,18 @@ ss::future<> hydration_loop_state::hydrate(size_t wait_list_size) {
fs.push_back(state.hydrate_action());
break;
case cache_element_status::in_progress:
vassert(false, "{} is already in progress", state.path);
// Ths means that we have two remote_segment instances running
// in parallel. This is possible in case of extreme contention
// when the materialized segment gets evicted and then
// materialized again. The underlying cache service is a global
// state that all instances of the 'remote_segment' share.
vlog(_ctxlog.warn, "{} is already in progress", state.path);
fs.push_back(
ss::make_exception_future<>(std::runtime_error(fmt_with_ctx(
fmt::format,
"Concurrency violation. {} is already in progress.",
state.path))));
break;
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/v/cloud_storage/tests/cloud_storage_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ struct cloud_storage_fixture : s3_imposter_fixture {
cloud_storage_fixture(cloud_storage_fixture&&) = delete;
cloud_storage_fixture operator=(cloud_storage_fixture&&) = delete;

void mark_as_in_progress(remote_segment_path path) {
cache.local()._files_in_progress.insert(path);
}

ss::tmp_dir tmp_directory;
ss::sharded<cloud_storage::cache> cache;
ss::sharded<cloud_storage_clients::client_pool> pool;
Expand Down
55 changes: 55 additions & 0 deletions src/v/cloud_storage/tests/remote_segment_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "base/seastarx.h"
#include "bytes/iobuf.h"
#include "bytes/iostream.h"
#include "cloud_storage/cache_service.h"
#include "cloud_storage/download_exception.h"
#include "cloud_storage/materialized_resources.h"
#include "cloud_storage/partition_manifest.h"
Expand Down Expand Up @@ -42,6 +43,8 @@
#include <boost/test/tools/old/interface.hpp>
#include <boost/test/unit_test.hpp>

#include <stdexcept>

using namespace std::chrono_literals;
using namespace cloud_storage;

Expand Down Expand Up @@ -503,3 +506,55 @@ FIXTURE_TEST(
reader.stop().get();
segment->stop().get();
}

FIXTURE_TEST(
test_remote_segment_concurrent_download, cloud_storage_fixture) { // NOLINT
auto conf = get_configuration();
partition_manifest m(manifest_ntp, manifest_revision);
model::initial_revision_id segment_ntp_revision{777};
iobuf segment_bytes = generate_segment(model::offset(1), 20);
uint64_t clen = segment_bytes.size_bytes();
auto reset_stream = make_reset_fn(segment_bytes);
retry_chain_node fib(never_abort, 1000ms, 200ms);
partition_manifest::segment_meta meta{
.is_compacted = false,
.size_bytes = segment_bytes.size_bytes(),
.base_offset = model::offset(1),
.committed_offset = model::offset(20),
.base_timestamp = {},
.max_timestamp = {},
.delta_offset = model::offset_delta(0),
.ntp_revision = segment_ntp_revision,
.sname_format = segment_name_format::v2};
auto path = m.generate_segment_path(meta, path_provider);
set_expectations_and_listen({});
auto upl_res
= api.local()
.upload_segment(
bucket_name, path, clen, reset_stream, fib, always_continue)
.get();
BOOST_REQUIRE(upl_res == upload_result::success);
m.add(meta);

partition_probe probe{manifest_ntp};
auto& ts_probe = api.local().materialized().get_read_path_probe();

auto name = m.generate_segment_path(meta, path_provider);
mark_as_in_progress(name);
remote_segment segment(
api.local(),
cache.local(),
bucket_name,
name,
m.get_ntp(),
meta,
fib,
probe,
ts_probe);

auto d = ss::defer([&segment] { segment.stop().get(); });

BOOST_REQUIRE_THROW(
segment.data_stream(0, ss::default_priority_class()).get(),
std::runtime_error);
}

0 comments on commit 6ebd81f

Please sign in to comment.