From 5388cd249dea67c67406768a469cbe9b866ebb40 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Mon, 11 Jan 2021 21:34:44 +0530 Subject: [PATCH 01/10] graceful unsubscription when client gets disconnected from real-time websocket connection --- app/rest/rest.go | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/app/rest/rest.go b/app/rest/rest.go index 8bb123d1..245412a1 100644 --- a/app/rest/rest.go +++ b/app/rest/rest.go @@ -1099,6 +1099,33 @@ func RunHTTPServer(_db *gorm.DB, _status *d.StatusHolder, _redisClient *redis.Cl // or unsubscrribed from topics := make(map[string]ps.Consumer) + // When returning from this execution scope, unsubscribing client + // from all topics it might have subscribed to during it's life time + // + // Just attempting to do a graceful unsubscription + defer func() { + + for _, v := range topics { + + if v, ok := v.(*ps.BlockConsumer); ok { + v.Request.Type = "unsubscribe" + continue + } + + if v, ok := v.(*ps.TransactionConsumer); ok { + v.Request.Type = "unsubscribe" + continue + } + + if v, ok := v.(*ps.EventConsumer); ok { + v.Request.Type = "unsubscribe" + continue + } + + } + + }() + // Communication with client handling logic for { var req ps.SubscriptionRequest @@ -1170,7 +1197,7 @@ func RunHTTPServer(_db *gorm.DB, _status *d.StatusHolder, _redisClient *redis.Cl } } - topics[req.Name] = nil + delete(topics, req.Name) } } }) From ecbf6146c54ee1a22be1dd926f5083db81ceb164 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Mon, 11 Jan 2021 21:56:35 +0530 Subject: [PATCH 02/10] new function all consumers need to implement for going through a graceful unsubscription process --- app/pubsub/consumption.go | 1 + 1 file changed, 1 insertion(+) diff --git a/app/pubsub/consumption.go b/app/pubsub/consumption.go index 4a9fd851..ab229905 100644 --- a/app/pubsub/consumption.go +++ b/app/pubsub/consumption.go @@ -13,6 +13,7 @@ type Consumer interface { Listen() Send(msg string) bool SendData(data interface{}) bool + Unsubscribe() } // NewBlockConsumer - Creating one new block data consumer, which will subscribe to block From 59d3bcbec0607b4ac99f8340838a27b428fc9a26 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Mon, 11 Jan 2021 21:57:59 +0530 Subject: [PATCH 03/10] implemented graceful unsubscription from pubsub topic for block consumers --- app/pubsub/block.go | 65 ++++++++++++++++++++++----------------------- 1 file changed, 32 insertions(+), 33 deletions(-) diff --git a/app/pubsub/block.go b/app/pubsub/block.go index f2d9be75..0efa2393 100644 --- a/app/pubsub/block.go +++ b/app/pubsub/block.go @@ -3,6 +3,7 @@ package pubsub import ( "context" "encoding/json" + "fmt" "log" "time" @@ -35,6 +36,11 @@ func (b *BlockConsumer) Subscribe() { // and reads data from subcribed channel, which also gets delivered to client application func (b *BlockConsumer) Listen() { + // When ever returning from this function's + // execution context, client will be unsubscribed from + // pubsub topic i.e. `block` topic in this case + defer b.Unsubscribe() + for { // Checking if client is still subscribed to this topic @@ -42,19 +48,7 @@ func (b *BlockConsumer) Listen() { // // If not, we're cancelling this subscription if b.Request.Type == "unsubscribe" { - - if err := b.Connection.WriteJSON(&SubscriptionResponse{ - Code: 1, - Message: "Unsubscribed from `block`", - }); err != nil { - log.Printf("[!] Failed to deliver block unsubscription confirmation to client : %s\n", err.Error()) - } - - if err := b.PubSub.Unsubscribe(context.Background(), b.Request.Topic()); err != nil { - log.Printf("[!] Failed to unsubscribe from `block` topic : %s\n", err.Error()) - } break - } msg, err := b.PubSub.ReceiveTimeout(context.Background(), time.Second) @@ -96,10 +90,6 @@ func (b *BlockConsumer) Send(msg string) bool { log.Printf("[!] Failed to deliver bad API key message to client : %s\n", err.Error()) } - if err := b.PubSub.Unsubscribe(context.Background(), b.Request.Topic()); err != nil { - log.Printf("[!] Failed to unsubscribe from `block` topic : %s\n", err.Error()) - } - return false } @@ -113,10 +103,6 @@ func (b *BlockConsumer) Send(msg string) bool { log.Printf("[!] Failed to deliver bad API key message to client : %s\n", err.Error()) } - if err := b.PubSub.Unsubscribe(context.Background(), b.Request.Topic()); err != nil { - log.Printf("[!] Failed to unsubscribe from `block` topic : %s\n", err.Error()) - } - return false } @@ -132,10 +118,6 @@ func (b *BlockConsumer) Send(msg string) bool { log.Printf("[!] Failed to deliver rate limit crossed message to client : %s\n", err.Error()) } - if err := b.PubSub.Unsubscribe(context.Background(), b.Request.Topic()); err != nil { - log.Printf("[!] Failed to unsubscribe from `block` topic : %s\n", err.Error()) - } - return false } @@ -156,6 +138,7 @@ func (b *BlockConsumer) Send(msg string) bool { } return false + } // SendData - Sending message to client application, connected over websocket @@ -166,18 +149,34 @@ func (b *BlockConsumer) SendData(data interface{}) bool { if err := b.Connection.WriteJSON(data); err != nil { log.Printf("[!] Failed to deliver `block` data to client : %s\n", err.Error()) - - if err = b.PubSub.Unsubscribe(context.Background(), b.Request.Topic()); err != nil { - log.Printf("[!] Failed to unsubscribe from `block` topic : %s\n", err.Error()) - } - - if err = b.Connection.Close(); err != nil { - log.Printf("[!] Failed to close websocket connection : %s\n", err.Error()) - } - return false } log.Printf("[+] Delivered `block` data to client\n") return true + +} + +// Unsubscribe - Unsubscribe from block data publishing event this client has subscribed to +func (b *BlockConsumer) Unsubscribe() { + + if b.PubSub == nil { + log.Printf("[!] Bad attempt to unsubscribe from `%s` topic\n", b.Request.Topic()) + return + } + + if err := b.PubSub.Unsubscribe(context.Background(), b.Request.Topic()); err != nil { + log.Printf("[!] Failed to unsubscribe from `%s` topic : %s\n", b.Request.Topic(), err.Error()) + return + } + + resp := &SubscriptionResponse{ + Code: 1, + Message: fmt.Sprintf("Unsubscribed from `%s`", b.Request.Topic()), + } + + if err := b.Connection.WriteJSON(resp); err != nil { + log.Printf("[!] Failed to deliver `%s` unsubscription confirmation to client : %s\n", b.Request.Topic(), err.Error()) + } + } From 3b32d51a46f1334701384fb1a4728881c828e64d Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Mon, 11 Jan 2021 22:08:02 +0530 Subject: [PATCH 04/10] graceful subscription management of pubsub topic for transaction data consumer --- app/pubsub/transaction.go | 62 ++++++++++++++++++--------------------- 1 file changed, 29 insertions(+), 33 deletions(-) diff --git a/app/pubsub/transaction.go b/app/pubsub/transaction.go index 907e1320..b660ea46 100644 --- a/app/pubsub/transaction.go +++ b/app/pubsub/transaction.go @@ -4,6 +4,7 @@ import ( "context" "encoding/hex" "encoding/json" + "fmt" "log" "time" @@ -38,6 +39,10 @@ func (t *TransactionConsumer) Subscribe() { // and reads data from subcribed channel, which also gets delivered to client application func (t *TransactionConsumer) Listen() { + // When leaving this execution scope, attempting to unsubscribe + // client from pubsub topic, it was listening to, in a graceful fashion + defer t.Unsubscribe() + for { // Checking if client is still subscribed to this topic @@ -45,19 +50,7 @@ func (t *TransactionConsumer) Listen() { // // If not, we're cancelling this subscription if t.Request.Type == "unsubscribe" { - - if err := t.Connection.WriteJSON(&SubscriptionResponse{ - Code: 1, - Message: "Unsubscribed from `transaction`", - }); err != nil { - log.Printf("[!] Failed to deliver transaction unsubscription confirmation to client : %s\n", err.Error()) - } - - if err := t.PubSub.Unsubscribe(context.Background(), t.Request.Topic()); err != nil { - log.Printf("[!] Failed to unsubscribe from `transaction` topic : %s\n", err.Error()) - } break - } msg, err := t.PubSub.ReceiveTimeout(context.Background(), time.Second) @@ -99,10 +92,6 @@ func (t *TransactionConsumer) Send(msg string) bool { log.Printf("[!] Failed to deliver bad API key message to client : %s\n", err.Error()) } - if err := t.PubSub.Unsubscribe(context.Background(), t.Request.Topic()); err != nil { - log.Printf("[!] Failed to unsubscribe from `transaction` topic : %s\n", err.Error()) - } - return false } @@ -116,10 +105,6 @@ func (t *TransactionConsumer) Send(msg string) bool { log.Printf("[!] Failed to deliver bad API key message to client : %s\n", err.Error()) } - if err := t.PubSub.Unsubscribe(context.Background(), t.Request.Topic()); err != nil { - log.Printf("[!] Failed to unsubscribe from `transaction` topic : %s\n", err.Error()) - } - return false } @@ -135,10 +120,6 @@ func (t *TransactionConsumer) Send(msg string) bool { log.Printf("[!] Failed to deliver rate limit crossed message to client : %s\n", err.Error()) } - if err := t.PubSub.Unsubscribe(context.Background(), t.Request.Topic()); err != nil { - log.Printf("[!] Failed to unsubscribe from `transaction` topic : %s\n", err.Error()) - } - return false } @@ -219,18 +200,33 @@ func (t *TransactionConsumer) Send(msg string) bool { func (t *TransactionConsumer) SendData(data interface{}) bool { if err := t.Connection.WriteJSON(data); err != nil { log.Printf("[!] Failed to deliver `transaction` data to client : %s\n", err.Error()) - - if err = t.PubSub.Unsubscribe(context.Background(), t.Request.Topic()); err != nil { - log.Printf("[!] Failed to unsubscribe from `transaction` topic : %s\n", err.Error()) - } - - if err = t.Connection.Close(); err != nil { - log.Printf("[!] Failed to close websocket connection : %s\n", err.Error()) - } - return false } log.Printf("[+] Delivered `transaction` data to client\n") return true } + +// Unsubscribe - Unsubscribe from transactions pubsub topic, which client has subscribed to +func (t *TransactionConsumer) Unsubscribe() { + + if t.PubSub == nil { + log.Printf("[!] Bad attempt to unsubscribe from `%s` topic\n", t.Request.Topic()) + return + } + + if err := t.PubSub.Unsubscribe(context.Background(), t.Request.Topic()); err != nil { + log.Printf("[!] Failed to unsubscribe from `%s` topic : %s\n", t.Request.Topic(), err.Error()) + return + } + + resp := &SubscriptionResponse{ + Code: 1, + Message: fmt.Sprintf("Unsubscribed from `%s`", t.Request.Topic()), + } + + if err := t.Connection.WriteJSON(resp); err != nil { + log.Printf("[!] Failed to deliver `%s` unsubscription confirmation to client : %s\n", t.Request.Topic(), err.Error()) + } + +} From ccfbc23358b4bc719dd69dc232f7369e86cafd6c Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Mon, 11 Jan 2021 22:14:52 +0530 Subject: [PATCH 05/10] better subscription management while handling events --- app/pubsub/event.go | 66 ++++++++++++++++++++++----------------------- 1 file changed, 33 insertions(+), 33 deletions(-) diff --git a/app/pubsub/event.go b/app/pubsub/event.go index 794a0286..4d1ce1db 100644 --- a/app/pubsub/event.go +++ b/app/pubsub/event.go @@ -4,6 +4,7 @@ import ( "context" "encoding/hex" "encoding/json" + "fmt" "log" "time" @@ -41,6 +42,12 @@ func (e *EventConsumer) Subscribe() { // if client has subscribed to get notified on occurrence of this event func (e *EventConsumer) Listen() { + // Before leaving this execution context, attempting to + // unsubscribe client from `event` pubsub topic + // + // One attempt to unsubscribe gracefully + defer e.Unsubscribe() + for { // Checking if client is still subscribed to this topic @@ -48,19 +55,7 @@ func (e *EventConsumer) Listen() { // // If not, we're cancelling this subscription if e.Request.Type == "unsubscribe" { - - if err := e.Connection.WriteJSON(&SubscriptionResponse{ - Code: 1, - Message: "Unsubscribed from `event`", - }); err != nil { - log.Printf("[!] Failed to deliver event unsubscription confirmation to client : %s\n", err.Error()) - } - - if err := e.PubSub.Unsubscribe(context.Background(), e.Request.Topic()); err != nil { - log.Printf("[!] Failed to unsubscribe from `event` topic : %s\n", err.Error()) - } break - } msg, err := e.PubSub.ReceiveTimeout(context.Background(), time.Second) @@ -102,10 +97,6 @@ func (e *EventConsumer) Send(msg string) bool { log.Printf("[!] Failed to deliver bad API key message to client : %s\n", err.Error()) } - if err := e.PubSub.Unsubscribe(context.Background(), e.Request.Topic()); err != nil { - log.Printf("[!] Failed to unsubscribe from `event` topic : %s\n", err.Error()) - } - return false } @@ -119,10 +110,6 @@ func (e *EventConsumer) Send(msg string) bool { log.Printf("[!] Failed to deliver bad API key message to client : %s\n", err.Error()) } - if err := e.PubSub.Unsubscribe(context.Background(), e.Request.Topic()); err != nil { - log.Printf("[!] Failed to unsubscribe from `event` topic : %s\n", err.Error()) - } - return false } @@ -138,10 +125,6 @@ func (e *EventConsumer) Send(msg string) bool { log.Printf("[!] Failed to deliver rate limit crossed message to client : %s\n", err.Error()) } - if err := e.PubSub.Unsubscribe(context.Background(), e.Request.Topic()); err != nil { - log.Printf("[!] Failed to unsubscribe from `event` topic : %s\n", err.Error()) - } - return false } @@ -201,18 +184,35 @@ func (e *EventConsumer) Send(msg string) bool { func (e *EventConsumer) SendData(data interface{}) bool { if err := e.Connection.WriteJSON(data); err != nil { log.Printf("[!] Failed to deliver `event` data to client : %s\n", err.Error()) - - if err = e.PubSub.Unsubscribe(context.Background(), e.Request.Topic()); err != nil { - log.Printf("[!] Failed to unsubscribe from `event` topic : %s\n", err.Error()) - } - - if err = e.Connection.Close(); err != nil { - log.Printf("[!] Failed to close websocket connection : %s\n", err.Error()) - } - return false } log.Printf("[+] Delivered `event` data to client\n") return true } + +// Unsubscribe - Unsubscribe from event data publishing topic, to be called +// when stopping to listen data being published on this pubsub channel +// due to client has requested a unsubscription/ network connection got hampered +func (e *EventConsumer) Unsubscribe() { + + if e.PubSub == nil { + log.Printf("[!] Bad attempt to unsubscribe from `%s` topic\n", e.Request.Topic()) + return + } + + if err := e.PubSub.Unsubscribe(context.Background(), e.Request.Topic()); err != nil { + log.Printf("[!] Failed to unsubscribe from `%s` topic : %s\n", e.Request.Topic(), err.Error()) + return + } + + resp := &SubscriptionResponse{ + Code: 1, + Message: fmt.Sprintf("Unsubscribed from `%s`", e.Request.Topic()), + } + + if err := e.Connection.WriteJSON(resp); err != nil { + log.Printf("[!] Failed to deliver `%s` unsubscription confirmation to client : %s\n", e.Request.Topic(), err.Error()) + } + +} From 994c574d68e04989c4f7adc0ca27e740d882e60a Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Mon, 11 Jan 2021 22:15:59 +0530 Subject: [PATCH 06/10] updated comment --- app/pubsub/subscription.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/pubsub/subscription.go b/app/pubsub/subscription.go index 0402f93a..cfa8d567 100644 --- a/app/pubsub/subscription.go +++ b/app/pubsub/subscription.go @@ -48,7 +48,7 @@ func (s *SubscriptionRequest) GetRegex() *regexp.Regexp { } // Topic - Get main topic name to which this client is subscribing to -// i.e. {block, transaction} +// i.e. {block, transaction, event} func (s *SubscriptionRequest) Topic() string { if strings.HasPrefix(s.Name, "block") { return "block" From e5dd05335c1c5d0f943c87cc3cf97e491822ba77 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Mon, 11 Jan 2021 22:47:26 +0530 Subject: [PATCH 07/10] updated pubsub consumer structs with mutex lock, to be used for protecting critical section of code --- app/pubsub/block.go | 2 ++ app/pubsub/consumption.go | 11 ++++++++--- app/pubsub/event.go | 2 ++ app/pubsub/transaction.go | 2 ++ app/rest/rest.go | 21 +++++++++++++++------ 5 files changed, 29 insertions(+), 9 deletions(-) diff --git a/app/pubsub/block.go b/app/pubsub/block.go index 0efa2393..d28601ea 100644 --- a/app/pubsub/block.go +++ b/app/pubsub/block.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "log" + "sync" "time" "github.com/ethereum/go-ethereum/common" @@ -25,6 +26,7 @@ type BlockConsumer struct { Connection *websocket.Conn PubSub *redis.PubSub DB *gorm.DB + Lock *sync.Mutex } // Subscribe - Subscribe to `block` channel diff --git a/app/pubsub/consumption.go b/app/pubsub/consumption.go index ab229905..d6898c09 100644 --- a/app/pubsub/consumption.go +++ b/app/pubsub/consumption.go @@ -1,6 +1,8 @@ package pubsub import ( + "sync" + "github.com/ethereum/go-ethereum/common" "github.com/go-redis/redis/v8" "github.com/gorilla/websocket" @@ -19,13 +21,14 @@ type Consumer interface { // NewBlockConsumer - Creating one new block data consumer, which will subscribe to block // topic & listen for data being published on this channel, which will eventually be // delivered to client application over websocket connection -func NewBlockConsumer(client *redis.Client, conn *websocket.Conn, req *SubscriptionRequest, db *gorm.DB, address common.Address) *BlockConsumer { +func NewBlockConsumer(client *redis.Client, conn *websocket.Conn, req *SubscriptionRequest, db *gorm.DB, address common.Address, lock *sync.Mutex) *BlockConsumer { consumer := BlockConsumer{ Client: client, Request: req, UserAddress: address, Connection: conn, DB: db, + Lock: lock, } consumer.Subscribe() @@ -38,13 +41,14 @@ func NewBlockConsumer(client *redis.Client, conn *websocket.Conn, req *Subscript // topic & listen for data being published on this channel & check whether received data // is what, client is interested in or not, which will eventually be // delivered to client application over websocket connection -func NewTransactionConsumer(client *redis.Client, conn *websocket.Conn, req *SubscriptionRequest, db *gorm.DB, address common.Address) *TransactionConsumer { +func NewTransactionConsumer(client *redis.Client, conn *websocket.Conn, req *SubscriptionRequest, db *gorm.DB, address common.Address, lock *sync.Mutex) *TransactionConsumer { consumer := TransactionConsumer{ Client: client, Request: req, UserAddress: address, Connection: conn, DB: db, + Lock: lock, } consumer.Subscribe() @@ -57,13 +61,14 @@ func NewTransactionConsumer(client *redis.Client, conn *websocket.Conn, req *Sub // topic & listen for data being published on this channel & check whether received data // is what, client is interested in or not, which will eventually be // delivered to client application over websocket connection -func NewEventConsumer(client *redis.Client, conn *websocket.Conn, req *SubscriptionRequest, db *gorm.DB, address common.Address) *EventConsumer { +func NewEventConsumer(client *redis.Client, conn *websocket.Conn, req *SubscriptionRequest, db *gorm.DB, address common.Address, lock *sync.Mutex) *EventConsumer { consumer := EventConsumer{ Client: client, Request: req, UserAddress: address, Connection: conn, DB: db, + Lock: lock, } consumer.Subscribe() diff --git a/app/pubsub/event.go b/app/pubsub/event.go index 4d1ce1db..61c9d70c 100644 --- a/app/pubsub/event.go +++ b/app/pubsub/event.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "log" + "sync" "time" "github.com/ethereum/go-ethereum/common" @@ -29,6 +30,7 @@ type EventConsumer struct { Connection *websocket.Conn PubSub *redis.PubSub DB *gorm.DB + Lock *sync.Mutex } // Subscribe - Event consumer is subscribing to `event` topic, diff --git a/app/pubsub/transaction.go b/app/pubsub/transaction.go index b660ea46..e7200eef 100644 --- a/app/pubsub/transaction.go +++ b/app/pubsub/transaction.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "log" + "sync" "time" "github.com/ethereum/go-ethereum/common" @@ -28,6 +29,7 @@ type TransactionConsumer struct { Connection *websocket.Conn PubSub *redis.PubSub DB *gorm.DB + Lock *sync.Mutex } // Subscribe - Subscribe to `transaction` topic, under which all transaction related data to be published diff --git a/app/rest/rest.go b/app/rest/rest.go index 245412a1..628ce20f 100644 --- a/app/rest/rest.go +++ b/app/rest/rest.go @@ -9,6 +9,7 @@ import ( "net/http" "strconv" "strings" + "sync" "time" "github.com/foolin/goview" @@ -1095,9 +1096,9 @@ func RunHTTPServer(_db *gorm.DB, _status *d.StatusHolder, _redisClient *redis.Cl return } - // keeping track of which topics this client has already subscribed to - // or unsubscrribed from + // Keeping track of which topics this client has subscribed to topics := make(map[string]ps.Consumer) + lock := sync.Mutex{} // When returning from this execution scope, unsubscribing client // from all topics it might have subscribed to during it's life time @@ -1126,7 +1127,7 @@ func RunHTTPServer(_db *gorm.DB, _status *d.StatusHolder, _redisClient *redis.Cl }() - // Communication with client handling logic + // Client communication handling logic for { var req ps.SubscriptionRequest @@ -1174,27 +1175,35 @@ func RunHTTPServer(_db *gorm.DB, _status *d.StatusHolder, _redisClient *redis.Cl switch req.Type { case "subscribe": switch req.Topic() { + case "block": - topics[req.Name] = ps.NewBlockConsumer(_redisClient, conn, &req, _db, userAddress) + topics[req.Name] = ps.NewBlockConsumer(_redisClient, conn, &req, _db, userAddress, &lock) + case "transaction": - topics[req.Name] = ps.NewTransactionConsumer(_redisClient, conn, &req, _db, userAddress) + topics[req.Name] = ps.NewTransactionConsumer(_redisClient, conn, &req, _db, userAddress, &lock) + case "event": - topics[req.Name] = ps.NewEventConsumer(_redisClient, conn, &req, _db, userAddress) + topics[req.Name] = ps.NewEventConsumer(_redisClient, conn, &req, _db, userAddress, &lock) + } case "unsubscribe": switch req.Topic() { + case "block": if v, ok := topics[req.Name].(*ps.BlockConsumer); ok { v.Request.Type = req.Type } + case "transaction": if v, ok := topics[req.Name].(*ps.TransactionConsumer); ok { v.Request.Type = req.Type } + case "event": if v, ok := topics[req.Name].(*ps.EventConsumer); ok { v.Request.Type = req.Type } + } delete(topics, req.Name) From d058c85eb3adba82ea5d776107905a2c5ce6eb8b Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Mon, 11 Jan 2021 22:54:01 +0530 Subject: [PATCH 08/10] used mutex lock based network connection access [ only exclusive access allowed from now ] --- app/rest/rest.go | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/app/rest/rest.go b/app/rest/rest.go index 628ce20f..6163bcf5 100644 --- a/app/rest/rest.go +++ b/app/rest/rest.go @@ -1140,17 +1140,33 @@ func RunHTTPServer(_db *gorm.DB, _status *d.StatusHolder, _redisClient *redis.Cl // failure message to client & close connection user := req.GetUserFromAPIKey(_db) if user == nil { + // -- Critical section of code begins + // + // Attempting to write to shared network connection + lock.Lock() + if err := conn.WriteJSON(&ps.SubscriptionResponse{Code: 0, Message: "Bad API Key"}); err != nil { log.Printf("[!] Failed to write message : %s\n", err.Error()) } + + lock.Unlock() + // -- ends here break } // Checking if user has kept this APIKey enabled or not if !user.Enabled { + // -- Critical section of code begins + // + // Attempting to write to shared network connection + lock.Lock() + if err := conn.WriteJSON(&ps.SubscriptionResponse{Code: 0, Message: "Bad API Key"}); err != nil { log.Printf("[!] Failed to write message : %s\n", err.Error()) } + + lock.Unlock() + // -- ends here break } @@ -1158,17 +1174,33 @@ func RunHTTPServer(_db *gorm.DB, _status *d.StatusHolder, _redisClient *redis.Cl // Checking if client is under allowed rate limit or not if !req.IsUnderRateLimit(_db, userAddress) { + // -- Critical section of code begins + // + // Attempting to write to shared network connection + lock.Lock() + if err := conn.WriteJSON(&ps.SubscriptionResponse{Code: 0, Message: "Crossed Allowed Rate Limit"}); err != nil { log.Printf("[!] Failed to write message : %s\n", err.Error()) } + + lock.Unlock() + // -- ends here break } // Validating incoming request on websocket subscription channel if !req.Validate(topics) { + // -- Critical section of code begins + // + // Attempting to write to shared network connection + lock.Lock() + if err := conn.WriteJSON(&ps.SubscriptionResponse{Code: 0, Message: "Bad Payload"}); err != nil { log.Printf("[!] Failed to write message : %s\n", err.Error()) } + + lock.Unlock() + // -- ends here break } From 3a87b18d4f9f2470be4d51024bc3d07e8a36e29d Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Mon, 11 Jan 2021 23:00:56 +0530 Subject: [PATCH 09/10] safe write to network resource ops when consuming block data --- app/pubsub/block.go | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/app/pubsub/block.go b/app/pubsub/block.go index d28601ea..4529875f 100644 --- a/app/pubsub/block.go +++ b/app/pubsub/block.go @@ -85,6 +85,12 @@ func (b *BlockConsumer) Send(msg string) bool { user := db.GetUserFromAPIKey(b.DB, b.Request.APIKey) if user == nil { + // -- Critical section of code begins + // + // Attempting to write to a network resource, + // shared among multiple go routines + b.Lock.Lock() + if err := b.Connection.WriteJSON(&SubscriptionResponse{ Code: 0, Message: "Bad API Key", @@ -92,12 +98,20 @@ func (b *BlockConsumer) Send(msg string) bool { log.Printf("[!] Failed to deliver bad API key message to client : %s\n", err.Error()) } + b.Lock.Unlock() + // -- ends here return false } if !user.Enabled { + // -- Critical section of code begins + // + // Attempting to write to a network resource, + // shared among multiple go routines + b.Lock.Lock() + if err := b.Connection.WriteJSON(&SubscriptionResponse{ Code: 0, Message: "Bad API Key", @@ -105,6 +119,8 @@ func (b *BlockConsumer) Send(msg string) bool { log.Printf("[!] Failed to deliver bad API key message to client : %s\n", err.Error()) } + b.Lock.Unlock() + // -- ends here return false } @@ -113,6 +129,12 @@ func (b *BlockConsumer) Send(msg string) bool { // if client has crossed it's allowed data delivery limit if !db.IsUnderRateLimit(b.DB, b.UserAddress.Hex()) { + // -- Critical section of code begins + // + // Attempting to write to a network resource, + // shared among multiple go routines + b.Lock.Lock() + if err := b.Connection.WriteJSON(&SubscriptionResponse{ Code: 0, Message: "Crossed Allowed Rate Limit", @@ -120,6 +142,8 @@ func (b *BlockConsumer) Send(msg string) bool { log.Printf("[!] Failed to deliver rate limit crossed message to client : %s\n", err.Error()) } + b.Lock.Unlock() + // -- ends here return false } @@ -149,6 +173,13 @@ func (b *BlockConsumer) Send(msg string) bool { // connection ( connection might be already closed though ) func (b *BlockConsumer) SendData(data interface{}) bool { + // -- Critical section of code begins + // + // Attempting to write to a network resource, + // shared among multiple go routines + b.Lock.Lock() + defer b.Lock.Unlock() + if err := b.Connection.WriteJSON(data); err != nil { log.Printf("[!] Failed to deliver `block` data to client : %s\n", err.Error()) return false @@ -177,6 +208,13 @@ func (b *BlockConsumer) Unsubscribe() { Message: fmt.Sprintf("Unsubscribed from `%s`", b.Request.Topic()), } + // -- Critical section of code begins + // + // Attempting to write to a network resource, + // shared among multiple go routines + b.Lock.Lock() + defer b.Lock.Unlock() + if err := b.Connection.WriteJSON(resp); err != nil { log.Printf("[!] Failed to deliver `%s` unsubscription confirmation to client : %s\n", b.Request.Topic(), err.Error()) } From d896591118d63e80608993348622219198cfc0da Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Mon, 11 Jan 2021 23:09:46 +0530 Subject: [PATCH 10/10] safe concurrent writing to network resource for both transaction & event consumers --- app/pubsub/event.go | 39 +++++++++++++++++++++++++++++++++++++++ app/pubsub/transaction.go | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+) diff --git a/app/pubsub/event.go b/app/pubsub/event.go index 61c9d70c..d3052e2b 100644 --- a/app/pubsub/event.go +++ b/app/pubsub/event.go @@ -92,6 +92,12 @@ func (e *EventConsumer) Send(msg string) bool { user := db.GetUserFromAPIKey(e.DB, e.Request.APIKey) if user == nil { + // -- Critical section of code begins + // + // Attempting to write to a network resource, + // shared among multiple go routines + e.Lock.Lock() + if err := e.Connection.WriteJSON(&SubscriptionResponse{ Code: 0, Message: "Bad API Key", @@ -99,12 +105,20 @@ func (e *EventConsumer) Send(msg string) bool { log.Printf("[!] Failed to deliver bad API key message to client : %s\n", err.Error()) } + e.Lock.Unlock() + // -- ends here return false } if !user.Enabled { + // -- Critical section of code begins + // + // Attempting to write to a network resource, + // shared among multiple go routines + e.Lock.Lock() + if err := e.Connection.WriteJSON(&SubscriptionResponse{ Code: 0, Message: "Bad API Key", @@ -112,6 +126,8 @@ func (e *EventConsumer) Send(msg string) bool { log.Printf("[!] Failed to deliver bad API key message to client : %s\n", err.Error()) } + e.Lock.Unlock() + // -- ends here return false } @@ -120,6 +136,12 @@ func (e *EventConsumer) Send(msg string) bool { // if client has crossed it's allowed data delivery limit if !db.IsUnderRateLimit(e.DB, e.UserAddress.Hex()) { + // -- Critical section of code begins + // + // Attempting to write to a network resource, + // shared among multiple go routines + e.Lock.Lock() + if err := e.Connection.WriteJSON(&SubscriptionResponse{ Code: 0, Message: "Crossed Allowed Rate Limit", @@ -127,6 +149,8 @@ func (e *EventConsumer) Send(msg string) bool { log.Printf("[!] Failed to deliver rate limit crossed message to client : %s\n", err.Error()) } + e.Lock.Unlock() + // -- ends here return false } @@ -184,6 +208,14 @@ func (e *EventConsumer) Send(msg string) bool { // If failed, we're going to remove subscription & close websocket // connection ( connection might be already closed though ) func (e *EventConsumer) SendData(data interface{}) bool { + + // -- Critical section of code begins + // + // Attempting to write to a network resource, + // shared among multiple go routines + e.Lock.Lock() + defer e.Lock.Unlock() + if err := e.Connection.WriteJSON(data); err != nil { log.Printf("[!] Failed to deliver `event` data to client : %s\n", err.Error()) return false @@ -213,6 +245,13 @@ func (e *EventConsumer) Unsubscribe() { Message: fmt.Sprintf("Unsubscribed from `%s`", e.Request.Topic()), } + // -- Critical section of code begins + // + // Attempting to write to a network resource, + // shared among multiple go routines + e.Lock.Lock() + defer e.Lock.Unlock() + if err := e.Connection.WriteJSON(resp); err != nil { log.Printf("[!] Failed to deliver `%s` unsubscription confirmation to client : %s\n", e.Request.Topic(), err.Error()) } diff --git a/app/pubsub/transaction.go b/app/pubsub/transaction.go index e7200eef..f0712d84 100644 --- a/app/pubsub/transaction.go +++ b/app/pubsub/transaction.go @@ -87,6 +87,12 @@ func (t *TransactionConsumer) Send(msg string) bool { user := db.GetUserFromAPIKey(t.DB, t.Request.APIKey) if user == nil { + // -- Critical section of code begins + // + // Attempting to write to a network resource, + // shared among multiple go routines + t.Lock.Lock() + if err := t.Connection.WriteJSON(&SubscriptionResponse{ Code: 0, Message: "Bad API Key", @@ -94,12 +100,20 @@ func (t *TransactionConsumer) Send(msg string) bool { log.Printf("[!] Failed to deliver bad API key message to client : %s\n", err.Error()) } + t.Lock.Unlock() + // -- ends here return false } if !user.Enabled { + // -- Critical section of code begins + // + // Attempting to write to a network resource, + // shared among multiple go routines + t.Lock.Lock() + if err := t.Connection.WriteJSON(&SubscriptionResponse{ Code: 0, Message: "Bad API Key", @@ -107,6 +121,8 @@ func (t *TransactionConsumer) Send(msg string) bool { log.Printf("[!] Failed to deliver bad API key message to client : %s\n", err.Error()) } + t.Lock.Unlock() + // -- ends here return false } @@ -115,6 +131,12 @@ func (t *TransactionConsumer) Send(msg string) bool { // if client has crossed it's allowed data delivery limit if !db.IsUnderRateLimit(t.DB, t.UserAddress.Hex()) { + // -- Critical section of code begins + // + // Attempting to write to a network resource, + // shared among multiple go routines + t.Lock.Lock() + if err := t.Connection.WriteJSON(&SubscriptionResponse{ Code: 0, Message: "Crossed Allowed Rate Limit", @@ -122,6 +144,8 @@ func (t *TransactionConsumer) Send(msg string) bool { log.Printf("[!] Failed to deliver rate limit crossed message to client : %s\n", err.Error()) } + t.Lock.Unlock() + // -- ends here return false } @@ -200,6 +224,14 @@ func (t *TransactionConsumer) Send(msg string) bool { // If failed, we're going to remove subscription & close websocket // connection ( connection might be already closed though ) func (t *TransactionConsumer) SendData(data interface{}) bool { + + // -- Critical section of code begins + // + // Attempting to write to a network resource, + // shared among multiple go routines + t.Lock.Lock() + defer t.Lock.Unlock() + if err := t.Connection.WriteJSON(data); err != nil { log.Printf("[!] Failed to deliver `transaction` data to client : %s\n", err.Error()) return false @@ -227,6 +259,13 @@ func (t *TransactionConsumer) Unsubscribe() { Message: fmt.Sprintf("Unsubscribed from `%s`", t.Request.Topic()), } + // -- Critical section of code begins + // + // Attempting to write to a network resource, + // shared among multiple go routines + t.Lock.Lock() + defer t.Lock.Unlock() + if err := t.Connection.WriteJSON(resp); err != nil { log.Printf("[!] Failed to deliver `%s` unsubscription confirmation to client : %s\n", t.Request.Topic(), err.Error()) }