Skip to content

Commit

Permalink
Fix race condition and event trigger in leader manager (#232)
Browse files Browse the repository at this point in the history
- Trigger ousted callback in `noopLeaderManager`
- Avoid race condition using `atomic.Value` for context cancel function
  • Loading branch information
wanliqun authored Oct 9, 2024
1 parent fa97505 commit fa85473
Showing 1 changed file with 50 additions and 12 deletions.
62 changes: 50 additions & 12 deletions sync/election/leader_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ type DlockLeaderManager struct {
lockMan *dlock.LockManager // provides lock operations
lockExpiryTime int64 // expiry timestamp for the dlock

mu sync.Mutex // mutex lock
cancel context.CancelFunc // function to cancel the campaign process.
electedCallbacks []ElectedCallback // leader elected callback functions
oustedCallbacks []OustedCallback // leader ousted callback functions
errorCallbacks []ErrorCallback // leader election error callback functions
mu sync.Mutex // mutex lock
cancel atomic.Value // function to cancel the campaign process.
electedCallbacks []ElectedCallback // leader elected callback functions
oustedCallbacks []OustedCallback // leader ousted callback functions
errorCallbacks []ErrorCallback // leader election error callback functions
}

// NewDlockLeaderManager creates a new `LeaderManager` with the provided configuration and election key.
Expand Down Expand Up @@ -152,8 +152,8 @@ func (l *DlockLeaderManager) Extend(ctx context.Context) error {

// Campaign starts the election process, which will run in a goroutine until contex canceled.
func (l *DlockLeaderManager) Campaign(ctx context.Context) {
newCtx, cancel := context.WithCancel(ctx)
l.cancel = cancel
newCtx, cancelFn := context.WithCancel(ctx)
l.cancel.Store(cancelFn)

logger := logutil.NewErrorTolerantLogger(logutil.DefaultETConfig)
etlog := func(err error) {
Expand Down Expand Up @@ -202,8 +202,10 @@ func (l *DlockLeaderManager) Campaign(ctx context.Context) {

// Stop stops the leader election process and resigns from the leadership if appliable.
func (l *DlockLeaderManager) Stop() error {
if l.cancel != nil {
l.cancel()
if v := l.cancel.Load(); v != nil {
if cancelFn, ok := v.(context.CancelFunc); ok {
cancelFn()
}
}

return l.lockMan.Release(context.Background(), l.lockIntent())
Expand Down Expand Up @@ -354,21 +356,48 @@ func (l *DlockLeaderManager) onError(ctx context.Context, err error) {
// noopLeaderManager dummy leader manager that does nothing at all.
type noopLeaderManager struct {
mu sync.Mutex // mutex lock
cancel atomic.Value // function to cancel the campaign process.
oustedCallbacks []OustedCallback // leader ousted callback functions
electedCallbacks []ElectedCallback // leader elected callback functions
}

func (l *noopLeaderManager) Identity() string { return "noop" }
func (l *noopLeaderManager) Extend(ctx context.Context) error { return nil }
func (l *noopLeaderManager) Await(ctx context.Context) bool { return true }
func (l *noopLeaderManager) Stop() error { return nil }
func (l *noopLeaderManager) OnOusted(cb OustedCallback) { /* do nothing */ }
func (l *noopLeaderManager) OnError(cb ErrorCallback) { /* do nothing */ }

func (l *noopLeaderManager) Stop() error {
if v := l.cancel.Load(); v != nil {
if cancelFn, ok := v.(context.CancelFunc); ok {
cancelFn()
}
}
return nil
}

func (l *noopLeaderManager) Campaign(ctx context.Context) {
ctx, cancelFn := context.WithCancel(ctx)
l.cancel.Store(cancelFn)

l.onElected(ctx)

<-ctx.Done()

l.onOusted(ctx)
}

func (l *noopLeaderManager) OnOusted(cb OustedCallback) {
l.mu.Lock()
defer l.mu.Unlock()

for _, cb := range l.electedCallbacks {
l.oustedCallbacks = append(l.oustedCallbacks, cb)
}

func (l *noopLeaderManager) onOusted(ctx context.Context) {
l.mu.Lock()
defer l.mu.Unlock()

for _, cb := range l.oustedCallbacks {
cb(ctx, l)
}
}
Expand All @@ -379,3 +408,12 @@ func (l *noopLeaderManager) OnElected(cb ElectedCallback) {

l.electedCallbacks = append(l.electedCallbacks, cb)
}

func (l *noopLeaderManager) onElected(ctx context.Context) {
l.mu.Lock()
defer l.mu.Unlock()

for _, cb := range l.electedCallbacks {
cb(ctx, l)
}
}

0 comments on commit fa85473

Please sign in to comment.