Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
colinlyguo committed Jan 15, 2024
1 parent b7af443 commit 45d17e9
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 91 deletions.
18 changes: 9 additions & 9 deletions database/migrate/migrations/00015_transaction.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-- +goose Up
-- +goose StatementBegin

CREATE TABLE transaction
CREATE TABLE pending_transaction
(
id SERIAL PRIMARY KEY,

Expand All @@ -16,7 +16,7 @@ CREATE TABLE transaction
gas_price NUMERIC NOT NULL,
gas_limit BIGINT NOT NULL,
nonce BIGINT NOT NULL,
submit_at BIGINT NOT NULL,
submit_block_number BIGINT NOT NULL,

sender_name VARCHAR NOT NULL,
sender_service VARCHAR NOT NULL,
Expand All @@ -28,19 +28,19 @@ CREATE TABLE transaction
deleted_at TIMESTAMP(0) DEFAULT NULL
);

CREATE INDEX idx_transaction_on_context_id
ON transaction (context_id);
CREATE INDEX idx_pending_transaction_on_context_id_status
ON pending_transaction (context_id, status);

CREATE INDEX idx_transaction_on_sender_type_status_nonce
ON transaction (sender_type, status, nonce);
CREATE INDEX idx_pending_transaction_on_sender_type_status_nonce
ON pending_transaction (sender_type, status, nonce);

COMMENT ON COLUMN transaction.type IS 'unknown, commit batch, finalize batch, L1 gas oracle, L2 gas oracle';
COMMENT ON COLUMN pending_transaction.type IS 'unknown, commit batch, finalize batch, L1 gas oracle, L2 gas oracle';

COMMENT ON COLUMN transaction.status IS 'unknown, pending, confirmed, failed';
COMMENT ON COLUMN pending_transaction.status IS 'unknown, pending, confirmed, failed';

