Skip to content

Commit

Permalink
Merge pull request #178 from flow-hydraulics/latenssi/refactor-event-…
Browse files Browse the repository at this point in the history
…polling

Refactor event polling
  • Loading branch information
latenssi authored Sep 3, 2021
2 parents 7c3530b + 6c574df commit 5667f98
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 67 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
.PHONY: dev
dev:
docker-compose up -d db emulator
docker-compose up -d db pgadmin emulator
docker-compose logs -f

.PHONY: stop
Expand Down
8 changes: 8 additions & 0 deletions chain_events/lib.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package chain_events

func min(x, y uint64) uint64 {
if x > y {
return y
}
return x
}
138 changes: 90 additions & 48 deletions chain_events/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package chain_events

import (
"context"
"fmt"
"log"
"os"
"strings"
"time"

"github.com/onflow/flow-go-sdk"
Expand All @@ -13,13 +15,15 @@ import (
type GetEventTypes func() []string

type Listener struct {
ticker *time.Ticker
done chan bool
fc *client.Client
db Store
maxDiff uint64
interval time.Duration
getTypes GetEventTypes
ticker *time.Ticker
done chan bool
logger *log.Logger
fc *client.Client
db Store
getTypes GetEventTypes
maxBlocks uint64
interval time.Duration
startingHeight uint64
}

type ListenerStatus struct {
Expand All @@ -32,103 +36,141 @@ func (ListenerStatus) TableName() string {
}

func NewListener(
logger *log.Logger,
fc *client.Client,
db Store,
getTypes GetEventTypes,
maxDiff uint64,
interval time.Duration,
getTypes GetEventTypes,
startingHeight uint64,
) *Listener {
return &Listener{nil, make(chan bool), fc, db, maxDiff, interval, getTypes}
if logger == nil {
logger = log.New(os.Stdout, "[EVENT-POLLER] ", log.LstdFlags|log.Lshortfile)
}
return &Listener{
nil, make(chan bool),
logger, fc, db, getTypes,
maxDiff, interval, startingHeight,
}
}

func (l *Listener) run(ctx context.Context, start, end uint64) error {
ee := make([]flow.Event, 0)
events := make([]flow.Event, 0)

types := l.getTypes()
eventTypes := l.getTypes()

for _, t := range types {
for _, t := range eventTypes {
r, err := l.fc.GetEventsForHeightRange(ctx, client.EventRangeQuery{
Type: t,
StartHeight: start,
EndHeight: end,
})
if err != nil {
return fmt.Errorf("error while fetching events: %w", err)
return err
}
for _, b := range r {
ee = append(ee, b.Events...)
events = append(events, b.Events...)
}
}

for _, event := range ee {
for _, event := range events {
Event.Trigger(event)
}

return nil
}

func (l *Listener) handleError(err error) {
fmt.Println(err)
l.logger.Println(err)
if strings.Contains(err.Error(), "key not found") {
l.logger.Println(`"key not found" error indicates data is not available at this height, please manually set correct starting height`)
}
}

func (l *Listener) Start() *Listener {
if l.ticker != nil {
// Already started
return l
}

if err := l.initHeight(); err != nil {
_, ok := err.(*LockError)
if !ok {
panic(err)
}
// Skip LockError as it means another listener is already handling this
}

// TODO (latenssi): should use random intervals instead
l.ticker = time.NewTicker(l.interval)

go func() {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

status, err := l.db.GetListenerStatus()
if err != nil {
panic(err)
}

for {
select {
case <-l.done:
return
case <-l.ticker.C:
cur, err := l.fc.GetLatestBlock(ctx, true)
if err != nil {
l.handleError(err)
continue
}
curHeight := cur.Height
if curHeight > status.LatestHeight {
start := status.LatestHeight + 1 // latestHeight has already been checked, add 1
end := min(curHeight, start+l.maxDiff) // Limit maximum end
if err := l.run(ctx, start, end); err != nil {
l.handleError(err)
continue
}
status.LatestHeight = end
err := l.db.UpdateListenerStatus(status)
lockErr := l.db.LockedStatus(func(status *ListenerStatus) error {
latestBlock, err := l.fc.GetLatestBlock(ctx, true)
if err != nil {
l.handleError(err)
continue
return err
}

if latestBlock.Height > status.LatestHeight {
start := status.LatestHeight + 1 // LatestHeight has already been checked, add 1
end := min(latestBlock.Height, start+l.maxBlocks) // Limit maximum end
if err := l.run(ctx, start, end); err != nil {
return err
}
status.LatestHeight = end
}

return nil
})

if lockErr != nil {
_, ok := lockErr.(*LockError)
if !ok {
l.handleError(lockErr)
}
// Skip on LockError as it means another listener is already handling this round
}
}
}
}()

l.logger.Println("started")

return l
}

func (l *Listener) initHeight() error {
return l.db.LockedStatus(func(status *ListenerStatus) error {
if l.startingHeight > 0 && status.LatestHeight < l.startingHeight-1 {
status.LatestHeight = l.startingHeight - 1
}

if status.LatestHeight == 0 {
// If starting fresh, we need to start from the latest block as we can't
// know what is the root of the current spork.
// Data on Flow is only accessible for the current spork height.
latestBlock, err := l.fc.GetLatestBlock(context.Background(), true)
if err != nil {
return err
}
status.LatestHeight = latestBlock.Height
}

return nil
})
}

func (l *Listener) Stop() {
l.logger.Println("stopping...")
l.ticker.Stop()
l.done <- true
l.ticker = nil
}

func min(x, y uint64) uint64 {
if x > y {
return y
}
return x
}
11 changes: 9 additions & 2 deletions chain_events/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@ package chain_events

// Store manages data regarding tokens.
type Store interface {
UpdateListenerStatus(s *ListenerStatus) error
GetListenerStatus() (*ListenerStatus, error)
LockedStatus(func(*ListenerStatus) error) error
}

type LockError struct {
Err error
}

func (e *LockError) Error() string {
return e.Err.Error()
}
36 changes: 30 additions & 6 deletions chain_events/store_gorm.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package chain_events

import (
"strings"

"gorm.io/gorm"
"gorm.io/gorm/clause"
)

type GormStore struct {
Expand All @@ -13,11 +16,32 @@ func NewGormStore(db *gorm.DB) *GormStore {
return &GormStore{db}
}

func (s *GormStore) GetListenerStatus() (t *ListenerStatus, err error) {
err = s.db.FirstOrCreate(&t).Error
return
}
// LockedStatus runs a transaction on the database manipulating 'status' of type ListenerStatus.
// It takes a function 'fn' as argument. In the context of 'fn' 'status' is locked.
// Uses NOWAIT modifier on the lock so simultaneous requests can be ignored.
func (s *GormStore) LockedStatus(fn func(status *ListenerStatus) error) error {
txErr := s.db.Transaction(func(tx *gorm.DB) error {
status := ListenerStatus{}

if err := tx.Clauses(clause.Locking{Strength: "UPDATE", Options: "NOWAIT"}).FirstOrCreate(&status).Error; err != nil {
return err // rollback
}

if err := fn(&status); err != nil {
return err // rollback
}

if err := tx.Save(&status).Error; err != nil {
return err // rollback
}

return nil // commit
})

// Need to handle implementation specific error message
if txErr != nil && strings.Contains(txErr.Error(), "could not obtain lock on row") {
return &LockError{Err: txErr}
}

func (s *GormStore) UpdateListenerStatus(t *ListenerStatus) error {
return s.db.Save(&t).Error
return txErr
}
10 changes: 10 additions & 0 deletions configs/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,16 @@ type Config struct {
// Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
// For more info: https://pkg.go.dev/time#ParseDuration
TransactionTimeout time.Duration `env:"FLOW_WALLET_TRANSACTION_TIMEOUT" envDefault:"0"`

// Set the starting height for event polling. This won't have any effect if the value in
// database (chain_event_status[0].latest_height) is greater.
// If 0 (default) use latest block height if starting fresh (no previous value in database).
ChainListenerStartingHeight uint64 `env:"FLOW_WALLET_EVENTS_STARTING_HEIGHT" envDefault:"0"`
// Maximum number of blocks to check at once.
ChainListenerMaxBlocks uint64 `env:"FLOW_WALLET_EVENTS_MAX_BLOCKS" envDefault:"100"`
// Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
// For more info: https://pkg.go.dev/time#ParseDuration
ChainListenerInterval time.Duration `env:"FLOW_WALLET_EVENTS_INTERVAL" envDefault:"10s"`
}

type Options struct {
Expand Down
13 changes: 12 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,31 @@ version: "3.9"
services:
db:
image: postgres:13-alpine
restart: unless-stopped
ports:
- "5432:5432"
environment:
- POSTGRES_DB=wallet
- POSTGRES_USER=wallet
- POSTGRES_PASSWORD=wallet

pgadmin:
image: dpage/pgadmin4
restart: unless-stopped
environment:
PGADMIN_DEFAULT_EMAIL: [email protected]
PGADMIN_DEFAULT_PASSWORD: admin
ports:
- "5050:80"

wallet:
build:
context: .
dockerfile: ./docker/wallet/Dockerfile
network: host # docker build sometimes has problems fetching from alpine's CDN
restart: unless-stopped
ports:
- "3000:3000"
restart: unless-stopped
env_file:
- ./.env
environment:
Expand All @@ -30,6 +40,7 @@ services:

emulator:
image: gcr.io/flow-container-registry/emulator:v0.23.0
restart: unless-stopped
command: emulator -v
ports:
- "3569:3569"
Expand Down
17 changes: 8 additions & 9 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func runServer(cfg *configs.Config) {
// Application wide logger
ls := log.New(os.Stdout, "[SERVER] ", log.LstdFlags|log.Lshortfile)
lj := log.New(os.Stdout, "[JOBS] ", log.LstdFlags|log.Lshortfile)
le := log.New(os.Stdout, "[EVENT-POLLER] ", log.LstdFlags|log.Lshortfile)

ls.Printf("Starting server (v%s)...\n", version)

Expand Down Expand Up @@ -235,11 +236,7 @@ func runServer(cfg *configs.Config) {

// Chain event listener
if !cfg.DisableChainEvents {
ls.Println("Starting chain event listener..")

store := chain_events.NewGormStore(db)
maxDiff := uint64(100) // maximum number of blocks to check each iteration, TODO: make this configurable
interval := 10 * time.Second // TODO: make this configurable
getTypes := func() []string {
// Get all enabled tokens
tt, err := templateService.ListTokens(templates.NotSpecified)
Expand All @@ -258,12 +255,14 @@ func runServer(cfg *configs.Config) {
return event_types
}

listener := chain_events.NewListener(fc, store, maxDiff, interval, getTypes)
listener := chain_events.NewListener(
le, fc, store, getTypes,
cfg.ChainListenerMaxBlocks,
cfg.ChainListenerInterval,
cfg.ChainListenerStartingHeight,
)

defer func() {
ls.Println("Stopping event listener..")
listener.Stop()
}()
defer listener.Stop()

// Register a handler for chain events
chain_events.Event.Register(&tokens.ChainEventHandler{
Expand Down

0 comments on commit 5667f98

Please sign in to comment.