diff --git a/execution/gethexec/express_lane_service.go b/execution/gethexec/express_lane_service.go index ee8c45d2db..a5618ee719 100644 --- a/execution/gethexec/express_lane_service.go +++ b/execution/gethexec/express_lane_service.go @@ -39,9 +39,14 @@ type expressLaneControl struct { controller common.Address } +type transactionPublisher interface { + PublishTimeboostedTransaction(context.Context, *types.Transaction, *arbitrum_types.ConditionalOptions) error +} + type expressLaneService struct { stopwaiter.StopWaiter sync.RWMutex + transactionPublisher transactionPublisher auctionContractAddr common.Address apiBackend *arbitrum.APIBackend initialTimestamp time.Time @@ -50,7 +55,7 @@ type expressLaneService struct { chainConfig *params.ChainConfig logs chan []*types.Log auctionContract *express_lane_auctiongen.ExpressLaneAuction - roundControl *lru.Cache[uint64, *expressLaneControl] + roundControl *lru.Cache[uint64, *expressLaneControl] // thread safe messagesBySequenceNumber map[uint64]*timeboost.ExpressLaneSubmission } @@ -118,6 +123,7 @@ func (a *contractAdapter) CallContract(ctx context.Context, call ethereum.CallMs } func newExpressLaneService( + transactionPublisher transactionPublisher, apiBackend *arbitrum.APIBackend, filterSystem *filters.FilterSystem, auctionContractAddr common.Address, @@ -155,6 +161,7 @@ pending: roundDuration := arbmath.SaturatingCast[time.Duration](roundTimingInfo.RoundDurationSeconds) * time.Second auctionClosingDuration := arbmath.SaturatingCast[time.Duration](roundTimingInfo.AuctionClosingSeconds) * time.Second return &expressLaneService{ + transactionPublisher: transactionPublisher, auctionContract: auctionContract, apiBackend: apiBackend, chainConfig: chainConfig, @@ -281,8 +288,6 @@ func (es *expressLaneService) Start(ctxIn context.Context) { } func (es *expressLaneService) currentRoundHasController() bool { - es.Lock() - defer es.Unlock() currRound := timeboost.CurrentRound(es.initialTimestamp, es.roundDuration) control, ok := es.roundControl.Get(currRound) if !ok { @@ -307,19 +312,15 @@ func (es *expressLaneService) isWithinAuctionCloseWindow(arrivalTime time.Time) func (es *expressLaneService) sequenceExpressLaneSubmission( ctx context.Context, msg *timeboost.ExpressLaneSubmission, - publishTxFn func( - parentCtx context.Context, - tx *types.Transaction, - options *arbitrum_types.ConditionalOptions, - delay bool, - ) error, ) error { - es.Lock() - defer es.Unlock() + // no service lock needed since roundControl is thread-safe control, ok := es.roundControl.Get(msg.Round) if !ok { return timeboost.ErrNoOnchainController } + + es.Lock() + defer es.Unlock() // Check if the submission nonce is too low. if msg.SequenceNumber < control.sequence { return timeboost.ErrSequenceNumberTooLow @@ -330,7 +331,7 @@ func (es *expressLaneService) sequenceExpressLaneSubmission( } // Log an informational warning if the message's sequence number is in the future. if msg.SequenceNumber > control.sequence { - log.Warn("Received express lane submission with future sequence number", "SequenceNumber", msg.SequenceNumber) + log.Info("Received express lane submission with future sequence number", "SequenceNumber", msg.SequenceNumber) } // Put into the sequence number map. es.messagesBySequenceNumber[msg.SequenceNumber] = msg @@ -341,11 +342,10 @@ func (es *expressLaneService) sequenceExpressLaneSubmission( if !exists { break } - if err := publishTxFn( + if err := es.transactionPublisher.PublishTimeboostedTransaction( ctx, nextMsg.Transaction, msg.Options, - false, /* no delay, as it should go through express lane */ ); err != nil { // If the tx failed, clear it from the sequence map. delete(es.messagesBySequenceNumber, msg.SequenceNumber) diff --git a/execution/gethexec/express_lane_service_test.go b/execution/gethexec/express_lane_service_test.go index 940a668509..bbb2c3c240 100644 --- a/execution/gethexec/express_lane_service_test.go +++ b/execution/gethexec/express_lane_service_test.go @@ -233,6 +233,28 @@ func Test_expressLaneService_validateExpressLaneTx(t *testing.T) { } } +type stubPublisher struct { + els *expressLaneService + publishedTxOrder []uint64 +} + +func makeStubPublisher(els *expressLaneService) *stubPublisher { + return &stubPublisher{ + els: els, + publishedTxOrder: make([]uint64, 0), + } +} + +func (s *stubPublisher) PublishTimeboostedTransaction(parentCtx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error { + if tx == nil { + return errors.New("oops, bad tx") + } + control, _ := s.els.roundControl.Get(0) + s.publishedTxOrder = append(s.publishedTxOrder, control.sequence) + return nil + +} + func Test_expressLaneService_sequenceExpressLaneSubmission_nonceTooLow(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -240,16 +262,16 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_nonceTooLow(t *testin messagesBySequenceNumber: make(map[uint64]*timeboost.ExpressLaneSubmission), roundControl: lru.NewCache[uint64, *expressLaneControl](8), } + stubPublisher := makeStubPublisher(els) + els.transactionPublisher = stubPublisher els.roundControl.Add(0, &expressLaneControl{ sequence: 1, }) msg := &timeboost.ExpressLaneSubmission{ SequenceNumber: 0, } - publishFn := func(parentCtx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions, delay bool) error { - return nil - } - err := els.sequenceExpressLaneSubmission(ctx, msg, publishFn) + + err := els.sequenceExpressLaneSubmission(ctx, msg) require.ErrorIs(t, err, timeboost.ErrSequenceNumberTooLow) } @@ -260,24 +282,21 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_duplicateNonce(t *tes roundControl: lru.NewCache[uint64, *expressLaneControl](8), messagesBySequenceNumber: make(map[uint64]*timeboost.ExpressLaneSubmission), } + stubPublisher := makeStubPublisher(els) + els.transactionPublisher = stubPublisher els.roundControl.Add(0, &expressLaneControl{ sequence: 1, }) msg := &timeboost.ExpressLaneSubmission{ SequenceNumber: 2, } - numPublished := 0 - publishFn := func(parentCtx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions, delay bool) error { - numPublished += 1 - return nil - } - err := els.sequenceExpressLaneSubmission(ctx, msg, publishFn) + err := els.sequenceExpressLaneSubmission(ctx, msg) require.NoError(t, err) // Because the message is for a future sequence number, it // should get queued, but not yet published. - require.Equal(t, 0, numPublished) + require.Equal(t, 0, len(stubPublisher.publishedTxOrder)) // Sending it again should give us an error. - err = els.sequenceExpressLaneSubmission(ctx, msg, publishFn) + err = els.sequenceExpressLaneSubmission(ctx, msg) require.ErrorIs(t, err, timeboost.ErrDuplicateSequenceNumber) } @@ -288,45 +307,46 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_outOfOrder(t *testing roundControl: lru.NewCache[uint64, *expressLaneControl](8), messagesBySequenceNumber: make(map[uint64]*timeboost.ExpressLaneSubmission), } + stubPublisher := makeStubPublisher(els) + els.transactionPublisher = stubPublisher + els.roundControl.Add(0, &expressLaneControl{ sequence: 1, }) - numPublished := 0 - publishedTxOrder := make([]uint64, 0) - control, _ := els.roundControl.Get(0) - publishFn := func(parentCtx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions, delay bool) error { - numPublished += 1 - publishedTxOrder = append(publishedTxOrder, control.sequence) - return nil - } + messages := []*timeboost.ExpressLaneSubmission{ { SequenceNumber: 10, + Transaction: &types.Transaction{}, }, { SequenceNumber: 5, + Transaction: &types.Transaction{}, }, { SequenceNumber: 1, + Transaction: &types.Transaction{}, }, { SequenceNumber: 4, + Transaction: &types.Transaction{}, }, { SequenceNumber: 2, + Transaction: &types.Transaction{}, }, } for _, msg := range messages { - err := els.sequenceExpressLaneSubmission(ctx, msg, publishFn) + err := els.sequenceExpressLaneSubmission(ctx, msg) require.NoError(t, err) } // We should have only published 2, as we are missing sequence number 3. - require.Equal(t, 2, numPublished) + require.Equal(t, 2, len(stubPublisher.publishedTxOrder)) require.Equal(t, len(messages), len(els.messagesBySequenceNumber)) - err := els.sequenceExpressLaneSubmission(ctx, &timeboost.ExpressLaneSubmission{SequenceNumber: 3}, publishFn) + err := els.sequenceExpressLaneSubmission(ctx, &timeboost.ExpressLaneSubmission{SequenceNumber: 3, Transaction: &types.Transaction{}}) require.NoError(t, err) - require.Equal(t, 5, numPublished) + require.Equal(t, 5, len(stubPublisher.publishedTxOrder)) } func Test_expressLaneService_sequenceExpressLaneSubmission_erroredTx(t *testing.T) { @@ -339,17 +359,9 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_erroredTx(t *testing. els.roundControl.Add(0, &expressLaneControl{ sequence: 1, }) - numPublished := 0 - publishedTxOrder := make([]uint64, 0) - control, _ := els.roundControl.Get(0) - publishFn := func(parentCtx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions, delay bool) error { - if tx == nil { - return errors.New("oops, bad tx") - } - numPublished += 1 - publishedTxOrder = append(publishedTxOrder, control.sequence) - return nil - } + stubPublisher := makeStubPublisher(els) + els.transactionPublisher = stubPublisher + messages := []*timeboost.ExpressLaneSubmission{ { SequenceNumber: 1, @@ -370,16 +382,16 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_erroredTx(t *testing. } for _, msg := range messages { if msg.Transaction == nil { - err := els.sequenceExpressLaneSubmission(ctx, msg, publishFn) + err := els.sequenceExpressLaneSubmission(ctx, msg) require.ErrorContains(t, err, "oops, bad tx") } else { - err := els.sequenceExpressLaneSubmission(ctx, msg, publishFn) + err := els.sequenceExpressLaneSubmission(ctx, msg) require.NoError(t, err) } } // One tx out of the four should have failed, so we should have only published 3. - require.Equal(t, 3, numPublished) - require.Equal(t, []uint64{1, 2, 3}, publishedTxOrder) + require.Equal(t, 3, len(stubPublisher.publishedTxOrder)) + require.Equal(t, []uint64{1, 2, 3}, stubPublisher.publishedTxOrder) } func TestIsWithinAuctionCloseWindow(t *testing.T) { diff --git a/execution/gethexec/sequencer.go b/execution/gethexec/sequencer.go index 6564d46b0c..c49be51ab5 100644 --- a/execution/gethexec/sequencer.go +++ b/execution/gethexec/sequencer.go @@ -455,10 +455,14 @@ func ctxWithTimeout(ctx context.Context, timeout time.Duration) (context.Context } func (s *Sequencer) PublishTransaction(parentCtx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error { - return s.publishTransactionImpl(parentCtx, tx, options, true /* delay tx if express lane is active */) + return s.publishTransactionImpl(parentCtx, tx, options, false /* delay tx if express lane is active */) } -func (s *Sequencer) publishTransactionImpl(parentCtx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions, delay bool) error { +func (s *Sequencer) PublishTimeboostedTransaction(parentCtx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error { + return s.publishTransactionImpl(parentCtx, tx, options, true) +} + +func (s *Sequencer) publishTransactionImpl(parentCtx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions, isExpressLaneController bool) error { config := s.config() // Only try to acquire Rlock and check for hard threshold if l1reader is not nil // And hard threshold was enabled, this prevents spamming of read locks when not needed @@ -504,7 +508,7 @@ func (s *Sequencer) publishTransactionImpl(parentCtx context.Context, tx *types. } if s.config().Timeboost.Enable && s.expressLaneService != nil { - if delay && s.expressLaneService.currentRoundHasController() { + if !isExpressLaneController && s.expressLaneService.currentRoundHasController() { time.Sleep(s.config().Timeboost.ExpressLaneAdvantage) } } @@ -558,7 +562,7 @@ func (s *Sequencer) PublishExpressLaneTransaction(ctx context.Context, msg *time if err := s.expressLaneService.validateExpressLaneTx(msg); err != nil { return err } - return s.expressLaneService.sequenceExpressLaneSubmission(ctx, msg, s.publishTransactionImpl) + return s.expressLaneService.sequenceExpressLaneSubmission(ctx, msg) } func (s *Sequencer) PublishAuctionResolutionTransaction(ctx context.Context, tx *types.Transaction) error { @@ -1276,6 +1280,7 @@ func (s *Sequencer) StartExpressLane(ctx context.Context, apiBackend *arbitrum.A } els, err := newExpressLaneService( + s, apiBackend, filterSystem, auctionContractAddr,