Skip to content

Commit

Permalink
Merge pull request #2526 from OffchainLabs/Redis_switch
Browse files Browse the repository at this point in the history
Support seamless switchover redis for sequencer coordinator
  • Loading branch information
PlasmaPower authored Oct 17, 2024
2 parents fc2c2d8 + 12f40eb commit 3f019ad
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 37 deletions.
2 changes: 1 addition & 1 deletion arbnode/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func NewMaintenanceRunner(config MaintenanceConfigFetcher, seqCoordinator *SeqCo
if seqCoordinator != nil {
c := func() *redislock.SimpleCfg { return &cfg.Lock }
r := func() bool { return true } // always ready to lock
rl, err := redislock.NewSimple(seqCoordinator.Client, c, r)
rl, err := redislock.NewSimple(seqCoordinator.RedisCoordinator().Client, c, r)
if err != nil {
return nil, fmt.Errorf("creating new simple redis lock: %w", err)
}
Expand Down
138 changes: 112 additions & 26 deletions arbnode/seq_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ var (
type SeqCoordinator struct {
stopwaiter.StopWaiter

redisutil.RedisCoordinator
redisCoordinatorMutex sync.RWMutex
redisCoordinator redisutil.RedisCoordinator
prevRedisCoordinator *redisutil.RedisCoordinator
prevRedisMessageCount arbutil.MessageIndex

sync *SyncMonitor
streamer *TransactionStreamer
Expand All @@ -61,6 +64,7 @@ type SeqCoordinatorConfig struct {
Enable bool `koanf:"enable"`
ChosenHealthcheckAddr string `koanf:"chosen-healthcheck-addr"`
RedisUrl string `koanf:"redis-url"`
NewRedisUrl string `koanf:"new-redis-url"`
LockoutDuration time.Duration `koanf:"lockout-duration"`
LockoutSpare time.Duration `koanf:"lockout-spare"`
SeqNumDuration time.Duration `koanf:"seq-num-duration"`
Expand All @@ -86,6 +90,7 @@ func (c *SeqCoordinatorConfig) Url() string {
func SeqCoordinatorConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Bool(prefix+".enable", DefaultSeqCoordinatorConfig.Enable, "enable sequence coordinator")
f.String(prefix+".redis-url", DefaultSeqCoordinatorConfig.RedisUrl, "the Redis URL to coordinate via")
f.String(prefix+".new-redis-url", DefaultSeqCoordinatorConfig.NewRedisUrl, "switch to the new Redis URL to coordinate via")
f.String(prefix+".chosen-healthcheck-addr", DefaultSeqCoordinatorConfig.ChosenHealthcheckAddr, "if non-empty, launch an HTTP service binding to this address that returns status code 200 when chosen and 503 otherwise")
f.Duration(prefix+".lockout-duration", DefaultSeqCoordinatorConfig.LockoutDuration, "")
f.Duration(prefix+".lockout-spare", DefaultSeqCoordinatorConfig.LockoutSpare, "")
Expand All @@ -105,6 +110,7 @@ var DefaultSeqCoordinatorConfig = SeqCoordinatorConfig{
Enable: false,
ChosenHealthcheckAddr: "",
RedisUrl: "",
NewRedisUrl: "",
LockoutDuration: time.Minute,
LockoutSpare: 30 * time.Second,
SeqNumDuration: 10 * 24 * time.Hour,
Expand All @@ -122,6 +128,7 @@ var DefaultSeqCoordinatorConfig = SeqCoordinatorConfig{
var TestSeqCoordinatorConfig = SeqCoordinatorConfig{
Enable: false,
RedisUrl: "",
NewRedisUrl: "",
LockoutDuration: time.Second * 2,
LockoutSpare: time.Millisecond * 10,
SeqNumDuration: time.Minute * 10,
Expand Down Expand Up @@ -153,7 +160,7 @@ func NewSeqCoordinator(
return nil, err
}
coordinator := &SeqCoordinator{
RedisCoordinator: *redisCoordinator,
redisCoordinator: *redisCoordinator,
sync: sync,
streamer: streamer,
sequencer: sequencer,
Expand All @@ -174,6 +181,19 @@ func (c *SeqCoordinator) SetDelayedSequencer(delayedSequencer *DelayedSequencer)
c.delayedSequencer = delayedSequencer
}

func (c *SeqCoordinator) RedisCoordinator() *redisutil.RedisCoordinator {
c.redisCoordinatorMutex.RLock()
defer c.redisCoordinatorMutex.RUnlock()
return &c.redisCoordinator
}

func (c *SeqCoordinator) setRedisCoordinator(redisCoordinator *redisutil.RedisCoordinator) {
c.redisCoordinatorMutex.Lock()
defer c.redisCoordinatorMutex.Unlock()
c.prevRedisCoordinator = &c.redisCoordinator
c.redisCoordinator = *redisCoordinator
}

func StandaloneSeqCoordinatorInvalidateMsgIndex(ctx context.Context, redisClient redis.UniversalClient, keyConfig string, msgIndex arbutil.MessageIndex) error {
signerConfig := signature.EmptySimpleHmacConfig
if keyConfig == "" {
Expand Down Expand Up @@ -276,7 +296,7 @@ func (c *SeqCoordinator) acquireLockoutAndWriteMessage(ctx context.Context, msgC
defer c.wantsLockoutMutex.Unlock()
setWantsLockout := c.avoidLockout <= 0
lockoutUntil := time.Now().Add(c.config.LockoutDuration)
err = c.Client.Watch(ctx, func(tx *redis.Tx) error {
err = c.RedisCoordinator().Client.Watch(ctx, func(tx *redis.Tx) error {
current, err := tx.Get(ctx, redisutil.CHOSENSEQ_KEY).Result()
var wasEmpty bool
if errors.Is(err, redis.Nil) {
Expand Down Expand Up @@ -345,7 +365,7 @@ func (c *SeqCoordinator) acquireLockoutAndWriteMessage(ctx context.Context, msgC
}

func (c *SeqCoordinator) getRemoteFinalizedMsgCount(ctx context.Context) (arbutil.MessageIndex, error) {
resStr, err := c.Client.Get(ctx, redisutil.FINALIZED_MSG_COUNT_KEY).Result()
resStr, err := c.RedisCoordinator().Client.Get(ctx, redisutil.FINALIZED_MSG_COUNT_KEY).Result()
if err != nil {
return 0, err
}
Expand All @@ -364,23 +384,23 @@ func (c *SeqCoordinator) getRemoteMsgCountImpl(ctx context.Context, r redis.Cmda
}

func (c *SeqCoordinator) GetRemoteMsgCount() (arbutil.MessageIndex, error) {
return c.getRemoteMsgCountImpl(c.GetContext(), c.Client)
return c.getRemoteMsgCountImpl(c.GetContext(), c.RedisCoordinator().Client)
}

func (c *SeqCoordinator) wantsLockoutUpdate(ctx context.Context) error {
func (c *SeqCoordinator) wantsLockoutUpdate(ctx context.Context, client redis.UniversalClient) error {
c.wantsLockoutMutex.Lock()
defer c.wantsLockoutMutex.Unlock()
return c.wantsLockoutUpdateWithMutex(ctx)
return c.wantsLockoutUpdateWithMutex(ctx, client)
}

// Requires the caller hold the wantsLockoutMutex
func (c *SeqCoordinator) wantsLockoutUpdateWithMutex(ctx context.Context) error {
func (c *SeqCoordinator) wantsLockoutUpdateWithMutex(ctx context.Context, client redis.UniversalClient) error {
if c.avoidLockout > 0 {
return nil
}
myWantsLockoutKey := redisutil.WantsLockoutKeyFor(c.config.Url())
wantsLockoutUntil := time.Now().Add(c.config.LockoutDuration)
pipe := c.Client.TxPipeline()
pipe := client.TxPipeline()
initialDuration := c.config.LockoutDuration
if initialDuration < 2*time.Second {
initialDuration = 2 * time.Second
Expand All @@ -398,7 +418,7 @@ func (c *SeqCoordinator) wantsLockoutUpdateWithMutex(ctx context.Context) error
func (c *SeqCoordinator) chosenOneRelease(ctx context.Context) error {
atomicTimeWrite(&c.lockoutUntil, time.Time{})
isActiveSequencer.Update(0)
releaseErr := c.Client.Watch(ctx, func(tx *redis.Tx) error {
releaseErr := c.RedisCoordinator().Client.Watch(ctx, func(tx *redis.Tx) error {
current, err := tx.Get(ctx, redisutil.CHOSENSEQ_KEY).Result()
if errors.Is(err, redis.Nil) {
return nil
Expand All @@ -421,7 +441,7 @@ func (c *SeqCoordinator) chosenOneRelease(ctx context.Context) error {
return nil
}
// got error - was it still released?
current, readErr := c.Client.Get(ctx, redisutil.CHOSENSEQ_KEY).Result()
current, readErr := c.RedisCoordinator().Client.Get(ctx, redisutil.CHOSENSEQ_KEY).Result()
if errors.Is(readErr, redis.Nil) {
return nil
}
Expand All @@ -438,10 +458,10 @@ func (c *SeqCoordinator) wantsLockoutRelease(ctx context.Context) error {
return nil
}
myWantsLockoutKey := redisutil.WantsLockoutKeyFor(c.config.Url())
releaseErr := c.Client.Del(ctx, myWantsLockoutKey).Err()
releaseErr := c.RedisCoordinator().Client.Del(ctx, myWantsLockoutKey).Err()
if releaseErr != nil {
// got error - was it still deleted?
readErr := c.Client.Get(ctx, myWantsLockoutKey).Err()
readErr := c.RedisCoordinator().Client.Get(ctx, myWantsLockoutKey).Err()
if !errors.Is(readErr, redis.Nil) {
return releaseErr
}
Expand Down Expand Up @@ -525,7 +545,7 @@ func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context, final
// In non-init cases it doesn't matter how we delete as we always try to delete from prevFinalized to finalized
batchDeleteCount := 1000
for i := len(keys); i > 0; i -= batchDeleteCount {
if err := c.Client.Del(ctx, keys[max(0, i-batchDeleteCount):i]...).Err(); err != nil {
if err := c.RedisCoordinator().Client.Del(ctx, keys[max(0, i-batchDeleteCount):i]...).Err(); err != nil {
return fmt.Errorf("error deleting finalized messages and their signatures from redis: %w", err)
}
}
Expand All @@ -534,7 +554,7 @@ func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context, final
if err != nil {
return err
}
if err = c.Client.Set(ctx, redisutil.FINALIZED_MSG_COUNT_KEY, finalizedBytes, c.config.SeqNumDuration).Err(); err != nil {
if err = c.RedisCoordinator().Client.Set(ctx, redisutil.FINALIZED_MSG_COUNT_KEY, finalizedBytes, c.config.SeqNumDuration).Err(); err != nil {
return fmt.Errorf("couldn't set %s key to current finalizedMsgCount in redis: %w", redisutil.FINALIZED_MSG_COUNT_KEY, err)
}
return nil
Expand All @@ -543,7 +563,7 @@ func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context, final
if errors.Is(err, redis.Nil) {
var keys []string
for msg := finalized - 1; msg > 0; msg-- {
exists, err := c.Client.Exists(ctx, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg)).Result()
exists, err := c.RedisCoordinator().Client.Exists(ctx, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg)).Result()
if err != nil {
// If there is an error deleting finalized messages during init, we retry later either from this sequencer or from another
return err
Expand All @@ -558,7 +578,7 @@ func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context, final
} else if err != nil {
return fmt.Errorf("error getting finalizedMsgCount value from redis: %w", err)
}
remoteMsgCount, err := c.getRemoteMsgCountImpl(ctx, c.Client)
remoteMsgCount, err := c.getRemoteMsgCountImpl(ctx, c.RedisCoordinator().Client)
if err != nil {
return fmt.Errorf("cannot get remote message count: %w", err)
}
Expand All @@ -574,7 +594,7 @@ func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context, final
}

func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
chosenSeq, err := c.RecommendSequencerWantingLockout(ctx)
chosenSeq, err := c.RedisCoordinator().RecommendSequencerWantingLockout(ctx)
if err != nil {
log.Warn("coordinator failed finding sequencer wanting lockout", "err", err)
return c.retryAfterRedisError()
Expand Down Expand Up @@ -603,6 +623,15 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
log.Error("cannot read message count", "err", err)
return c.config.UpdateInterval
}
// Cache the previous redis coordinator's message count
if c.prevRedisCoordinator != nil && c.prevRedisMessageCount == 0 {
prevRemoteMsgCount, err := c.getRemoteMsgCountImpl(ctx, c.prevRedisCoordinator.Client)
if err != nil {
log.Warn("cannot get remote message count", "err", err)
return c.retryAfterRedisError()
}
c.prevRedisMessageCount = prevRemoteMsgCount
}
remoteFinalizedMsgCount, err := c.getRemoteFinalizedMsgCount(ctx)
if err != nil {
loglevel := log.Error
Expand All @@ -617,12 +646,22 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
return c.retryAfterRedisError()
}
readUntil := min(localMsgCount+c.config.MsgPerPoll, remoteMsgCount)
client := c.RedisCoordinator().Client
// If we have a previous redis coordinator,
// we can read from it until the local message count catches up to the prev coordinator's message count
if c.prevRedisMessageCount > localMsgCount {
readUntil = min(readUntil, c.prevRedisMessageCount)
client = c.prevRedisCoordinator.Client
}
if c.prevRedisMessageCount != 0 && localMsgCount >= c.prevRedisMessageCount {
log.Info("coordinator caught up to prev redis coordinator", "msgcount", localMsgCount, "prevMsgCount", c.prevRedisMessageCount)
}
var messages []arbostypes.MessageWithMetadata
msgToRead := localMsgCount
var msgReadErr error
for msgToRead < readUntil && localMsgCount >= remoteFinalizedMsgCount {
var resString string
resString, msgReadErr = c.Client.Get(ctx, redisutil.MessageKeyFor(msgToRead)).Result()
resString, msgReadErr = client.Get(ctx, redisutil.MessageKeyFor(msgToRead)).Result()
if msgReadErr != nil {
log.Warn("coordinator failed reading message", "pos", msgToRead, "err", msgReadErr)
break
Expand All @@ -631,7 +670,7 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
var sigString string
var sigBytes []byte
sigSeparateKey := true
sigString, msgReadErr = c.Client.Get(ctx, redisutil.MessageSigKeyFor(msgToRead)).Result()
sigString, msgReadErr = client.Get(ctx, redisutil.MessageSigKeyFor(msgToRead)).Result()
if errors.Is(msgReadErr, redis.Nil) {
// no separate signature. Try reading old-style sig
if len(rsBytes) < 32 {
Expand Down Expand Up @@ -722,7 +761,7 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
// this could be just new messages we didn't get yet - even then, we should retry soon
log.Info("sequencer failed to become chosen", "err", err, "msgcount", localMsgCount)
// make sure we're marked as wanting the lockout
if err := c.wantsLockoutUpdate(ctx); err != nil {
if err := c.wantsLockoutUpdate(ctx, c.RedisCoordinator().Client); err != nil {
log.Warn("failed to update wants lockout key", "err", err)
}
c.prevChosenSequencer = ""
Expand Down Expand Up @@ -750,7 +789,7 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
// update wanting the lockout
var wantsLockoutErr error
if synced && !c.AvoidingLockout() {
wantsLockoutErr = c.wantsLockoutUpdate(ctx)
wantsLockoutErr = c.wantsLockoutUpdate(ctx, c.RedisCoordinator().Client)
} else {
wantsLockoutErr = c.wantsLockoutRelease(ctx)
}
Expand Down Expand Up @@ -817,12 +856,59 @@ func (c *SeqCoordinator) launchHealthcheckServer(ctx context.Context) {

func (c *SeqCoordinator) Start(ctxIn context.Context) {
c.StopWaiter.Start(ctxIn, c)
c.CallIteratively(c.update)
var newRedisCoordinator *redisutil.RedisCoordinator
if c.config.NewRedisUrl != "" {
var err error
newRedisCoordinator, err = redisutil.NewRedisCoordinator(c.config.NewRedisUrl)
if err != nil {
log.Warn("failed to create new redis coordinator", "err",
err, "newRedisUrl", c.config.NewRedisUrl)
}
}
c.CallIteratively(func(ctx context.Context) time.Duration { return c.chooseRedisAndUpdate(ctx, newRedisCoordinator) })
if c.config.ChosenHealthcheckAddr != "" {
c.StopWaiter.LaunchThread(c.launchHealthcheckServer)
}
}

func (c *SeqCoordinator) chooseRedisAndUpdate(ctx context.Context, newRedisCoordinator *redisutil.RedisCoordinator) time.Duration {
// If we have a new redis coordinator, and we haven't switched to it yet, try to switch.
if c.config.NewRedisUrl != "" && c.prevRedisCoordinator == nil {
// If we fail to try to switch, we'll retry soon.
if err := c.trySwitchingRedis(ctx, newRedisCoordinator); err != nil {
log.Warn("error while trying to switch redis coordinator", "err", err)
return c.retryAfterRedisError()
}
}
return c.update(ctx)
}

func (c *SeqCoordinator) trySwitchingRedis(ctx context.Context, newRedisCoordinator *redisutil.RedisCoordinator) error {
err := c.wantsLockoutUpdate(ctx, newRedisCoordinator.Client)
if err != nil {
return err
}
current, err := c.RedisCoordinator().Client.Get(ctx, redisutil.CHOSENSEQ_KEY).Result()
var wasEmpty bool
if errors.Is(err, redis.Nil) {
wasEmpty = true
err = nil
}
if err != nil {
log.Warn("failed to get current chosen sequencer", "err", err)
return err
}
// If the chosen key is set to switch, we need to switch to the new redis coordinator.
if !wasEmpty && (current == redisutil.SWITCHED_REDIS) {
err = c.wantsLockoutUpdate(ctx, c.RedisCoordinator().Client)
if err != nil {
return err
}
c.setRedisCoordinator(newRedisCoordinator)
}
return nil
}

// Calls check() every c.config.RetryInterval until it returns true, or the context times out.
func (c *SeqCoordinator) waitFor(ctx context.Context, check func() bool) bool {
for {
Expand Down Expand Up @@ -872,7 +958,7 @@ func (c *SeqCoordinator) StopAndWait() {
time.Sleep(c.retryAfterRedisError())
}
}
_ = c.Client.Close()
_ = c.RedisCoordinator().Client.Close()
}

func (c *SeqCoordinator) CurrentlyChosen() bool {
Expand Down Expand Up @@ -914,7 +1000,7 @@ func (c *SeqCoordinator) TryToHandoffChosenOne(ctx context.Context) bool {
return !c.CurrentlyChosen()
})
if success {
wantsLockout, err := c.RecommendSequencerWantingLockout(ctx)
wantsLockout, err := c.RedisCoordinator().RecommendSequencerWantingLockout(ctx)
if err == nil {
log.Info("released chosen one status; a new sequencer hopefully wants to acquire it", "delay", c.config.SafeShutdownDelay, "wantsLockout", wantsLockout)
} else {
Expand All @@ -936,7 +1022,7 @@ func (c *SeqCoordinator) SeekLockout(ctx context.Context) {
log.Info("seeking lockout", "myUrl", c.config.Url())
if c.sequencer.Synced() {
// Even if this errors we still internally marked ourselves as wanting the lockout
err := c.wantsLockoutUpdateWithMutex(ctx)
err := c.wantsLockoutUpdateWithMutex(ctx, c.RedisCoordinator().Client)
if err != nil {
log.Warn("failed to set wants lockout key in redis after seeking lockout again", "err", err)
}
Expand Down
Loading

0 comments on commit 3f019ad

Please sign in to comment.