Skip to content

Commit

Permalink
Merge branch 'master' of github.com:gabrielperezs/goreactor
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielperezs committed Aug 16, 2022
2 parents 096d0a0 + 9fd2445 commit 60c2945
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 52 deletions.
14 changes: 14 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
default: build

build: export CGO_ENABLED = 0
build:
go env -w GOPRIVATE="github.com/Webjet/*"
go build -o goreactor -ldflags '-w -s'

arm: export CGO_ENABLED = 0
arm:
go env -w GOPRIVATE="github.com/Webjet/*"
env GOOS=linux GOARCH=arm64 GONOSUMDB=github.com/Webjet/engine go build -o goreactor_arm -ldflags '-w -s'

clean:
go clean
74 changes: 56 additions & 18 deletions inputs/sqs/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sqs
import (
"fmt"
"log"
"runtime"
"strconv"
"strings"
"sync"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/gabrielperezs/goreactor/lib"
"github.com/gabrielperezs/goreactor/reactor"
"github.com/gallir/dynsemaphore"
)

var MessageSystemAttributeNameSentTimestamp = sqs.MessageSystemAttributeNameSentTimestamp
Expand All @@ -35,8 +37,10 @@ type sqsListen struct {
exitedMu sync.Mutex
done chan bool

broadcastCh sync.Map
pendings map[string]int
broadcastCh sync.Map
pendings map[string]int
messError map[string]bool
maxQueuedMessages *dynsemaphore.DynSemaphore // Max of goroutines wating to send the message
}

