Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
charlie-wasp committed Aug 14, 2020
1 parent c8d83f2 commit f8a6fce
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 322 deletions.
2 changes: 1 addition & 1 deletion commands/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (cmd *ExportCommand) Execute() {
log.Fatal(err)
}

es.SerializeLedgerFromHistory(meta, &batchBuffer)
es.SerializeLedgerFromHistory(cmd.Config.NetworkPassphrase, meta, &batchBuffer)

if (ledgerSeq-cmd.firstLedger+1)%uint32(cmd.Config.BatchSize) == 0 || ledgerSeq == cmd.lastLedger {
payload := batchBuffer.String()
Expand Down
12 changes: 2 additions & 10 deletions config/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ var (
createIndexCommand = kingpin.Command("create-index", "Create ES indexes")
exportCommand = kingpin.Command("export", "Run export")
ingestCommand = kingpin.Command("ingest", "Start real time ingestion")
fastReplayCommand = kingpin.Command("fast-replay", "Experiment with using stellar-core fast in-memory replay catchup")
_ = kingpin.Command("stats", "Print database ledger statistics")
_ = kingpin.Command("es-stats", "Print ES ranges stats")

Expand Down Expand Up @@ -95,12 +94,8 @@ var (
Start = exportCommand.Arg("start", "Ledger to start indexing, +100 means offset 100 from the first").Default("0").Int()

// Count ledgers
Count = exportCommand.Arg("count", "Count of ledgers to ingest, should be aliquout batch size").Default("0").Int()
NetworkPassphrase = exportCommand.
Flag("network-passphrase", "Network passphrase to use").
Default("Test SDF Network ; September 2015").
OverrideDefaultFromEnvar("NETWORK_PASSPHRASE").
String()
Count = exportCommand.Arg("count", "Count of ledgers to ingest, should be aliquout batch size").Default("0").Int()
Network = exportCommand.Flag("network", "Stellar network to use").Default("testnet").Enum("public", "test")

// StartIngest ledger to start with ingesting
StartIngest = ingestCommand.Arg("start", "Ledger to start ingesting").Int()
Expand All @@ -113,7 +108,4 @@ var (

// ForceRecreateIndexes Allows indexes to be deleted before creation
ForceRecreateIndexes = createIndexCommand.Flag("force", "Delete indexes before creation").Bool()

FastReplayUpTo = fastReplayCommand.Arg("upto", "Ledger to start indexing").Int()
FastReplayCount = fastReplayCommand.Arg("count", "Ledgers count to catchup").Int()
)
26 changes: 0 additions & 26 deletions es/ledger_header.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package es

import (
"encoding/hex"
"time"

"github.com/astroband/astrologer/db"
"github.com/stellar/go/xdr"
)

// LedgerHeader represents json-serializable struct for LedgerHeader to index
Expand Down Expand Up @@ -46,30 +44,6 @@ func NewLedgerHeader(row *db.LedgerHeaderRow) *LedgerHeader {
}
}

func NewLedgerHeaderFromHistory(historyEntry xdr.LedgerHeaderHistoryEntry) *LedgerHeader {
header := historyEntry.Header
seq := int(header.LedgerSeq)
pagingToken := PagingToken{LedgerSeq: seq}

return &LedgerHeader{
ID: pagingToken.String(),
Hash: hex.EncodeToString(historyEntry.Hash[:]),
PrevHash: hex.EncodeToString(header.PreviousLedgerHash[:]),
BucketListHash: hex.EncodeToString(header.BucketListHash[:]),
Seq: seq,
PagingToken: pagingToken,
CloseTime: time.Unix(int64(header.ScpValue.CloseTime), 0),
Version: int(header.LedgerVersion),
TotalCoins: int(header.TotalCoins),
FeePool: int(header.FeePool),
InflationSeq: int(header.InflationSeq),
IDPool: int(header.IdPool),
BaseFee: int(header.BaseFee),
BaseReserve: int(header.BaseReserve),
MaxTxSetSize: int(header.MaxTxSetSize),
}
}

// DocID returns es id (seq number in this case)
func (h *LedgerHeader) DocID() *string {
s := h.PagingToken.String()
Expand Down
71 changes: 71 additions & 0 deletions es/ledger_serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package es

import (
"bytes"
// "encoding/base64"
"encoding/hex"
"fmt"
"log"

"github.com/astroband/astrologer/db"
"github.com/astroband/astrologer/util"
"github.com/stellar/go/xdr"
)

Expand Down Expand Up @@ -32,6 +36,73 @@ func SerializeLedger(ledgerRow db.LedgerHeaderRow, transactionRows []db.TxHistor
return serializer.serialize()
}

// SerializeLedger serializes ledger data into ES bulk index data
func SerializeLedgerFromHistory(networkPassphrase string, meta xdr.LedgerCloseMeta, buffer *bytes.Buffer) {
ledgerHeader := meta.V0.LedgerHeader.Header
// b, _ := meta.MarshalBinary()

// log.Println("********************************")
// base := base64.StdEncoding.EncodeToString(b)
// log.Println(base)
// log.Println("********************************")

ledgerRow := db.LedgerHeaderRow{
Hash: hex.EncodeToString(meta.V0.LedgerHeader.Hash[:]),
PrevHash: hex.EncodeToString(ledgerHeader.PreviousLedgerHash[:]),
BucketListHash: hex.EncodeToString(ledgerHeader.BucketListHash[:]),
LedgerSeq: int(ledgerHeader.LedgerSeq),
CloseTime: int64(ledgerHeader.ScpValue.CloseTime),
Data: ledgerHeader,
}

transactionRows := make([]db.TxHistoryRow, len(meta.V0.TxSet.Txs))
feeRows := make([]db.TxFeeHistoryRow, len(meta.V0.TxSet.Txs))

for i, txe := range meta.V0.TxSet.Txs {
txHash, hashErr := util.HashTransactionInEnvelope(txe, networkPassphrase)

if hashErr != nil {
log.Fatalf("Failed to hash transaction #%d in ledger %d\n", i, ledgerRow.LedgerSeq)
}

transactionRows[i] = db.TxHistoryRow{
ID: hex.EncodeToString(txHash[:]),
LedgerSeq: ledgerRow.LedgerSeq,
Index: i + 1,
Envelope: txe,
}

feeRows[i] = db.TxFeeHistoryRow{
TxID: transactionRows[i].ID,
LedgerSeq: ledgerRow.LedgerSeq,
Index: i + 1,
}

log.Println(ledgerRow.LedgerSeq)
log.Println(hex.EncodeToString(txHash[:]))
log.Println(hex.EncodeToString(meta.V0.TxProcessing[i].Result.TransactionHash[:]))
log.Println(i + 1)

for _, txp := range meta.V0.TxProcessing {
if transactionRows[i].ID == hex.EncodeToString(txp.Result.TransactionHash[:]) {
transactionRows[i].Result = txp.Result
transactionRows[i].Meta = txp.TxApplyProcessing
feeRows[i].Changes = txp.FeeProcessing
}
}
}

serializer := &ledgerSerializer{
ledgerRow: ledgerRow,
transactionRows: transactionRows,
feeRows: feeRows,
ledger: NewLedgerHeader(&ledgerRow),
buffer: buffer,
}

serializer.serialize()
}

func (s *ledgerSerializer) serialize() error {
SerializeForBulk(s.ledger, s.buffer)

Expand Down
174 changes: 0 additions & 174 deletions es/ledger_serializer_xdr.go

This file was deleted.

19 changes: 16 additions & 3 deletions es/serialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,22 @@ import (

// SerializeForBulk returns object serialized for elastic bulk indexing
func SerializeForBulk(obj Indexable, b *bytes.Buffer) {
meta := fmt.Sprintf(
`{ "index": { "_index": "%s", "_type": "_doc" } }%s`, obj.IndexName(), "\n",
)
var meta string

id := obj.DocID()

if id != nil {
meta = fmt.Sprintf(
`{ "index": { "_index": "%s", "_id": "%s", "_type": "_doc" } }%s`,
obj.IndexName(),
*id,
"\n",
)
} else {
meta = fmt.Sprintf(
`{ "index": { "_index": "%s", "_type": "_doc" } }%s`, obj.IndexName(), "\n",
)
}

data, err := json.Marshal(obj)
if err != nil {
Expand Down
Loading

0 comments on commit f8a6fce

Please sign in to comment.