From 6dcf127802a4f899c3cb65f46cfef2928311a6cd Mon Sep 17 00:00:00 2001 From: Thomas van Dam Date: Wed, 27 Mar 2024 18:26:03 +0100 Subject: [PATCH] feat(plugin): publish tx data Closes: #223 --- plugins/indexing/base/transaction.go | 56 ++++++++++++++++++++++++++++ plugins/indexing/plugin.go | 12 ++++-- 2 files changed, 65 insertions(+), 3 deletions(-) create mode 100644 plugins/indexing/base/transaction.go diff --git a/plugins/indexing/base/transaction.go b/plugins/indexing/base/transaction.go new file mode 100644 index 00000000..d79f576f --- /dev/null +++ b/plugins/indexing/base/transaction.go @@ -0,0 +1,56 @@ +package base + +import ( + "encoding/hex" + "strings" + "time" + + abci "github.com/cometbft/cometbft/abci/types" + "github.com/cometbft/cometbft/crypto/tmhash" + "github.com/cosmos/cosmos-sdk/codec" + txtype "github.com/cosmos/cosmos-sdk/types/tx" + + types "github.com/sedaprotocol/seda-chain/plugins/indexing/types" +) + +type wrappedTx struct { + cdc codec.Codec + Tx *txtype.Tx +} + +func (s wrappedTx) MarshalJSON() ([]byte, error) { + return s.cdc.MarshalJSON(s.Tx) +} + +func ExtractTransactionUpdates(cdc codec.Codec, req abci.RequestFinalizeBlock, res abci.ResponseFinalizeBlock) ([]*types.Message, error) { + messages := make([]*types.Message, 0, len(req.Txs)) + + timestamp := req.Time + + for index := range req.Txs { + txBytes := req.Txs[index] + txResult := res.TxResults[index] + txHash := strings.ToUpper(hex.EncodeToString(tmhash.Sum(txBytes))) + + var tx txtype.Tx + if err := cdc.Unmarshal(txBytes, &tx); err != nil { + return nil, err + } + + data := struct { + Hash string `json:"hash"` + Time time.Time `json:"time"` + Tx *wrappedTx `json:"tx"` + Result *abci.ExecTxResult `json:"result"` + }{ + Hash: txHash, + Time: timestamp, + Tx: &wrappedTx{cdc: cdc, Tx: &tx}, + Result: txResult, + } + + messages = append(messages, types.NewMessage("tx", data)) + } + + return messages, nil +} diff --git a/plugins/indexing/plugin.go b/plugins/indexing/plugin.go index ae5762a1..3df7a010 100644 --- a/plugins/indexing/plugin.go +++ b/plugins/indexing/plugin.go @@ -48,10 +48,11 @@ func (p *IndexerPlugin) publishToQueue(messages []*types.Message) error { return nil } -func (p *IndexerPlugin) ListenFinalizeBlock(_ context.Context, req abci.RequestFinalizeBlock, _ abci.ResponseFinalizeBlock) error { +func (p *IndexerPlugin) ListenFinalizeBlock(_ context.Context, req abci.RequestFinalizeBlock, res abci.ResponseFinalizeBlock) error { p.logger.Debug(fmt.Sprintf("[%d] Start processing finalize block.", req.Height)) p.blockHeight = req.Height - var messages []*types.Message + // TODO(#229) Change to +2 to account for the votes message + messages := make([]*types.Message, 0, len(req.Txs)+1) blockMessage, err := base.ExtractBlockUpdate(req) if err != nil { @@ -60,7 +61,12 @@ func (p *IndexerPlugin) ListenFinalizeBlock(_ context.Context, req abci.RequestF } messages = append(messages, blockMessage) - // TODO(#223) Extract all transaction data. + txMessages, err := base.ExtractTransactionUpdates(p.cdc, req, res) + if err != nil { + p.logger.Error("Failed to extract Tx updates", "error", err) + return err + } + messages = append(messages, txMessages...) // TODO(#229) Extract all vote data. if err := p.publishToQueue(messages); err != nil {