Skip to content

Commit

Permalink
cluster: avoid duplicate partition registration callbacks
Browse files Browse the repository at this point in the history
If registration occurs while a partition is co_await starting, it will
be present in the ntp_table and the upcalls will happen both inline with
registration and again after the partition is started.

This patch skips callback at time of registration if the partition is in
the process of starting, and relies on the callback that is invoked
after the partition is started.

Signed-off-by: Noah Watkins <[email protected]>
  • Loading branch information
dotnwat committed Oct 23, 2024
1 parent 0c7a11f commit 2311c1f
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 2 deletions.
5 changes: 5 additions & 0 deletions src/v/cluster/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,9 @@ class partition : public ss::enable_lw_shared_from_this<partition> {
partition_properties_stm::writes_disabled disable,
model::timeout_clock::time_point deadline);

bool started() const noexcept { return _started; }
void mark_started() noexcept { _started = true; }

private:
ss::future<result<ssx::rwlock_unit>> hold_writes_enabled();

Expand Down Expand Up @@ -410,6 +413,8 @@ class partition : public ss::enable_lw_shared_from_this<partition> {
// exclusive ("write") for enabling/disabling writes
ssx::rwlock _produce_lock;

bool _started{false};

friend std::ostream& operator<<(std::ostream& o, const partition& x);
};
} // namespace cluster
Expand Down
6 changes: 6 additions & 0 deletions src/v/cluster/partition_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,12 @@ ss::future<consensus_ptr> partition_manager::manage(

co_await p->start(_stm_registry, xst_state);

// this is not done in partition::start itself because the purpose of this
// flag is to operate in an uninterruptible context with watcher
// notification below to avoid registration while this fiber is blocked,
// leading to double notifications.
p->mark_started();

_manage_watchers.notify(p->ntp(), p);

co_return c;
Expand Down
8 changes: 6 additions & 2 deletions src/v/cluster/partition_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ class partition_manager
cb(std::move(p));
});
for (auto& e : _ntp_table) {
init.notify(e.first, e.second);
if (e.second->started()) {
init.notify(e.first, e.second);
}
}

// now setup the permenant callback for new partitions
Expand All @@ -134,7 +136,9 @@ class partition_manager
init.register_notify(
ns, [&cb](ss::lw_shared_ptr<partition> p) { cb(std::move(p)); });
for (auto& e : _ntp_table) {
init.notify(e.first, e.second);
if (e.second->started()) {
init.notify(e.first, e.second);
}
}
return _manage_watchers.register_notify(ns, std::move(cb));
}
Expand Down

0 comments on commit 2311c1f

Please sign in to comment.