Skip to content

Commit

Permalink
Merge pull request #23898 from dotnwat/partition-manager-notify
Browse files Browse the repository at this point in the history
cluster: avoid duplicate partition registration callbacks
  • Loading branch information
dotnwat authored Oct 24, 2024
2 parents 9fbcd7b + 2311c1f commit aa74fcb
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 2 deletions.
5 changes: 5 additions & 0 deletions src/v/cluster/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,9 @@ class partition : public ss::enable_lw_shared_from_this<partition> {
partition_properties_stm::writes_disabled disable,
model::timeout_clock::time_point deadline);

bool started() const noexcept { return _started; }
void mark_started() noexcept { _started = true; }

private:
ss::future<result<ssx::rwlock_unit>> hold_writes_enabled();

Expand Down Expand Up @@ -410,6 +413,8 @@ class partition : public ss::enable_lw_shared_from_this<partition> {
// exclusive ("write") for enabling/disabling writes
ssx::rwlock _produce_lock;

bool _started{false};

friend std::ostream& operator<<(std::ostream& o, const partition& x);
};
} // namespace cluster
Expand Down
6 changes: 6 additions & 0 deletions src/v/cluster/partition_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,12 @@ ss::future<consensus_ptr> partition_manager::manage(

co_await p->start(_stm_registry, xst_state);

// this is not done in partition::start itself because the purpose of this
// flag is to operate in an uninterruptible context with watcher
// notification below to avoid registration while this fiber is blocked,
// leading to double notifications.
p->mark_started();

_manage_watchers.notify(p->ntp(), p);

co_return c;
Expand Down
8 changes: 6 additions & 2 deletions src/v/cluster/partition_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ class partition_manager
cb(std::move(p));
});
for (auto& e : _ntp_table) {
init.notify(e.first, e.second);
if (e.second->started()) {
init.notify(e.first, e.second);
}
}

// now setup the permenant callback for new partitions
Expand All @@ -134,7 +136,9 @@ class partition_manager
init.register_notify(
ns, [&cb](ss::lw_shared_ptr<partition> p) { cb(std::move(p)); });
for (auto& e : _ntp_table) {
init.notify(e.first, e.second);
if (e.second->started()) {
init.notify(e.first, e.second);
}
}
return _manage_watchers.register_notify(ns, std::move(cb));
}
Expand Down

0 comments on commit aa74fcb

Please sign in to comment.