Skip to content
This repository has been archived by the owner on Oct 18, 2023. It is now read-only.

Commit

Permalink
rpcdaemon: subscriptions, newHeads (erigontech#1359)
Browse files Browse the repository at this point in the history
* fix `make grpc` on new checkouts

* update proto files

* add some stub

* prototype with fake events

* notifying about events

* pass events

* events are being sent

* transfer headers to filters

* create the “filters” struct

* implement new heads

* PoC of New Heads subscription

* fix keep alive

* fixups for the client

* add “type” to the event

* support header event type on client

* better stage refactor

* fixup for the eth backend

* fixups

* fix tests

* fix tests

* fix linters

* address comments

* remove unused log
  • Loading branch information
mandrigin authored Nov 17, 2020
1 parent 583fada commit 393c996
Show file tree
Hide file tree
Showing 24 changed files with 563 additions and 61 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,4 @@ root_*.txt

__pycache__
docker-compose.dev.yml
/build
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ bindings:

grpc:
# See also: ./cmd/hack/binary-deps/main.go
mkdir -p ./build/bin/
rm -f ./build/bin/protoc*
rm -rf ./build/include*

Expand Down
6 changes: 3 additions & 3 deletions cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func RootCommand() (*cobra.Command, *Flags) {

func OpenDB(cfg Flags) (ethdb.KV, ethdb.Backend, error) {
var db ethdb.KV
var txPool ethdb.Backend
var ethBackend ethdb.Backend
var err error
// Do not change the order of these checks. Chaindata needs to be checked first, because PrivateApiAddr has default value which is not ""
// If PrivateApiAddr is checked first, the Chaindata option will never work
Expand All @@ -109,7 +109,7 @@ func OpenDB(cfg Flags) (ethdb.KV, ethdb.Backend, error) {
db = kv
}
} else if cfg.PrivateApiAddr != "" {
db, txPool, err = ethdb.NewRemote().Path(cfg.PrivateApiAddr).Open(cfg.TLSCertfile, cfg.TLSKeyFile, cfg.TLSCACert)
db, ethBackend, err = ethdb.NewRemote().Path(cfg.PrivateApiAddr).Open(cfg.TLSCertfile, cfg.TLSKeyFile, cfg.TLSCACert)
if err != nil {
return nil, nil, fmt.Errorf("could not connect to remoteDb: %w", err)
}
Expand All @@ -121,7 +121,7 @@ func OpenDB(cfg Flags) (ethdb.KV, ethdb.Backend, error) {
return nil, nil, fmt.Errorf("could not connect to remoteDb: %w", err)
}

return db, txPool, err
return db, ethBackend, err
}

func StartRpcServer(ctx context.Context, cfg Flags, rpcAPI []rpc.API) error {
Expand Down
5 changes: 3 additions & 2 deletions cmd/rpcdaemon/commands/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@ package commands

import (
"github.com/ledgerwatch/turbo-geth/cmd/rpcdaemon/cli"
"github.com/ledgerwatch/turbo-geth/cmd/rpcdaemon/filters"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/rpc"
)

// APIList describes the list of available RPC apis
func APIList(db ethdb.KV, eth ethdb.Backend, cfg cli.Flags, customAPIList []rpc.API) []rpc.API {
func APIList(db ethdb.KV, eth ethdb.Backend, filters *filters.Filters, cfg cli.Flags, customAPIList []rpc.API) []rpc.API {
var defaultAPIList []rpc.API

dbReader := ethdb.NewObjectDatabase(db)

ethImpl := NewEthAPI(db, dbReader, eth, cfg.Gascap)
ethImpl := NewEthAPI(db, dbReader, eth, cfg.Gascap, filters)
tgImpl := NewTgAPI(db, dbReader)
netImpl := NewNetAPIImpl(eth)
debugImpl := NewPrivateDebugAPI(db, dbReader)
Expand Down
8 changes: 5 additions & 3 deletions cmd/rpcdaemon/commands/eth_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import (
"context"
"math/big"

"github.com/ledgerwatch/turbo-geth/eth/filters"

rpcfilters "github.com/ledgerwatch/turbo-geth/cmd/rpcdaemon/filters"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/hexutil"
"github.com/ledgerwatch/turbo-geth/core"
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/eth/filters"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/internal/ethapi"
"github.com/ledgerwatch/turbo-geth/rpc"
Expand Down Expand Up @@ -90,15 +90,17 @@ type APIImpl struct {
dbReader ethdb.Database
chainContext core.ChainContext
GasCap uint64
filters *rpcfilters.Filters
}

// NewEthAPI returns APIImpl instance
func NewEthAPI(db ethdb.KV, dbReader ethdb.Database, eth ethdb.Backend, gascap uint64) *APIImpl {
func NewEthAPI(db ethdb.KV, dbReader ethdb.Database, eth ethdb.Backend, gascap uint64, filters *rpcfilters.Filters) *APIImpl {
return &APIImpl{
db: db,
dbReader: dbReader,
ethBackend: eth,
GasCap: gascap,
filters: filters,
}
}

Expand Down
36 changes: 36 additions & 0 deletions cmd/rpcdaemon/commands/eth_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"fmt"

"github.com/ledgerwatch/turbo-geth/common/hexutil"
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/log"
"github.com/ledgerwatch/turbo-geth/rpc"
)

// NewPendingTransactionFilter new transaction filter
Expand Down Expand Up @@ -32,3 +35,36 @@ func (api *APIImpl) GetFilterChanges(_ context.Context, index hexutil.Uint64) ([
var stub []interface{}
return stub, fmt.Errorf(NotImplemented, "eth_getFilterChanges")
}

// NewHeads send a notification each time a new (header) block is appended to the chain.
func (api *APIImpl) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}

rpcSub := notifier.CreateSubscription()

go func() {
headers := make(chan *types.Header)
id := api.filters.SubscribeNewHeads(headers)

for {
select {
case h := <-headers:
err := notifier.Notify(rpcSub.ID, h)
if err != nil {
log.Warn("error while notifying subscription", "err", err)
}
case <-rpcSub.Err():
api.filters.Unsubscribe(id)
return
case <-notifier.Closed():
api.filters.Unsubscribe(id)
return
}
}
}()

return rpcSub, nil
}
87 changes: 87 additions & 0 deletions cmd/rpcdaemon/filters/filters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package filters

import (
"crypto/rand"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/ethdb/remote"
"github.com/ledgerwatch/turbo-geth/ethdb/remote/remotedbserver"
"github.com/ledgerwatch/turbo-geth/log"
)

type Filters struct {
mu sync.RWMutex

headsSubs map[string]chan *types.Header
}

func New(ethBackend ethdb.Backend) *Filters {
log.Info("rpc filters: subscribing to tg events")

ff := &Filters{headsSubs: make(map[string]chan *types.Header)}

go func() {
var err error
for i := 0; i < 10; i++ {
err = ethBackend.Subscribe(ff.OnNewEvent)
if err != nil {
log.Warn("rpc filters: error subscribing to events", "err", err)
time.Sleep(time.Second)
}
}
}()

return ff
}

func (ff *Filters) SubscribeNewHeads(out chan *types.Header) string {
ff.mu.Lock()
defer ff.mu.Unlock()
id := generateSubscriptionID()
ff.headsSubs[id] = out
return id
}

func (ff *Filters) Unsubscribe(id string) {
ff.mu.Lock()
defer ff.mu.Unlock()
delete(ff.headsSubs, id)
}

func (ff *Filters) OnNewEvent(event *remote.SubscribeReply) {
ff.mu.RLock()
defer ff.mu.RUnlock()

if remotedbserver.RpcEventType(event.Type) != remotedbserver.EventTypeHeader {
log.Warn("rpc filters: unsupported event type", "type", event.Type)
return
}

payload := event.Data
var header types.Header
err := json.Unmarshal(payload, &header)
if err != nil {
// ignoring what we can't unmarshal
log.Warn("rpc filters, unprocessable payload", "err", err)
} else {
for _, v := range ff.headsSubs {
v <- &header
}
}
}

func generateSubscriptionID() string {
var id [32]byte

_, err := rand.Read(id[:])
if err != nil {
log.Crit("rpc filters: error creating random id", "err", err)
}

return fmt.Sprintf("%x", id)
}
11 changes: 9 additions & 2 deletions cmd/rpcdaemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/ledgerwatch/turbo-geth/cmd/rpcdaemon/cli"
"github.com/ledgerwatch/turbo-geth/cmd/rpcdaemon/commands"
"github.com/ledgerwatch/turbo-geth/cmd/rpcdaemon/filters"
"github.com/ledgerwatch/turbo-geth/cmd/utils"
"github.com/ledgerwatch/turbo-geth/log"
"github.com/spf13/cobra"
Expand All @@ -20,8 +21,14 @@ func main() {
}
defer db.Close()

var apiList = commands.APIList(db, backend, *cfg, nil)
return cli.StartRpcServer(cmd.Context(), *cfg, apiList)
var ff *filters.Filters
if backend != nil {
ff = filters.New(backend)
} else {
log.Info("filters are not supported in chaindata mode")
}

return cli.StartRpcServer(cmd.Context(), *cfg, commands.APIList(db, backend, ff, *cfg, nil))
}

if err := cmd.ExecuteContext(utils.RootContext()); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/rpcdaemon/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

func New(db ethdb.HasKV, ethereum core.Backend, stack *node.Node) {
apis := commands.APIList(db.KV(), core.NewEthBackend(ethereum), cli.Flags{API: []string{"eth", "debug"}}, nil)
apis := commands.APIList(db.KV(), core.NewEthBackend(ethereum), nil, cli.Flags{API: []string{"eth", "debug"}}, nil)

stack.RegisterAPIs(apis)
}
6 changes: 6 additions & 0 deletions core/eth_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package core
import (
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/ethdb/remote"
"github.com/ledgerwatch/turbo-geth/rlp"
)

Expand All @@ -28,3 +29,8 @@ func (back *EthBackend) AddLocal(signedtx []byte) ([]byte, error) {

return tx.Hash().Bytes(), back.TxPool().AddLocal(tx)
}

func (back *EthBackend) Subscribe(func(*remote.SubscribeReply)) error {
// do nothing
return nil
}
27 changes: 22 additions & 5 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"crypto/x509"
"errors"
"fmt"
"github.com/ledgerwatch/turbo-geth/turbo/snapshotsync"
"github.com/ledgerwatch/turbo-geth/turbo/snapshotsync/bittorrent"
"io/ioutil"
"math/big"
"os"
Expand All @@ -35,6 +33,9 @@ import (
"sync/atomic"
"time"

"github.com/ledgerwatch/turbo-geth/turbo/snapshotsync"
"github.com/ledgerwatch/turbo-geth/turbo/snapshotsync/bittorrent"

ethereum "github.com/ledgerwatch/turbo-geth"
"github.com/ledgerwatch/turbo-geth/common/etl"
"google.golang.org/grpc"
Expand All @@ -54,6 +55,7 @@ import (
"github.com/ledgerwatch/turbo-geth/eth/downloader"
"github.com/ledgerwatch/turbo-geth/eth/filters"
"github.com/ledgerwatch/turbo-geth/eth/gasprice"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/ethdb/remote/remotedbserver"
"github.com/ledgerwatch/turbo-geth/event"
Expand Down Expand Up @@ -355,6 +357,20 @@ func New(stack *node.Node, config *Config) (*Ethereum, error) {

eth.txPool = core.NewTxPool(config.TxPool, chainConfig, chainDb, txCacher)

stagedSync := config.StagedSync

// setting notifier to support streaming events to rpc daemon
remoteEvents := remotedbserver.NewEvents()
if stagedSync == nil {
// if there is not stagedsync, we create one with the custom notifier
stagedSync = stagedsync.New(stagedsync.DefaultStages(), stagedsync.DefaultUnwindOrder(), stagedsync.OptionalParameters{Notifier: remoteEvents})
} else {
// otherwise we add one if needed
if stagedSync.Notifier == nil {
stagedSync.Notifier = remoteEvents
}
}

if stack.Config().PrivateApiAddr != "" {
if stack.Config().TLSConnection {
// load peer cert/key, ca cert
Expand Down Expand Up @@ -387,12 +403,12 @@ func New(stack *node.Node, config *Config) (*Ethereum, error) {
if err != nil {
return nil, err
}
eth.privateAPI, err = remotedbserver.StartGrpc(chainDb.KV(), eth, stack.Config().PrivateApiAddr, &creds)
eth.privateAPI, err = remotedbserver.StartGrpc(chainDb.KV(), eth, stack.Config().PrivateApiAddr, &creds, remoteEvents)
if err != nil {
return nil, err
}
} else {
eth.privateAPI, err = remotedbserver.StartGrpc(chainDb.KV(), eth, stack.Config().PrivateApiAddr, nil)
eth.privateAPI, err = remotedbserver.StartGrpc(chainDb.KV(), eth, stack.Config().PrivateApiAddr, nil, remoteEvents)
if err != nil {
return nil, err
}
Expand All @@ -403,7 +419,8 @@ func New(stack *node.Node, config *Config) (*Ethereum, error) {
if checkpoint == nil {
//checkpoint = params.TrustedCheckpoints[genesisHash]
}
if eth.protocolManager, err = NewProtocolManager(chainConfig, checkpoint, config.SyncMode, config.NetworkID, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb, config.Whitelist, config.StagedSync); err != nil {

if eth.protocolManager, err = NewProtocolManager(chainConfig, checkpoint, config.SyncMode, config.NetworkID, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb, config.Whitelist, stagedSync); err != nil {
return nil, err
}
eth.miner = miner.New(eth, &config.Miner, chainConfig, eth.EventMux(), eth.engine, eth.isLocalBlock)
Expand Down
27 changes: 27 additions & 0 deletions eth/stagedsync/stage_finish.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package stagedsync

import (
"fmt"

"github.com/ledgerwatch/turbo-geth/core/rawdb"
"github.com/ledgerwatch/turbo-geth/log"
)

func NotifyRpcDaemon(from, to uint64, notifier ChainEventNotifier, db rawdb.DatabaseReader) error {
if notifier == nil {
log.Warn("rpc notifier is not set, rpc daemon won't be updated about headers")
return nil
}
for i := from; i <= to; i++ {
hash, err := rawdb.ReadCanonicalHash(db, i)
if err != nil {
return err
}
header := rawdb.ReadHeader(db, hash, i)
if header == nil {
return fmt.Errorf("could not find canonical header for hash: %x number: %d", hash, i)
}
notifier.OnNewHeader(header)
}
return nil
}
Loading

0 comments on commit 393c996

Please sign in to comment.