Skip to content
This repository has been archived by the owner on Jan 13, 2023. It is now read-only.

Commit

Permalink
safe concurrent writing to network resource for both transaction & ev…
Browse files Browse the repository at this point in the history
…ent consumers
  • Loading branch information
itzmeanjan committed Jan 11, 2021
1 parent 3a87b18 commit d896591
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 0 deletions.
39 changes: 39 additions & 0 deletions app/pubsub/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,26 +92,42 @@ func (e *EventConsumer) Send(msg string) bool {
user := db.GetUserFromAPIKey(e.DB, e.Request.APIKey)
if user == nil {

// -- Critical section of code begins
//
// Attempting to write to a network resource,
// shared among multiple go routines
e.Lock.Lock()

if err := e.Connection.WriteJSON(&SubscriptionResponse{
Code: 0,
Message: "Bad API Key",
}); err != nil {
log.Printf("[!] Failed to deliver bad API key message to client : %s\n", err.Error())
}

e.Lock.Unlock()
// -- ends here
return false

}

if !user.Enabled {

// -- Critical section of code begins
//
// Attempting to write to a network resource,
// shared among multiple go routines
e.Lock.Lock()

if err := e.Connection.WriteJSON(&SubscriptionResponse{
Code: 0,
Message: "Bad API Key",
}); err != nil {
log.Printf("[!] Failed to deliver bad API key message to client : %s\n", err.Error())
}

e.Lock.Unlock()
// -- ends here
return false

}
Expand All @@ -120,13 +136,21 @@ func (e *EventConsumer) Send(msg string) bool {
// if client has crossed it's allowed data delivery limit
if !db.IsUnderRateLimit(e.DB, e.UserAddress.Hex()) {

// -- Critical section of code begins
//
// Attempting to write to a network resource,
// shared among multiple go routines
e.Lock.Lock()

if err := e.Connection.WriteJSON(&SubscriptionResponse{
Code: 0,
Message: "Crossed Allowed Rate Limit",
}); err != nil {
log.Printf("[!] Failed to deliver rate limit crossed message to client : %s\n", err.Error())
}

e.Lock.Unlock()
// -- ends here
return false

}
Expand Down Expand Up @@ -184,6 +208,14 @@ func (e *EventConsumer) Send(msg string) bool {
// If failed, we're going to remove subscription & close websocket
// connection ( connection might be already closed though )
func (e *EventConsumer) SendData(data interface{}) bool {

// -- Critical section of code begins
//
// Attempting to write to a network resource,
// shared among multiple go routines
e.Lock.Lock()
defer e.Lock.Unlock()

if err := e.Connection.WriteJSON(data); err != nil {
log.Printf("[!] Failed to deliver `event` data to client : %s\n", err.Error())
return false
Expand Down Expand Up @@ -213,6 +245,13 @@ func (e *EventConsumer) Unsubscribe() {
Message: fmt.Sprintf("Unsubscribed from `%s`", e.Request.Topic()),
}

// -- Critical section of code begins
//
// Attempting to write to a network resource,
// shared among multiple go routines
e.Lock.Lock()
defer e.Lock.Unlock()

if err := e.Connection.WriteJSON(resp); err != nil {
log.Printf("[!] Failed to deliver `%s` unsubscription confirmation to client : %s\n", e.Request.Topic(), err.Error())
}
Expand Down
39 changes: 39 additions & 0 deletions app/pubsub/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,26 +87,42 @@ func (t *TransactionConsumer) Send(msg string) bool {
user := db.GetUserFromAPIKey(t.DB, t.Request.APIKey)
if user == nil {

// -- Critical section of code begins
//
// Attempting to write to a network resource,
// shared among multiple go routines
t.Lock.Lock()

if err := t.Connection.WriteJSON(&SubscriptionResponse{
Code: 0,
Message: "Bad API Key",
}); err != nil {
log.Printf("[!] Failed to deliver bad API key message to client : %s\n", err.Error())
}

t.Lock.Unlock()
// -- ends here
return false

}

if !user.Enabled {

// -- Critical section of code begins
//
// Attempting to write to a network resource,
// shared among multiple go routines
t.Lock.Lock()

if err := t.Connection.WriteJSON(&SubscriptionResponse{
Code: 0,
Message: "Bad API Key",
}); err != nil {
log.Printf("[!] Failed to deliver bad API key message to client : %s\n", err.Error())
}

t.Lock.Unlock()
// -- ends here
return false

}
Expand All @@ -115,13 +131,21 @@ func (t *TransactionConsumer) Send(msg string) bool {
// if client has crossed it's allowed data delivery limit
if !db.IsUnderRateLimit(t.DB, t.UserAddress.Hex()) {

// -- Critical section of code begins
//
// Attempting to write to a network resource,
// shared among multiple go routines
t.Lock.Lock()

if err := t.Connection.WriteJSON(&SubscriptionResponse{
Code: 0,
Message: "Crossed Allowed Rate Limit",
}); err != nil {
log.Printf("[!] Failed to deliver rate limit crossed message to client : %s\n", err.Error())
}

t.Lock.Unlock()
// -- ends here
return false

}
Expand Down Expand Up @@ -200,6 +224,14 @@ func (t *TransactionConsumer) Send(msg string) bool {
// If failed, we're going to remove subscription & close websocket
// connection ( connection might be already closed though )
func (t *TransactionConsumer) SendData(data interface{}) bool {

// -- Critical section of code begins
//
// Attempting to write to a network resource,
// shared among multiple go routines
t.Lock.Lock()
defer t.Lock.Unlock()

if err := t.Connection.WriteJSON(data); err != nil {
log.Printf("[!] Failed to deliver `transaction` data to client : %s\n", err.Error())
return false
Expand Down Expand Up @@ -227,6 +259,13 @@ func (t *TransactionConsumer) Unsubscribe() {
Message: fmt.Sprintf("Unsubscribed from `%s`", t.Request.Topic()),
}

// -- Critical section of code begins
//
// Attempting to write to a network resource,
// shared among multiple go routines
t.Lock.Lock()
defer t.Lock.Unlock()

if err := t.Connection.WriteJSON(resp); err != nil {
log.Printf("[!] Failed to deliver `%s` unsubscription confirmation to client : %s\n", t.Request.Topic(), err.Error())
}
Expand Down

0 comments on commit d896591

Please sign in to comment.