Skip to content

Commit

Permalink
create export-tx
Browse files Browse the repository at this point in the history
  • Loading branch information
cayod committed Sep 26, 2023
1 parent 4053d24 commit 9013d60
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 0 deletions.
83 changes: 83 additions & 0 deletions cmd/export_tx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package cmd

import (
"fmt"

"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/stellar/stellar-etl/internal/input"
"github.com/stellar/stellar-etl/internal/transform"
"github.com/stellar/stellar-etl/internal/utils"
)

var txCmd = &cobra.Command{
Use: "export_tx",
Short: "Exports the tx transaction data over a specified range.",
Long: `Exports the tx transaction data over a specified range to an output file.`,
Run: func(cmd *cobra.Command, args []string) {
cmdLogger.SetLevel(logrus.InfoLevel)
endNum, strictExport, isTest, isFuture, extra := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = strictExport
startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
gcsBucket, gcpCredentials := utils.MustGcsFlags(cmd.Flags(), cmdLogger)
env := utils.GetEnvironmentDetails(isTest, isFuture)

tx, err := input.GetTransactions(startNum, endNum, limit, env)
if err != nil {
cmdLogger.Fatal("could not read tx: ", err)
}

outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
for _, transformInput := range tx {
transformed, err := transform.TransformTx(transformInput.Transaction, transformInput.LedgerHistory)
if err != nil {
ledgerSeq := transformInput.LedgerHistory.Header.LedgerSeq
cmdLogger.LogError(fmt.Errorf("could not transform tx transaction %d in ledger %d: ", transformInput.Transaction.Index, ledgerSeq))
numFailures += 1
continue
}

numBytes, err := exportEntry(transformed, outFile, extra)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not export transaction: %v", err))
numFailures += 1
continue
}
totalNumBytes += numBytes
}

outFile.Close()
cmdLogger.Info("Number of bytes written: ", totalNumBytes)

printTransformStats(len(tx), numFailures)

maybeUpload(gcpCredentials, gcsBucket, path)
},
}

func init() {
rootCmd.AddCommand(txCmd)
utils.AddCommonFlags(txCmd.Flags())
utils.AddArchiveFlags("tx", txCmd.Flags())
utils.AddGcsFlags(txCmd.Flags())
txCmd.MarkFlagRequired("end-ledger")

/*
Current flags:
start-ledger: the ledger sequence number for the beginning of the export period
end-ledger: the ledger sequence number for the end of the export range (*required)
limit: maximum number of tx to export
TODO: measure a good default value that ensures all tx within a 5 minute period will be exported with a single call
The current max_tx_set_size is 1000 and there are 60 new ledgers in a 5 minute period:
1000*60 = 60000
output-file: filename of the output file
TODO: implement extra flags if possible
serialize-method: the method for serialization of the output data (JSON, XDR, etc)
start and end time as a replacement for start and end sequence numbers
*/
}
10 changes: 10 additions & 0 deletions internal/transform/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,16 @@ type TransactionOutput struct {
SorobanResourcesWriteBytes uint32 `json:"soroban_resources_write_bytes"`
}

type TxOutput struct {
LedgerSequence uint32 `json:"ledger_sequence"`
TxEnvelope string `json:"tx_envelope"`
TxResult string `json:"tx_result"`
TxMeta string `json:"tx_meta"`
TxFeeMeta string `json:"tx_fee_meta"`
TxLedgerHistory string `json:"tx_ledger_history"`
ClosedAt time.Time `json:"closed_at"`
}

// AccountOutput is a representation of an account that aligns with the BigQuery table accounts
type AccountOutput struct {
AccountID string `json:"account_id"` // account address
Expand Down
58 changes: 58 additions & 0 deletions internal/transform/tx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package transform

import (
"fmt"

"github.com/stellar/stellar-etl/internal/utils"

"github.com/stellar/go/ingest"
"github.com/stellar/go/xdr"
)

// TransformTransaction converts a transaction from the history archive ingestion system into a form suitable for BigQuery
func TransformTx(transaction ingest.LedgerTransaction, lhe xdr.LedgerHeaderHistoryEntry) (TxOutput, error) {
ledgerHeader := lhe.Header
outputLedgerSequence := uint32(ledgerHeader.LedgerSeq)

outputTxEnvelope, err := xdr.MarshalBase64(transaction.Envelope)
if err != nil {
return TxOutput{}, err
}

outputTxResult, err := xdr.MarshalBase64(&transaction.Result)
if err != nil {
return TxOutput{}, err
}

outputTxMeta, err := xdr.MarshalBase64(transaction.UnsafeMeta)
if err != nil {
return TxOutput{}, err
}

outputTxFeeMeta, err := xdr.MarshalBase64(transaction.FeeChanges)
if err != nil {
return TxOutput{}, err
}

outputTxLedgerHistory, err := xdr.MarshalBase64(lhe)
if err != nil {
return TxOutput{}, err
}

outputCloseTime, err := utils.TimePointToUTCTimeStamp(ledgerHeader.ScpValue.CloseTime)
if err != nil {
return TxOutput{}, fmt.Errorf("could not convert close time to UTC timestamp: %v", err)
}

transformedTx := TxOutput{
LedgerSequence: outputLedgerSequence,
TxEnvelope: outputTxEnvelope,
TxResult: outputTxResult,
TxMeta: outputTxMeta,
TxFeeMeta: outputTxFeeMeta,
TxLedgerHistory: outputTxLedgerHistory,
ClosedAt: outputCloseTime,
}

return transformedTx, nil
}

0 comments on commit 9013d60

Please sign in to comment.