Skip to content

Commit

Permalink
Merge pull request #44 from babylonchain/critical-errors-handling
Browse files Browse the repository at this point in the history
Handle error of not getting covenant quorum
  • Loading branch information
KonradStaniec authored Jun 19, 2024
2 parents d3921ef + 38f6ce8 commit 6a4b91f
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 39 deletions.
10 changes: 10 additions & 0 deletions internal/db/dbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ func (db *Database) FindFailedUnbodningDocuments(ctx context.Context) ([]model.U
return db.findUnbondingDocumentsWithState(ctx, model.Failed)
}

func (db *Database) FindUnbondingDocumentsWithNoCovenantQuorum(ctx context.Context) ([]model.UnbondingDocument, error) {
return db.findUnbondingDocumentsWithState(ctx, model.FailedToGetCovenantSignatures)
}

func (db *Database) FindSendUnbondingDocuments(ctx context.Context) ([]model.UnbondingDocument, error) {
return db.findUnbondingDocumentsWithState(ctx, model.Send)
}
Expand Down Expand Up @@ -129,3 +133,9 @@ func (db *Database) SetUnbondingDocumentInputAlreadySpent(
unbondingTxHashHex string) error {
return db.updateUnbondingDocumentState(ctx, unbondingTxHashHex, model.InputAlreadySpent)
}

