Skip to content

Commit

Permalink
Merge pull request #13 from NethermindEth/feat/block-stream
Browse files Browse the repository at this point in the history
Block stream
  • Loading branch information
taco-paco authored Feb 5, 2024
2 parents c789803 + 97488d8 commit 1de3de6
Show file tree
Hide file tree
Showing 6 changed files with 412 additions and 203 deletions.
246 changes: 246 additions & 0 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
package consumer

import (
"context"
"errors"
"fmt"
"time"

"github.com/ethereum/go-ethereum/core/types"
rmq "github.com/rabbitmq/amqp091-go"
)

const (
reconnectDelay = 3 * time.Second
rechannelDelay = 2 * time.Second
)

var (
// TODO:
QueueNamesToNetworkId = map[string]uint{
"da-mq": 0,
// Add mappings
}
defaultQueues = compilerDefaultQueues()

errAlreadyClosed = errors.New("already closed: not connected to the server")
)

func compilerDefaultQueues() []string {
keys := make([]string, 0, len(QueueNamesToNetworkId))
for k := range QueueNamesToNetworkId {
keys = append(keys, k)
}

return keys
}

type ConsumerConfig struct {
Addr string
ConsumerTag string
}

type BlockData struct {
NetworkId uint
Block types.Block
}

// TODO: add logger
type Consumer struct {
consumerTag string
blockstream chan BlockData

isReady bool
contextCancelFunc context.CancelFunc
connection *rmq.Connection
onConnClosed <-chan *rmq.Error
channel *rmq.Channel
onChanClosed <-chan *rmq.Error

queues []string
queuesListener QueuesListener
}

// TODO: Pass default queues in config?
func NewConsumer(config ConsumerConfig) Consumer {
// TODO: context.TODO() or background?
ctx, cancel := context.WithCancel(context.TODO())
consumer := Consumer{
consumerTag: config.ConsumerTag,
queues: defaultQueues,
blockstream: make(chan BlockData),
contextCancelFunc: cancel,
}

go consumer.Reconnect(config.Addr, ctx)
return consumer
}

func (consumer *Consumer) Reconnect(addr string, ctx context.Context) {
for {
fmt.Println("Reconnecting...")

consumer.isReady = false
conn, err := consumer.connect(addr)
if err != nil {
fmt.Println(err)

select {
case <-ctx.Done():
return
case <-time.After(reconnectDelay):
}

continue
}

if done := consumer.ResetChannel(conn, ctx); done {
return
}

fmt.Println("Connected")
select {
case err := <-ctx.Done():
ctx.Err()
fmt.Println(err)
// deref cancel smth?
break

case err := <-consumer.onConnClosed:
if !err.Recover {
fmt.Println(err)
break
}

fmt.Println("Connection closed with err:", err, "Reconnecting...")

case err := <-consumer.onChanClosed:
if !err.Recover {
fmt.Println(err)
break
}

// TODO: Reconnect not the whole connection but just a channel?
fmt.Println("Channel closed with err:", err, "Reconnecting...")
}
}
}

func (consumer *Consumer) connect(addr string) (*rmq.Connection, error) {
conn, err := rmq.Dial(addr)
if err != nil {
return nil, err
}

consumer.changeConnection(conn)
return conn, nil
}

func (consumer *Consumer) changeConnection(conn *rmq.Connection) {
consumer.connection = conn

closeNotifier := make(chan *rmq.Error)
consumer.onConnClosed = conn.NotifyClose(closeNotifier)
}

func (consumer *Consumer) ResetChannel(conn *rmq.Connection, ctx context.Context) bool {
for {
consumer.isReady = false

err := consumer.setupChannel(conn, ctx)
if err != nil {
fmt.Println(err)

select {
case <-ctx.Done():
fmt.Println("Consumer ctx canceled")
return true

case rmqError := <-consumer.onConnClosed:
if rmqError.Recover {
fmt.Println("channel can't recover error")
return true
}

fmt.Println("ResetChannel err:", rmqError)
return false
case <-time.After(rechannelDelay):
}

continue
}

return false
}
}

func (consumer *Consumer) setupChannel(conn *rmq.Connection, ctx context.Context) error {
// TODO: create multiple chanels?
channel, err := conn.Channel()
if err != nil {
return err
}

queueDeliveries := make(map[string]<-chan rmq.Delivery)
for i := range consumer.queues {
queue, err := channel.QueueDeclare(consumer.queues[i], true, false, false, false, nil)
if err != nil {
return err
}

deliveries, err := channel.Consume(
queue.Name,
consumer.consumerTag,
false,
false,
false,
false,
nil,
)

if err != nil {
return err
}

queueDeliveries[queue.Name] = deliveries
}

listener := NewQueuesListener(queueDeliveries, consumer.blockstream, ctx)
consumer.queuesListener = listener

consumer.changeChannel(channel)
consumer.isReady = true
return nil
}

func (consumer *Consumer) changeChannel(channel *rmq.Channel) {
consumer.channel = channel

closeNotifer := make(chan *rmq.Error)
consumer.onChanClosed = channel.NotifyClose(closeNotifer)
}

func (consumer *Consumer) Close(ctx context.Context) error {
if !consumer.isReady {
return errAlreadyClosed
}

// shut down goroutines
consumer.contextCancelFunc()

err := consumer.channel.Close()
if err != nil {
return err
}

err = consumer.connection.Close()
if err != nil {
return err
}
consumer.isReady = false
return nil
}

func (consumer Consumer) GetBlockStream() <-chan BlockData {
return consumer.blockstream
}
20 changes: 19 additions & 1 deletion consumer/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,22 @@ module github.com/NethermindEth/near-sffl/comsumer

