Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
joshklop committed Aug 17, 2023
1 parent 3c121ac commit c11af72
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 37 deletions.
3 changes: 1 addition & 2 deletions .github/codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ coverage:
only_pulls: true
ignore:
- mocks
- l1/internal/contract/starknet.go
- grpc/gen/*
- vm
- p2p/starknet/spec
- docs
- docs
2 changes: 1 addition & 1 deletion ethereum/ethereum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ type unreliableEthHandler struct {
h *ethHandler
}

func (e *unreliableEthHandler) ChainId() *hexutil.Big { //nolint:stylecheck
func (e *unreliableEthHandler) ChainId() *hexutil.Big {
return e.h.ChainId()
}

Expand Down
52 changes: 52 additions & 0 deletions mocks/mock_l1.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 21 additions & 21 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/NethermindEth/juno/db"
"github.com/NethermindEth/juno/db/pebble"
"github.com/NethermindEth/juno/ethereum"
"github.com/NethermindEth/juno/l1"
"github.com/NethermindEth/juno/metrics"
"github.com/NethermindEth/juno/migration"
"github.com/NethermindEth/juno/node/http"
Expand Down Expand Up @@ -98,6 +97,27 @@ func New(cfg *Config, version string) (*Node, error) { //nolint:gocyclo

chain := blockchain.New(database, cfg.Network, log)
client := feeder.NewClient(cfg.Network.FeederURL()).WithUserAgent(ua)

if cfg.EthNode == "" {
log.Warnw("Ethereum node address not found; will not verify against L1")
} else {
var ethNodeURL *url.URL
ethNodeURL, err = url.Parse(cfg.EthNode)
if err != nil {
return nil, fmt.Errorf("parse Ethereum node URL: %w", err)
}
if ethNodeURL.Scheme != "wss" && ethNodeURL.Scheme != "ws" {
return nil, errors.New("non-websocket Ethereum node URL (need wss://... or ws://...): " + cfg.EthNode)
}
var l1Client *l1.Client
l1Client, err = newL1Client(n.cfg.EthNode, n.blockchain, n.log)
if err != nil {
return nil, fmt.Errorf("create L1 client: %w", err)
}

n.services = append(n.services, l1Client)
}

synchronizer := sync.New(chain, adaptfeeder.New(client), log, cfg.PendingPollInterval)
gatewayClient := gateway.NewClient(cfg.Network.GatewayURL(), log).WithUserAgent(ua)

Expand All @@ -120,26 +140,6 @@ func New(cfg *Config, version string) (*Node, error) { //nolint:gocyclo
services: []service.Service{rpcSrv, synchronizer},
}

if n.cfg.EthNode == "" {
n.log.Warnw("Ethereum node address not found; will not verify against L1")
} else {
var ethNodeURL *url.URL
ethNodeURL, err = url.Parse(n.cfg.EthNode)
if err != nil {
return nil, fmt.Errorf("parse Ethereum node URL: %w", err)
}
if ethNodeURL.Scheme != "wss" && ethNodeURL.Scheme != "ws" {
return nil, errors.New("non-websocket Ethereum node URL (need wss://... or ws://...): " + n.cfg.EthNode)
}
var l1Client *l1.Client
l1Client, err = newL1Client(n.cfg.EthNode, n.blockchain, n.log)
if err != nil {
return nil, fmt.Errorf("create L1 client: %w", err)
}

n.services = append(n.services, l1Client)
}

if cfg.P2P {
var privKeyStr string
privKeyStr, _ = os.LookupEnv("P2P_PRIVATE_KEY")
Expand Down
45 changes: 38 additions & 7 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sync
import (
"context"
"errors"
"fmt"
"runtime"
"time"

Expand All @@ -11,14 +12,20 @@ import (
"github.com/NethermindEth/juno/core/felt"
"github.com/NethermindEth/juno/db"
"github.com/NethermindEth/juno/metrics"
"github.com/NethermindEth/juno/service"
"github.com/NethermindEth/juno/starknetdata"
"github.com/NethermindEth/juno/utils"
"github.com/ethereum/go-ethereum/event"
"github.com/prometheus/client_golang/prometheus"
"github.com/sourcegraph/conc/stream"
)

var _ service.Service = (*Synchronizer)(nil)
//go:generate mockgen -destination=../mocks/mock_l1.go -package=mocks github.com/NethermindEth/juno/sync L1
type L1 interface {
// WatchL1Heads sends on sink whenever the Starknet block number
// in the core contract is incremented in a finalized Ethereum block.
// All errors received on the subscription's error channel are treated as fatal.
WatchL1Heads(context.Context, chan<- *core.L1Head) (event.Subscription, error)
}

const (
opVerifyLabel = "verify"
Expand All @@ -39,12 +46,14 @@ type Synchronizer struct {

catchUpMode bool

l1 L1

// metrics
opTimers *prometheus.HistogramVec
totalBlocks prometheus.Counter
}

func New(bc *blockchain.Blockchain, starkNetData starknetdata.StarknetData,
func New(bc *blockchain.Blockchain, l1 L1, starkNetData starknetdata.StarknetData,
log utils.SimpleLogger, pendingPollInterval time.Duration,
) *Synchronizer {
s := &Synchronizer{
Expand All @@ -61,15 +70,15 @@ func New(bc *blockchain.Blockchain, starkNetData starknetdata.StarknetData,
Namespace: "sync",
Name: "blocks",
}),
l1: l1,
}
metrics.MustRegister(s.opTimers, s.totalBlocks)
return s
}

// Run starts the Synchronizer, returns an error if the loop is already running
func (s *Synchronizer) Run(ctx context.Context) error {
s.syncBlocks(ctx)
return nil
return s.syncBlocks(ctx)
}

func (s *Synchronizer) fetcherTask(ctx context.Context, height uint64, verifiers *stream.Stream,
Expand Down Expand Up @@ -219,12 +228,19 @@ func (s *Synchronizer) nextHeight() uint64 {
return nextHeight
}

func (s *Synchronizer) syncBlocks(syncCtx context.Context) {
func (s *Synchronizer) syncBlocks(syncCtx context.Context) error {
defer func() {
s.StartingBlockNumber = nil
s.HighestBlockHeader = nil
}()

l1Heads := make(chan *core.L1Head, 2048)
sub, err := s.l1.WatchL1Heads(syncCtx, l1Heads)
if err != nil {
return fmt.Errorf("watch L1 heads: %w", err)
}
defer sub.Unsubscribe()

fetchers, verifiers := s.setupWorkers()
streamCtx, streamCancel := context.WithCancel(syncCtx)

Expand All @@ -245,13 +261,28 @@ func (s *Synchronizer) syncBlocks(syncCtx context.Context) {
select {
case <-syncCtx.Done():
pendingSem <- struct{}{}
return
return nil
default:
streamCtx, streamCancel = context.WithCancel(syncCtx)
nextHeight = s.nextHeight()
fetchers, verifiers = s.setupWorkers()
s.log.Warnw("Restarting sync process", "height", nextHeight, "catchUpMode", s.catchUpMode)
}
case head := <-l1Heads:
if err = s.Blockchain.SetL1Head(head); err == nil {
s.log.Infow("Updated l1 head",
"blockNumber", head.BlockNumber,
"blockHash", head.BlockHash.ShortString(),
"stateRoot", head.StateRoot.ShortString())
} else {
err = fmt.Errorf("l1 head for block %d and state root %s: %w", head.BlockNumber, head.StateRoot.String(), err)
s.log.Errorw("Failed to set L1 head", "err", err)
}
case err = <-sub.Err():
streamCancel()
fetchers.Wait()
verifiers.Wait()
return fmt.Errorf("sync: L1 heads subscription: %w", err)
default:
curHeight, curStreamCtx, curCancel := nextHeight, streamCtx, streamCancel
fetchers.Go(func() stream.Callback {
Expand Down
Loading

0 comments on commit c11af72

Please sign in to comment.