diff --git a/app/pubsub/block.go b/app/pubsub/block.go index f2d9be75..4529875f 100644 --- a/app/pubsub/block.go +++ b/app/pubsub/block.go @@ -3,7 +3,9 @@ package pubsub import ( "context" "encoding/json" + "fmt" "log" + "sync" "time" "github.com/ethereum/go-ethereum/common" @@ -24,6 +26,7 @@ type BlockConsumer struct { Connection *websocket.Conn PubSub *redis.PubSub DB *gorm.DB + Lock *sync.Mutex } // Subscribe - Subscribe to `block` channel @@ -35,6 +38,11 @@ func (b *BlockConsumer) Subscribe() { // and reads data from subcribed channel, which also gets delivered to client application func (b *BlockConsumer) Listen() { + // When ever returning from this function's + // execution context, client will be unsubscribed from + // pubsub topic i.e. `block` topic in this case + defer b.Unsubscribe() + for { // Checking if client is still subscribed to this topic @@ -42,19 +50,7 @@ func (b *BlockConsumer) Listen() { // // If not, we're cancelling this subscription if b.Request.Type == "unsubscribe" { - - if err := b.Connection.WriteJSON(&SubscriptionResponse{ - Code: 1, - Message: "Unsubscribed from `block`", - }); err != nil { - log.Printf("[!] Failed to deliver block unsubscription confirmation to client : %s\n", err.Error()) - } - - if err := b.PubSub.Unsubscribe(context.Background(), b.Request.Topic()); err != nil { - log.Printf("[!] Failed to unsubscribe from `block` topic : %s\n", err.Error()) - } break - } msg, err := b.PubSub.ReceiveTimeout(context.Background(), time.Second) @@ -89,6 +85,12 @@ func (b *BlockConsumer) Send(msg string) bool { user := db.GetUserFromAPIKey(b.DB, b.Request.APIKey) if user == nil { + // -- Critical section of code begins + // + // Attempting to write to a network resource, + // shared among multiple go routines + b.Lock.Lock() + if err := b.Connection.WriteJSON(&SubscriptionResponse{ Code: 0, Message: "Bad API Key", @@ -96,16 +98,20 @@ func (b *BlockConsumer) Send(msg string) bool { log.Printf("[!] Failed to deliver bad API key message to client : %s\n", err.Error()) } - if err := b.PubSub.Unsubscribe(context.Background(), b.Request.Topic()); err != nil { - log.Printf("[!] Failed to unsubscribe from `block` topic : %s\n", err.Error()) - } - + b.Lock.Unlock() + // -- ends here return false } if !user.Enabled { + // -- Critical section of code begins + // + // Attempting to write to a network resource, + // shared among multiple go routines + b.Lock.Lock() + if err := b.Connection.WriteJSON(&SubscriptionResponse{ Code: 0, Message: "Bad API Key", @@ -113,10 +119,8 @@ func (b *BlockConsumer) Send(msg string) bool { log.Printf("[!] Failed to deliver bad API key message to client : %s\n", err.Error()) } - if err := b.PubSub.Unsubscribe(context.Background(), b.Request.Topic()); err != nil { - log.Printf("[!] Failed to unsubscribe from `block` topic : %s\n", err.Error()) - } - + b.Lock.Unlock() + // -- ends here return false } @@ -125,6 +129,12 @@ func (b *BlockConsumer) Send(msg string) bool { // if client has crossed it's allowed data delivery limit if !db.IsUnderRateLimit(b.DB, b.UserAddress.Hex()) { + // -- Critical section of code begins + // + // Attempting to write to a network resource, + // shared among multiple go routines + b.Lock.Lock() + if err := b.Connection.WriteJSON(&SubscriptionResponse{ Code: 0, Message: "Crossed Allowed Rate Limit", @@ -132,10 +142,8 @@ func (b *BlockConsumer) Send(msg string) bool { log.Printf("[!] Failed to deliver rate limit crossed message to client : %s\n", err.Error()) } - if err := b.PubSub.Unsubscribe(context.Background(), b.Request.Topic()); err != nil { - log.Printf("[!] Failed to unsubscribe from `block` topic : %s\n", err.Error()) - } - + b.Lock.Unlock() + // -- ends here return false } @@ -156,6 +164,7 @@ func (b *BlockConsumer) Send(msg string) bool { } return false + } // SendData - Sending message to client application, connected over websocket @@ -164,20 +173,50 @@ func (b *BlockConsumer) Send(msg string) bool { // connection ( connection might be already closed though ) func (b *BlockConsumer) SendData(data interface{}) bool { + // -- Critical section of code begins + // + // Attempting to write to a network resource, + // shared among multiple go routines + b.Lock.Lock() + defer b.Lock.Unlock() + if err := b.Connection.WriteJSON(data); err != nil { log.Printf("[!] Failed to deliver `block` data to client : %s\n", err.Error()) - - if err = b.PubSub.Unsubscribe(context.Background(), b.Request.Topic()); err != nil { - log.Printf("[!] Failed to unsubscribe from `block` topic : %s\n", err.Error()) - } - - if err = b.Connection.Close(); err != nil { - log.Printf("[!] Failed to close websocket connection : %s\n", err.Error()) - } - return false } log.Printf("[+] Delivered `block` data to client\n") return true + +} + +// Unsubscribe - Unsubscribe from block data publishing event this client has subscribed to +func (b *BlockConsumer) Unsubscribe() { + + if b.PubSub == nil { + log.Printf("[!] Bad attempt to unsubscribe from `%s` topic\n", b.Request.Topic()) + return + } + + if err := b.PubSub.Unsubscribe(context.Background(), b.Request.Topic()); err != nil { + log.Printf("[!] Failed to unsubscribe from `%s` topic : %s\n", b.Request.Topic(), err.Error()) + return + } + + resp := &SubscriptionResponse{ + Code: 1, + Message: fmt.Sprintf("Unsubscribed from `%s`", b.Request.Topic()), + } + + // -- Critical section of code begins + // + // Attempting to write to a network resource, + // shared among multiple go routines + b.Lock.Lock() + defer b.Lock.Unlock() + + if err := b.Connection.WriteJSON(resp); err != nil { + log.Printf("[!] Failed to deliver `%s` unsubscription confirmation to client : %s\n", b.Request.Topic(), err.Error()) + } + } diff --git a/app/pubsub/consumption.go b/app/pubsub/consumption.go index 4a9fd851..d6898c09 100644 --- a/app/pubsub/consumption.go +++ b/app/pubsub/consumption.go @@ -1,6 +1,8 @@ package pubsub import ( + "sync" + "github.com/ethereum/go-ethereum/common" "github.com/go-redis/redis/v8" "github.com/gorilla/websocket" @@ -13,18 +15,20 @@ type Consumer interface { Listen() Send(msg string) bool SendData(data interface{}) bool + Unsubscribe() } // NewBlockConsumer - Creating one new block data consumer, which will subscribe to block // topic & listen for data being published on this channel, which will eventually be // delivered to client application over websocket connection -func NewBlockConsumer(client *redis.Client, conn *websocket.Conn, req *SubscriptionRequest, db *gorm.DB, address common.Address) *BlockConsumer { +func NewBlockConsumer(client *redis.Client, conn *websocket.Conn, req *SubscriptionRequest, db *gorm.DB, address common.Address, lock *sync.Mutex) *BlockConsumer { consumer := BlockConsumer{ Client: client, Request: req, UserAddress: address, Connection: conn, DB: db, + Lock: lock, } consumer.Subscribe() @@ -37,13 +41,14 @@ func NewBlockConsumer(client *redis.Client, conn *websocket.Conn, req *Subscript // topic & listen for data being published on this channel & check whether received data // is what, client is interested in or not, which will eventually be // delivered to client application over websocket connection -func NewTransactionConsumer(client *redis.Client, conn *websocket.Conn, req *SubscriptionRequest, db *gorm.DB, address common.Address) *TransactionConsumer { +func NewTransactionConsumer(client *redis.Client, conn *websocket.Conn, req *SubscriptionRequest, db *gorm.DB, address common.Address, lock *sync.Mutex) *TransactionConsumer { consumer := TransactionConsumer{ Client: client, Request: req, UserAddress: address, Connection: conn, DB: db, + Lock: lock, } consumer.Subscribe() @@ -56,13 +61,14 @@ func NewTransactionConsumer(client *redis.Client, conn *websocket.Conn, req *Sub // topic & listen for data being published on this channel & check whether received data // is what, client is interested in or not, which will eventually be // delivered to client application over websocket connection -func NewEventConsumer(client *redis.Client, conn *websocket.Conn, req *SubscriptionRequest, db *gorm.DB, address common.Address) *EventConsumer { +func NewEventConsumer(client *redis.Client, conn *websocket.Conn, req *SubscriptionRequest, db *gorm.DB, address common.Address, lock *sync.Mutex) *EventConsumer { consumer := EventConsumer{ Client: client, Request: req, UserAddress: address, Connection: conn, DB: db, + Lock: lock, } consumer.Subscribe() diff --git a/app/pubsub/event.go b/app/pubsub/event.go index 794a0286..d3052e2b 100644 --- a/app/pubsub/event.go +++ b/app/pubsub/event.go @@ -4,7 +4,9 @@ import ( "context" "encoding/hex" "encoding/json" + "fmt" "log" + "sync" "time" "github.com/ethereum/go-ethereum/common" @@ -28,6 +30,7 @@ type EventConsumer struct { Connection *websocket.Conn PubSub *redis.PubSub DB *gorm.DB + Lock *sync.Mutex } // Subscribe - Event consumer is subscribing to `event` topic, @@ -41,6 +44,12 @@ func (e *EventConsumer) Subscribe() { // if client has subscribed to get notified on occurrence of this event func (e *EventConsumer) Listen() { + // Before leaving this execution context, attempting to + // unsubscribe client from `event` pubsub topic + // + // One attempt to unsubscribe gracefully + defer e.Unsubscribe() + for { // Checking if client is still subscribed to this topic @@ -48,19 +57,7 @@ func (e *EventConsumer) Listen() { // // If not, we're cancelling this subscription if e.Request.Type == "unsubscribe" { - - if err := e.Connection.WriteJSON(&SubscriptionResponse{ - Code: 1, - Message: "Unsubscribed from `event`", - }); err != nil { - log.Printf("[!] Failed to deliver event unsubscription confirmation to client : %s\n", err.Error()) - } - - if err := e.PubSub.Unsubscribe(context.Background(), e.Request.Topic()); err != nil { - log.Printf("[!] Failed to unsubscribe from `event` topic : %s\n", err.Error()) - } break - } msg, err := e.PubSub.ReceiveTimeout(context.Background(), time.Second) @@ -95,6 +92,12 @@ func (e *EventConsumer) Send(msg string) bool { user := db.GetUserFromAPIKey(e.DB, e.Request.APIKey) if user == nil { + // -- Critical section of code begins + // + // Attempting to write to a network resource, + // shared among multiple go routines + e.Lock.Lock() + if err := e.Connection.WriteJSON(&SubscriptionResponse{ Code: 0, Message: "Bad API Key", @@ -102,16 +105,20 @@ func (e *EventConsumer) Send(msg string) bool { log.Printf("[!] Failed to deliver bad API key message to client : %s\n", err.Error()) } - if err := e.PubSub.Unsubscribe(context.Background(), e.Request.Topic()); err != nil { - log.Printf("[!] Failed to unsubscribe from `event` topic : %s\n", err.Error()) - } - + e.Lock.Unlock() + // -- ends here return false } if !user.Enabled { + // -- Critical section of code begins + // + // Attempting to write to a network resource, + // shared among multiple go routines + e.Lock.Lock() + if err := e.Connection.WriteJSON(&SubscriptionResponse{ Code: 0, Message: "Bad API Key", @@ -119,10 +126,8 @@ func (e *EventConsumer) Send(msg string) bool { log.Printf("[!] Failed to deliver bad API key message to client : %s\n", err.Error()) } - if err := e.PubSub.Unsubscribe(context.Background(), e.Request.Topic()); err != nil { - log.Printf("[!] Failed to unsubscribe from `event` topic : %s\n", err.Error()) - } - + e.Lock.Unlock() + // -- ends here return false } @@ -131,6 +136,12 @@ func (e *EventConsumer) Send(msg string) bool { // if client has crossed it's allowed data delivery limit if !db.IsUnderRateLimit(e.DB, e.UserAddress.Hex()) { + // -- Critical section of code begins + // + // Attempting to write to a network resource, + // shared among multiple go routines + e.Lock.Lock() + if err := e.Connection.WriteJSON(&SubscriptionResponse{ Code: 0, Message: "Crossed Allowed Rate Limit", @@ -138,10 +149,8 @@ func (e *EventConsumer) Send(msg string) bool { log.Printf("[!] Failed to deliver rate limit crossed message to client : %s\n", err.Error()) } - if err := e.PubSub.Unsubscribe(context.Background(), e.Request.Topic()); err != nil { - log.Printf("[!] Failed to unsubscribe from `event` topic : %s\n", err.Error()) - } - + e.Lock.Unlock() + // -- ends here return false } @@ -199,20 +208,52 @@ func (e *EventConsumer) Send(msg string) bool { // If failed, we're going to remove subscription & close websocket // connection ( connection might be already closed though ) func (e *EventConsumer) SendData(data interface{}) bool { - if err := e.Connection.WriteJSON(data); err != nil { - log.Printf("[!] Failed to deliver `event` data to client : %s\n", err.Error()) - if err = e.PubSub.Unsubscribe(context.Background(), e.Request.Topic()); err != nil { - log.Printf("[!] Failed to unsubscribe from `event` topic : %s\n", err.Error()) - } - - if err = e.Connection.Close(); err != nil { - log.Printf("[!] Failed to close websocket connection : %s\n", err.Error()) - } + // -- Critical section of code begins + // + // Attempting to write to a network resource, + // shared among multiple go routines + e.Lock.Lock() + defer e.Lock.Unlock() + if err := e.Connection.WriteJSON(data); err != nil { + log.Printf("[!] Failed to deliver `event` data to client : %s\n", err.Error()) return false } log.Printf("[+] Delivered `event` data to client\n") return true } + +// Unsubscribe - Unsubscribe from event data publishing topic, to be called +// when stopping to listen data being published on this pubsub channel +// due to client has requested a unsubscription/ network connection got hampered +func (e *EventConsumer) Unsubscribe() { + + if e.PubSub == nil { + log.Printf("[!] Bad attempt to unsubscribe from `%s` topic\n", e.Request.Topic()) + return + } + + if err := e.PubSub.Unsubscribe(context.Background(), e.Request.Topic()); err != nil { + log.Printf("[!] Failed to unsubscribe from `%s` topic : %s\n", e.Request.Topic(), err.Error()) + return + } + + resp := &SubscriptionResponse{ + Code: 1, + Message: fmt.Sprintf("Unsubscribed from `%s`", e.Request.Topic()), + } + + // -- Critical section of code begins + // + // Attempting to write to a network resource, + // shared among multiple go routines + e.Lock.Lock() + defer e.Lock.Unlock() + + if err := e.Connection.WriteJSON(resp); err != nil { + log.Printf("[!] Failed to deliver `%s` unsubscription confirmation to client : %s\n", e.Request.Topic(), err.Error()) + } + +} diff --git a/app/pubsub/subscription.go b/app/pubsub/subscription.go index 0402f93a..cfa8d567 100644 --- a/app/pubsub/subscription.go +++ b/app/pubsub/subscription.go @@ -48,7 +48,7 @@ func (s *SubscriptionRequest) GetRegex() *regexp.Regexp { } // Topic - Get main topic name to which this client is subscribing to -// i.e. {block, transaction} +// i.e. {block, transaction, event} func (s *SubscriptionRequest) Topic() string { if strings.HasPrefix(s.Name, "block") { return "block" diff --git a/app/pubsub/transaction.go b/app/pubsub/transaction.go index 907e1320..f0712d84 100644 --- a/app/pubsub/transaction.go +++ b/app/pubsub/transaction.go @@ -4,7 +4,9 @@ import ( "context" "encoding/hex" "encoding/json" + "fmt" "log" + "sync" "time" "github.com/ethereum/go-ethereum/common" @@ -27,6 +29,7 @@ type TransactionConsumer struct { Connection *websocket.Conn PubSub *redis.PubSub DB *gorm.DB + Lock *sync.Mutex } // Subscribe - Subscribe to `transaction` topic, under which all transaction related data to be published @@ -38,6 +41,10 @@ func (t *TransactionConsumer) Subscribe() { // and reads data from subcribed channel, which also gets delivered to client application func (t *TransactionConsumer) Listen() { + // When leaving this execution scope, attempting to unsubscribe + // client from pubsub topic, it was listening to, in a graceful fashion + defer t.Unsubscribe() + for { // Checking if client is still subscribed to this topic @@ -45,19 +52,7 @@ func (t *TransactionConsumer) Listen() { // // If not, we're cancelling this subscription if t.Request.Type == "unsubscribe" { - - if err := t.Connection.WriteJSON(&SubscriptionResponse{ - Code: 1, - Message: "Unsubscribed from `transaction`", - }); err != nil { - log.Printf("[!] Failed to deliver transaction unsubscription confirmation to client : %s\n", err.Error()) - } - - if err := t.PubSub.Unsubscribe(context.Background(), t.Request.Topic()); err != nil { - log.Printf("[!] Failed to unsubscribe from `transaction` topic : %s\n", err.Error()) - } break - } msg, err := t.PubSub.ReceiveTimeout(context.Background(), time.Second) @@ -92,6 +87,12 @@ func (t *TransactionConsumer) Send(msg string) bool { user := db.GetUserFromAPIKey(t.DB, t.Request.APIKey) if user == nil { + // -- Critical section of code begins + // + // Attempting to write to a network resource, + // shared among multiple go routines + t.Lock.Lock() + if err := t.Connection.WriteJSON(&SubscriptionResponse{ Code: 0, Message: "Bad API Key", @@ -99,16 +100,20 @@ func (t *TransactionConsumer) Send(msg string) bool { log.Printf("[!] Failed to deliver bad API key message to client : %s\n", err.Error()) } - if err := t.PubSub.Unsubscribe(context.Background(), t.Request.Topic()); err != nil { - log.Printf("[!] Failed to unsubscribe from `transaction` topic : %s\n", err.Error()) - } - + t.Lock.Unlock() + // -- ends here return false } if !user.Enabled { + // -- Critical section of code begins + // + // Attempting to write to a network resource, + // shared among multiple go routines + t.Lock.Lock() + if err := t.Connection.WriteJSON(&SubscriptionResponse{ Code: 0, Message: "Bad API Key", @@ -116,10 +121,8 @@ func (t *TransactionConsumer) Send(msg string) bool { log.Printf("[!] Failed to deliver bad API key message to client : %s\n", err.Error()) } - if err := t.PubSub.Unsubscribe(context.Background(), t.Request.Topic()); err != nil { - log.Printf("[!] Failed to unsubscribe from `transaction` topic : %s\n", err.Error()) - } - + t.Lock.Unlock() + // -- ends here return false } @@ -128,6 +131,12 @@ func (t *TransactionConsumer) Send(msg string) bool { // if client has crossed it's allowed data delivery limit if !db.IsUnderRateLimit(t.DB, t.UserAddress.Hex()) { + // -- Critical section of code begins + // + // Attempting to write to a network resource, + // shared among multiple go routines + t.Lock.Lock() + if err := t.Connection.WriteJSON(&SubscriptionResponse{ Code: 0, Message: "Crossed Allowed Rate Limit", @@ -135,10 +144,8 @@ func (t *TransactionConsumer) Send(msg string) bool { log.Printf("[!] Failed to deliver rate limit crossed message to client : %s\n", err.Error()) } - if err := t.PubSub.Unsubscribe(context.Background(), t.Request.Topic()); err != nil { - log.Printf("[!] Failed to unsubscribe from `transaction` topic : %s\n", err.Error()) - } - + t.Lock.Unlock() + // -- ends here return false } @@ -217,20 +224,50 @@ func (t *TransactionConsumer) Send(msg string) bool { // If failed, we're going to remove subscription & close websocket // connection ( connection might be already closed though ) func (t *TransactionConsumer) SendData(data interface{}) bool { - if err := t.Connection.WriteJSON(data); err != nil { - log.Printf("[!] Failed to deliver `transaction` data to client : %s\n", err.Error()) - if err = t.PubSub.Unsubscribe(context.Background(), t.Request.Topic()); err != nil { - log.Printf("[!] Failed to unsubscribe from `transaction` topic : %s\n", err.Error()) - } - - if err = t.Connection.Close(); err != nil { - log.Printf("[!] Failed to close websocket connection : %s\n", err.Error()) - } + // -- Critical section of code begins + // + // Attempting to write to a network resource, + // shared among multiple go routines + t.Lock.Lock() + defer t.Lock.Unlock() + if err := t.Connection.WriteJSON(data); err != nil { + log.Printf("[!] Failed to deliver `transaction` data to client : %s\n", err.Error()) return false } log.Printf("[+] Delivered `transaction` data to client\n") return true } + +// Unsubscribe - Unsubscribe from transactions pubsub topic, which client has subscribed to +func (t *TransactionConsumer) Unsubscribe() { + + if t.PubSub == nil { + log.Printf("[!] Bad attempt to unsubscribe from `%s` topic\n", t.Request.Topic()) + return + } + + if err := t.PubSub.Unsubscribe(context.Background(), t.Request.Topic()); err != nil { + log.Printf("[!] Failed to unsubscribe from `%s` topic : %s\n", t.Request.Topic(), err.Error()) + return + } + + resp := &SubscriptionResponse{ + Code: 1, + Message: fmt.Sprintf("Unsubscribed from `%s`", t.Request.Topic()), + } + + // -- Critical section of code begins + // + // Attempting to write to a network resource, + // shared among multiple go routines + t.Lock.Lock() + defer t.Lock.Unlock() + + if err := t.Connection.WriteJSON(resp); err != nil { + log.Printf("[!] Failed to deliver `%s` unsubscription confirmation to client : %s\n", t.Request.Topic(), err.Error()) + } + +} diff --git a/app/rest/rest.go b/app/rest/rest.go index 8bb123d1..6163bcf5 100644 --- a/app/rest/rest.go +++ b/app/rest/rest.go @@ -9,6 +9,7 @@ import ( "net/http" "strconv" "strings" + "sync" "time" "github.com/foolin/goview" @@ -1095,11 +1096,38 @@ func RunHTTPServer(_db *gorm.DB, _status *d.StatusHolder, _redisClient *redis.Cl return } - // keeping track of which topics this client has already subscribed to - // or unsubscrribed from + // Keeping track of which topics this client has subscribed to topics := make(map[string]ps.Consumer) + lock := sync.Mutex{} - // Communication with client handling logic + // When returning from this execution scope, unsubscribing client + // from all topics it might have subscribed to during it's life time + // + // Just attempting to do a graceful unsubscription + defer func() { + + for _, v := range topics { + + if v, ok := v.(*ps.BlockConsumer); ok { + v.Request.Type = "unsubscribe" + continue + } + + if v, ok := v.(*ps.TransactionConsumer); ok { + v.Request.Type = "unsubscribe" + continue + } + + if v, ok := v.(*ps.EventConsumer); ok { + v.Request.Type = "unsubscribe" + continue + } + + } + + }() + + // Client communication handling logic for { var req ps.SubscriptionRequest @@ -1112,17 +1140,33 @@ func RunHTTPServer(_db *gorm.DB, _status *d.StatusHolder, _redisClient *redis.Cl // failure message to client & close connection user := req.GetUserFromAPIKey(_db) if user == nil { + // -- Critical section of code begins + // + // Attempting to write to shared network connection + lock.Lock() + if err := conn.WriteJSON(&ps.SubscriptionResponse{Code: 0, Message: "Bad API Key"}); err != nil { log.Printf("[!] Failed to write message : %s\n", err.Error()) } + + lock.Unlock() + // -- ends here break } // Checking if user has kept this APIKey enabled or not if !user.Enabled { + // -- Critical section of code begins + // + // Attempting to write to shared network connection + lock.Lock() + if err := conn.WriteJSON(&ps.SubscriptionResponse{Code: 0, Message: "Bad API Key"}); err != nil { log.Printf("[!] Failed to write message : %s\n", err.Error()) } + + lock.Unlock() + // -- ends here break } @@ -1130,47 +1174,71 @@ func RunHTTPServer(_db *gorm.DB, _status *d.StatusHolder, _redisClient *redis.Cl // Checking if client is under allowed rate limit or not if !req.IsUnderRateLimit(_db, userAddress) { + // -- Critical section of code begins + // + // Attempting to write to shared network connection + lock.Lock() + if err := conn.WriteJSON(&ps.SubscriptionResponse{Code: 0, Message: "Crossed Allowed Rate Limit"}); err != nil { log.Printf("[!] Failed to write message : %s\n", err.Error()) } + + lock.Unlock() + // -- ends here break } // Validating incoming request on websocket subscription channel if !req.Validate(topics) { + // -- Critical section of code begins + // + // Attempting to write to shared network connection + lock.Lock() + if err := conn.WriteJSON(&ps.SubscriptionResponse{Code: 0, Message: "Bad Payload"}); err != nil { log.Printf("[!] Failed to write message : %s\n", err.Error()) } + + lock.Unlock() + // -- ends here break } switch req.Type { case "subscribe": switch req.Topic() { + case "block": - topics[req.Name] = ps.NewBlockConsumer(_redisClient, conn, &req, _db, userAddress) + topics[req.Name] = ps.NewBlockConsumer(_redisClient, conn, &req, _db, userAddress, &lock) + case "transaction": - topics[req.Name] = ps.NewTransactionConsumer(_redisClient, conn, &req, _db, userAddress) + topics[req.Name] = ps.NewTransactionConsumer(_redisClient, conn, &req, _db, userAddress, &lock) + case "event": - topics[req.Name] = ps.NewEventConsumer(_redisClient, conn, &req, _db, userAddress) + topics[req.Name] = ps.NewEventConsumer(_redisClient, conn, &req, _db, userAddress, &lock) + } case "unsubscribe": switch req.Topic() { + case "block": if v, ok := topics[req.Name].(*ps.BlockConsumer); ok { v.Request.Type = req.Type } + case "transaction": if v, ok := topics[req.Name].(*ps.TransactionConsumer); ok { v.Request.Type = req.Type } + case "event": if v, ok := topics[req.Name].(*ps.EventConsumer); ok { v.Request.Type = req.Type } + } - topics[req.Name] = nil + delete(topics, req.Name) } } })