Skip to content

Commit

Permalink
feat: Add logger to Consumer and QueuesListener
Browse files Browse the repository at this point in the history
  • Loading branch information
taco-paco committed Feb 5, 2024
1 parent df0535e commit 650d2a2
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 27 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*.dll
*.so
*.dylib
consumer/test/cli/main

# Test binary, built with `go test -c`
*.test
Expand Down
43 changes: 24 additions & 19 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package consumer
import (
"context"
"errors"
"fmt"
"time"

"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/ethereum/go-ethereum/core/types"
rmq "github.com/rabbitmq/amqp091-go"
)
Expand Down Expand Up @@ -41,26 +41,29 @@ type Consumer struct {
consumerTag string
blockstream chan BlockData

queues []string
queuesListener QueuesListener

isReady bool
contextCancelFunc context.CancelFunc
connection *rmq.Connection
onConnClosed <-chan *rmq.Error
channel *rmq.Channel
onChanClosed <-chan *rmq.Error

queues []string
queuesListener QueuesListener
logger logging.Logger
}

// TODO: Pass default queues in config?
func NewConsumer(config ConsumerConfig) Consumer {
func NewConsumer(config ConsumerConfig, logger logging.Logger) Consumer {
// TODO: context.TODO() or background?
ctx, cancel := context.WithCancel(context.TODO())
consumer := Consumer{
consumerTag: config.ConsumerTag,
queues: config.QueueNames,
blockstream: make(chan BlockData),
contextCancelFunc: cancel,
logger: logger,
}

go consumer.Reconnect(config.Addr, ctx)
Expand All @@ -69,15 +72,16 @@ func NewConsumer(config ConsumerConfig) Consumer {

func (consumer *Consumer) Reconnect(addr string, ctx context.Context) {
for {
fmt.Println("Reconnecting...")
consumer.logger.Infof("Reconnecting...")

consumer.isReady = false
conn, err := consumer.connect(addr)
if err != nil {
fmt.Println(err)
consumer.logger.Warn("Connection setup failed", "err", err)

select {
case <-ctx.Done():
consumer.logger.Info("Consumer context canceled")
return
case <-time.After(reconnectDelay):
}
Expand All @@ -89,30 +93,30 @@ func (consumer *Consumer) Reconnect(addr string, ctx context.Context) {
return
}

fmt.Println("Connected")
consumer.logger.Infof("Connected")

select {
case err := <-ctx.Done():
ctx.Err()
fmt.Println(err)
case <-ctx.Done():
consumer.logger.Info("Consumer context canceled")
// deref cancel smth?
break

case err := <-consumer.onConnClosed:
if !err.Recover {
fmt.Println(err)
consumer.logger.Error("Can't recover connection", "err", err)
break
}

fmt.Println("Connection closed with err:", err, "Reconnecting...")
consumer.logger.Warnf("Recovering connection, closed with:", "err", err)

case err := <-consumer.onChanClosed:
if !err.Recover {
fmt.Println(err)
consumer.logger.Error("Can't recover connection", "err", err)
break
}

// TODO: Reconnect not the whole connection but just a channel?
fmt.Println("Channel closed with err:", err, "Reconnecting...")
consumer.logger.Warnf("Reconnecting channel, closed with:", "err", err)
}
}
}
Expand Down Expand Up @@ -140,20 +144,20 @@ func (consumer *Consumer) ResetChannel(conn *rmq.Connection, ctx context.Context

err := consumer.setupChannel(conn, ctx)
if err != nil {
fmt.Println(err)
consumer.logger.Warn("Channel setup failed", "err", err)

select {
case <-ctx.Done():
fmt.Println("Consumer ctx canceled")
consumer.logger.Info("Consumer context canceled")
return true

case rmqError := <-consumer.onConnClosed:
if rmqError.Recover {
fmt.Println("channel can't recover error")
consumer.logger.Error("Can't recover connection", "err", err)
return true
}

fmt.Println("ResetChannel err:", rmqError)
consumer.logger.Warnf("Recovering connection, closed with:", "err", err)
return false
case <-time.After(rechannelDelay):
}
Expand Down Expand Up @@ -196,7 +200,7 @@ func (consumer *Consumer) setupChannel(conn *rmq.Connection, ctx context.Context
queueDeliveries[queue.Name] = deliveries
}

listener := NewQueuesListener(queueDeliveries, consumer.blockstream, ctx)
listener := NewQueuesListener(queueDeliveries, consumer.blockstream, consumer.logger, ctx)
consumer.queuesListener = listener

consumer.changeChannel(channel)
Expand Down Expand Up @@ -228,6 +232,7 @@ func (consumer *Consumer) Close(ctx context.Context) error {
if err != nil {
return err
}

consumer.isReady = false
return nil
}
Expand Down
16 changes: 9 additions & 7 deletions consumer/queue_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,25 @@ package consumer

import (
"context"
"fmt"

"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
rmq "github.com/rabbitmq/amqp091-go"
)

// TODO: rename to deliveries smth?
type QueuesListener struct {
blockstream chan<- BlockData
queueDeliveries map[string]<-chan rmq.Delivery

logger logging.Logger
}

func NewQueuesListener(deliveries map[string]<-chan rmq.Delivery, blockstream chan<- BlockData, ctx context.Context) QueuesListener {
func NewQueuesListener(deliveries map[string]<-chan rmq.Delivery, blockstream chan<- BlockData, logger logging.Logger, ctx context.Context) QueuesListener {
listener := QueuesListener{
blockstream: blockstream,
queueDeliveries: deliveries,
logger: logger,
}

go listener.initListeners(ctx)
Expand All @@ -35,16 +37,16 @@ func (listener *QueuesListener) listen(name string, stream <-chan rmq.Delivery,
select {
case d, ok := <-stream:
if !ok {
fmt.Println("deliveries channel close, network id:", id)
listener.logger.Info("Deliveries channel close, network", "id", id)
break
}

fmt.Println("New delivery, network id:", id)
listener.logger.Info("New delivery, network", "id", id)

var block types.Block
if err := rlp.DecodeBytes(d.Body, &block); err != nil {
// TODO: pass error smwr
fmt.Println("invalid block")
listener.logger.Warn("Invalid block")
continue
}

Expand All @@ -53,7 +55,7 @@ func (listener *QueuesListener) listen(name string, stream <-chan rmq.Delivery,
d.Ack(true)

case <-ctx.Done():
fmt.Println("context shutdown")
listener.logger.Info("Consumer context canceled")
// TODO: some closing and canceling here
break
}
Expand Down
8 changes: 7 additions & 1 deletion consumer/test/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"flag"
"fmt"

"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/NethermindEth/near-sffl/consumer"
)

Expand Down Expand Up @@ -48,8 +49,13 @@ func parse() consumer.ConsumerConfig {

func main() {
config := parse()
consumer := consumer.NewConsumer(config)
logLevel := logging.Development
logger, err := logging.NewZapLogger(logLevel)
if err != nil {
panic(err)
}

consumer := consumer.NewConsumer(config, logger)
blockStream := consumer.GetBlockStream()

for {
Expand Down

0 comments on commit 650d2a2

Please sign in to comment.