Skip to content

Commit

Permalink
feat(plugin): publish tx data
Browse files Browse the repository at this point in the history
Closes: #223
  • Loading branch information
Thomasvdam committed Mar 28, 2024
1 parent d18ee12 commit 6dcf127
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 3 deletions.
56 changes: 56 additions & 0 deletions plugins/indexing/base/transaction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package base

import (
"encoding/hex"
"strings"
"time"

abci "github.com/cometbft/cometbft/abci/types"
"github.com/cometbft/cometbft/crypto/tmhash"
"github.com/cosmos/cosmos-sdk/codec"
txtype "github.com/cosmos/cosmos-sdk/types/tx"

types "github.com/sedaprotocol/seda-chain/plugins/indexing/types"
)

type wrappedTx struct {
cdc codec.Codec
Tx *txtype.Tx
}

func (s wrappedTx) MarshalJSON() ([]byte, error) {
return s.cdc.MarshalJSON(s.Tx)
}

func ExtractTransactionUpdates(cdc codec.Codec, req abci.RequestFinalizeBlock, res abci.ResponseFinalizeBlock) ([]*types.Message, error) {
messages := make([]*types.Message, 0, len(req.Txs))

timestamp := req.Time

for index := range req.Txs {
txBytes := req.Txs[index]
txResult := res.TxResults[index]
txHash := strings.ToUpper(hex.EncodeToString(tmhash.Sum(txBytes)))

var tx txtype.Tx
if err := cdc.Unmarshal(txBytes, &tx); err != nil {
return nil, err
}

data := struct {
Hash string `json:"hash"`
Time time.Time `json:"time"`
Tx *wrappedTx `json:"tx"`
Result *abci.ExecTxResult `json:"result"`
}{
Hash: txHash,
Time: timestamp,
Tx: &wrappedTx{cdc: cdc, Tx: &tx},
Result: txResult,
}

messages = append(messages, types.NewMessage("tx", data))
}

return messages, nil
}
12 changes: 9 additions & 3 deletions plugins/indexing/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ func (p *IndexerPlugin) publishToQueue(messages []*types.Message) error {
return nil
}

func (p *IndexerPlugin) ListenFinalizeBlock(_ context.Context, req abci.RequestFinalizeBlock, _ abci.ResponseFinalizeBlock) error {
func (p *IndexerPlugin) ListenFinalizeBlock(_ context.Context, req abci.RequestFinalizeBlock, res abci.ResponseFinalizeBlock) error {
p.logger.Debug(fmt.Sprintf("[%d] Start processing finalize block.", req.Height))
p.blockHeight = req.Height
var messages []*types.Message
// TODO(#229) Change to +2 to account for the votes message
messages := make([]*types.Message, 0, len(req.Txs)+1)

blockMessage, err := base.ExtractBlockUpdate(req)
if err != nil {
Expand All @@ -60,7 +61,12 @@ func (p *IndexerPlugin) ListenFinalizeBlock(_ context.Context, req abci.RequestF
}
messages = append(messages, blockMessage)

// TODO(#223) Extract all transaction data.
txMessages, err := base.ExtractTransactionUpdates(p.cdc, req, res)
if err != nil {
p.logger.Error("Failed to extract Tx updates", "error", err)
return err
}
messages = append(messages, txMessages...)
// TODO(#229) Extract all vote data.

if err := p.publishToQueue(messages); err != nil {
Expand Down

0 comments on commit 6dcf127

Please sign in to comment.