Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete messages from coordinator after they become final #2471

Merged
merged 17 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 108 additions & 22 deletions arbnode/seq_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type SeqCoordinator struct {

redisutil.RedisCoordinator

sync *SyncMonitor
streamer *TransactionStreamer
sequencer execution.ExecutionSequencer
delayedSequencer *DelayedSequencer
Expand Down Expand Up @@ -69,9 +70,10 @@ type SeqCoordinatorConfig struct {
SafeShutdownDelay time.Duration `koanf:"safe-shutdown-delay"`
ReleaseRetries int `koanf:"release-retries"`
// Max message per poll.
MsgPerPoll arbutil.MessageIndex `koanf:"msg-per-poll"`
MyUrl string `koanf:"my-url"`
Signer signature.SignVerifyConfig `koanf:"signer"`
MsgPerPoll arbutil.MessageIndex `koanf:"msg-per-poll"`
MyUrl string `koanf:"my-url"`
DeleteFinalizedMsgs bool `koanf:"delete-finalized-msgs"`
Signer signature.SignVerifyConfig `koanf:"signer"`
}

func (c *SeqCoordinatorConfig) Url() string {
Expand All @@ -95,6 +97,7 @@ func SeqCoordinatorConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Int(prefix+".release-retries", DefaultSeqCoordinatorConfig.ReleaseRetries, "the number of times to retry releasing the wants lockout and chosen one status on shutdown")
f.Uint64(prefix+".msg-per-poll", uint64(DefaultSeqCoordinatorConfig.MsgPerPoll), "will only be marked as wanting the lockout if not too far behind")
f.String(prefix+".my-url", DefaultSeqCoordinatorConfig.MyUrl, "url for this sequencer if it is the chosen")
f.Bool(prefix+".delete-finalized-msgs", DefaultSeqCoordinatorConfig.DeleteFinalizedMsgs, "enable deleting of finalized messages from redis")
signature.SignVerifyConfigAddOptions(prefix+".signer", f)
}

Expand All @@ -104,31 +107,33 @@ var DefaultSeqCoordinatorConfig = SeqCoordinatorConfig{
RedisUrl: "",
LockoutDuration: time.Minute,
LockoutSpare: 30 * time.Second,
SeqNumDuration: 24 * time.Hour,
SeqNumDuration: 10 * 24 * time.Hour,
UpdateInterval: 250 * time.Millisecond,
HandoffTimeout: 30 * time.Second,
SafeShutdownDelay: 5 * time.Second,
ReleaseRetries: 4,
RetryInterval: 50 * time.Millisecond,
MsgPerPoll: 2000,
MyUrl: redisutil.INVALID_URL,
DeleteFinalizedMsgs: true,
Signer: signature.DefaultSignVerifyConfig,
}

var TestSeqCoordinatorConfig = SeqCoordinatorConfig{
Enable: false,
RedisUrl: "",
LockoutDuration: time.Second * 2,
LockoutSpare: time.Millisecond * 10,
SeqNumDuration: time.Minute * 10,
UpdateInterval: time.Millisecond * 10,
HandoffTimeout: time.Millisecond * 200,
SafeShutdownDelay: time.Millisecond * 100,
ReleaseRetries: 4,
RetryInterval: time.Millisecond * 3,
MsgPerPoll: 20,
MyUrl: redisutil.INVALID_URL,
Signer: signature.DefaultSignVerifyConfig,
Enable: false,
RedisUrl: "",
LockoutDuration: time.Second * 2,
LockoutSpare: time.Millisecond * 10,
SeqNumDuration: time.Minute * 10,
UpdateInterval: time.Millisecond * 10,
HandoffTimeout: time.Millisecond * 200,
SafeShutdownDelay: time.Millisecond * 100,
ReleaseRetries: 4,
RetryInterval: time.Millisecond * 3,
MsgPerPoll: 20,
MyUrl: redisutil.INVALID_URL,
DeleteFinalizedMsgs: true,
Signer: signature.DefaultSignVerifyConfig,
}

func NewSeqCoordinator(
Expand All @@ -149,6 +154,7 @@ func NewSeqCoordinator(
}
coordinator := &SeqCoordinator{
RedisCoordinator: *redisCoordinator,
sync: sync,
streamer: streamer,
sequencer: sequencer,
config: config,
Expand Down Expand Up @@ -338,6 +344,14 @@ func (c *SeqCoordinator) acquireLockoutAndWriteMessage(ctx context.Context, msgC
return nil
}

func (c *SeqCoordinator) getRemoteFinalizedMsgCount(ctx context.Context) (arbutil.MessageIndex, error) {
resStr, err := c.Client.Get(ctx, redisutil.FINALIZED_MSG_COUNT_KEY).Result()
if err != nil {
return 0, err
}
return c.signedBytesToMsgCount(ctx, []byte(resStr))
}

func (c *SeqCoordinator) getRemoteMsgCountImpl(ctx context.Context, r redis.Cmdable) (arbutil.MessageIndex, error) {
resStr, err := r.Get(ctx, redisutil.MSG_COUNT_KEY).Result()
if errors.Is(err, redis.Nil) {
Expand Down Expand Up @@ -473,6 +487,17 @@ func (c *SeqCoordinator) updateWithLockout(ctx context.Context, nextChosen strin
return c.noRedisError()
}
// Was, and still is, the active sequencer
if c.config.DeleteFinalizedMsgs {
// Before proceeding, first try deleting finalized messages from redis and setting the finalizedMsgCount key
finalized, err := c.sync.GetFinalizedMsgCount(ctx)
if err != nil {
log.Warn("Error getting finalizedMessageCount from syncMonitor: %w", err)
} else if finalized == 0 {
log.Warn("SyncMonitor returned zero finalizedMessageCount")
} else if err := c.deleteFinalizedMsgsFromRedis(ctx, finalized); err != nil {
log.Warn("Coordinator failed to delete finalized messages from redis", "err", err)
}
}
// We leave a margin of error of either a five times the update interval or a fifth of the lockout duration, whichever is greater.
marginOfError := arbmath.MaxInt(c.config.LockoutDuration/5, c.config.UpdateInterval*5)
if time.Now().Add(marginOfError).Before(atomicTimeRead(&c.lockoutUntil)) {
Expand All @@ -492,6 +517,62 @@ func (c *SeqCoordinator) updateWithLockout(ctx context.Context, nextChosen strin
return c.noRedisError()
}

func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context, finalized arbutil.MessageIndex) error {
deleteMsgsAndUpdateFinalizedMsgCount := func(keys []string) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you gain something by having this function defined that way and not as a normal functin?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like we use a bunch of SeqCoordinator methods such as msgCountToSignedBytes, getRemoteMsgCountImpl, c.Client etc.. so thought having a method on SeqCoordinator is a better option over a standalone function

if len(keys) > 0 {
// To support cases during init we delete keys from reverse (i.e lowest seq num first), so that even if deletion fails in one of the iterations
// next time deleteFinalizedMsgsFromRedis is called we dont miss undeleted messages, as exists is checked from higher seqnum to lower.
// In non-init cases it doesn't matter how we delete as we always try to delete from prevFinalized to finalized
batchDeleteCount := 1000
for i := len(keys); i > 0; i -= batchDeleteCount {
if err := c.Client.Del(ctx, keys[max(0, i-batchDeleteCount):i]...).Err(); err != nil {
return fmt.Errorf("error deleting finalized messages and their signatures from redis: %w", err)
}
}
}
finalizedBytes, err := c.msgCountToSignedBytes(finalized)
if err != nil {
return err
}
if err = c.Client.Set(ctx, redisutil.FINALIZED_MSG_COUNT_KEY, finalizedBytes, c.config.SeqNumDuration).Err(); err != nil {
return fmt.Errorf("couldn't set %s key to current finalizedMsgCount in redis: %w", redisutil.FINALIZED_MSG_COUNT_KEY, err)
}
return nil
}
prevFinalized, err := c.getRemoteFinalizedMsgCount(ctx)
if errors.Is(err, redis.Nil) {
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
var keys []string
for msg := finalized - 1; msg > 0; msg-- {
exists, err := c.Client.Exists(ctx, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg)).Result()
if err != nil {
// If there is an error deleting finalized messages during init, we retry later either from this sequencer or from another
return err
}
if exists == 0 {
break
}
keys = append(keys, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg))
}
log.Info("Initializing finalizedMsgCount and deleting finalized messages from redis", "finalizedMsgCount", finalized)
return deleteMsgsAndUpdateFinalizedMsgCount(keys)
} else if err != nil {
return fmt.Errorf("error getting finalizedMsgCount value from redis: %w", err)
}
remoteMsgCount, err := c.getRemoteMsgCountImpl(ctx, c.Client)
if err != nil {
return fmt.Errorf("cannot get remote message count: %w", err)
}
msgToDelete := min(finalized, remoteMsgCount)
if prevFinalized < msgToDelete {
var keys []string
for msg := prevFinalized; msg < msgToDelete; msg++ {
keys = append(keys, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg))
}
return deleteMsgsAndUpdateFinalizedMsgCount(keys)
}
return nil
}

func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
chosenSeq, err := c.RecommendSequencerWantingLockout(ctx)
if err != nil {
Expand Down Expand Up @@ -522,19 +603,24 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
log.Error("cannot read message count", "err", err)
return c.config.UpdateInterval
}
remoteFinalizedMsgCount, err := c.getRemoteFinalizedMsgCount(ctx)
if err != nil {
loglevel := log.Error
if errors.Is(err, redis.Nil) {
loglevel = log.Debug
}
loglevel("Cannot get remote finalized message count, might encounter failed to read message warnings later", "err", err)
}
remoteMsgCount, err := c.GetRemoteMsgCount()
if err != nil {
log.Warn("cannot get remote message count", "err", err)
return c.retryAfterRedisError()
}
readUntil := remoteMsgCount
if readUntil > localMsgCount+c.config.MsgPerPoll {
readUntil = localMsgCount + c.config.MsgPerPoll
}
readUntil := min(localMsgCount+c.config.MsgPerPoll, remoteMsgCount)
var messages []arbostypes.MessageWithMetadata
msgToRead := localMsgCount
var msgReadErr error
for msgToRead < readUntil {
for msgToRead < readUntil && localMsgCount >= remoteFinalizedMsgCount {
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
var resString string
resString, msgReadErr = c.Client.Get(ctx, redisutil.MessageKeyFor(msgToRead)).Result()
if msgReadErr != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,94 @@ func TestRedisSeqCoordinatorAtomic(t *testing.T) {
}

}

func TestSeqCoordinatorDeletesFinalizedMessages(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

coordConfig := TestSeqCoordinatorConfig
coordConfig.LockoutDuration = time.Millisecond * 100
coordConfig.LockoutSpare = time.Millisecond * 10
coordConfig.Signer.ECDSA.AcceptSequencer = false
coordConfig.Signer.SymmetricFallback = true
coordConfig.Signer.SymmetricSign = true
coordConfig.Signer.Symmetric.Dangerous.DisableSignatureVerification = true
coordConfig.Signer.Symmetric.SigningKey = ""

nullSigner, err := signature.NewSignVerify(&coordConfig.Signer, nil, nil)
Require(t, err)

redisUrl := redisutil.CreateTestRedis(ctx, t)
coordConfig.RedisUrl = redisUrl

config := coordConfig
config.MyUrl = "test"
redisCoordinator, err := redisutil.NewRedisCoordinator(config.RedisUrl)
Require(t, err)
coordinator := &SeqCoordinator{
RedisCoordinator: *redisCoordinator,
config: config,
signer: nullSigner,
}

// Add messages to redis
var keys []string
msgBytes, err := coordinator.msgCountToSignedBytes(0)
Require(t, err)
for i := arbutil.MessageIndex(1); i <= 10; i++ {
err = coordinator.Client.Set(ctx, redisutil.MessageKeyFor(i), msgBytes, time.Hour).Err()
Require(t, err)
err = coordinator.Client.Set(ctx, redisutil.MessageSigKeyFor(i), msgBytes, time.Hour).Err()
Require(t, err)
keys = append(keys, redisutil.MessageKeyFor(i), redisutil.MessageSigKeyFor(i))
}
// Set msgCount key
msgCountBytes, err := coordinator.msgCountToSignedBytes(11)
Require(t, err)
err = coordinator.Client.Set(ctx, redisutil.MSG_COUNT_KEY, msgCountBytes, time.Hour).Err()
Require(t, err)
exists, err := coordinator.Client.Exists(ctx, keys...).Result()
Require(t, err)
if exists != 20 {
t.Fatal("couldn't find all messages and signatures in redis")
}

// Set finalizedMsgCount and delete finalized messages
err = coordinator.deleteFinalizedMsgsFromRedis(ctx, 5)
Require(t, err)

// Check if messages and signatures were deleted successfully
exists, err = coordinator.Client.Exists(ctx, keys[:8]...).Result()
Require(t, err)
if exists != 0 {
t.Fatal("finalized messages and signatures in range 1 to 4 were not deleted")
}

// Check if finalizedMsgCount was set to correct value
finalized, err := coordinator.getRemoteFinalizedMsgCount(ctx)
Require(t, err)
if finalized != 5 {
t.Fatalf("incorrect finalizedMsgCount, want: 5, have: %d", finalized)
}

// Try deleting finalized messages when theres already a finalizedMsgCount
err = coordinator.deleteFinalizedMsgsFromRedis(ctx, 7)
Require(t, err)
exists, err = coordinator.Client.Exists(ctx, keys[8:12]...).Result()
Require(t, err)
if exists != 0 {
t.Fatal("finalized messages and signatures in range 5 to 6 were not deleted")
}
finalized, err = coordinator.getRemoteFinalizedMsgCount(ctx)
Require(t, err)
if finalized != 7 {
t.Fatalf("incorrect finalizedMsgCount, want: 7, have: %d", finalized)
}

// Check that non-finalized messages are still available in redis
exists, err = coordinator.Client.Exists(ctx, keys[12:]...).Result()
Require(t, err)
if exists != 8 {
t.Fatal("non-finalized messages and signatures in range 7 to 10 are not fully available")
}
}
7 changes: 7 additions & 0 deletions arbnode/sync_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ func (s *SyncMonitor) SyncTargetMessageCount() arbutil.MessageIndex {
return s.syncTarget
}

func (s *SyncMonitor) GetFinalizedMsgCount(ctx context.Context) (arbutil.MessageIndex, error) {
if s.inboxReader != nil && s.inboxReader.l1Reader != nil {
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
return s.inboxReader.GetFinalizedMsgCount(ctx)
}
return 0, nil
}

func (s *SyncMonitor) maxMessageCount() (arbutil.MessageIndex, error) {
msgCount, err := s.txStreamer.GetMessageCount()
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions cmd/nitro/nitro.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ func mainImpl() int {
if nodeConfig.Execution.Sequencer.Enable != nodeConfig.Node.Sequencer {
log.Error("consensus and execution must agree if sequencing is enabled or not", "Execution.Sequencer.Enable", nodeConfig.Execution.Sequencer.Enable, "Node.Sequencer", nodeConfig.Node.Sequencer)
}
if nodeConfig.Node.SeqCoordinator.Enable && !nodeConfig.Node.ParentChainReader.Enable {
log.Error("Sequencer coordinator must be enabled with parent chain reader, try starting node with --parent-chain.connection.url")
return 1
}

var dataSigner signature.DataSignerFunc
var l1TransactionOptsValidator *bind.TransactOpts
Expand Down
13 changes: 7 additions & 6 deletions util/redisutil/redis_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ import (
"github.com/offchainlabs/nitro/arbutil"
)

const CHOSENSEQ_KEY string = "coordinator.chosen" // Never overwritten. Expires or released only
const MSG_COUNT_KEY string = "coordinator.msgCount" // Only written by sequencer holding CHOSEN key
const PRIORITIES_KEY string = "coordinator.priorities" // Read only
const WANTS_LOCKOUT_KEY_PREFIX string = "coordinator.liveliness." // Per server. Only written by self
const MESSAGE_KEY_PREFIX string = "coordinator.msg." // Per Message. Only written by sequencer holding CHOSEN
const SIGNATURE_KEY_PREFIX string = "coordinator.msg.sig." // Per Message. Only written by sequencer holding CHOSEN
const CHOSENSEQ_KEY string = "coordinator.chosen" // Never overwritten. Expires or released only
const MSG_COUNT_KEY string = "coordinator.msgCount" // Only written by sequencer holding CHOSEN key
const FINALIZED_MSG_COUNT_KEY string = "coordinator.finalizedMsgCount" // Only written by sequencer holding CHOSEN key
const PRIORITIES_KEY string = "coordinator.priorities" // Read only
const WANTS_LOCKOUT_KEY_PREFIX string = "coordinator.liveliness." // Per server. Only written by self
const MESSAGE_KEY_PREFIX string = "coordinator.msg." // Per Message. Only written by sequencer holding CHOSEN
const SIGNATURE_KEY_PREFIX string = "coordinator.msg.sig." // Per Message. Only written by sequencer holding CHOSEN
const WANTS_LOCKOUT_VAL string = "OK"
const INVALID_VAL string = "INVALID"
const INVALID_URL string = "<?INVALID-URL?>"
Expand Down
Loading