Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer #11

Merged
merged 34 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
dafa328
feat: init consumer
taco-paco Jan 31, 2024
c93e07c
feat: Consumer introduced
taco-paco Jan 31, 2024
c789803
fix: .mod & .sum files
taco-paco Jan 31, 2024
b39515a
feat: return block-stream handle insteat of raw deliveries
taco-paco Feb 1, 2024
9d59b42
feat: Consumer listens to miltiple queues in async and return blockst…
taco-paco Feb 5, 2024
97488d8
fix: blockstream is passed to QueueListener from Consumer. Some logging
taco-paco Feb 5, 2024
1de3de6
Merge pull request #13 from NethermindEth/feat/block-stream
taco-paco Feb 5, 2024
7a89f0e
Merge branch 'main' into feat/consumer
taco-paco Feb 5, 2024
728f19b
Merge branch 'feat/consumer' of github.com:NethermindEth/near-sffl in…
taco-paco Feb 5, 2024
df0535e
fix: resolve conflicts
taco-paco Feb 5, 2024
650d2a2
feat: Add logger to Consumer and QueuesListener
taco-paco Feb 5, 2024
49f3975
fix: renamed NetworkId to RollupId
taco-paco Feb 5, 2024
5f713b2
refactor: Remove unnecessary TODO
Hyodar Feb 5, 2024
aa466ad
feat: Change context.TODO() to context.Background()
Hyodar Feb 5, 2024
35caa06
refactor: Move cli to consumer/cmd package
Hyodar Feb 5, 2024
4359adb
feat: Add rollup IDs to ConsumerConfig & refactor QueuesListener func…
Hyodar Feb 5, 2024
bced24b
refactor: Slightly reword and rename double close error
Hyodar Feb 5, 2024
e607300
refactor: Remove unnecessary format logs
Hyodar Feb 5, 2024
fce20af
refactor: Remove TODO and add rollup ID to invalid block warning log
Hyodar Feb 5, 2024
7542b75
refactor: Remove some unnecessary TODOs
Hyodar Feb 5, 2024
ef4f35f
refactor: Standardize channel names
Hyodar Feb 5, 2024
db973ea
refactor: Use urfave/cli for CLI utils
Hyodar Feb 5, 2024
6a6c9a5
feat: Add "rollup" prefix to queue names
Hyodar Feb 5, 2024
a6c056a
refactor: Standardize channel names
Hyodar Feb 5, 2024
9849012
feat: Add consumer to operator
Hyodar Feb 5, 2024
cc77726
test: Add mock consumer to mock operator
Hyodar Feb 5, 2024
486b745
fix: Disable multiple ack
Hyodar Feb 5, 2024
6e2805e
refactor: Change constants casing to UPPER_CASE
Hyodar Feb 5, 2024
c5500cf
feat: Added back queues_listener. Moved cmd into consumer package
taco-paco Feb 6, 2024
13d4761
fix: operator compilation
taco-paco Feb 6, 2024
deaca80
fix: registration_test compilation
taco-paco Feb 6, 2024
c6d0a47
chore: Remove unnecessary entry from .gitignore
Hyodar Feb 6, 2024
b2d6eea
test: Remove Consumerer mock generation
Hyodar Feb 6, 2024
9314ac2
refactor: Rename queueDeliveries to queueDeliveryCs
Hyodar Feb 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ target

# IDE
.idea
.vscode

# just for example
id_rsa
237 changes: 237 additions & 0 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
package consumer

import (
"context"
"errors"
"fmt"
"time"

"github.com/ethereum/go-ethereum/core/types"
rmq "github.com/rabbitmq/amqp091-go"
)

const (
reconnectDelay = 3 * time.Second
rechannelDelay = 2 * time.Second
)

var (
// TODO:
QueueNamesToNetworkId = map[string]uint{
"da-mq": 0,
// Add mappings
}
Hyodar marked this conversation as resolved.
Show resolved Hide resolved

errAlreadyClosed = errors.New("already closed: not connected to the server")
)

