From e424cb0265958a9ebeed73b6a052b6fae561823d Mon Sep 17 00:00:00 2001 From: Dan Tudor Date: Thu, 22 Oct 2020 12:06:40 +0100 Subject: [PATCH] Clear Id (#5) --- internal/elastic_cache/elastic_cache.go | 29 ++++- internal/indexer/indexer.go | 12 +- internal/service/address/factory.go | 2 +- internal/service/address/indexer.go | 18 +-- internal/service/address/repository.go | 123 +++++++++--------- internal/service/address/rewinder.go | 4 +- internal/service/block/repository.go | 46 +++---- internal/service/dao/consensus/consensus.go | 4 +- internal/service/dao/consensus/indexer.go | 22 ++-- internal/service/dao/consensus/repository.go | 3 +- internal/service/dao/consensus/rewinder.go | 10 +- internal/service/dao/consensus/service.go | 6 +- internal/service/dao/consultation/indexer.go | 7 +- .../service/dao/consultation/repository.go | 3 +- .../service/dao/payment_request/indexer.go | 4 +- .../service/dao/payment_request/repository.go | 3 +- internal/service/dao/proposal/indexer.go | 4 +- internal/service/dao/proposal/repository.go | 3 +- internal/service/softfork/repository.go | 7 +- internal/service/softfork/rewinder.go | 6 +- internal/service/softfork/service.go | 44 ++++--- pkg/explorer/address.go | 16 ++- pkg/explorer/address_history.go | 10 ++ pkg/explorer/address_transaction.go | 9 +- pkg/explorer/block.go | 10 ++ pkg/explorer/block_transaction.go | 10 ++ pkg/explorer/cfund_payment_request.go | 13 +- pkg/explorer/cfund_proposal.go | 10 ++ pkg/explorer/dao_consensus_parameters.go | 14 +- pkg/explorer/dao_consultation.go | 14 +- pkg/explorer/entity.go | 2 + pkg/explorer/signal.go | 10 ++ pkg/explorer/soft_fork.go | 18 ++- pkg/explorer/vote.go | 10 ++ 34 files changed, 314 insertions(+), 192 deletions(-) diff --git a/internal/elastic_cache/elastic_cache.go b/internal/elastic_cache/elastic_cache.go index eac392f..d587eab 100644 --- a/internal/elastic_cache/elastic_cache.go +++ b/internal/elastic_cache/elastic_cache.go @@ -109,8 +109,13 @@ func (i *Index) AddRequest(index string, entity explorer.Entity, reqType Request "index": index, "type": reqType, "slug": entity.Slug(), + "id": entity.Id(), }).Debugf("AddRequest") + if reqType == UpdateRequest && entity.Id() == "" { + reqType = IndexRequest + } + request := Request{ Index: index, Entity: entity, @@ -120,12 +125,16 @@ func (i *Index) AddRequest(index string, entity explorer.Entity, reqType Request cached, found := i.cache.Get(entity.Slug()) if found == true { - logrus.WithField("persisted", cached.(Request).Persisted).Debugf("Found in cache %s: %s", cached.(Request).Index, cached.(Request).Entity.Slug()) - if cached.(Request).Persisted == false && reqType == UpdateRequest { + logrus.WithFields(logrus.Fields{ + "persisted": cached.(Request).Persisted, + "slug": cached.(Request).Entity.Slug(), + "id": cached.(Request).Entity.Id(), + }).Debug("Found in cache ", cached.(Request).Index) + + if cached.(Request).Persisted == false && cached.(Request).Entity.Id() == "" && reqType == UpdateRequest { logrus.Debugf("Switch update to index as not previously persisted %s", entity.Slug()) request.Type = IndexRequest } - request.Persisted = false } i.cache.Set(entity.Slug(), request, cache.DefaultExpiration) } @@ -168,6 +177,14 @@ func (i *Index) BatchPersist(height uint64) bool { logrus.Infof("Persisting data at height %d", height) i.Persist() + if config.Get().Reindex == true && height == config.Get().BulkIndexSize { + logrus.Error("Stopping reindex at height:", height) + for { + switch { + } + } + } + return true } @@ -175,9 +192,9 @@ func (i *Index) Persist() int { bulk := i.Client.Bulk() for _, r := range i.GetPendingRequests() { if r.Type == IndexRequest { - bulk.Add(elastic.NewBulkIndexRequest().Index(r.Index).Id(r.Entity.Slug()).Doc(r.Entity)) + bulk.Add(elastic.NewBulkIndexRequest().Index(r.Index).Doc(r.Entity)) } else if r.Type == UpdateRequest { - bulk.Add(elastic.NewBulkUpdateRequest().Index(r.Index).Id(r.Entity.Slug()).Doc(r.Entity)) + bulk.Add(elastic.NewBulkUpdateRequest().Index(r.Index).Id(r.Entity.Id()).Doc(r.Entity)) } r.Persisted = true i.cache.Set(r.Entity.Slug(), r, cache.DefaultExpiration) @@ -192,6 +209,7 @@ func (i *Index) Persist() int { } func (i *Index) persist(bulk *elastic.BulkService) { + actions := bulk.NumberOfActions() response, err := bulk.Do(context.Background()) if err != nil { logrus.WithError(err).Fatal("Failed to persist requests") @@ -206,6 +224,7 @@ func (i *Index) persist(bulk *elastic.BulkService) { } } } + logrus.Infof("Persisted %d actions", actions) } func (i *Index) DeleteHeightGT(height uint64, indices ...string) error { diff --git a/internal/indexer/indexer.go b/internal/indexer/indexer.go index aa4a59d..5b03e15 100644 --- a/internal/indexer/indexer.go +++ b/internal/indexer/indexer.go @@ -51,8 +51,6 @@ func NewIndexer( } func (i *Indexer) BulkIndex() { - log.Debug("Subscribe to 0MQ") - if err := i.Index(IndexOption.BatchIndex); err != nil { if err.Error() == "-8: Block height out of range" { i.elastic.Persist() @@ -103,9 +101,9 @@ func (i *Indexer) index(height uint64, option IndexOption.IndexOption) error { go func() { defer wg.Done() start := time.Now() - i.addressIndexer.Index(txs, b) + i.addressIndexer.Index(b, txs) elapsed := time.Since(start) - log.WithField("time", elapsed).Debugf("Indexed addresses at height %d", height) + log.WithField("time", elapsed).Infof("Indexed addresses at height %d", height) }() go func() { @@ -113,7 +111,7 @@ func (i *Indexer) index(height uint64, option IndexOption.IndexOption) error { start := time.Now() i.softForkIndexer.Index(b) elapsed := time.Since(start) - log.WithField("time", elapsed).Debugf("Indexed softforks at height %d", height) + log.WithField("time", elapsed).Infof("Index softforks: %d", height) }() go func() { @@ -121,13 +119,13 @@ func (i *Indexer) index(height uint64, option IndexOption.IndexOption) error { start := time.Now() i.daoIndexer.Index(b, txs, header) elapsed := time.Since(start) - log.WithField("time", elapsed).Debugf("Indexed dao at height %d", height) + log.WithField("time", elapsed).Infof("Index dao: %d", height) }() wg.Wait() elapsed := time.Since(start) - log.WithField("time", elapsed).Debugf("Indexed block at height %d", height) + log.WithField("time", elapsed).Infof("Index block: %d", height) log.Debugf("") LastBlockIndexed = height diff --git a/internal/service/address/factory.go b/internal/service/address/factory.go index 366dc51..433d83d 100644 --- a/internal/service/address/factory.go +++ b/internal/service/address/factory.go @@ -10,7 +10,7 @@ func CreateAddress(hash string, height uint64, time time.Time) *explorer.Address return &explorer.Address{Hash: hash, CreatedBlock: height, CreatedTime: time} } -func CreateAddressHistory(history *navcoind.AddressHistory, tx *explorer.BlockTransaction, block *explorer.Block) *explorer.AddressHistory { +func CreateAddressHistory(history *navcoind.AddressHistory, tx *explorer.BlockTransaction) *explorer.AddressHistory { h := &explorer.AddressHistory{ Height: history.Block, TxIndex: history.TxIndex, diff --git a/internal/service/address/indexer.go b/internal/service/address/indexer.go index 9007245..64aa532 100644 --- a/internal/service/address/indexer.go +++ b/internal/service/address/indexer.go @@ -17,12 +17,12 @@ func NewIndexer(navcoin *navcoind.Navcoind, elastic *elastic_cache.Index, repo * return &Indexer{navcoin, elastic, repo} } -func (i *Indexer) Index(txs []*explorer.BlockTransaction, block *explorer.Block) { +func (i *Indexer) Index(block *explorer.Block, txs []*explorer.BlockTransaction) { if len(txs) == 0 { return } - for _, addressHistory := range i.generateAddressHistory(block, txs) { + for _, addressHistory := range i.generateAddressHistory(&block.Height, &block.Height, txs) { i.elastic.AddIndexRequest(elastic_cache.AddressHistoryIndex.Get(), addressHistory) err := i.updateAddress(addressHistory, block) @@ -32,18 +32,18 @@ func (i *Indexer) Index(txs []*explorer.BlockTransaction, block *explorer.Block) } } -func (i *Indexer) generateAddressHistory(block *explorer.Block, txs []*explorer.BlockTransaction) []*explorer.AddressHistory { +func (i *Indexer) generateAddressHistory(start, end *uint64, txs []*explorer.BlockTransaction) []*explorer.AddressHistory { addressHistory := make([]*explorer.AddressHistory, 0) addresses := getAddressesForTxs(txs) - history, err := i.navcoin.GetAddressHistory(&block.Height, &block.Height, addresses...) + history, err := i.navcoin.GetAddressHistory(start, end, addresses...) if err != nil { - log.WithError(err).Errorf("Could not get address history for height: %d", block.Height) + log.WithError(err).Errorf("Could not get address history for height: %d-%d", start, end) return addressHistory } for _, h := range history { - addressHistory = append(addressHistory, CreateAddressHistory(h, getTxsById(h.TxId, txs), block)) + addressHistory = append(addressHistory, CreateAddressHistory(h, getTxById(h.TxId, txs))) } return addressHistory @@ -81,12 +81,12 @@ func getAddressesForTxs(txs []*explorer.BlockTransaction) []string { return addresses } -func getTxsById(txid string, txs []*explorer.BlockTransaction) *explorer.BlockTransaction { +func getTxById(id string, txs []*explorer.BlockTransaction) *explorer.BlockTransaction { for _, tx := range txs { - if tx.Txid == txid { + if tx.Txid == id { return tx } } - log.Fatal("Could not match tx") + log.Fatal("Could not match tx: ", id) return nil } diff --git a/internal/service/address/repository.go b/internal/service/address/repository.go index fefaa25..e30a7fe 100644 --- a/internal/service/address/repository.go +++ b/internal/service/address/repository.go @@ -6,7 +6,6 @@ import ( "errors" "github.com/NavExplorer/navexplorer-indexer-go/v2/internal/elastic_cache" "github.com/NavExplorer/navexplorer-indexer-go/v2/pkg/explorer" - "github.com/getsentry/raven-go" "github.com/olivere/elastic/v7" log "github.com/sirupsen/logrus" "strings" @@ -24,110 +23,84 @@ func NewRepo(Client *elastic.Client) *Repository { return &Repository{Client} } -func (r *Repository) GetAddress(hash string) (*explorer.Address, error) { - log.Debugf("GetAddress(hash:%s)", hash) - - results, err := r.Client.Search(elastic_cache.AddressIndex.Get()). - Query(elastic.NewTermQuery("hash.keyword", hash)). +func (r *Repository) GetBestHeight() (uint64, error) { + result, err := r.Client.Search(elastic_cache.AddressIndex.Get()). + Sort("height", false). Size(1). Do(context.Background()) if err != nil { - raven.CaptureError(err, nil) - return nil, err + return 0, err } - if results == nil || len(results.Hits.Hits) != 1 { - return nil, errors.New("Invalid result found") + address, err := r.findOneAddress(result) + if err != nil { + return 0, err } - var address *explorer.Address - if err = json.Unmarshal(results.Hits.Hits[0].Source, &address); err != nil { + return address.Height, nil +} + +func (r *Repository) GetAddress(hash string) (*explorer.Address, error) { + result, err := r.Client.Search(elastic_cache.AddressIndex.Get()). + Query(elastic.NewTermQuery("hash.keyword", hash)). + Size(1). + Do(context.Background()) + if err != nil { return nil, err } - return address, nil + return r.findOneAddress(result) } func (r *Repository) GetAddresses(hashes []string) ([]*explorer.Address, error) { - log.Debugf("GetAddresses([%s])", strings.Join(hashes, ",")) - - results, err := r.Client.Search(elastic_cache.AddressIndex.Get()). + result, err := r.Client.Search(elastic_cache.AddressIndex.Get()). Query(elastic.NewMatchQuery("hash", strings.Join(hashes, " "))). Size(len(hashes)). Do(context.Background()) - if err != nil || results == nil { + if err != nil { return nil, err } - addresses := make([]*explorer.Address, 0) - for _, hit := range results.Hits.Hits { - var address *explorer.Address - if err = json.Unmarshal(hit.Source, &address); err != nil { - return nil, err - } - addresses = append(addresses, address) - } - - return addresses, nil + return r.findManyAddress(result) } func (r *Repository) GetAddressesHeightGt(height uint64) ([]*explorer.Address, error) { - log.Debugf("GetAddressesHeightGt(height:%d)", height) - - results, err := r.Client. + result, err := r.Client. Search(elastic_cache.AddressIndex.Get()). Query(elastic.NewRangeQuery("height").Gt(height)). Size(50000). Do(context.Background()) - if err != nil || results == nil { + if err != nil { return nil, err } - addresses := make([]*explorer.Address, 0) - for _, hit := range results.Hits.Hits { - var address *explorer.Address - if err = json.Unmarshal(hit.Source, &address); err != nil { - return nil, err - } - addresses = append(addresses, address) - } - - return addresses, nil + return r.findManyAddress(result) } func (r *Repository) GetOrCreateAddress(hash string, block *explorer.Block) (*explorer.Address, error) { - log.WithField("address", hash).Debug("GetOrCreateAddress") - - results, err := r.Client. + result, err := r.Client. Search(elastic_cache.AddressIndex.Get()). - Query(elastic.NewMatchQuery("hash", hash)). - Size(1). + Query(elastic.NewTermQuery("hash.keyword", hash)). Do(context.Background()) - if err != nil || results == nil { + if err != nil { return nil, err } - var address *explorer.Address - if results.TotalHits() == 0 { - address = CreateAddress(hash, block.Height, block.MedianTime) - _, err := r.Client. - Index(). + if result.TotalHits() == 0 { + address := CreateAddress(hash, block.Height, block.MedianTime) + result, err := r.Client.Index(). Index(elastic_cache.AddressIndex.Get()). - Id(address.Slug()). BodyJson(address). Do(context.Background()) if err != nil { return nil, err } + address.SetId(result.Id) return address, nil } - if err = json.Unmarshal(results.Hits.Hits[0].Source, &address); err != nil { - return nil, err - } - - return address, nil + return r.findOneAddress(result) } func (r *Repository) GetLatestHistoryByHash(hash string) (*explorer.AddressHistory, error) { @@ -151,6 +124,40 @@ func (r *Repository) GetLatestHistoryByHash(hash string) (*explorer.AddressHisto if err != nil { return nil, err } + history.SetId(hit.Id) return history, err } + +func (r *Repository) findOneAddress(result *elastic.SearchResult) (*explorer.Address, error) { + if result == nil || len(result.Hits.Hits) != 1 { + return nil, errors.New("Invalid result") + } + + var address *explorer.Address + hit := result.Hits.Hits[0] + if err := json.Unmarshal(hit.Source, &address); err != nil { + return nil, err + } + address.SetId(hit.Id) + + return address, nil +} + +func (r *Repository) findManyAddress(result *elastic.SearchResult) ([]*explorer.Address, error) { + if result == nil { + return nil, errors.New("Invalid result") + } + + addresses := make([]*explorer.Address, 0) + for _, hit := range result.Hits.Hits { + var address *explorer.Address + if err := json.Unmarshal(hit.Source, &address); err != nil { + return nil, err + } + address.SetId(hit.Id) + addresses = append(addresses, address) + } + + return addresses, nil +} diff --git a/internal/service/address/rewinder.go b/internal/service/address/rewinder.go index d78bde9..34adb93 100644 --- a/internal/service/address/rewinder.go +++ b/internal/service/address/rewinder.go @@ -63,9 +63,9 @@ func (r *Rewinder) ResetAddress(address *explorer.Address) error { _, err = r.elastic.Client.Index(). Index(elastic_cache.AddressIndex.Get()). + Id(address.Id()). BodyJson(address). - Id(address.Slug()). Do(context.Background()) - return nil + return err } diff --git a/internal/service/block/repository.go b/internal/service/block/repository.go index eb2c8e8..ee13fea 100644 --- a/internal/service/block/repository.go +++ b/internal/service/block/repository.go @@ -21,24 +21,25 @@ func NewRepo(elastic *elastic_cache.Index) *Repository { } func (r *Repository) GetBestBlock() (*explorer.Block, error) { - results, err := r.elastic.Client. + result, err := r.elastic.Client. Search(elastic_cache.BlockIndex.Get()). Sort("height", false). Size(1). Do(context.Background()) - if err != nil || results == nil { + if err != nil || result == nil { return nil, err } - if len(results.Hits.Hits) == 0 { + if len(result.Hits.Hits) == 0 { return nil, ErrBlockNotFound } var block *explorer.Block - if err = json.Unmarshal(results.Hits.Hits[0].Source, &block); err != nil { + hit := result.Hits.Hits[0] + if err = json.Unmarshal(hit.Source, &block); err != nil { return nil, err } - + block.SetId(hit.Id) return block, nil } @@ -58,47 +59,44 @@ func (r *Repository) GetBlockByHash(hash string) (*explorer.Block, error) { Size(1). Do(context.Background()) if err != nil || results == nil { - raven.CaptureError(err, nil) return nil, err } if len(results.Hits.Hits) == 0 { - raven.CaptureError(err, nil) return nil, elastic_cache.ErrRecordNotFound } var block *explorer.Block hit := results.Hits.Hits[0] if err = json.Unmarshal(hit.Source, &block); err != nil { - raven.CaptureError(err, nil) return nil, err } + block.SetId(hit.Id) return block, nil } func (r *Repository) GetTransactionsByBlock(block *explorer.Block) ([]*explorer.BlockTransaction, error) { - results, err := r.elastic.Client. + result, err := r.elastic.Client. Search(elastic_cache.BlockTransactionIndex.Get()). Query(elastic.NewMatchQuery("height", block.Height)). Do(context.Background()) - if err != nil || results == nil { - raven.CaptureError(err, nil) + if err != nil || result == nil { return nil, err } - if len(results.Hits.Hits) == 0 { - raven.CaptureError(err, nil) + if len(result.Hits.Hits) == 0 { return nil, elastic_cache.ErrRecordNotFound } var txs []*explorer.BlockTransaction - for _, hit := range results.Hits.Hits { + for _, hit := range result.Hits.Hits { var tx *explorer.BlockTransaction if err = json.Unmarshal(hit.Source, &tx); err != nil { raven.CaptureError(err, nil) return nil, err } + tx.SetId(hit.Id) txs = append(txs, tx) } @@ -114,14 +112,13 @@ func (r *Repository) GetTransactionByHash(hash string) (*explorer.BlockTransacti getTransactionByHash := func(hash string) (*elastic.SearchResult, error) { return r.elastic.Client. Search(elastic_cache.BlockTransactionIndex.Get()). - Query(elastic.NewMatchQuery("hash", hash)). + Query(elastic.NewTermQuery("hash.keyword", hash)). Size(1). Do(context.Background()) } results, err := getTransactionByHash(hash) if err != nil || results == nil { - raven.CaptureError(err, nil) return nil, err } @@ -130,7 +127,6 @@ func (r *Repository) GetTransactionByHash(hash string) (*explorer.BlockTransacti time.Sleep(5 * time.Second) results, err = getTransactionByHash(hash) if err != nil || results == nil { - raven.CaptureError(err, nil) return nil, err } @@ -142,9 +138,9 @@ func (r *Repository) GetTransactionByHash(hash string) (*explorer.BlockTransacti var tx *explorer.BlockTransaction hit := results.Hits.Hits[0] if err = json.Unmarshal(hit.Source, &tx); err != nil { - raven.CaptureError(err, nil) return nil, err } + tx.SetId(hit.Id) return tx, nil } @@ -156,21 +152,19 @@ func (r *Repository) GetBlockByHeight(height uint64) (*explorer.Block, error) { Size(1). Do(context.Background()) if err != nil || results == nil { - raven.CaptureError(err, nil) return nil, err } if len(results.Hits.Hits) == 0 { - raven.CaptureError(err, nil) return nil, elastic_cache.ErrRecordNotFound } var block *explorer.Block hit := results.Hits.Hits[0] if err = json.Unmarshal(hit.Source, &block); err != nil { - raven.CaptureError(err, nil) return nil, err } + block.SetId(hit.Id) return block, nil } @@ -183,7 +177,6 @@ func (r *Repository) GetBlocksBetweenHeight(start uint64, end uint64) ([]*explor Size(int(end - start)). Do(context.Background()) if err != nil || results == nil { - raven.CaptureError(err, nil) return nil, err } @@ -191,9 +184,9 @@ func (r *Repository) GetBlocksBetweenHeight(start uint64, end uint64) ([]*explor for _, hit := range results.Hits.Hits { var block *explorer.Block if err = json.Unmarshal(hit.Source, &block); err != nil { - raven.CaptureError(err, nil) return nil, err } + block.SetId(hit.Id) blocks = append(blocks, block) } @@ -201,15 +194,11 @@ func (r *Repository) GetBlocksBetweenHeight(start uint64, end uint64) ([]*explor } func (r *Repository) GetTransactionsWithCfundPayment() error { - query := elastic.NewBoolQuery() - query = query.Must(elastic.NewMatchQuery("vout.scriptPubKey.type.keyword", explorer.VoutCfundContribution)) - results, err := r.elastic.Client.Search(elastic_cache.BlockTransactionIndex.Get()). - Query(query). + Query(elastic.NewMatchQuery("vout.scriptPubKey.type.keyword", explorer.VoutCfundContribution)). Do(context.Background()) if err != nil || results == nil { - raven.CaptureError(err, nil) return err } @@ -241,6 +230,7 @@ func (r *Repository) GetAllTransactionsThatIncludeAddress(hash string) ([]*explo raven.CaptureError(err, nil) return nil, err } + tx.SetId(hit.Id) txs = append(txs, tx) } } diff --git a/internal/service/dao/consensus/consensus.go b/internal/service/dao/consensus/consensus.go index 9a511e3..d3b0d41 100644 --- a/internal/service/dao/consensus/consensus.go +++ b/internal/service/dao/consensus/consensus.go @@ -47,7 +47,7 @@ var ( func (p *consensusParameters) Get(parameter Parameter) *explorer.ConsensusParameter { for _, c := range Parameters { - if parameter == Parameter(c.Id) { + if parameter == Parameter(c.Uid) { return c } } @@ -59,7 +59,7 @@ func (p *consensusParameters) Get(parameter Parameter) *explorer.ConsensusParame func (p *consensusParameters) GetById(id int) *explorer.ConsensusParameter { for _, c := range Parameters { - if id == c.Id { + if id == c.Uid { return c } } diff --git a/internal/service/dao/consensus/indexer.go b/internal/service/dao/consensus/indexer.go index f96c7cb..b4b1f13 100644 --- a/internal/service/dao/consensus/indexer.go +++ b/internal/service/dao/consensus/indexer.go @@ -31,7 +31,7 @@ func (i *Indexer) Index() error { for _, initialParameter := range initialParameters { for _, consensusParameter := range consensusParameters { - if initialParameter.Id == consensusParameter.Id { + if initialParameter.Uid == consensusParameter.Uid { i.elastic.AddUpdateRequest(elastic_cache.ConsensusIndex.Get(), initialParameter) c = append(c, consensusParameter) } @@ -45,15 +45,17 @@ func (i *Indexer) Index() error { func (i *Indexer) Update(block *explorer.Block) { for _, p := range Parameters { - if p.UpdatedOnBlock == block.Height { - _, err := i.elastic.Client.Index(). - Index(elastic_cache.ConsensusIndex.Get()). - Id(p.Slug()). - BodyJson(p). - Do(context.Background()) - if err != nil { - log.WithError(err).Fatal("Failed to persist consensus change") - } + if p.UpdatedOnBlock != block.Height { + continue + } + + _, err := i.elastic.Client.Index(). + Index(elastic_cache.ConsensusIndex.Get()). + Id(p.Id()). + BodyJson(p). + Do(context.Background()) + if err != nil { + log.WithError(err).Fatal("Failed to persist consensus change") } } } diff --git a/internal/service/dao/consensus/repository.go b/internal/service/dao/consensus/repository.go index 7a89ba2..4462072 100644 --- a/internal/service/dao/consensus/repository.go +++ b/internal/service/dao/consensus/repository.go @@ -5,7 +5,6 @@ import ( "encoding/json" "github.com/NavExplorer/navexplorer-indexer-go/v2/internal/elastic_cache" "github.com/NavExplorer/navexplorer-indexer-go/v2/pkg/explorer" - "github.com/getsentry/raven-go" "github.com/olivere/elastic/v7" ) @@ -23,7 +22,6 @@ func (r *Repository) GetConsensusParameters() ([]*explorer.ConsensusParameter, e Size(10000). Do(context.Background()) if err != nil || results == nil { - raven.CaptureError(err, nil) return nil, err } @@ -37,6 +35,7 @@ func (r *Repository) GetConsensusParameters() ([]*explorer.ConsensusParameter, e if err = json.Unmarshal(hit.Source, &consensusParameter); err != nil { return nil, err } + consensusParameter.SetId(hit.Id) consensusParameters = append(consensusParameters, consensusParameter) } diff --git a/internal/service/dao/consensus/rewinder.go b/internal/service/dao/consensus/rewinder.go index 4a66048..39f42f0 100644 --- a/internal/service/dao/consensus/rewinder.go +++ b/internal/service/dao/consensus/rewinder.go @@ -31,7 +31,7 @@ func (r *Rewinder) Rewind(consultations []*explorer.Consultation) error { for _, c := range consultations { for _, p := range parameters { - if c.Min == p.Id { + if c.Min == p.Uid { value, _ := strconv.Atoi(c.GetPassedAnswer().Answer) log.WithFields(log.Fields{"old": p.Value, "new": value, "desc": p.Description}).Info("Update consensus parameter") p.Value = value @@ -40,11 +40,11 @@ func (r *Rewinder) Rewind(consultations []*explorer.Consultation) error { } } - for _, p := range parameters { - _, err = r.elastic.Client.Index(). + for idx := range parameters { + _, err := r.elastic.Client.Index(). Index(elastic_cache.ConsensusIndex.Get()). - Id(p.Slug()). - BodyJson(p). + Id(parameters[idx].Id()). + BodyJson(parameters[idx]). Do(context.Background()) if err != nil { log.WithError(err).Fatal("Failed to get consensus parameters from repo") diff --git a/internal/service/dao/consensus/service.go b/internal/service/dao/consensus/service.go index f787f2a..9f67747 100644 --- a/internal/service/dao/consensus/service.go +++ b/internal/service/dao/consensus/service.go @@ -36,15 +36,15 @@ func (s *Service) InitConsensusParameters() { initialParams, _ := s.InitialState() for _, initialParam := range initialParams { - initialParam.UpdatedOnBlock = 0 - _, err := s.elastic.Client.Index(). + result, err := s.elastic.Client.Index(). Index(elastic_cache.ConsensusIndex.Get()). - Id(initialParam.Slug()). BodyJson(initialParam). Do(context.Background()) if err != nil { log.WithError(err).Fatal("Failed to save new softfork") } + initialParam.SetId(result.Id) + initialParam.UpdatedOnBlock = 0 log.Info("Saving new consensus parameter: ", initialParam.Description) } diff --git a/internal/service/dao/consultation/indexer.go b/internal/service/dao/consultation/indexer.go index d3459fa..f6f996e 100644 --- a/internal/service/dao/consultation/indexer.go +++ b/internal/service/dao/consultation/indexer.go @@ -31,16 +31,15 @@ func (i *Indexer) Index(txs []*explorer.BlockTransaction) { if navC, err := i.navcoin.GetConsultation(tx.Hash); err == nil { consultation := CreateConsultation(navC, tx) - index := elastic_cache.DaoConsultationIndex.Get() - _, err := i.elastic.Client.Index(). - Index(index). - Id(consultation.Slug()). + result, err := i.elastic.Client.Index(). + Index(elastic_cache.DaoConsultationIndex.Get()). BodyJson(consultation). Do(context.Background()) if err != nil { raven.CaptureError(err, nil) log.WithError(err).Fatal("Failed to save new consultation") } + consultation.SetId(result.Id) Consultations.Add(consultation) } else { diff --git a/internal/service/dao/consultation/repository.go b/internal/service/dao/consultation/repository.go index 35547d2..ef23892 100644 --- a/internal/service/dao/consultation/repository.go +++ b/internal/service/dao/consultation/repository.go @@ -31,7 +31,6 @@ func (r *Repository) GetOpenConsultations(height uint64) ([]*explorer.Consultati Sort("updatedOnBlock", false). Do(context.Background()) if err != nil { - raven.CaptureError(err, nil) return nil, err } @@ -41,6 +40,7 @@ func (r *Repository) GetOpenConsultations(height uint64) ([]*explorer.Consultati if err := json.Unmarshal(hit.Source, &consultation); err != nil { log.WithError(err).Fatal("Failed to unmarshall consultation") } + consultation.SetId(hit.Id) consultations = append(consultations, consultation) } } @@ -72,6 +72,7 @@ func (r *Repository) GetPassedConsultations(maxHeight uint64) ([]*explorer.Consu log.WithError(err).Fatal("Failed to unmarshall consultation") } if consultation.HasPassedAnswer() { + consultation.SetId(hit.Id) consultations = append(consultations, consultation) } } diff --git a/internal/service/dao/payment_request/indexer.go b/internal/service/dao/payment_request/indexer.go index 905aa9b..e96f51c 100644 --- a/internal/service/dao/payment_request/indexer.go +++ b/internal/service/dao/payment_request/indexer.go @@ -28,15 +28,15 @@ func (i *Indexer) Index(txs []*explorer.BlockTransaction) { paymentRequest := CreatePaymentRequest(navP, tx.Height) index := elastic_cache.PaymentRequestIndex.Get() - _, err := i.elastic.Client.Index(). + result, err := i.elastic.Client.Index(). Index(index). - Id(paymentRequest.Slug()). BodyJson(paymentRequest). Do(context.Background()) if err != nil { raven.CaptureError(err, nil) log.WithError(err).Fatal("Failed to save new payment request") } + paymentRequest.SetId(result.Id) PaymentRequests = append(PaymentRequests, paymentRequest) } } diff --git a/internal/service/dao/payment_request/repository.go b/internal/service/dao/payment_request/repository.go index 38aa03a..f30b991 100644 --- a/internal/service/dao/payment_request/repository.go +++ b/internal/service/dao/payment_request/repository.go @@ -5,7 +5,6 @@ import ( "encoding/json" "github.com/NavExplorer/navexplorer-indexer-go/v2/internal/elastic_cache" "github.com/NavExplorer/navexplorer-indexer-go/v2/pkg/explorer" - "github.com/getsentry/raven-go" "github.com/olivere/elastic/v7" log "github.com/sirupsen/logrus" ) @@ -31,7 +30,6 @@ func (r *Repository) GetPossibleVotingRequests(height uint64) ([]*explorer.Payme Sort("updatedOnBlock", false). Do(context.Background()) if err != nil { - raven.CaptureError(err, nil) return nil, err } @@ -41,6 +39,7 @@ func (r *Repository) GetPossibleVotingRequests(height uint64) ([]*explorer.Payme if err := json.Unmarshal(hit.Source, &paymentRequest); err != nil { log.WithError(err).Fatal("Failed to unmarshall payment request") } + paymentRequest.SetId(hit.Id) paymentRequests = append(paymentRequests, paymentRequest) } } diff --git a/internal/service/dao/proposal/indexer.go b/internal/service/dao/proposal/indexer.go index 23d2632..83740b3 100644 --- a/internal/service/dao/proposal/indexer.go +++ b/internal/service/dao/proposal/indexer.go @@ -28,15 +28,15 @@ func (i *Indexer) Index(txs []*explorer.BlockTransaction) { if navP, err := i.navcoin.GetProposal(tx.Hash); err == nil { proposal := CreateProposal(navP, tx.Height) - _, err := i.elastic.Client.Index(). + result, err := i.elastic.Client.Index(). Index(elastic_cache.ProposalIndex.Get()). - Id(proposal.Slug()). BodyJson(proposal). Do(context.Background()) if err != nil { raven.CaptureError(err, nil) log.WithError(err).Fatal("Failed to save new proposal") } + proposal.SetId(result.Id) Proposals = append(Proposals, proposal) } } diff --git a/internal/service/dao/proposal/repository.go b/internal/service/dao/proposal/repository.go index 23ce0e1..f8c05ab 100644 --- a/internal/service/dao/proposal/repository.go +++ b/internal/service/dao/proposal/repository.go @@ -5,7 +5,6 @@ import ( "encoding/json" "github.com/NavExplorer/navexplorer-indexer-go/v2/internal/elastic_cache" "github.com/NavExplorer/navexplorer-indexer-go/v2/pkg/explorer" - "github.com/getsentry/raven-go" "github.com/olivere/elastic/v7" log "github.com/sirupsen/logrus" ) @@ -31,7 +30,6 @@ func (r *Repository) GetPossibleVotingProposals(height uint64) ([]*explorer.Prop Sort("updatedOnBlock", false). Do(context.Background()) if err != nil { - raven.CaptureError(err, nil) return nil, err } @@ -41,6 +39,7 @@ func (r *Repository) GetPossibleVotingProposals(height uint64) ([]*explorer.Prop if err := json.Unmarshal(hit.Source, &proposal); err != nil { log.WithError(err).Fatal("Failed to unmarshall proposal") } + proposal.SetId(hit.Id) proposals = append(proposals, proposal) } } diff --git a/internal/service/softfork/repository.go b/internal/service/softfork/repository.go index 49eeb1a..918a2ab 100644 --- a/internal/service/softfork/repository.go +++ b/internal/service/softfork/repository.go @@ -18,7 +18,7 @@ func NewRepo(client *elastic.Client) *Repository { return &Repository{client} } -func (r *Repository) GetSoftForks() ([]*explorer.SoftFork, error) { +func (r *Repository) GetSoftForks() (explorer.SoftForks, error) { var softForks []*explorer.SoftFork results, err := r.Client.Search(elastic_cache.SoftForkIndex.Get()). @@ -36,6 +36,7 @@ func (r *Repository) GetSoftForks() ([]*explorer.SoftFork, error) { if err := json.Unmarshal(hit.Source, &softFork); err != nil { log.WithError(err).Fatal("Failed to unmarshall soft fork") } + softFork.SetId(hit.Id) softForks = append(softForks, softFork) } @@ -56,10 +57,12 @@ func (r *Repository) GetSoftFork(name string) (*explorer.SoftFork, error) { return nil, errors.New("Invalid result found") } - err = json.Unmarshal(results.Hits.Hits[0].Source, &softfork) + hit := results.Hits.Hits[0] + err = json.Unmarshal(hit.Source, &softfork) if err != nil { return nil, err } + softfork.SetId(hit.Id) return softfork, nil } diff --git a/internal/service/softfork/rewinder.go b/internal/service/softfork/rewinder.go index 10feede..7020d1d 100644 --- a/internal/service/softfork/rewinder.go +++ b/internal/service/softfork/rewinder.go @@ -2,6 +2,7 @@ package softfork import ( "context" + "encoding/json" "github.com/NavExplorer/navexplorer-indexer-go/v2/internal/config" "github.com/NavExplorer/navexplorer-indexer-go/v2/internal/elastic_cache" "github.com/NavExplorer/navexplorer-indexer-go/v2/internal/service/softfork/signal" @@ -36,6 +37,7 @@ func (r *Rewinder) Rewind(height uint64) error { StartTime: s.StartTime, Timeout: s.Timeout, } + SoftForks[idx].SetId(s.Id()) } start := uint64(1) @@ -92,9 +94,11 @@ func (r *Rewinder) Rewind(height uint64) error { bulk := r.elastic.Client.Bulk() for _, sf := range SoftForks { + sfJson, _ := json.Marshal(sf) + log.WithField("softfork", string(sfJson)).Info("Updating softFork ", sf.Name) bulk.Add(elastic.NewBulkUpdateRequest(). Index(elastic_cache.SoftForkIndex.Get()). - Id(sf.Slug()). + Id(sf.Id()). Doc(sf)) } diff --git a/internal/service/softfork/service.go b/internal/service/softfork/service.go index 783a844..d7b8fa2 100644 --- a/internal/service/softfork/service.go +++ b/internal/service/softfork/service.go @@ -29,28 +29,34 @@ func (i *Service) InitSoftForks() { log.WithError(err).Fatal("Failed to get blockchaininfo") } + SoftForks, err = i.repo.GetSoftForks() + if err != nil && err != elastic_cache.ErrResultsNotFound { + log.WithError(err).Fatal("Failed to get soft forks") + return + } + for name, bip9fork := range info.Bip9SoftForks { - softFork := &explorer.SoftFork{ - Name: name, - SignalBit: bip9fork.Bit, - State: explorer.SoftForkDefined, - StartTime: time.Unix(int64(bip9fork.StartTime), 0), - Timeout: time.Unix(int64(bip9fork.Timeout), 0), - ActivationHeight: 0, - LockedInHeight: 0, - } + if !SoftForks.HasSoftFork(name) { + softFork := &explorer.SoftFork{ + Name: name, + SignalBit: bip9fork.Bit, + State: explorer.SoftForkDefined, + StartTime: time.Unix(int64(bip9fork.StartTime), 0), + Timeout: time.Unix(int64(bip9fork.Timeout), 0), + ActivationHeight: 0, + LockedInHeight: 0, + } - _, err := i.elastic.Client. - Index(). - Index(elastic_cache.SoftForkIndex.Get()). - Id(softFork.Slug()). - BodyJson(softFork). - Do(context.Background()) - if err != nil { - log.WithError(err).Fatal("Failed to save new softfork") + result, err := i.elastic.Client.Index(). + Index(elastic_cache.SoftForkIndex.Get()). + BodyJson(softFork). + Do(context.Background()) + if err != nil { + log.WithError(err).Fatal("Failed to save new softfork") + } + softFork.SetId(result.Id) + SoftForks = append(SoftForks, softFork) } - - SoftForks = append(SoftForks, softFork) } } diff --git a/pkg/explorer/address.go b/pkg/explorer/address.go index 6246e54..f8ce133 100644 --- a/pkg/explorer/address.go +++ b/pkg/explorer/address.go @@ -7,6 +7,8 @@ import ( ) type Address struct { + id string + Hash string `json:"hash"` Height uint64 `json:"height"` @@ -22,9 +24,17 @@ type Address struct { } type RichList struct { - Spending uint64 `json:"spending"` - Staking uint64 `json:"staking"` - Voting uint64 `json:"voting"` + Spendable uint64 `json:"spendable"` + Stakable uint64 `json:"stakable"` + VotingWeight uint64 `json:"voting_weight"` +} + +func (a *Address) Id() string { + return a.id +} + +func (a *Address) SetId(id string) { + a.id = id } func (a *Address) Slug() string { diff --git a/pkg/explorer/address_history.go b/pkg/explorer/address_history.go index 1329768..a685983 100644 --- a/pkg/explorer/address_history.go +++ b/pkg/explorer/address_history.go @@ -7,6 +7,8 @@ import ( ) type AddressHistory struct { + id string + Height uint64 `json:"height"` TxIndex uint `json:"txindex"` Time time.Time `json:"time"` @@ -43,6 +45,14 @@ var ( VotingWeight BalanceType = "voting_weight" ) +func (a *AddressHistory) Id() string { + return a.id +} + +func (a *AddressHistory) SetId(id string) { + a.id = id +} + func (a *AddressHistory) Slug() string { return slug.Make(fmt.Sprintf("addresshistory-%s-%s", a.Hash, a.TxId)) } diff --git a/pkg/explorer/address_transaction.go b/pkg/explorer/address_transaction.go index 2029ab5..b9318fb 100644 --- a/pkg/explorer/address_transaction.go +++ b/pkg/explorer/address_transaction.go @@ -1,17 +1,16 @@ package explorer import ( - "fmt" - "github.com/gosimple/slug" "time" ) +// deprecated type AddressTransaction struct { Hash string `json:"hash"` Txid string `json:"txid"` Height uint64 `json:"height"` Index uint `json:"index"` - Time time.Time `json:"time, omitempty"` + Time time.Time `json:"time,omitempty"` Type TransferType `json:"type"` Input uint64 `json:"input"` Output uint64 `json:"output"` @@ -19,7 +18,3 @@ type AddressTransaction struct { Balance uint64 `json:"balance"` Cold bool `json:"cold"` } - -func (a *AddressTransaction) Slug() string { - return slug.Make(fmt.Sprintf("addresstx-%s-%s-%t", a.Hash, a.Txid, a.Cold)) -} diff --git a/pkg/explorer/block.go b/pkg/explorer/block.go index 161e478..0636099 100644 --- a/pkg/explorer/block.go +++ b/pkg/explorer/block.go @@ -28,6 +28,8 @@ type RawBlock struct { } type Block struct { + id string + RawBlock TxCount uint `json:"tx_count"` @@ -44,6 +46,14 @@ type Block struct { Best bool `json:"best,omitempty"` } +func (b *Block) Id() string { + return b.id +} + +func (b *Block) SetId(id string) { + b.id = id +} + func (b *Block) Slug() string { return slug.Make(fmt.Sprintf("block-%s", b.Hash)) } diff --git a/pkg/explorer/block_transaction.go b/pkg/explorer/block_transaction.go index f0d8c3f..7eca54c 100644 --- a/pkg/explorer/block_transaction.go +++ b/pkg/explorer/block_transaction.go @@ -27,6 +27,8 @@ type RawBlockTransaction struct { } type BlockTransaction struct { + id string + RawBlockTransaction Index uint `json:"index"` Vin Vins `json:"vin"` @@ -38,6 +40,14 @@ type BlockTransaction struct { Fees uint64 `json:"fees"` } +func (b *BlockTransaction) Id() string { + return b.id +} + +func (b *BlockTransaction) SetId(id string) { + b.id = id +} + func (tx *BlockTransaction) Slug() string { return CreateBlockTxSlug(tx.Hash) } diff --git a/pkg/explorer/cfund_payment_request.go b/pkg/explorer/cfund_payment_request.go index dfe2dd6..14a2588 100644 --- a/pkg/explorer/cfund_payment_request.go +++ b/pkg/explorer/cfund_payment_request.go @@ -5,10 +5,9 @@ import ( "github.com/gosimple/slug" ) -type RawPaymentRequest struct { -} - type PaymentRequest struct { + id string + Version uint32 `json:"version"` Hash string `json:"hash"` BlockHash string `json:"blockHash"` @@ -28,6 +27,14 @@ type PaymentRequest struct { VotingCycle uint `json:"votingCycle"` } +func (p *PaymentRequest) Id() string { + return p.id +} + +func (p *PaymentRequest) SetId(id string) { + p.id = id +} + func (p *PaymentRequest) Slug() string { return slug.Make(fmt.Sprintf("paymentrequest-%s", p.Hash)) } diff --git a/pkg/explorer/cfund_proposal.go b/pkg/explorer/cfund_proposal.go index bd43430..ac29a27 100644 --- a/pkg/explorer/cfund_proposal.go +++ b/pkg/explorer/cfund_proposal.go @@ -6,6 +6,8 @@ import ( ) type Proposal struct { + id string + Version uint32 `json:"version"` Hash string `json:"hash"` BlockHash string `json:"blockHash"` @@ -29,6 +31,14 @@ type Proposal struct { VotingCycle uint `json:"votingCycle"` } +func (p *Proposal) Id() string { + return p.id +} + +func (p *Proposal) SetId(id string) { + p.id = id +} + func (p *Proposal) Slug() string { return slug.Make(fmt.Sprintf("proposal-%s", p.Hash)) } diff --git a/pkg/explorer/dao_consensus_parameters.go b/pkg/explorer/dao_consensus_parameters.go index 326dc6f..40a001c 100644 --- a/pkg/explorer/dao_consensus_parameters.go +++ b/pkg/explorer/dao_consensus_parameters.go @@ -24,7 +24,7 @@ func (p *ConsensusParameters) Add(c *ConsensusParameter) { func (p *ConsensusParameters) Get(id int) *ConsensusParameter { for _, p := range p.parameters { - if p.Id == id { + if p.Uid == id { return p } } @@ -37,13 +37,23 @@ func (p *ConsensusParameters) All() []*ConsensusParameter { } type ConsensusParameter struct { - Id int `json:"id"` + id string + + Uid int `json:"id"` Description string `json:"desc"` Type ConsensusParameterType `json:"type"` Value int `json:"value"` UpdatedOnBlock uint64 `json:"updatedOnBlock"` } +func (cp *ConsensusParameter) Id() string { + return cp.id +} + +func (cp *ConsensusParameter) SetId(id string) { + cp.id = id +} + func (cp *ConsensusParameter) Slug() string { return slug.Make(fmt.Sprintf("consensus-%d", cp.Id)) } diff --git a/pkg/explorer/dao_consultation.go b/pkg/explorer/dao_consultation.go index 076026a..d8ee51c 100644 --- a/pkg/explorer/dao_consultation.go +++ b/pkg/explorer/dao_consultation.go @@ -1,10 +1,10 @@ package explorer -import ( - "github.com/gosimple/slug" -) +import "github.com/gosimple/slug" type Consultation struct { + id string + Version uint32 `json:"version"` Hash string `json:"hash"` BlockHash string `json:"blockHash"` @@ -30,6 +30,14 @@ type Consultation struct { ConsensusParameter bool `json:"consensusParameter"` } +func (c *Consultation) Id() string { + return c.id +} + +func (c *Consultation) SetId(id string) { + c.id = id +} + func (c *Consultation) Slug() string { return slug.Make(c.Hash) } diff --git a/pkg/explorer/entity.go b/pkg/explorer/entity.go index ee780e5..8afc295 100644 --- a/pkg/explorer/entity.go +++ b/pkg/explorer/entity.go @@ -1,5 +1,7 @@ package explorer type Entity interface { + Id() string + SetId(id string) Slug() string } diff --git a/pkg/explorer/signal.go b/pkg/explorer/signal.go index 62ce4c7..5e7cb32 100644 --- a/pkg/explorer/signal.go +++ b/pkg/explorer/signal.go @@ -6,11 +6,21 @@ import ( ) type Signal struct { + id string + Address string `json:"address"` Height uint64 `json:"height"` SoftForks []string `json:"softforks"` } +func (s *Signal) Id() string { + return s.id +} + +func (s *Signal) SetId(id string) { + s.id = id +} + func (s *Signal) Slug() string { return slug.Make(fmt.Sprintf("signal-%s-%d", s.Address, s.Height)) } diff --git a/pkg/explorer/soft_fork.go b/pkg/explorer/soft_fork.go index e2f1066..ed9ab22 100644 --- a/pkg/explorer/soft_fork.go +++ b/pkg/explorer/soft_fork.go @@ -10,6 +10,8 @@ import ( type SoftForks []*SoftFork type SoftFork struct { + id string + Name string `json:"name"` SignalBit uint `json:"signalBit"` StartTime time.Time `json:"startTime"` @@ -21,6 +23,14 @@ type SoftFork struct { Cycles SoftForkCycles `json:"cycles,omitempty"` } +func (s *SoftFork) Id() string { + return s.id +} + +func (s *SoftFork) SetId(id string) { + s.id = id +} + func (s *SoftFork) Slug() string { return slug.Make(fmt.Sprintf("softfork-%s", s.Name)) } @@ -74,11 +84,5 @@ func (s SoftForks) GetSoftFork(name string) *SoftFork { } func (s SoftForks) HasSoftFork(name string) bool { - for i, _ := range s { - if s[i].Name == name { - return true - } - } - - return false + return s.GetSoftFork(name) != nil } diff --git a/pkg/explorer/vote.go b/pkg/explorer/vote.go index 5fd0b33..8a70d52 100644 --- a/pkg/explorer/vote.go +++ b/pkg/explorer/vote.go @@ -6,12 +6,22 @@ import ( ) type DaoVotes struct { + id string + Cycle uint `json:"cycle"` Height uint64 `json:"height"` Address string `json:"address"` Votes []Vote `json:"votes"` } +func (v *DaoVotes) Id() string { + return v.id +} + +func (v *DaoVotes) SetId(id string) { + v.id = id +} + func (v *DaoVotes) Slug() string { return slug.Make(fmt.Sprintf("vote-%d-%s", v.Height, v.Address)) }