go 1.19

require github.com/rabbitmq/amqp091-go v1.9.0 // indirect
require (
github.com/bits-and-blooms/bitset v1.10.0 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
github.com/consensys/bavard v0.1.13 // indirect
github.com/consensys/gnark-crypto v0.12.1 // indirect
github.com/crate-crypto/go-kzg-4844 v0.7.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/ethereum/c-kzg-4844 v0.4.0 // indirect
github.com/ethereum/go-ethereum v1.13.11 // indirect
github.com/holiman/uint256 v1.2.4 // indirect
github.com/mmcloughlin/addchain v0.4.0 // indirect
github.com/rabbitmq/amqp091-go v1.9.0 // indirect
github.com/supranational/blst v0.3.11 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sys v0.16.0 // indirect
rsc.io/tmplfunc v0.0.3 // indirect
)
35 changes: 35 additions & 0 deletions consumer/go.sum
Original file line number Diff line number Diff line change
@@ -1,17 +1,52 @@
github.com/bits-and-blooms/bitset v1.10.0 h1:ePXTeiPEazB5+opbv5fr8umg2R/1NlzgDsyepwsSr88=
github.com/bits-and-blooms/bitset v1.10.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPxiMSCF5k=
github.com/btcsuite/btcd/btcec/v2 v2.2.0/go.mod h1:U7MHm051Al6XmscBQ0BoNydpOTsFAn707034b5nY8zU=
github.com/consensys/bavard v0.1.13 h1:oLhMLOFGTLdlda/kma4VOJazblc7IM5y5QPd2A/YjhQ=
github.com/consensys/bavard v0.1.13/go.mod h1:9ItSMtA/dXMAiL7BG6bqW2m3NdSEObYWoH223nGHukI=
github.com/consensys/gnark-crypto v0.12.1 h1:lHH39WuuFgVHONRl3J0LRBtuYdQTumFSDtJF7HpyG8M=
github.com/consensys/gnark-crypto v0.12.1/go.mod h1:v2Gy7L/4ZRosZ7Ivs+9SfUDr0f5UlG+EM5t7MPHiLuY=
github.com/crate-crypto/go-kzg-4844 v0.7.0 h1:C0vgZRk4q4EZ/JgPfzuSoxdCq3C3mOZMBShovmncxvA=
github.com/crate-crypto/go-kzg-4844 v0.7.0/go.mod h1:1kMhvPgI0Ky3yIa+9lFySEBUBXkYxeOi8ZF1sYioxhc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/decred/dcrd/crypto/blake256 v1.0.0/go.mod h1:sQl2p6Y26YV+ZOcSTP6thNdn47hh8kt6rqSlvmrXFAc=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 h1:YLtO71vCjJRCBcrPMtQ9nqBsqpA1m5sE92cU+pd5Mcc=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1/go.mod h1:hyedUtir6IdtD/7lIxGeCxkaw7y45JueMRL4DIyJDKs=
github.com/ethereum/c-kzg-4844 v0.4.0 h1:3MS1s4JtA868KpJxroZoepdV0ZKBp3u/O5HcZ7R3nlY=
github.com/ethereum/c-kzg-4844 v0.4.0/go.mod h1:VewdlzQmpT5QSrVhbBuGoCdFJkpaJlO1aQputP83wc0=
github.com/ethereum/go-ethereum v1.13.11 h1:b51Dsm+rEg7anFRUMGB8hODXHvNfcRKzz9vcj8wSdUs=
github.com/ethereum/go-ethereum v1.13.11/go.mod h1:gFtlVORuUcT+UUIcJ/veCNjkuOSujCi338uSHJrYAew=
github.com/google/subcommands v1.2.0/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk=
github.com/holiman/uint256 v1.2.4 h1:jUc4Nk8fm9jZabQuqr2JzednajVmBpC+oiTiXZJEApU=
github.com/holiman/uint256 v1.2.4/go.mod h1:EOMSn4q6Nyt9P6efbI3bueV4e1b3dGlUCXeiRV4ng7E=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/mmcloughlin/addchain v0.4.0 h1:SobOdjm2xLj1KkXN5/n0xTIWyZA2+s99UCY1iPfkHRY=
github.com/mmcloughlin/addchain v0.4.0/go.mod h1:A86O+tHqZLMNO4w6ZZ4FlVQEadcoqkyU72HC5wJ4RlU=
github.com/mmcloughlin/profile v0.1.1/go.mod h1:IhHD7q1ooxgwTgjxQYkACGA77oFTDdFVejUS1/tS/qU=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo=
github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/supranational/blst v0.3.11 h1:LyU6FolezeWAhvQk0k6O/d49jqgO52MSDDfYgbeoEm4=
github.com/supranational/blst v0.3.11/go.mod h1:jZJtfjgudtNl4en1tzwPIV3KjUnQUvG3/j+w+fVonLw=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa h1:FRnLl4eNAQl8hwxVVC17teOw8kdjVDVAiFMtgUdTSRQ=
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa/go.mod h1:zk2irFbV9DP96SEBUUAy67IdHUaZuSnrz1n472HUCLE=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
rsc.io/tmplfunc v0.0.3 h1:53XFQh69AfOa8Tw0Jm7t+GV7KZhOi6jzsCzTtKbMvzU=
rsc.io/tmplfunc v0.0.3/go.mod h1:AG3sTPzElb1Io3Yg4voV9AGZJuleGAwaVRxL9M49PhA=
Loading

0 comments on commit 1de3de6

Please sign in to comment.