type ConsumerConfig struct {
Addr string
ConsumerTag string
QueueNames []string
}

type BlockData struct {
NetworkId uint
Hyodar marked this conversation as resolved.
Show resolved Hide resolved
Block types.Block
}

// TODO: add logger
Hyodar marked this conversation as resolved.
Show resolved Hide resolved
type Consumer struct {
consumerTag string
blockstream chan BlockData

isReady bool
contextCancelFunc context.CancelFunc
connection *rmq.Connection
onConnClosed <-chan *rmq.Error
channel *rmq.Channel
onChanClosed <-chan *rmq.Error

queues []string
queuesListener QueuesListener
}

// TODO: Pass default queues in config?
func NewConsumer(config ConsumerConfig) Consumer {
taco-paco marked this conversation as resolved.
Show resolved Hide resolved
// TODO: context.TODO() or background?
ctx, cancel := context.WithCancel(context.TODO())
Hyodar marked this conversation as resolved.
Show resolved Hide resolved
consumer := Consumer{
consumerTag: config.ConsumerTag,
queues: config.QueueNames,
blockstream: make(chan BlockData),
contextCancelFunc: cancel,
}

go consumer.Reconnect(config.Addr, ctx)
return consumer
}

func (consumer *Consumer) Reconnect(addr string, ctx context.Context) {
for {
fmt.Println("Reconnecting...")
Hyodar marked this conversation as resolved.
Show resolved Hide resolved

consumer.isReady = false
conn, err := consumer.connect(addr)
if err != nil {
fmt.Println(err)

select {
case <-ctx.Done():
return
case <-time.After(reconnectDelay):
}

continue
}

if done := consumer.ResetChannel(conn, ctx); done {
return
}

fmt.Println("Connected")
select {
case err := <-ctx.Done():
ctx.Err()
fmt.Println(err)
// deref cancel smth?
break

case err := <-consumer.onConnClosed:
if !err.Recover {
fmt.Println(err)
break
}

fmt.Println("Connection closed with err:", err, "Reconnecting...")

case err := <-consumer.onChanClosed:
if !err.Recover {
fmt.Println(err)
break
}

// TODO: Reconnect not the whole connection but just a channel?
taco-paco marked this conversation as resolved.
Show resolved Hide resolved
fmt.Println("Channel closed with err:", err, "Reconnecting...")
}
}
}

func (consumer *Consumer) connect(addr string) (*rmq.Connection, error) {
conn, err := rmq.Dial(addr)
if err != nil {
return nil, err
}

consumer.changeConnection(conn)
return conn, nil
}

func (consumer *Consumer) changeConnection(conn *rmq.Connection) {
consumer.connection = conn

closeNotifier := make(chan *rmq.Error)
consumer.onConnClosed = conn.NotifyClose(closeNotifier)
}

func (consumer *Consumer) ResetChannel(conn *rmq.Connection, ctx context.Context) bool {
for {
consumer.isReady = false

err := consumer.setupChannel(conn, ctx)
if err != nil {
fmt.Println(err)

select {
case <-ctx.Done():
fmt.Println("Consumer ctx canceled")
return true

case rmqError := <-consumer.onConnClosed:
if rmqError.Recover {
fmt.Println("channel can't recover error")
return true
}

fmt.Println("ResetChannel err:", rmqError)
return false
case <-time.After(rechannelDelay):
}

continue
}

return false
}
}

func (consumer *Consumer) setupChannel(conn *rmq.Connection, ctx context.Context) error {
// TODO: create multiple chanels?
channel, err := conn.Channel()
if err != nil {
return err
}

queueDeliveries := make(map[string]<-chan rmq.Delivery)
for i := range consumer.queues {
queue, err := channel.QueueDeclare(consumer.queues[i], true, false, false, false, nil)
if err != nil {
return err
}

deliveries, err := channel.Consume(
queue.Name,
consumer.consumerTag,
false,
false,
false,
false,
nil,
)

if err != nil {
return err
}

queueDeliveries[queue.Name] = deliveries
}

listener := NewQueuesListener(queueDeliveries, consumer.blockstream, ctx)
consumer.queuesListener = listener

consumer.changeChannel(channel)
consumer.isReady = true
return nil
}