-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
DROP TABLE IF EXISTS transaction;
DROP TABLE IF EXISTS pending_transaction;
-- +goose StatementEnd
40 changes: 20 additions & 20 deletions rollup/internal/controller/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type Sender struct {
blockNumber uint64 // Current block number on chain
baseFeePerGas uint64 // Current base fee per gas on chain

transactionOrm *orm.Transaction
pendingTransactionOrm *orm.PendingTransaction

confirmCh chan *Confirmation
stopCh chan struct{}
Expand Down Expand Up @@ -124,19 +124,19 @@ func NewSender(ctx context.Context, config *config.SenderConfig, priv *ecdsa.Pri
}

sender := &Sender{
ctx: ctx,
config: config,
client: client,
chainID: chainID,
auth: auth,
blockNumber: header.Number.Uint64(),
baseFeePerGas: baseFeePerGas,
transactionOrm: orm.NewTransaction(db),
confirmCh: make(chan *Confirmation, 128),
stopCh: make(chan struct{}),
name: name,
service: service,
senderType: senderType,
ctx: ctx,
config: config,
client: client,
chainID: chainID,
auth: auth,
blockNumber: header.Number.Uint64(),
baseFeePerGas: baseFeePerGas,
pendingTransactionOrm: orm.NewPendingTransaction(db),
confirmCh: make(chan *Confirmation, 128),
stopCh: make(chan struct{}),
name: name,
service: service,
senderType: senderType,
}
sender.metrics = initSenderMetrics(reg)

Expand Down Expand Up @@ -195,7 +195,7 @@ func (s *Sender) SendTransaction(contextID string, target *common.Address, value
return common.Hash{}, fmt.Errorf("failed to create and send transaction, err: %w", err)
}

if err = s.transactionOrm.InsertTransaction(s.ctx, contextID, s.getSenderMeta(), tx, atomic.LoadUint64(&s.blockNumber)); err != nil {
if err = s.pendingTransactionOrm.InsertPendingTransaction(s.ctx, contextID, s.getSenderMeta(), tx, atomic.LoadUint64(&s.blockNumber)); err != nil {
log.Error("failed to insert transaction", "from", s.auth.From.String(), "nonce", s.auth.Nonce.Uint64(), "err", err)
return common.Hash{}, fmt.Errorf("failed to insert transaction, err: %w", err)
}
Expand Down Expand Up @@ -409,7 +409,7 @@ func (s *Sender) checkPendingTransaction(header *gethTypes.Header, confirmed uin
}
}

pendingTransactions, err := s.transactionOrm.GetPendingTransactionsBySenderType(s.ctx, s.senderType, 100)
pendingTransactions, err := s.pendingTransactionOrm.GetPendingTransactionsBySenderType(s.ctx, s.senderType, 100)
if err != nil {
log.Error("failed to load pending transactions", "sender meta", s.getSenderMeta(), "error", err)
return
Expand All @@ -426,7 +426,7 @@ func (s *Sender) checkPendingTransaction(header *gethTypes.Header, confirmed uin
receipt, err := s.client.TransactionReceipt(s.ctx, tx.Hash())
if (err == nil) && (receipt != nil) {
if receipt.BlockNumber.Uint64() <= confirmed {
if err = s.transactionOrm.UpdateTransactionStatusByContextID(s.ctx, t.ContextID, types.TxStatusConfirmed); err != nil {
if err = s.pendingTransactionOrm.UpdatePendingTransactionStatusByContextID(s.ctx, t.ContextID, types.TxStatusConfirmed); err != nil {
log.Error("failed to update transaction status by context ID", "context ID", t.ContextID, "sender meta", s.getSenderMeta(), "from", s.auth.From.String(), "nonce", tx.Nonce(), "err", err)
return
}
Expand All @@ -438,20 +438,20 @@ func (s *Sender) checkPendingTransaction(header *gethTypes.Header, confirmed uin
SenderType: s.senderType,
}
}
} else if s.config.EscalateBlocks+t.SubmitAt < number {
} else if s.config.EscalateBlocks+t.SubmitBlockNumber < number {
log.Info("resubmit transaction",
"hash", tx.Hash().String(),
"from", s.auth.From.String(),
"nonce", tx.Nonce(),
"submit block number", t.SubmitAt,
"submit block number", t.SubmitBlockNumber,
"current block number", number,
"configured escalateBlocks", s.config.EscalateBlocks)

if newTx, err := s.resubmitTransaction(s.auth, tx); err != nil {
s.metrics.resubmitTransactionFailedTotal.WithLabelValues(s.service, s.name).Inc()
log.Error("failed to resubmit transaction", "context ID", t.ContextID, "sender meta", s.getSenderMeta(), "from", s.auth.From.String(), "nonce", newTx.Nonce(), "err", err)
} else {
if err := s.transactionOrm.InsertTransaction(s.ctx, t.ContextID, s.getSenderMeta(), newTx, number); err != nil {
if err := s.pendingTransactionOrm.InsertPendingTransaction(s.ctx, t.ContextID, s.getSenderMeta(), newTx, number); err != nil {
log.Error("failed to insert transaction", "context ID", t.ContextID, "sender meta", s.getSenderMeta(), "from", s.auth.From.String(), "nonce", newTx.Nonce(), "err", err)
return
}
Expand Down
22 changes: 11 additions & 11 deletions rollup/internal/orm/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import (
var (
base *docker.App

db *gorm.DB
l2BlockOrm *L2Block
chunkOrm *Chunk
batchOrm *Batch
transactionOrm *Transaction
db *gorm.DB
l2BlockOrm *L2Block
chunkOrm *Chunk
batchOrm *Batch
pendingTransactionOrm *PendingTransaction

wrappedBlock1 *types.WrappedBlock
wrappedBlock2 *types.WrappedBlock
Expand Down Expand Up @@ -63,7 +63,7 @@ func setupEnv(t *testing.T) {
batchOrm = NewBatch(db)
chunkOrm = NewChunk(db)
l2BlockOrm = NewL2Block(db)
transactionOrm = NewTransaction(db)
pendingTransactionOrm = NewPendingTransaction(db)

templateBlockTrace, err := os.ReadFile("../../../common/testdata/blockTrace_02.json")
assert.NoError(t, err)
Expand Down Expand Up @@ -359,20 +359,20 @@ func TestTransactionOrm(t *testing.T) {
Type: types.SenderTypeUnknown,
}

err = transactionOrm.InsertTransaction(context.Background(), "context1", senderMeta, tx1, uint64(time.Now().Unix()))
err = pendingTransactionOrm.InsertPendingTransaction(context.Background(), "context1", senderMeta, tx1, uint64(time.Now().Unix()))
assert.NoError(t, err)

err = transactionOrm.InsertTransaction(context.Background(), "context1", senderMeta, tx2, uint64(time.Now().Unix()))
err = pendingTransactionOrm.InsertPendingTransaction(context.Background(), "context1", senderMeta, tx2, uint64(time.Now().Unix()))
assert.NoError(t, err)

txs, err := transactionOrm.GetPendingTransactionsBySenderType(context.Background(), senderMeta.Type, 100)
txs, err := pendingTransactionOrm.GetPendingTransactionsBySenderType(context.Background(), senderMeta.Type, 100)
assert.NoError(t, err)
assert.Len(t, txs, 2)

err = transactionOrm.UpdateTransactionStatusByContextID(context.Background(), "context1", types.TxStatusConfirmed)
err = pendingTransactionOrm.UpdatePendingTransactionStatusByContextID(context.Background(), "context1", types.TxStatusConfirmed)
assert.NoError(t, err)

txs, err = transactionOrm.GetPendingTransactionsBySenderType(context.Background(), senderMeta.Type, 100)
txs, err = pendingTransactionOrm.GetPendingTransactionsBySenderType(context.Background(), senderMeta.Type, 100)
assert.NoError(t, err)
assert.Len(t, txs, 0)
}
103 changes: 52 additions & 51 deletions rollup/internal/orm/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,64 +21,64 @@ type SenderMeta struct {
Type types.SenderType
}

// Transaction represents the structure of a transaction in the database.
type Transaction struct {
// PendingTransaction represents the structure of a transaction in the database.
type PendingTransaction struct {
db *gorm.DB `gorm:"column:-"`

ID uint `json:"id" gorm:"id;primaryKey"`
ContextID string `json:"context_id" gorm:"context_id"`
Hash string `json:"hash" gorm:"hash"`
Type uint8 `json:"type" gorm:"type"`
GasFeeCap string `json:"gas_fee_cap" gorm:"gas_fee_cap"`
GasTipCap string `json:"gas_tip_cap" gorm:"gas_tip_cap"`
GasPrice string `json:"gas_price" gorm:"gas_price"`
GasLimit uint64 `json:"gas_limit" gorm:"gas_limit"`
Nonce uint64 `json:"nonce" gorm:"nonce"`
SubmitAt uint64 `json:"submit_at" gorm:"submit_at"`
Status types.TxStatus `json:"status" gorm:"status"`
RLPEncoding []byte `json:"rlp_encoding" gorm:"rlp_encoding"`
SenderName string `json:"sender_name" gorm:"sender_name"`
SenderService string `json:"sender_service" gorm:"sender_service"`
SenderAddress string `json:"sender_address" gorm:"sender_address"`
SenderType types.SenderType `json:"sender_type" gorm:"sender_type"`
CreatedAt time.Time `json:"created_at" gorm:"column:created_at"`
UpdatedAt time.Time `json:"updated_at" gorm:"column:updated_at"`
DeletedAt gorm.DeletedAt `json:"deleted_at" gorm:"column:deleted_at"`
ID uint `json:"id" gorm:"id;primaryKey"`
ContextID string `json:"context_id" gorm:"context_id"`
Hash string `json:"hash" gorm:"hash"`
Type uint8 `json:"type" gorm:"type"`
GasFeeCap string `json:"gas_fee_cap" gorm:"gas_fee_cap"`
GasTipCap string `json:"gas_tip_cap" gorm:"gas_tip_cap"`
GasPrice string `json:"gas_price" gorm:"gas_price"`
GasLimit uint64 `json:"gas_limit" gorm:"gas_limit"`
Nonce uint64 `json:"nonce" gorm:"nonce"`
SubmitBlockNumber uint64 `json:"submit_block_number" gorm:"submit_block_number"`
Status types.TxStatus `json:"status" gorm:"status"`
RLPEncoding []byte `json:"rlp_encoding" gorm:"rlp_encoding"`
SenderName string `json:"sender_name" gorm:"sender_name"`
SenderService string `json:"sender_service" gorm:"sender_service"`
SenderAddress string `json:"sender_address" gorm:"sender_address"`
SenderType types.SenderType `json:"sender_type" gorm:"sender_type"`
CreatedAt time.Time `json:"created_at" gorm:"column:created_at"`
UpdatedAt time.Time `json:"updated_at" gorm:"column:updated_at"`
DeletedAt gorm.DeletedAt `json:"deleted_at" gorm:"column:deleted_at"`
}

// TableName returns the table name for the Transaction model.
func (*Transaction) TableName() string {
return "transaction"
func (*PendingTransaction) TableName() string {
return "pending_transaction"
}

// NewTransaction returns a new instance of Transaction.
func NewTransaction(db *gorm.DB) *Transaction {
return &Transaction{db: db}
// NewPendignTransaction returns a new instance of PendingTransaction.
func NewPendingTransaction(db *gorm.DB) *PendingTransaction {
return &PendingTransaction{db: db}
}

// InsertTransaction creates a new transaction record and stores it in the database.
func (t *Transaction) InsertTransaction(ctx context.Context, contextID string, senderMeta *SenderMeta, tx *gethTypes.Transaction, submitAt uint64) error {
// InsertPendingTransaction creates a new pending transaction record and stores it in the database.
func (t *PendingTransaction) InsertPendingTransaction(ctx context.Context, contextID string, senderMeta *SenderMeta, tx *gethTypes.Transaction, submitBlockNumber uint64) error {
rlp := new(bytes.Buffer)
if err := tx.EncodeRLP(rlp); err != nil {
return fmt.Errorf("failed to encode rlp, err: %w", err)
}

newTransaction := &Transaction{
ContextID: contextID,
Hash: tx.Hash().String(),
Type: tx.Type(),
GasFeeCap: tx.GasFeeCap().String(),
GasTipCap: tx.GasTipCap().String(),
GasPrice: tx.GasPrice().String(),
GasLimit: tx.Gas(),
Nonce: tx.Nonce(),
SubmitAt: submitAt,
Status: types.TxStatusPending,
RLPEncoding: rlp.Bytes(),
SenderName: senderMeta.Name,
SenderAddress: senderMeta.Address.String(),
SenderService: senderMeta.Service,
SenderType: senderMeta.Type,
newTransaction := &PendingTransaction{
ContextID: contextID,
Hash: tx.Hash().String(),
Type: tx.Type(),
GasFeeCap: tx.GasFeeCap().String(),
GasTipCap: tx.GasTipCap().String(),
GasPrice: tx.GasPrice().String(),
GasLimit: tx.Gas(),
Nonce: tx.Nonce(),
SubmitBlockNumber: submitBlockNumber,
Status: types.TxStatusPending,
RLPEncoding: rlp.Bytes(),
SenderName: senderMeta.Name,
SenderAddress: senderMeta.Address.String(),
SenderService: senderMeta.Service,
SenderType: senderMeta.Type,
}

db := t.db.WithContext(ctx)
Expand All @@ -88,27 +88,28 @@ func (t *Transaction) InsertTransaction(ctx context.Context, contextID string, s
return nil
}

// UpdateTransactionStatusByContextID updates the status of a transaction based on the given context ID.
func (t *Transaction) UpdateTransactionStatusByContextID(ctx context.Context, contextID string, status types.TxStatus) error {
// UpdatePendingTransactionStatusByContextID updates the status of a transaction based on the given context ID.
func (t *PendingTransaction) UpdatePendingTransactionStatusByContextID(ctx context.Context, contextID string, status types.TxStatus) error {
db := t.db.WithContext(ctx)
db = db.Model(&Transaction{})
db = db.Model(&PendingTransaction{})
db = db.Where("context_id = ?", contextID)
db = db.Where("status = ?", types.TxStatusPending)
if err := db.Update("status", status).Error; err != nil {
return fmt.Errorf("failed to UpdateTransactionStatus, error: %w", err)
}
return nil
}

// GetPendingTransactionsBySenderType retrieves pending transactions filtered by sender type, ordered by nonce, and limited to a specified count.
func (t *Transaction) GetPendingTransactionsBySenderType(ctx context.Context, senderType types.SenderType, limit int) ([]Transaction, error) {
var transactions []Transaction
func (t *PendingTransaction) GetPendingTransactionsBySenderType(ctx context.Context, senderType types.SenderType, limit int) ([]PendingTransaction, error) {
var pendingTransactions []PendingTransaction
db := t.db.WithContext(ctx)
db = db.Where("sender_type = ?", senderType)
db = db.Where("status = ?", types.TxStatusPending)
db = db.Order("nonce asc")
db = db.Limit(limit)
if err := db.Find(&transactions).Error; err != nil {
if err := db.Find(&pendingTransactions).Error; err != nil {
return nil, fmt.Errorf("failed to get pending transactions by sender type, error: %w", err)
}
return transactions, nil
return pendingTransactions, nil
}

0 comments on commit 45d17e9

Please sign in to comment.