From b39515a49942bcf57bfe7fefc03e04fe698e7c5c Mon Sep 17 00:00:00 2001 From: edwin Date: Thu, 1 Feb 2024 22:47:11 +0700 Subject: [PATCH] feat: return block-stream handle insteat of raw deliveries --- consumer/go.mod | 20 ++++++++++- consumer/go.sum | 35 ++++++++++++++++++ consumer/main.go | 92 +++++++++++++++++++++--------------------------- 3 files changed, 95 insertions(+), 52 deletions(-) diff --git a/consumer/go.mod b/consumer/go.mod index cbe1ac2b..7f8c40ba 100644 --- a/consumer/go.mod +++ b/consumer/go.mod @@ -2,4 +2,22 @@ module github.com/NethermindEth/near-sffl/comsumer go 1.19 -require github.com/rabbitmq/amqp091-go v1.9.0 // indirect +require ( + github.com/bits-and-blooms/bitset v1.10.0 // indirect + github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect + github.com/consensys/bavard v0.1.13 // indirect + github.com/consensys/gnark-crypto v0.12.1 // indirect + github.com/crate-crypto/go-kzg-4844 v0.7.0 // indirect + github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect + github.com/ethereum/c-kzg-4844 v0.4.0 // indirect + github.com/ethereum/go-ethereum v1.13.11 // indirect + github.com/holiman/uint256 v1.2.4 // indirect + github.com/mmcloughlin/addchain v0.4.0 // indirect + github.com/rabbitmq/amqp091-go v1.9.0 // indirect + github.com/supranational/blst v0.3.11 // indirect + golang.org/x/crypto v0.17.0 // indirect + golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect + golang.org/x/sync v0.5.0 // indirect + golang.org/x/sys v0.16.0 // indirect + rsc.io/tmplfunc v0.0.3 // indirect +) diff --git a/consumer/go.sum b/consumer/go.sum index 00c627da..cb847a33 100644 --- a/consumer/go.sum +++ b/consumer/go.sum @@ -1,8 +1,31 @@ +github.com/bits-and-blooms/bitset v1.10.0 h1:ePXTeiPEazB5+opbv5fr8umg2R/1NlzgDsyepwsSr88= +github.com/bits-and-blooms/bitset v1.10.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= +github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPxiMSCF5k= +github.com/btcsuite/btcd/btcec/v2 v2.2.0/go.mod h1:U7MHm051Al6XmscBQ0BoNydpOTsFAn707034b5nY8zU= +github.com/consensys/bavard v0.1.13 h1:oLhMLOFGTLdlda/kma4VOJazblc7IM5y5QPd2A/YjhQ= +github.com/consensys/bavard v0.1.13/go.mod h1:9ItSMtA/dXMAiL7BG6bqW2m3NdSEObYWoH223nGHukI= +github.com/consensys/gnark-crypto v0.12.1 h1:lHH39WuuFgVHONRl3J0LRBtuYdQTumFSDtJF7HpyG8M= +github.com/consensys/gnark-crypto v0.12.1/go.mod h1:v2Gy7L/4ZRosZ7Ivs+9SfUDr0f5UlG+EM5t7MPHiLuY= +github.com/crate-crypto/go-kzg-4844 v0.7.0 h1:C0vgZRk4q4EZ/JgPfzuSoxdCq3C3mOZMBShovmncxvA= +github.com/crate-crypto/go-kzg-4844 v0.7.0/go.mod h1:1kMhvPgI0Ky3yIa+9lFySEBUBXkYxeOi8ZF1sYioxhc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/decred/dcrd/crypto/blake256 v1.0.0/go.mod h1:sQl2p6Y26YV+ZOcSTP6thNdn47hh8kt6rqSlvmrXFAc= +github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 h1:YLtO71vCjJRCBcrPMtQ9nqBsqpA1m5sE92cU+pd5Mcc= +github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1/go.mod h1:hyedUtir6IdtD/7lIxGeCxkaw7y45JueMRL4DIyJDKs= +github.com/ethereum/c-kzg-4844 v0.4.0 h1:3MS1s4JtA868KpJxroZoepdV0ZKBp3u/O5HcZ7R3nlY= +github.com/ethereum/c-kzg-4844 v0.4.0/go.mod h1:VewdlzQmpT5QSrVhbBuGoCdFJkpaJlO1aQputP83wc0= +github.com/ethereum/go-ethereum v1.13.11 h1:b51Dsm+rEg7anFRUMGB8hODXHvNfcRKzz9vcj8wSdUs= +github.com/ethereum/go-ethereum v1.13.11/go.mod h1:gFtlVORuUcT+UUIcJ/veCNjkuOSujCi338uSHJrYAew= +github.com/google/subcommands v1.2.0/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk= +github.com/holiman/uint256 v1.2.4 h1:jUc4Nk8fm9jZabQuqr2JzednajVmBpC+oiTiXZJEApU= +github.com/holiman/uint256 v1.2.4/go.mod h1:EOMSn4q6Nyt9P6efbI3bueV4e1b3dGlUCXeiRV4ng7E= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/mmcloughlin/addchain v0.4.0 h1:SobOdjm2xLj1KkXN5/n0xTIWyZA2+s99UCY1iPfkHRY= +github.com/mmcloughlin/addchain v0.4.0/go.mod h1:A86O+tHqZLMNO4w6ZZ4FlVQEadcoqkyU72HC5wJ4RlU= +github.com/mmcloughlin/profile v0.1.1/go.mod h1:IhHD7q1ooxgwTgjxQYkACGA77oFTDdFVejUS1/tS/qU= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo= github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc= @@ -10,8 +33,20 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/supranational/blst v0.3.11 h1:LyU6FolezeWAhvQk0k6O/d49jqgO52MSDDfYgbeoEm4= +github.com/supranational/blst v0.3.11/go.mod h1:jZJtfjgudtNl4en1tzwPIV3KjUnQUvG3/j+w+fVonLw= go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= +golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= +golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa h1:FRnLl4eNAQl8hwxVVC17teOw8kdjVDVAiFMtgUdTSRQ= +golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa/go.mod h1:zk2irFbV9DP96SEBUUAy67IdHUaZuSnrz1n472HUCLE= +golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= +golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +rsc.io/tmplfunc v0.0.3 h1:53XFQh69AfOa8Tw0Jm7t+GV7KZhOi6jzsCzTtKbMvzU= +rsc.io/tmplfunc v0.0.3/go.mod h1:AG3sTPzElb1Io3Yg4voV9AGZJuleGAwaVRxL9M49PhA= diff --git a/consumer/main.go b/consumer/main.go index b110c21c..67e0a463 100644 --- a/consumer/main.go +++ b/consumer/main.go @@ -1,11 +1,13 @@ package main import ( - "context" + // "context" "flag" "fmt" "time" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rlp" rmq "github.com/rabbitmq/amqp091-go" ) @@ -37,7 +39,7 @@ type Consumer struct { onChanClosed <-chan *rmq.Error } -func (consumer *Consumer) Reconnect(addr string) <-chan rmq.Delivery { +func (consumer *Consumer) Reconnect(addr string) <-chan types.Block { for { conn, err := consumer.connect(addr) if err != nil { @@ -46,13 +48,13 @@ func (consumer *Consumer) Reconnect(addr string) <-chan rmq.Delivery { continue } - deliveries, rmqError := consumer.ResetChannel(conn) + blockStream, rmqError := consumer.ResetChannel(conn) if rmqError != nil { fmt.Println(rmqError) continue } - return deliveries + return blockStream } } @@ -73,9 +75,9 @@ func (consumer *Consumer) changeConnection(conn *rmq.Connection) { consumer.onConnClosed = conn.NotifyClose(closeNotifier) } -func (consumer *Consumer) ResetChannel(conn *rmq.Connection) (<-chan rmq.Delivery, *rmq.Error) { +func (consumer *Consumer) ResetChannel(conn *rmq.Connection) (<-chan types.Block, *rmq.Error) { for { - deliveries, err := consumer.setupChannel(conn) + blockStream, err := consumer.setupChannel(conn) if err != nil { fmt.Println(err) @@ -88,11 +90,11 @@ func (consumer *Consumer) ResetChannel(conn *rmq.Connection) (<-chan rmq.Deliver continue } - return deliveries, nil + return blockStream, nil } } -func (consumer *Consumer) setupChannel(conn *rmq.Connection) (<-chan rmq.Delivery, error) { +func (consumer *Consumer) setupChannel(conn *rmq.Connection) (<-chan types.Block, error) { channel, err := conn.Channel() if err != nil { return nil, err @@ -116,8 +118,16 @@ func (consumer *Consumer) setupChannel(conn *rmq.Connection) (<-chan rmq.Deliver return nil, err } + blockStream := make(chan types.Block) + + // TODO: improve logic + // client on start right away returns a channel and + // calls reconnect in backgroudn + // Also fix situation with onConnClosed(not used anymore) + go porcessDeliveries(blockStream, deliveries) + consumer.changeChannel(channel) - return deliveries, nil + return blockStream, nil } func (consumer *Consumer) changeChannel(channel *rmq.Channel) { @@ -127,6 +137,25 @@ func (consumer *Consumer) changeChannel(channel *rmq.Channel) { consumer.onChanClosed = channel.NotifyClose(closeNotifer) } +func porcessDeliveries(blocksCh chan<- types.Block, deliveries <-chan rmq.Delivery) { + defer close(blocksCh) + + for { + d, ok := <-deliveries + if !ok { + break + } + var block types.Block + + // Decode block + if err := rlp.DecodeBytes(d.Body, &block); err != nil { + break + } + + blocksCh <- block + } +} + func parse() consumerData { addr := flag.String(rmqAddressF, defaultRmqAddress, "RMQ address(required)") consumerTag := flag.String(rmqConsumerTagF, defaultConsumerTag, "Consumer tag") @@ -144,19 +173,6 @@ func parse() consumerData { } } -// TODO -func porcessDeliveries(stream <-chan []byte, ctx context.Context) { - for { - select { - case data := <-stream: - fmt.Println("accepted data", data) - case <-ctx.Done(): - fmt.Println("done") - break - } - } -} - func main() { consumerData := parse() consumer := Consumer{ @@ -167,36 +183,10 @@ func main() { channel: nil, onChanClosed: nil, } - deliveries := consumer.Reconnect(consumerData.addr) - - ctx, cancel := context.WithCancel(context.TODO()) - - dataStream := make(chan []byte, 10) - go porcessDeliveries(dataStream, ctx) + blockStream := consumer.Reconnect(consumerData.addr) for { - select { - case d := <-deliveries: - dataStream <- d.Body - d.Ack(true) - - case rmqError := <-consumer.onChanClosed: - fmt.Println(rmqError) - if !rmqError.Recover { - defer cancel() - break - } - - deliveries = consumer.Reconnect(consumerData.addr) - - case rmqError := <-consumer.onConnClosed: - fmt.Println(rmqError) - if !rmqError.Recover { - defer cancel() - break - } - - deliveries = consumer.Reconnect(consumerData.addr) - } + block := <-blockStream + fmt.Println(block) } }