func (consumer *Consumer) changeChannel(channel *rmq.Channel) {
consumer.channel = channel

closeNotifer := make(chan *rmq.Error)
consumer.onChanClosed = channel.NotifyClose(closeNotifer)
}

func (consumer *Consumer) Close(ctx context.Context) error {
if !consumer.isReady {
return errAlreadyClosed
}

// shut down goroutines
consumer.contextCancelFunc()

err := consumer.channel.Close()
if err != nil {
return err
}

err = consumer.connection.Close()
if err != nil {
return err
}
consumer.isReady = false
return nil
}

func (consumer Consumer) GetBlockStream() <-chan BlockData {
return consumer.blockstream
}
67 changes: 67 additions & 0 deletions consumer/queue_listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package consumer

import (
"context"
"fmt"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
rmq "github.com/rabbitmq/amqp091-go"
)

// TODO: rename to deliveries smth?
type QueuesListener struct {
blockstream chan<- BlockData
queueDeliveries map[string]<-chan rmq.Delivery
}

func NewQueuesListener(deliveries map[string]<-chan rmq.Delivery, blockstream chan<- BlockData, ctx context.Context) QueuesListener {
taco-paco marked this conversation as resolved.
Show resolved Hide resolved
listener := QueuesListener{
blockstream: blockstream,
queueDeliveries: deliveries,
}

go listener.initListeners(ctx)
return listener
}

func (listener *QueuesListener) Add(name string, stream <-chan rmq.Delivery, ctx context.Context) {
go listener.listen(name, stream, ctx)
}

func (listener *QueuesListener) listen(name string, stream <-chan rmq.Delivery, ctx context.Context) {
id := QueueNamesToNetworkId[name]
for {
select {
case d, ok := <-stream:
if !ok {
fmt.Println("deliveries channel close, network id:", id)
break
}

fmt.Println("New delivery, network id:", id)

var block types.Block
if err := rlp.DecodeBytes(d.Body, &block); err != nil {
// TODO: pass error smwr
fmt.Println("invalid block")
continue
}

// TODO: case with multiple consumers from same queue
listener.blockstream <- BlockData{NetworkId: id, Block: block}
d.Ack(true)

case <-ctx.Done():
fmt.Println("context shutdown")
// TODO: some closing and canceling here
break
}
}
}

func (listener *QueuesListener) initListeners(ctx context.Context) {
for name, ch := range listener.queueDeliveries {
listener.Add(name, ch, ctx)
}
}
59 changes: 59 additions & 0 deletions consumer/test/cli/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package main

import (
"flag"
"fmt"

"github.com/NethermindEth/near-sffl/consumer"
)

const (
rmqAddressF = "rmq-address"
rmqConsumerTagF = "consumer-tag"

defaultRmqAddress = ""
defaultConsumerTag = "da-consumer"
)

var (
defaultQueues = compilerDefaultQueues()
)

func compilerDefaultQueues() []string {
keys := make([]string, 0, len(consumer.QueueNamesToNetworkId))
for k := range consumer.QueueNamesToNetworkId {
keys = append(keys, k)
}

return keys
}

func parse() consumer.ConsumerConfig {
addr := flag.String(rmqAddressF, defaultRmqAddress, "RMQ address(required)")
consumerTag := flag.String(rmqConsumerTagF, defaultConsumerTag, "Consumer tag")

flag.Parse()

if *addr == "" {
flag.Usage()
panic("rmq-address is required")
}

return consumer.ConsumerConfig{
Addr: *addr,
ConsumerTag: *consumerTag,
QueueNames: defaultQueues,
}
}

func main() {
config := parse()
consumer := consumer.NewConsumer(config)

blockStream := consumer.GetBlockStream()

for {
block := <-blockStream
fmt.Println(block)
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ require (
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.46.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/rabbitmq/amqp091-go v1.9.0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
Expand Down
Loading
Loading