func newSQSListen(r *reactor.Reactor, c map[string]interface{}) (*sqsListen, error) {
Expand Down Expand Up @@ -79,18 +83,37 @@ func newSQSListen(r *reactor.Reactor, c map[string]interface{}) (*sqsListen, err
if err != nil {
return nil, err
}
p.broadcastCh.Store(r, true)
p.broadcastCh.Store(r, r.Concurrent)
p.pendings = make(map[string]int)
p.messError = make(map[string]bool)

p.svc = sqs.New(sess, &aws.Config{Region: aws.String(p.Region)})
p.maxQueuedMessages = dynsemaphore.New(0)
p.updateConcurrency()

go p.listen()

return p, nil
}

func (p *sqsListen) AddOrUpdate(r *reactor.Reactor) {
p.broadcastCh.Store(r, true)
p.broadcastCh.Store(r, r.Concurrent)
p.updateConcurrency()
}

func (p *sqsListen) updateConcurrency() {
total := 0
p.broadcastCh.Range(func(k, v interface{}) bool {
total += v.(int)
return true
})
maxPendings := total
if maxPendings < runtime.NumCPU() {
maxPendings = runtime.NumCPU()
}
maxPendings = maxPendings * 2 // Its means x*nreactors max pending messages via goroutines, What's the right number?
log.Printf("SQS: total concurrency: %d, max pending in-flight messages: %d", total, maxPendings)
p.maxQueuedMessages.SetConcurrency(total)
}

func (p *sqsListen) listen() {
Expand All @@ -107,6 +130,7 @@ func (p *sqsListen) listen() {
time.Sleep(time.Second)
tries++
if tries > 120 { // Wait no more than 120 seconds, the usual max
log.Printf("WARNING, timeout waiting for %d pending messages", len(p.pendings))
break
}
}
Expand Down Expand Up @@ -163,28 +187,31 @@ func (p *sqsListen) deliver(msg *sqs.Message) {
m.B = []byte(s)
}

wg := sync.WaitGroup{}
p.broadcastCh.Range(func(k, v interface{}) bool {
if err := k.(*reactor.Reactor).MatchConditions(m); err != nil {
return true
}
atLeastOneValid = true
p.addPending(m)
// Send the message in parallel to avoid blocking all messages
// due to a long standing reactor that has its chan full
wg.Add(1)
p.maxQueuedMessages.Access() // Check the limit of goroutines
go func(m *Msg) {
defer func() {
if r := recover(); r != nil {
return // Ignore "closed channel" error when the program finishes
}
}()
k.(*reactor.Reactor).Ch <- m
wg.Done()
p.maxQueuedMessages.Release()
}(m)
p.addPending(m)
return true
})
wg.Wait()

// We delete this message if is invalid for all the reactors
if !atLeastOneValid {
log.Printf("Invalid message from %s, deleted: %s", p.URL, *msg.Body)
p.Delete(m)
p.delete(m)
}

}
Expand All @@ -210,7 +237,7 @@ func (p *sqsListen) Exit() error {
return nil
}

func (p *sqsListen) Delete(v lib.Msg) (err error) {
func (p *sqsListen) delete(v lib.Msg) (err error) {
msg, ok := v.(*Msg)
if !ok {
log.Printf("ERROR SQS Delete: invalid message %+v", v)
Expand Down Expand Up @@ -247,23 +274,34 @@ func (p *sqsListen) addPending(m lib.Msg) {
}

// Done removes the message from the pending queue.
func (p *sqsListen) Done(m lib.Msg) {
p.Lock()
defer p.Unlock()
func (p *sqsListen) Done(m lib.Msg, statusOk bool) {
msg, ok := m.(*Msg)
if !ok {
return
}
id := *msg.M.ReceiptHandle

p.Lock()
defer p.Unlock()

// If it's not in pending, ignore it
v, ok := p.pendings[id]
if !ok {
return
}
if v <= 1 {
delete(p.pendings, id)
return
}
v -= 1
p.pendings[id] = v
if !statusOk {
p.messError[id] = true
}

// Check if it's the last
if v <= 0 {
delete(p.pendings, id)
_, hadError := p.messError[id]
if !hadError {
// Delete the message if there's no more pending reactors
go p.delete(m) // Execute delete message outside the Lock
}
}
}
9 changes: 2 additions & 7 deletions inputs/sqs/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,6 @@ func NewOrGet(r *reactor.Reactor, c map[string]interface{}) (*SQSPlugin, error)
return p, nil
}

// Delete message from the SQS
func (p *SQSPlugin) Delete(v lib.Msg) error {
return p.l.Delete(v)
}

// Put is not needed in SQS
func (p *SQSPlugin) Put(v lib.Msg) error {
return nil
Expand All @@ -87,6 +82,6 @@ func (p *SQSPlugin) Stop() {
p.l.Stop()
}

func (p *SQSPlugin) Done(v lib.Msg) {
p.l.Done(v)
func (p *SQSPlugin) Done(v lib.Msg, status bool) {
p.l.Done(v, status)
}
10 changes: 5 additions & 5 deletions lib/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import "github.com/gabrielperezs/goreactor/reactorlog"

// Input is the interface for the Input plugins
type Input interface {
Put(m Msg) error
Delete(m Msg) error
Done(m Msg) // Input was processed and don't need to keep it as pending
Stop() // Stop accepting input
Exit() // Exit from the loop
// Put(m Msg) error
//Delete(m Msg) error
Done(m Msg, status bool) // Input was processed and don't need to keep it as pending
Stop() // Stop accepting input
Exit() // Exit from the loop
}

// Output is the interface for the Output plugins
Expand Down
3 changes: 1 addition & 2 deletions outputs/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,7 @@ func (o *Cmd) getReplacedArguments(msg lib.Msg) []string {
// the command.
func (o *Cmd) Run(rl reactorlog.ReactorLog, msg lib.Msg) error {

var args []string
args = o.getReplacedArguments(msg)
args := o.getReplacedArguments(msg)

logLabel := o.findReplace(msg, o.r.Label)
rl.SetLabel(logLabel)
Expand Down
25 changes: 5 additions & 20 deletions reactor/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ func NewReactor(icfg interface{}) *Reactor {

// There are several listeners for concurrency,
// better not to buffer too much to avoid to many message in travel.
// Leave 1 for better use of CPU but can be zero
r.Ch = make(chan lib.Msg, 1)
// Buffer reasonable values are 0 or 1
r.Ch = make(chan lib.Msg)

log.Printf("Reactor %d concurrent %d, delay %s", r.id, r.Concurrent, r.Delay)

Expand Down Expand Up @@ -174,8 +174,6 @@ func (r *Reactor) deadline() {
}

func (r *Reactor) run(msg lib.Msg) {
defer r.I.Done(msg) // To remove this message from the pending message queue

r.deadline()

cc := r.cc
Expand All @@ -191,20 +189,7 @@ func (r *Reactor) run(msg lib.Msg) {
}

err = r.O.Run(rl, msg)
defer rl.Done(err)

if err == nil {
if err := r.I.Delete(msg); err != nil {
log.Printf("INTERNAL ERROR: %s", err)
}
return
}

if err == ErrInvalidMsgForPlugin {
return
}

if err := r.I.Put(msg); err != nil {
log.Printf("INTERNAL ERROR: %s", err)
}
ok := err == nil || err == ErrInvalidMsgForPlugin
r.I.Done(msg, ok) // To remove this message from the pending message queue
rl.Done(err)
}

0 comments on commit 60c2945

Please sign in to comment.