Skip to content

Commit

Permalink
storage_service, tablets: Prevent stale RPCs from running beyond thei…
Browse files Browse the repository at this point in the history
…r stage

Example scenario:

  1. coordinator A sends RPC #1 to trigger streaming
  2. coordinator fails over to B
  3. coordinator B performs streaming successfully
  4. RPC #1 arrives and starts streaming
  5. coordinator B commits the transition to the post-streaming stage
  6. coordinator B executes global token metadata barrier

We end up with streaming running despite the fact that the current
coordinator moved on. Currently, this won't happen, because streaming
holds on to erm. But we want to change that (see scylladb#14995), so that it
does not block barriers for migrations of other tablets. The same
problem applies to tablet cleanup.

The fix is to use tablet_metadata_guard around such long running
operations, which will keep hold to erm so that in the above scenario
coordinator B will wait for it in step 6. The guard ensures that erm
doesn't block other migrations because it switches to the latest erm
if it's compatible. If it's not, it signals abort_source for the guard
so that such stale operation aborts soon and the barrier in step 6
doesn't wait for long.
  • Loading branch information
tgrabiec committed Sep 14, 2023
1 parent 6a62aca commit 5cf0358
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 7 deletions.
56 changes: 50 additions & 6 deletions service/storage_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "db/consistency_level.hh"
#include "service/tablet_allocator.hh"
#include "locator/tablets.hh"
#include "locator/tablet_metadata_guard.hh"
#include "replica/tablet_mutation_builder.hh"
#include <seastar/core/smp.hh>
#include "mutation/canonical_mutation.hh"
Expand Down Expand Up @@ -6083,9 +6084,46 @@ inet_address storage_service::host2ip(locator::host_id host) {
// the actual operation performed may be different than intended, it may
// be the one intended by the new coordinator. This is not a problem
// because the old coordinator should do nothing with such result.
//
// The triggers may be retried. They may also be reordered with older triggers, from
// the same or a different coordinator. There is a protocol which ensures that
// stale triggers won't cause operations to run beyond the migration stage they were
// intended for. For example, that streaming is not still running after the coordinator
// moved past the "streaming" stage, and that it won't be started when the stage is not appropriate.
// A non-stale trigger is the one which completed successfully and caused the valid coordinator
// to advance tablet migration to the next stage. Other triggers are called stale.
// We can divide stale triggers into categories:
// (1) Those which start after the tablet was moved to the next stage
// Those which start before the tablet was moved to the next stage,
// (2) ...but after the non-stale trigger finished
// (3) ...but before the non-stale trigger finished
//
// By "start" I mean the atomic block which inserts into _tablet_ops, and by "finish" I mean
// removal from _tablet_ops.
// So event ordering is local from the perspective of this replica, and is linear because
// this happens on the same shard.
//
// What prevents (1) from running is the fact that triggers check the state of tablet
// metadata, and will fail immediately if the stage is not appropriate. It can happen
// that the trigger is so stale that it will match with an appropriate stage of the next
// migration of the same tablet. This is not a problem because we fall into the same
// category as a stale trigger which was started in the new migration, so cases (2) or (3) apply.
//
// What prevents (2) from running is the fact that after the coordinator moves on to
// the next stage, it executes a token metadata barrier, which will wait for such triggers
// to complete as they hold on to erm via tablet_metadata_barrier. They should be aborted
// soon after the coordinator changes the stage by the means of tablet_metadata_barrier::get_abort_source().
//
// What prevents (3) from running is that they will join with the non-stale trigger, or non-stale
// trigger will join with them, depending on which came first. In that case they finish at the same time.
//
// It's very important that the global token metadata barrier involves all nodes which
// may receive stale triggers started in the previous stage, so that those nodes will
// see tablet metadata which reflects group0 state. This will cut-off stale triggers
// as soon as the coordinator moves to the next stage.
future<> storage_service::do_tablet_operation(locator::global_tablet_id tablet,
sstring op_name,
std::function<future<>()> op) {
std::function<future<>(locator::tablet_metadata_guard&)> op) {
// The coordinator may not execute global token metadata barrier before triggering the operation, so we need
// a barrier here to see the token metadata which is at least as recent as that of the sender.
auto& raft_server = _group0->group0_server();
Expand All @@ -6097,6 +6135,12 @@ future<> storage_service::do_tablet_operation(locator::global_tablet_id tablet,
co_return;
}

locator::tablet_metadata_guard guard(_db.local().find_column_family(tablet.table), tablet);
auto& as = guard.get_abort_source();
auto sub = _abort_source.subscribe([&as] () noexcept {
as.request_abort();
});

auto async_gate_holder = _async_gate.hold();
promise<> p;
_tablet_ops.emplace(tablet, tablet_operation {
Expand All @@ -6107,7 +6151,7 @@ future<> storage_service::do_tablet_operation(locator::global_tablet_id tablet,
});

try {
co_await op();
co_await op(guard);
p.set_value();
slogger.debug("{} for tablet migration of {} successful", op_name, tablet);
} catch (...) {
Expand All @@ -6120,9 +6164,9 @@ future<> storage_service::do_tablet_operation(locator::global_tablet_id tablet,
// Streams data to the pending tablet replica of a given tablet on this node.
// The source tablet replica is determined from the current transition info of the tablet.
future<> storage_service::stream_tablet(locator::global_tablet_id tablet) {
return do_tablet_operation(tablet, "Streaming", [this, tablet] () -> future<> {
auto tm = _shared_token_metadata.get();
auto& tmap = tm->tablets().get_tablet_map(tablet.table);
return do_tablet_operation(tablet, "Streaming", [this, tablet] (locator::tablet_metadata_guard& guard) -> future<> {
auto tm = guard.get_token_metadata();
auto& tmap = guard.get_tablet_map();
auto* trinfo = tmap.get_tablet_transition_info(tablet.tablet);

// Check if the request is still valid.
Expand Down Expand Up @@ -6150,7 +6194,7 @@ future<> storage_service::stream_tablet(locator::global_tablet_id tablet) {

auto& table = _db.local().find_column_family(tablet.table);
std::vector<sstring> tables = {table.schema()->cf_name()};
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, tm, _abort_source,
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, std::move(tm), guard.get_abort_source(),
get_broadcast_address(), _snitch.local()->get_location(),
"Tablet migration", streaming::stream_reason::tablet_migration, std::move(tables));
streamer->add_source_filter(std::make_unique<dht::range_streamer::failure_detector_source_filter>(
Expand Down
3 changes: 2 additions & 1 deletion service/storage_service.hh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "service/endpoint_lifecycle_subscriber.hh"
#include "locator/abstract_replication_strategy.hh"
#include "locator/tablets.hh"
#include "locator/tablet_metadata_guard.hh"
#include "inet_address_vectors.hh"
#include <seastar/core/distributed.hh>
#include <seastar/core/condition-variable.hh>
Expand Down Expand Up @@ -156,7 +157,7 @@ private:
future<> node_ops_abort_thread();
future<> do_tablet_operation(locator::global_tablet_id tablet,
sstring op_name,
std::function<future<>()> op);
std::function<future<>(locator::tablet_metadata_guard&)> op);
future<> stream_tablet(locator::global_tablet_id);
inet_address host2ip(locator::host_id);
public:
Expand Down

0 comments on commit 5cf0358

Please sign in to comment.