func (db *Database) SetUnbondingDocumentFailedToGetCovenantSignatures(
ctx context.Context,
unbondingTxHashHex string) error {
return db.updateUnbondingDocumentState(ctx, unbondingTxHashHex, model.FailedToGetCovenantSignatures)
}
12 changes: 5 additions & 7 deletions internal/db/model/unbonding.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@ const (
type UnbondingState string

const (
Inserted UnbondingState = "INSERTED"
Send UnbondingState = "SEND"
// TODO: This is not used now, but it will be necessary to cover the case when
// we try to send unbonding transaction but someone already withdrew the staking
// output
InputAlreadySpent UnbondingState = "INPUT_ALREADY_SPENT"
Failed UnbondingState = "FAILED"
Inserted UnbondingState = "INSERTED"
Send UnbondingState = "SEND"
InputAlreadySpent UnbondingState = "INPUT_ALREADY_SPENT"
Failed UnbondingState = "FAILED"
FailedToGetCovenantSignatures UnbondingState = "FAILED_TO_GET_COVENANT_SIGNATURES"
)

type UnbondingDocument struct {
Expand Down
4 changes: 4 additions & 0 deletions internal/services/expected_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,13 @@ type UnbondingStore interface {

GetFailedUnbondingTransactions(ctx context.Context) ([]*UnbondingTxData, error)

GetUnbondingTransactionsWithNoQuorum(ctx context.Context) ([]*UnbondingTxData, error)

SetUnbondingTransactionProcessed(ctx context.Context, utx *UnbondingTxData) error

SetUnbondingTransactionProcessingFailed(ctx context.Context, utx *UnbondingTxData) error

SetUnbondingTransactionInputAlreadySpent(ctx context.Context, utx *UnbondingTxData) error

SetUnbondingTransactionFailedToGetCovenantSignatures(ctx context.Context, utx *UnbondingTxData) error
}
53 changes: 49 additions & 4 deletions internal/services/in_mem_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ import (
type state string

const (
inserted state = "inserted"
send state = "send"
inputAlreadySpent state = "input_already_spent"
failed state = "failed"
inserted state = "inserted"
send state = "send"
inputAlreadySpent state = "input_already_spent"
failed state = "failed"
failedToGetSignatues state = "failed_to_get_covenant_signatures"
)

type unbondingTxDataWithCounter struct {
Expand Down Expand Up @@ -136,6 +137,35 @@ func (s *InMemoryUnbondingStore) GetFailedUnbondingTransactions(ctx context.Cont
return resUnbondingTxData, nil
}

func (s *InMemoryUnbondingStore) GetUnbondingTransactionsWithNoQuorum(ctx context.Context) ([]*UnbondingTxData, error) {
s.mu.Lock()
defer s.mu.Unlock()

var res []*unbondingTxDataWithCounter

for _, tx := range s.mapping {
txCopy := tx
// get only failed transactions
if tx.state == failedToGetSignatues {
res = append(res, txCopy)
}
}

// sort by counter
sort.SliceStable(res, func(i, j int) bool {
return res[i].Counter < res[j].Counter
})

// convert to UnbondingTxData
var resUnbondingTxData []*UnbondingTxData
for _, tx := range res {
txCopy := tx
resUnbondingTxData = append(resUnbondingTxData, &txCopy.UnbondingTxData)
}

return resUnbondingTxData, nil
}

func (s *InMemoryUnbondingStore) SetUnbondingTransactionProcessed(_ context.Context, utx *UnbondingTxData) error {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down Expand Up @@ -180,3 +210,18 @@ func (s *InMemoryUnbondingStore) SetUnbondingTransactionInputAlreadySpent(_ cont

return nil
}

func (s *InMemoryUnbondingStore) SetUnbondingTransactionFailedToGetCovenantSignatures(_ context.Context, utx *UnbondingTxData) error {
s.mu.Lock()
defer s.mu.Unlock()

tx, exists := s.mapping[*utx.UnbondingTransactionHash]

if !exists {
return fmt.Errorf("tx with hash %s does not exist", *utx.UnbondingTransactionHash)
}

tx.state = failedToGetSignatues

return nil
}
12 changes: 12 additions & 0 deletions internal/services/persistent_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,13 @@ func (s *PersistentUnbondingStorage) GetFailedUnbondingTransactions(ctx context.
)
}

func (s *PersistentUnbondingStorage) GetUnbondingTransactionsWithNoQuorum(ctx context.Context) ([]*UnbondingTxData, error) {
return transformDocuments(
ctx,
s.client.FindUnbondingDocumentsWithNoCovenantQuorum,
)
}

func (s *PersistentUnbondingStorage) GetNotProcessedUnbondingTransactions(ctx context.Context) ([]*UnbondingTxData, error) {
return transformDocuments(
ctx,
Expand All @@ -247,3 +254,8 @@ func (s *PersistentUnbondingStorage) SetUnbondingTransactionInputAlreadySpent(ct
txHash := utx.UnbondingTransactionHash.String()
return s.client.SetUnbondingDocumentFailed(ctx, txHash)
}

func (s *PersistentUnbondingStorage) SetUnbondingTransactionFailedToGetCovenantSignatures(ctx context.Context, utx *UnbondingTxData) error {
txHash := utx.UnbondingTransactionHash.String()
return s.client.SetUnbondingDocumentFailedToGetCovenantSignatures(ctx, txHash)
}
118 changes: 97 additions & 21 deletions internal/services/unbonding_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import (
"fmt"
"log/slog"

"github.com/babylonchain/babylon/types"
"github.com/babylonchain/cli-tools/internal/btcclient"
"github.com/babylonchain/cli-tools/internal/config"
"github.com/babylonchain/cli-tools/internal/db"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/btcec/v2/schnorr"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/prometheus/client_golang/prometheus/push"
)
Expand All @@ -22,11 +24,26 @@ var (
// code, or we allowed some invalid data into database.
// When this happend we stop processing pipeline and return immediately, without
// changing status of any unbonding transaction.
ErrCriticalError = fmt.Errorf("critical error encountered")
ErrCriticalError = fmt.Errorf("critical error")
)

func wrapCrititical(err error) error {
return fmt.Errorf("%s:%w", err.Error(), ErrCriticalError)
func mustBtcTxToHex(tx *wire.MsgTx) string {
bytes, err := types.SerializeBTCTx(tx)
if err != nil {
panic(err)
}
return hex.EncodeToString(bytes)
}

func wrapCrititical(
stakingTx *wire.MsgTx,
stakingTxHash *chainhash.Hash,
err error,
) error {
stakingTxHex := mustBtcTxToHex(stakingTx)
return fmt.Errorf(
"err: %s, staking_tx_haxh:%s, staking_tx:%s: %w", err.Error(), stakingTxHash.String(), stakingTxHex, ErrCriticalError,
)
}

func pubKeyToStringSchnorr(pubKey *btcec.PublicKey) string {
Expand Down Expand Up @@ -247,6 +264,7 @@ func (up *UnbondingPipeline) processUnbondingTransactions(
utx := tx

stakingOutputFromDb := utx.StakingOutput()

stakingTxHash := utx.UnbondingTransaction.TxIn[0].PreviousOutPoint.Hash

stakingTxInfo, err := up.sender.TxByHash(
Expand All @@ -257,14 +275,22 @@ func (up *UnbondingPipeline) processUnbondingTransactions(
// if the staking transaction is not found in btc chain, it means something is wrong
// as staking service should not allow to create unbonding transaction without staking transaction
if err != nil {
return wrapCrititical(err)
return wrapCrititical(
utx.StakingTransactionData.StakingTransaction,
&stakingTxHash,
err,
)
}

params, err := up.retriever.ParamsByHeight(ctx, uint64(stakingTxInfo.TxInclusionHeight))

// we should always be able to retrieve params for the height of the staking transaction
if err != nil {
return wrapCrititical(err)
return wrapCrititical(
utx.StakingTransactionData.StakingTransaction,
&stakingTxHash,
err,
)
}

stakingOutputRecovered, unbondingPathSpendInfo, err := CreateUnbondingPathSpendInfo(
Expand All @@ -274,7 +300,11 @@ func (up *UnbondingPipeline) processUnbondingTransactions(
)

if err != nil {
return wrapCrititical(err)
return wrapCrititical(
utx.StakingTransactionData.StakingTransaction,
&stakingTxHash,
err,
)
}

// This the last line check before sending unbonding transaction for signing. It checks
Expand All @@ -285,7 +315,11 @@ func (up *UnbondingPipeline) processUnbondingTransactions(
// - pipeline is run on bad BTC network
// - stakingApi service has a bug
if !outputsAreEqual(stakingOutputRecovered, stakingOutputFromDb) {
return wrapCrititical(fmt.Errorf("staking output from staking tx and staking output re-build from params are different"))
return wrapCrititical(
utx.StakingTransactionData.StakingTransaction,
&stakingTxHash,
fmt.Errorf("staking output from staking tx and staking output re-build from params are different"),
)
}

sigs, err := up.signUnbondingTransaction(
Expand All @@ -297,7 +331,26 @@ func (up *UnbondingPipeline) processUnbondingTransactions(
)

if err != nil {
return wrapCrititical(err)
up.Metrics.RecordFailedCovenantQuorum()

unbondingTxHex := mustBtcTxToHex(utx.UnbondingTransaction)

up.logger.Error("Failed to get quorum of covenant signatures to unbond",
"staking_tx_hash", tx.StakingTransactionData.StakingTransaction.TxHash().String(),
"unbonding_tx_hash", tx.UnbondingTransactionHash.String(),
"unbonding_tx", unbondingTxHex,
)

// Note that we failed to get signatures from covenant members
if err := up.store.SetUnbondingTransactionFailedToGetCovenantSignatures(ctx, utx); err != nil {
return wrapCrititical(
utx.StakingTransactionData.StakingTransaction,
&stakingTxHash,
err,
)
}

continue
}

up.logger.Info("Successfully collected quorum of covenant signatures to unbond",
Expand All @@ -315,7 +368,11 @@ func (up *UnbondingPipeline) processUnbondingTransactions(
)

if err != nil {
return wrapCrititical(err)
return wrapCrititical(
utx.StakingTransactionData.StakingTransaction,
&stakingTxHash,
err,
)
}

// We assume that this is valid unbodning transaction, with 1 input
Expand All @@ -331,7 +388,11 @@ func (up *UnbondingPipeline) processUnbondingTransactions(
up.logger.Info("The input of the unbonding transaction has already been spent",
slog.String("staking_tx_hash", stakingTxHash.String()))
if err := up.store.SetUnbondingTransactionInputAlreadySpent(ctx, utx); err != nil {
return wrapCrititical(err)
return wrapCrititical(
utx.StakingTransactionData.StakingTransaction,
&stakingTxHash,
err,
)
}
continue
}
Expand All @@ -341,7 +402,11 @@ func (up *UnbondingPipeline) processUnbondingTransactions(
if err != nil {
up.logger.Error("Failed to send unbonding transaction", "error", err)
if err := up.store.SetUnbondingTransactionProcessingFailed(ctx, utx); err != nil {
return wrapCrititical(err)
return wrapCrititical(
utx.StakingTransactionData.StakingTransaction,
&stakingTxHash,
err,
)
}
up.Metrics.RecordFailedUnbodingTransaction()
} else {
Expand All @@ -350,7 +415,11 @@ func (up *UnbondingPipeline) processUnbondingTransactions(
slog.String("tx_hash", hash.String()),
)
if err := up.store.SetUnbondingTransactionProcessed(ctx, utx); err != nil {
return wrapCrititical(err)
return wrapCrititical(
utx.StakingTransactionData.StakingTransaction,
&stakingTxHash,
err,
)
}
up.Metrics.RecordSentUnbondingTransaction()
}
Expand Down Expand Up @@ -398,12 +467,6 @@ func (up *UnbondingPipeline) ProcessNewTransactions(ctx context.Context) error {
func (up *UnbondingPipeline) ProcessFailedTransactions(ctx context.Context) error {
up.logger.Info("Running unbonding pipeline for failed transactions")

unbondingTransactions, err := up.store.GetFailedUnbondingTransactions(ctx)

if err != nil {
return err
}

defer func() {
if up.Metrics.Config.Enabled {
if err := up.pushMetrics(); err != nil {
Expand All @@ -412,9 +475,22 @@ func (up *UnbondingPipeline) ProcessFailedTransactions(ctx context.Context) erro
}
}()

if len(unbondingTransactions) == 0 {
up.logger.Info("No failed unbonding transactions to process")
return nil
// 1. First process transactions that failed to get quorum of covenant signatures
unbondingTxWithNoQuorum, err := up.store.GetUnbondingTransactionsWithNoQuorum(ctx)

if err != nil {
return err
}

if err := up.processUnbondingTransactions(ctx, unbondingTxWithNoQuorum); err != nil {
return err
}

// 2. Second process other failed unbonding transactions
unbondingTransactions, err := up.store.GetFailedUnbondingTransactions(ctx)

if err != nil {
return err
}

if err := up.processUnbondingTransactions(ctx, unbondingTransactions); err != nil {
Expand Down
11 changes: 11 additions & 0 deletions internal/services/unbonding_pipeline_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type PipelineMetrics struct {
FailedSigningReqs *prometheus.CounterVec
SuccessfulSentTransactions prometheus.Counter
FailureSentTransactions prometheus.Counter
FailureToGetCovenantQuorum prometheus.Counter
Config *config.MetricsConfig
}

Expand Down Expand Up @@ -41,6 +42,12 @@ func NewPipelineMetrics(cfg *config.MetricsConfig) *PipelineMetrics {
Help: "How many transactions failed to be sent to the network",
},
),
FailureToGetCovenantQuorum: prometheus.NewCounter(
prometheus.CounterOpts{
Name: "number_of_failed_covenant_quorums",
Help: "How many times we failed to get covenant quorum for signing request",
},
),
Config: cfg,
}
}
Expand All @@ -60,3 +67,7 @@ func (pm *PipelineMetrics) RecordSentUnbondingTransaction() {
func (pm *PipelineMetrics) RecordFailedUnbodingTransaction() {
pm.FailureSentTransactions.Inc()
}

func (pm *PipelineMetrics) RecordFailedCovenantQuorum() {
pm.FailureToGetCovenantQuorum.Inc()
}
Loading

0 comments on commit 6a4b91f

Please sign in to comment.