diff --git a/sync/election/leader_manager.go b/sync/election/leader_manager.go index c0a729e..b5fa261 100644 --- a/sync/election/leader_manager.go +++ b/sync/election/leader_manager.go @@ -93,11 +93,11 @@ type DlockLeaderManager struct { lockMan *dlock.LockManager // provides lock operations lockExpiryTime int64 // expiry timestamp for the dlock - mu sync.Mutex // mutex lock - cancel context.CancelFunc // function to cancel the campaign process. - electedCallbacks []ElectedCallback // leader elected callback functions - oustedCallbacks []OustedCallback // leader ousted callback functions - errorCallbacks []ErrorCallback // leader election error callback functions + mu sync.Mutex // mutex lock + cancel atomic.Value // function to cancel the campaign process. + electedCallbacks []ElectedCallback // leader elected callback functions + oustedCallbacks []OustedCallback // leader ousted callback functions + errorCallbacks []ErrorCallback // leader election error callback functions } // NewDlockLeaderManager creates a new `LeaderManager` with the provided configuration and election key. @@ -152,8 +152,8 @@ func (l *DlockLeaderManager) Extend(ctx context.Context) error { // Campaign starts the election process, which will run in a goroutine until contex canceled. func (l *DlockLeaderManager) Campaign(ctx context.Context) { - newCtx, cancel := context.WithCancel(ctx) - l.cancel = cancel + newCtx, cancelFn := context.WithCancel(ctx) + l.cancel.Store(cancelFn) logger := logutil.NewErrorTolerantLogger(logutil.DefaultETConfig) etlog := func(err error) { @@ -202,8 +202,10 @@ func (l *DlockLeaderManager) Campaign(ctx context.Context) { // Stop stops the leader election process and resigns from the leadership if appliable. func (l *DlockLeaderManager) Stop() error { - if l.cancel != nil { - l.cancel() + if v := l.cancel.Load(); v != nil { + if cancelFn, ok := v.(context.CancelFunc); ok { + cancelFn() + } } return l.lockMan.Release(context.Background(), l.lockIntent()) @@ -354,21 +356,48 @@ func (l *DlockLeaderManager) onError(ctx context.Context, err error) { // noopLeaderManager dummy leader manager that does nothing at all. type noopLeaderManager struct { mu sync.Mutex // mutex lock + cancel atomic.Value // function to cancel the campaign process. + oustedCallbacks []OustedCallback // leader ousted callback functions electedCallbacks []ElectedCallback // leader elected callback functions } func (l *noopLeaderManager) Identity() string { return "noop" } func (l *noopLeaderManager) Extend(ctx context.Context) error { return nil } func (l *noopLeaderManager) Await(ctx context.Context) bool { return true } -func (l *noopLeaderManager) Stop() error { return nil } -func (l *noopLeaderManager) OnOusted(cb OustedCallback) { /* do nothing */ } func (l *noopLeaderManager) OnError(cb ErrorCallback) { /* do nothing */ } +func (l *noopLeaderManager) Stop() error { + if v := l.cancel.Load(); v != nil { + if cancelFn, ok := v.(context.CancelFunc); ok { + cancelFn() + } + } + return nil +} + func (l *noopLeaderManager) Campaign(ctx context.Context) { + ctx, cancelFn := context.WithCancel(ctx) + l.cancel.Store(cancelFn) + + l.onElected(ctx) + + <-ctx.Done() + + l.onOusted(ctx) +} + +func (l *noopLeaderManager) OnOusted(cb OustedCallback) { l.mu.Lock() defer l.mu.Unlock() - for _, cb := range l.electedCallbacks { + l.oustedCallbacks = append(l.oustedCallbacks, cb) +} + +func (l *noopLeaderManager) onOusted(ctx context.Context) { + l.mu.Lock() + defer l.mu.Unlock() + + for _, cb := range l.oustedCallbacks { cb(ctx, l) } } @@ -379,3 +408,12 @@ func (l *noopLeaderManager) OnElected(cb ElectedCallback) { l.electedCallbacks = append(l.electedCallbacks, cb) } + +func (l *noopLeaderManager) onElected(ctx context.Context) { + l.mu.Lock() + defer l.mu.Unlock() + + for _, cb := range l.electedCallbacks { + cb(ctx, l) + } +}