diff --git a/internal/db/dbclient.go b/internal/db/dbclient.go index d15b7dd..b2d9a87 100644 --- a/internal/db/dbclient.go +++ b/internal/db/dbclient.go @@ -97,6 +97,10 @@ func (db *Database) FindFailedUnbodningDocuments(ctx context.Context) ([]model.U return db.findUnbondingDocumentsWithState(ctx, model.Failed) } +func (db *Database) FindUnbondingDocumentsWithNoCovenantQuorum(ctx context.Context) ([]model.UnbondingDocument, error) { + return db.findUnbondingDocumentsWithState(ctx, model.FailedToGetCovenantSignatures) +} + func (db *Database) FindSendUnbondingDocuments(ctx context.Context) ([]model.UnbondingDocument, error) { return db.findUnbondingDocumentsWithState(ctx, model.Send) } @@ -129,3 +133,9 @@ func (db *Database) SetUnbondingDocumentInputAlreadySpent( unbondingTxHashHex string) error { return db.updateUnbondingDocumentState(ctx, unbondingTxHashHex, model.InputAlreadySpent) } + +func (db *Database) SetUnbondingDocumentFailedToGetCovenantSignatures( + ctx context.Context, + unbondingTxHashHex string) error { + return db.updateUnbondingDocumentState(ctx, unbondingTxHashHex, model.FailedToGetCovenantSignatures) +} diff --git a/internal/db/model/unbonding.go b/internal/db/model/unbonding.go index 4da2246..3ed81f3 100644 --- a/internal/db/model/unbonding.go +++ b/internal/db/model/unbonding.go @@ -9,13 +9,11 @@ const ( type UnbondingState string const ( - Inserted UnbondingState = "INSERTED" - Send UnbondingState = "SEND" - // TODO: This is not used now, but it will be necessary to cover the case when - // we try to send unbonding transaction but someone already withdrew the staking - // output - InputAlreadySpent UnbondingState = "INPUT_ALREADY_SPENT" - Failed UnbondingState = "FAILED" + Inserted UnbondingState = "INSERTED" + Send UnbondingState = "SEND" + InputAlreadySpent UnbondingState = "INPUT_ALREADY_SPENT" + Failed UnbondingState = "FAILED" + FailedToGetCovenantSignatures UnbondingState = "FAILED_TO_GET_COVENANT_SIGNATURES" ) type UnbondingDocument struct { diff --git a/internal/services/expected_interfaces.go b/internal/services/expected_interfaces.go index 6d274f9..236275b 100644 --- a/internal/services/expected_interfaces.go +++ b/internal/services/expected_interfaces.go @@ -125,9 +125,13 @@ type UnbondingStore interface { GetFailedUnbondingTransactions(ctx context.Context) ([]*UnbondingTxData, error) + GetUnbondingTransactionsWithNoQuorum(ctx context.Context) ([]*UnbondingTxData, error) + SetUnbondingTransactionProcessed(ctx context.Context, utx *UnbondingTxData) error SetUnbondingTransactionProcessingFailed(ctx context.Context, utx *UnbondingTxData) error SetUnbondingTransactionInputAlreadySpent(ctx context.Context, utx *UnbondingTxData) error + + SetUnbondingTransactionFailedToGetCovenantSignatures(ctx context.Context, utx *UnbondingTxData) error } diff --git a/internal/services/in_mem_store.go b/internal/services/in_mem_store.go index 53bf489..4d94444 100644 --- a/internal/services/in_mem_store.go +++ b/internal/services/in_mem_store.go @@ -14,10 +14,11 @@ import ( type state string const ( - inserted state = "inserted" - send state = "send" - inputAlreadySpent state = "input_already_spent" - failed state = "failed" + inserted state = "inserted" + send state = "send" + inputAlreadySpent state = "input_already_spent" + failed state = "failed" + failedToGetSignatues state = "failed_to_get_covenant_signatures" ) type unbondingTxDataWithCounter struct { @@ -136,6 +137,35 @@ func (s *InMemoryUnbondingStore) GetFailedUnbondingTransactions(ctx context.Cont return resUnbondingTxData, nil } +func (s *InMemoryUnbondingStore) GetUnbondingTransactionsWithNoQuorum(ctx context.Context) ([]*UnbondingTxData, error) { + s.mu.Lock() + defer s.mu.Unlock() + + var res []*unbondingTxDataWithCounter + + for _, tx := range s.mapping { + txCopy := tx + // get only failed transactions + if tx.state == failedToGetSignatues { + res = append(res, txCopy) + } + } + + // sort by counter + sort.SliceStable(res, func(i, j int) bool { + return res[i].Counter < res[j].Counter + }) + + // convert to UnbondingTxData + var resUnbondingTxData []*UnbondingTxData + for _, tx := range res { + txCopy := tx + resUnbondingTxData = append(resUnbondingTxData, &txCopy.UnbondingTxData) + } + + return resUnbondingTxData, nil +} + func (s *InMemoryUnbondingStore) SetUnbondingTransactionProcessed(_ context.Context, utx *UnbondingTxData) error { s.mu.Lock() defer s.mu.Unlock() @@ -180,3 +210,18 @@ func (s *InMemoryUnbondingStore) SetUnbondingTransactionInputAlreadySpent(_ cont return nil } + +func (s *InMemoryUnbondingStore) SetUnbondingTransactionFailedToGetCovenantSignatures(_ context.Context, utx *UnbondingTxData) error { + s.mu.Lock() + defer s.mu.Unlock() + + tx, exists := s.mapping[*utx.UnbondingTransactionHash] + + if !exists { + return fmt.Errorf("tx with hash %s does not exist", *utx.UnbondingTransactionHash) + } + + tx.state = failedToGetSignatues + + return nil +} diff --git a/internal/services/persistent_store.go b/internal/services/persistent_store.go index 0501aa1..c48731d 100644 --- a/internal/services/persistent_store.go +++ b/internal/services/persistent_store.go @@ -226,6 +226,13 @@ func (s *PersistentUnbondingStorage) GetFailedUnbondingTransactions(ctx context. ) } +func (s *PersistentUnbondingStorage) GetUnbondingTransactionsWithNoQuorum(ctx context.Context) ([]*UnbondingTxData, error) { + return transformDocuments( + ctx, + s.client.FindUnbondingDocumentsWithNoCovenantQuorum, + ) +} + func (s *PersistentUnbondingStorage) GetNotProcessedUnbondingTransactions(ctx context.Context) ([]*UnbondingTxData, error) { return transformDocuments( ctx, @@ -247,3 +254,8 @@ func (s *PersistentUnbondingStorage) SetUnbondingTransactionInputAlreadySpent(ct txHash := utx.UnbondingTransactionHash.String() return s.client.SetUnbondingDocumentFailed(ctx, txHash) } + +func (s *PersistentUnbondingStorage) SetUnbondingTransactionFailedToGetCovenantSignatures(ctx context.Context, utx *UnbondingTxData) error { + txHash := utx.UnbondingTransactionHash.String() + return s.client.SetUnbondingDocumentFailedToGetCovenantSignatures(ctx, txHash) +} diff --git a/internal/services/unbonding_pipeline.go b/internal/services/unbonding_pipeline.go index acbaf8f..b0f4f05 100644 --- a/internal/services/unbonding_pipeline.go +++ b/internal/services/unbonding_pipeline.go @@ -7,12 +7,14 @@ import ( "fmt" "log/slog" + "github.com/babylonchain/babylon/types" "github.com/babylonchain/cli-tools/internal/btcclient" "github.com/babylonchain/cli-tools/internal/config" "github.com/babylonchain/cli-tools/internal/db" "github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/btcec/v2/schnorr" "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" "github.com/prometheus/client_golang/prometheus/push" ) @@ -22,11 +24,26 @@ var ( // code, or we allowed some invalid data into database. // When this happend we stop processing pipeline and return immediately, without // changing status of any unbonding transaction. - ErrCriticalError = fmt.Errorf("critical error encountered") + ErrCriticalError = fmt.Errorf("critical error") ) -func wrapCrititical(err error) error { - return fmt.Errorf("%s:%w", err.Error(), ErrCriticalError) +func mustBtcTxToHex(tx *wire.MsgTx) string { + bytes, err := types.SerializeBTCTx(tx) + if err != nil { + panic(err) + } + return hex.EncodeToString(bytes) +} + +func wrapCrititical( + stakingTx *wire.MsgTx, + stakingTxHash *chainhash.Hash, + err error, +) error { + stakingTxHex := mustBtcTxToHex(stakingTx) + return fmt.Errorf( + "err: %s, staking_tx_haxh:%s, staking_tx:%s: %w", err.Error(), stakingTxHash.String(), stakingTxHex, ErrCriticalError, + ) } func pubKeyToStringSchnorr(pubKey *btcec.PublicKey) string { @@ -247,6 +264,7 @@ func (up *UnbondingPipeline) processUnbondingTransactions( utx := tx stakingOutputFromDb := utx.StakingOutput() + stakingTxHash := utx.UnbondingTransaction.TxIn[0].PreviousOutPoint.Hash stakingTxInfo, err := up.sender.TxByHash( @@ -257,14 +275,22 @@ func (up *UnbondingPipeline) processUnbondingTransactions( // if the staking transaction is not found in btc chain, it means something is wrong // as staking service should not allow to create unbonding transaction without staking transaction if err != nil { - return wrapCrititical(err) + return wrapCrititical( + utx.StakingTransactionData.StakingTransaction, + &stakingTxHash, + err, + ) } params, err := up.retriever.ParamsByHeight(ctx, uint64(stakingTxInfo.TxInclusionHeight)) // we should always be able to retrieve params for the height of the staking transaction if err != nil { - return wrapCrititical(err) + return wrapCrititical( + utx.StakingTransactionData.StakingTransaction, + &stakingTxHash, + err, + ) } stakingOutputRecovered, unbondingPathSpendInfo, err := CreateUnbondingPathSpendInfo( @@ -274,7 +300,11 @@ func (up *UnbondingPipeline) processUnbondingTransactions( ) if err != nil { - return wrapCrititical(err) + return wrapCrititical( + utx.StakingTransactionData.StakingTransaction, + &stakingTxHash, + err, + ) } // This the last line check before sending unbonding transaction for signing. It checks @@ -285,7 +315,11 @@ func (up *UnbondingPipeline) processUnbondingTransactions( // - pipeline is run on bad BTC network // - stakingApi service has a bug if !outputsAreEqual(stakingOutputRecovered, stakingOutputFromDb) { - return wrapCrititical(fmt.Errorf("staking output from staking tx and staking output re-build from params are different")) + return wrapCrititical( + utx.StakingTransactionData.StakingTransaction, + &stakingTxHash, + fmt.Errorf("staking output from staking tx and staking output re-build from params are different"), + ) } sigs, err := up.signUnbondingTransaction( @@ -297,7 +331,26 @@ func (up *UnbondingPipeline) processUnbondingTransactions( ) if err != nil { - return wrapCrititical(err) + up.Metrics.RecordFailedCovenantQuorum() + + unbondingTxHex := mustBtcTxToHex(utx.UnbondingTransaction) + + up.logger.Error("Failed to get quorum of covenant signatures to unbond", + "staking_tx_hash", tx.StakingTransactionData.StakingTransaction.TxHash().String(), + "unbonding_tx_hash", tx.UnbondingTransactionHash.String(), + "unbonding_tx", unbondingTxHex, + ) + + // Note that we failed to get signatures from covenant members + if err := up.store.SetUnbondingTransactionFailedToGetCovenantSignatures(ctx, utx); err != nil { + return wrapCrititical( + utx.StakingTransactionData.StakingTransaction, + &stakingTxHash, + err, + ) + } + + continue } up.logger.Info("Successfully collected quorum of covenant signatures to unbond", @@ -315,7 +368,11 @@ func (up *UnbondingPipeline) processUnbondingTransactions( ) if err != nil { - return wrapCrititical(err) + return wrapCrititical( + utx.StakingTransactionData.StakingTransaction, + &stakingTxHash, + err, + ) } // We assume that this is valid unbodning transaction, with 1 input @@ -331,7 +388,11 @@ func (up *UnbondingPipeline) processUnbondingTransactions( up.logger.Info("The input of the unbonding transaction has already been spent", slog.String("staking_tx_hash", stakingTxHash.String())) if err := up.store.SetUnbondingTransactionInputAlreadySpent(ctx, utx); err != nil { - return wrapCrititical(err) + return wrapCrititical( + utx.StakingTransactionData.StakingTransaction, + &stakingTxHash, + err, + ) } continue } @@ -341,7 +402,11 @@ func (up *UnbondingPipeline) processUnbondingTransactions( if err != nil { up.logger.Error("Failed to send unbonding transaction", "error", err) if err := up.store.SetUnbondingTransactionProcessingFailed(ctx, utx); err != nil { - return wrapCrititical(err) + return wrapCrititical( + utx.StakingTransactionData.StakingTransaction, + &stakingTxHash, + err, + ) } up.Metrics.RecordFailedUnbodingTransaction() } else { @@ -350,7 +415,11 @@ func (up *UnbondingPipeline) processUnbondingTransactions( slog.String("tx_hash", hash.String()), ) if err := up.store.SetUnbondingTransactionProcessed(ctx, utx); err != nil { - return wrapCrititical(err) + return wrapCrititical( + utx.StakingTransactionData.StakingTransaction, + &stakingTxHash, + err, + ) } up.Metrics.RecordSentUnbondingTransaction() } @@ -398,12 +467,6 @@ func (up *UnbondingPipeline) ProcessNewTransactions(ctx context.Context) error { func (up *UnbondingPipeline) ProcessFailedTransactions(ctx context.Context) error { up.logger.Info("Running unbonding pipeline for failed transactions") - unbondingTransactions, err := up.store.GetFailedUnbondingTransactions(ctx) - - if err != nil { - return err - } - defer func() { if up.Metrics.Config.Enabled { if err := up.pushMetrics(); err != nil { @@ -412,9 +475,22 @@ func (up *UnbondingPipeline) ProcessFailedTransactions(ctx context.Context) erro } }() - if len(unbondingTransactions) == 0 { - up.logger.Info("No failed unbonding transactions to process") - return nil + // 1. First process transactions that failed to get quorum of covenant signatures + unbondingTxWithNoQuorum, err := up.store.GetUnbondingTransactionsWithNoQuorum(ctx) + + if err != nil { + return err + } + + if err := up.processUnbondingTransactions(ctx, unbondingTxWithNoQuorum); err != nil { + return err + } + + // 2. Second process other failed unbonding transactions + unbondingTransactions, err := up.store.GetFailedUnbondingTransactions(ctx) + + if err != nil { + return err } if err := up.processUnbondingTransactions(ctx, unbondingTransactions); err != nil { diff --git a/internal/services/unbonding_pipeline_metrics.go b/internal/services/unbonding_pipeline_metrics.go index ca611e2..dc9cee6 100644 --- a/internal/services/unbonding_pipeline_metrics.go +++ b/internal/services/unbonding_pipeline_metrics.go @@ -10,6 +10,7 @@ type PipelineMetrics struct { FailedSigningReqs *prometheus.CounterVec SuccessfulSentTransactions prometheus.Counter FailureSentTransactions prometheus.Counter + FailureToGetCovenantQuorum prometheus.Counter Config *config.MetricsConfig } @@ -41,6 +42,12 @@ func NewPipelineMetrics(cfg *config.MetricsConfig) *PipelineMetrics { Help: "How many transactions failed to be sent to the network", }, ), + FailureToGetCovenantQuorum: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "number_of_failed_covenant_quorums", + Help: "How many times we failed to get covenant quorum for signing request", + }, + ), Config: cfg, } } @@ -60,3 +67,7 @@ func (pm *PipelineMetrics) RecordSentUnbondingTransaction() { func (pm *PipelineMetrics) RecordFailedUnbodingTransaction() { pm.FailureSentTransactions.Inc() } + +func (pm *PipelineMetrics) RecordFailedCovenantQuorum() { + pm.FailureToGetCovenantQuorum.Inc() +} diff --git a/itest/e2e_test.go b/itest/e2e_test.go index 2892499..364aeae 100644 --- a/itest/e2e_test.go +++ b/itest/e2e_test.go @@ -6,7 +6,6 @@ package e2etest import ( "context" "encoding/hex" - "errors" "fmt" "math" "os" @@ -634,7 +633,7 @@ func (tm *TestManager) updateSchnorSigInDb(newSig *schnorr.Signature, txHash *ch require.NoError(tm.t, err) } -func TestHandlingCriticalError(t *testing.T) { +func TestHandlingCriticalSigningError(t *testing.T) { m := StartManager(t, 10, true) d := defaultStakingData() @@ -669,10 +668,11 @@ func TestHandlingCriticalError(t *testing.T) { // 2. Run pipeline err = m.pipeLine.ProcessNewTransactions(context.Background()) - require.Error(t, err) - // With invalid signature in db, signers will refuse to sign it, which should end - // with critical error - require.True(t, errors.Is(err, services.ErrCriticalError)) + require.NoError(t, err) - // TODO:Find a way to simulate bitcoind not accepting transaction + failedQuorumTx, err := m.testStoreController.GetUnbondingTransactionsWithNoQuorum(context.TODO()) + require.NoError(t, err) + require.Len(t, failedQuorumTx, 1) + // check that it is in fact our tx with invalid schnorr signature + require.Equal(t, invalidSchnorrSigBytes, failedQuorumTx[0].UnbondingTransactionSig.Serialize()) }