From ef190e39eeb2f0f4c677bca9e850fb7994cf4635 Mon Sep 17 00:00:00 2001 From: Dan Tudor Date: Fri, 3 Sep 2021 09:43:18 +0100 Subject: [PATCH] Feature/address meta (#10) * Add meta to address * Dont cache address in single index mode --- config/mappings/address.json | 4 +++ internal/indexer/indexer.go | 4 +-- internal/service/address/indexer.go | 36 +++++++++++++++----------- internal/service/address/repository.go | 2 ++ pkg/explorer/address.go | 2 ++ 5 files changed, 31 insertions(+), 17 deletions(-) diff --git a/config/mappings/address.json b/config/mappings/address.json index c79bf03..5068dab 100644 --- a/config/mappings/address.json +++ b/config/mappings/address.json @@ -25,6 +25,10 @@ }, "attempt": { "type": "integer" + }, + "meta": { + "type": "nested", + "properties": { } } } }, diff --git a/internal/indexer/indexer.go b/internal/indexer/indexer.go index 364fa2f..a455798 100644 --- a/internal/indexer/indexer.go +++ b/internal/indexer/indexer.go @@ -99,7 +99,7 @@ func (i indexer) index(height, target uint64, option IndexOption.IndexOption) er start := time.Now() if option == IndexOption.SingleIndex { - i.addressIndexer.Index(height, height, txs) + i.addressIndexer.Index(height, height, txs, option) return } @@ -112,7 +112,7 @@ func (i indexer) index(height, target uint64, option IndexOption.IndexOption) er if option == IndexOption.BatchIndex { from = height - i.elastic.GetBulkIndexSize() + 1 } - i.addressIndexer.Index(from, height, txs) + i.addressIndexer.Index(from, height, txs, option) } zap.L().With( zap.Duration("elapsed", time.Since(start)), diff --git a/internal/service/address/indexer.go b/internal/service/address/indexer.go index f93d7bc..4831157 100644 --- a/internal/service/address/indexer.go +++ b/internal/service/address/indexer.go @@ -4,6 +4,7 @@ import ( "context" "github.com/NavExplorer/navcoind-go" "github.com/NavExplorer/navexplorer-indexer-go/v2/internal/elastic_cache" + "github.com/NavExplorer/navexplorer-indexer-go/v2/internal/indexer/IndexOption" "github.com/NavExplorer/navexplorer-indexer-go/v2/internal/service/block" "github.com/NavExplorer/navexplorer-indexer-go/v2/pkg/explorer" "github.com/olivere/elastic/v7" @@ -16,13 +17,13 @@ import ( type Indexer interface { bulkPersist(bulk *elastic.BulkService) - Index(from, to uint64, txs []explorer.BlockTransaction) + Index(from, to uint64, txs []explorer.BlockTransaction, option IndexOption.IndexOption) ClearCache() - indexAddresses(from, to uint64) - indexMultiSigs(txs []explorer.BlockTransaction) - getAddress(hash string) explorer.Address + indexAddresses(from, to uint64, option IndexOption.IndexOption) + indexMultiSigs(txs []explorer.BlockTransaction, option IndexOption.IndexOption) + getAddress(hash string, useCache bool) explorer.Address updateAddress(address explorer.Address, history explorer.AddressHistory) error - getAndUpdateAddress(history explorer.AddressHistory) error + getAndUpdateAddress(history explorer.AddressHistory, useCache bool) error generateAddressHistory(start, end uint64, addresses []string) ([]explorer.AddressHistory, error) } @@ -70,7 +71,7 @@ func (i indexer) bulkPersist(bulk *elastic.BulkService) { } } -func (i indexer) Index(from, to uint64, txs []explorer.BlockTransaction) { +func (i indexer) Index(from, to uint64, txs []explorer.BlockTransaction, option IndexOption.IndexOption) { if len(txs) == 0 { return } @@ -80,12 +81,12 @@ func (i indexer) Index(from, to uint64, txs []explorer.BlockTransaction) { go func() { defer wg.Done() - i.indexAddresses(from, to) + i.indexAddresses(from, to, option) }() go func() { defer wg.Done() - i.indexMultiSigs(txs) + i.indexMultiSigs(txs, option) }() wg.Wait() @@ -95,7 +96,7 @@ func (i indexer) ClearCache() { i.cache.Flush() } -func (i indexer) indexAddresses(from, to uint64) { +func (i indexer) indexAddresses(from, to uint64, option IndexOption.IndexOption) { txs := make([]explorer.BlockTransaction, 0) for _, entity := range i.elastic.GetEntitiesByIndex(elastic_cache.BlockTransactionIndex.Get()) { txs = append(txs, entity.(explorer.BlockTransaction)) @@ -130,7 +131,7 @@ func (i indexer) indexAddresses(from, to uint64) { for _, history := range addressHistorys { i.elastic.AddIndexRequest(elastic_cache.AddressHistoryIndex.Get(), history) - err := i.getAndUpdateAddress(history) + err := i.getAndUpdateAddress(history, option == IndexOption.BatchIndex) if err != nil { zap.L().With( zap.Error(err), @@ -168,10 +169,10 @@ func chunkAddresses(addresses []string, chunks int) [][]string { return chunked } -func (i indexer) indexMultiSigs(txs []explorer.BlockTransaction) { +func (i indexer) indexMultiSigs(txs []explorer.BlockTransaction, option IndexOption.IndexOption) { for _, tx := range txs { for _, multiSig := range tx.GetAllMultiSigs() { - address := i.getAddress(multiSig.Key()) + address := i.getAddress(multiSig.Key(), option == IndexOption.BatchIndex) address.MultiSig = &multiSig if len(tx.GetAllMultiSigs()) == 0 { continue @@ -189,7 +190,12 @@ func (i indexer) indexMultiSigs(txs []explorer.BlockTransaction) { } } -func (i indexer) getAddress(hash string) explorer.Address { +func (i indexer) getAddress(hash string, useCache bool) explorer.Address { + if !useCache { + address := i.addressRepository.GetOrCreateAddress(hash) + return address + } + address, exists := i.cache.Get(hash) if exists == false { address := i.addressRepository.GetOrCreateAddress(hash) @@ -249,9 +255,9 @@ func (i indexer) generateAddressHistory(start, end uint64, addresses []string) ( return addressHistorys, nil } -func (i indexer) getAndUpdateAddress(history explorer.AddressHistory) error { +func (i indexer) getAndUpdateAddress(history explorer.AddressHistory, useCache bool) error { start := time.Now() - err := i.updateAddress(i.getAddress(history.Hash), history) + err := i.updateAddress(i.getAddress(history.Hash, useCache), history) zap.L().With( zap.Duration("elapsed", time.Since(start)), diff --git a/internal/service/address/repository.go b/internal/service/address/repository.go index 351b5dc..90f7199 100644 --- a/internal/service/address/repository.go +++ b/internal/service/address/repository.go @@ -154,6 +154,8 @@ func (r repository) CreateAddress(hash string, createdBlock uint64, createdTime } func (r repository) GetOrCreateAddress(hash string) explorer.Address { + zap.L().With(zap.String("hash", hash)).Info("AddressRepository: GetOrCreateAddress") + result, err := r.elastic.GetClient(). Search(elastic_cache.AddressIndex.Get()). Query(elastic.NewTermQuery("hash.keyword", hash)). diff --git a/pkg/explorer/address.go b/pkg/explorer/address.go index 1ad5297..40011c1 100644 --- a/pkg/explorer/address.go +++ b/pkg/explorer/address.go @@ -20,6 +20,8 @@ type Address struct { MultiSig *MultiSig `json:"multisig,omitempty"` + Meta map[string]string `json:"meta"` + // Transient RichList RichList `json:"rich_list,omitempty"` }