diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..37e2e0d --- /dev/null +++ b/Makefile @@ -0,0 +1,14 @@ +default: build + +build: export CGO_ENABLED = 0 +build: + go env -w GOPRIVATE="github.com/Webjet/*" + go build -o goreactor -ldflags '-w -s' + +arm: export CGO_ENABLED = 0 +arm: + go env -w GOPRIVATE="github.com/Webjet/*" + env GOOS=linux GOARCH=arm64 GONOSUMDB=github.com/Webjet/engine go build -o goreactor_arm -ldflags '-w -s' + +clean: + go clean diff --git a/inputs/sqs/pool.go b/inputs/sqs/pool.go index 3fb8776..8e6672a 100644 --- a/inputs/sqs/pool.go +++ b/inputs/sqs/pool.go @@ -3,6 +3,7 @@ package sqs import ( "fmt" "log" + "runtime" "strconv" "strings" "sync" @@ -15,6 +16,7 @@ import ( "github.com/aws/aws-sdk-go/service/sqs" "github.com/gabrielperezs/goreactor/lib" "github.com/gabrielperezs/goreactor/reactor" + "github.com/gallir/dynsemaphore" ) var MessageSystemAttributeNameSentTimestamp = sqs.MessageSystemAttributeNameSentTimestamp @@ -35,8 +37,10 @@ type sqsListen struct { exitedMu sync.Mutex done chan bool - broadcastCh sync.Map - pendings map[string]int + broadcastCh sync.Map + pendings map[string]int + messError map[string]bool + maxQueuedMessages *dynsemaphore.DynSemaphore // Max of goroutines wating to send the message } func newSQSListen(r *reactor.Reactor, c map[string]interface{}) (*sqsListen, error) { @@ -79,10 +83,13 @@ func newSQSListen(r *reactor.Reactor, c map[string]interface{}) (*sqsListen, err if err != nil { return nil, err } - p.broadcastCh.Store(r, true) + p.broadcastCh.Store(r, r.Concurrent) p.pendings = make(map[string]int) + p.messError = make(map[string]bool) p.svc = sqs.New(sess, &aws.Config{Region: aws.String(p.Region)}) + p.maxQueuedMessages = dynsemaphore.New(0) + p.updateConcurrency() go p.listen() @@ -90,7 +97,23 @@ func newSQSListen(r *reactor.Reactor, c map[string]interface{}) (*sqsListen, err } func (p *sqsListen) AddOrUpdate(r *reactor.Reactor) { - p.broadcastCh.Store(r, true) + p.broadcastCh.Store(r, r.Concurrent) + p.updateConcurrency() +} + +func (p *sqsListen) updateConcurrency() { + total := 0 + p.broadcastCh.Range(func(k, v interface{}) bool { + total += v.(int) + return true + }) + maxPendings := total + if maxPendings < runtime.NumCPU() { + maxPendings = runtime.NumCPU() + } + maxPendings = maxPendings * 2 // Its means x*nreactors max pending messages via goroutines, What's the right number? + log.Printf("SQS: total concurrency: %d, max pending in-flight messages: %d", total, maxPendings) + p.maxQueuedMessages.SetConcurrency(total) } func (p *sqsListen) listen() { @@ -107,6 +130,7 @@ func (p *sqsListen) listen() { time.Sleep(time.Second) tries++ if tries > 120 { // Wait no more than 120 seconds, the usual max + log.Printf("WARNING, timeout waiting for %d pending messages", len(p.pendings)) break } } @@ -163,28 +187,31 @@ func (p *sqsListen) deliver(msg *sqs.Message) { m.B = []byte(s) } - wg := sync.WaitGroup{} p.broadcastCh.Range(func(k, v interface{}) bool { if err := k.(*reactor.Reactor).MatchConditions(m); err != nil { return true } atLeastOneValid = true + p.addPending(m) // Send the message in parallel to avoid blocking all messages // due to a long standing reactor that has its chan full - wg.Add(1) + p.maxQueuedMessages.Access() // Check the limit of goroutines go func(m *Msg) { + defer func() { + if r := recover(); r != nil { + return // Ignore "closed channel" error when the program finishes + } + }() k.(*reactor.Reactor).Ch <- m - wg.Done() + p.maxQueuedMessages.Release() }(m) - p.addPending(m) return true }) - wg.Wait() // We delete this message if is invalid for all the reactors if !atLeastOneValid { log.Printf("Invalid message from %s, deleted: %s", p.URL, *msg.Body) - p.Delete(m) + p.delete(m) } } @@ -210,7 +237,7 @@ func (p *sqsListen) Exit() error { return nil } -func (p *sqsListen) Delete(v lib.Msg) (err error) { +func (p *sqsListen) delete(v lib.Msg) (err error) { msg, ok := v.(*Msg) if !ok { log.Printf("ERROR SQS Delete: invalid message %+v", v) @@ -247,23 +274,34 @@ func (p *sqsListen) addPending(m lib.Msg) { } // Done removes the message from the pending queue. -func (p *sqsListen) Done(m lib.Msg) { - p.Lock() - defer p.Unlock() +func (p *sqsListen) Done(m lib.Msg, statusOk bool) { msg, ok := m.(*Msg) if !ok { return } id := *msg.M.ReceiptHandle + p.Lock() + defer p.Unlock() + + // If it's not in pending, ignore it v, ok := p.pendings[id] if !ok { return } - if v <= 1 { - delete(p.pendings, id) - return - } v -= 1 p.pendings[id] = v + if !statusOk { + p.messError[id] = true + } + + // Check if it's the last + if v <= 0 { + delete(p.pendings, id) + _, hadError := p.messError[id] + if !hadError { + // Delete the message if there's no more pending reactors + go p.delete(m) // Execute delete message outside the Lock + } + } } diff --git a/inputs/sqs/sqs.go b/inputs/sqs/sqs.go index da48ab1..d814f32 100644 --- a/inputs/sqs/sqs.go +++ b/inputs/sqs/sqs.go @@ -66,11 +66,6 @@ func NewOrGet(r *reactor.Reactor, c map[string]interface{}) (*SQSPlugin, error) return p, nil } -// Delete message from the SQS -func (p *SQSPlugin) Delete(v lib.Msg) error { - return p.l.Delete(v) -} - // Put is not needed in SQS func (p *SQSPlugin) Put(v lib.Msg) error { return nil @@ -87,6 +82,6 @@ func (p *SQSPlugin) Stop() { p.l.Stop() } -func (p *SQSPlugin) Done(v lib.Msg) { - p.l.Done(v) +func (p *SQSPlugin) Done(v lib.Msg, status bool) { + p.l.Done(v, status) } diff --git a/lib/interfaces.go b/lib/interfaces.go index 75d2034..0f84d33 100644 --- a/lib/interfaces.go +++ b/lib/interfaces.go @@ -4,11 +4,11 @@ import "github.com/gabrielperezs/goreactor/reactorlog" // Input is the interface for the Input plugins type Input interface { - Put(m Msg) error - Delete(m Msg) error - Done(m Msg) // Input was processed and don't need to keep it as pending - Stop() // Stop accepting input - Exit() // Exit from the loop + // Put(m Msg) error + //Delete(m Msg) error + Done(m Msg, status bool) // Input was processed and don't need to keep it as pending + Stop() // Stop accepting input + Exit() // Exit from the loop } // Output is the interface for the Output plugins diff --git a/outputs/cmd/cmd.go b/outputs/cmd/cmd.go index 2ac216e..bf62ef2 100644 --- a/outputs/cmd/cmd.go +++ b/outputs/cmd/cmd.go @@ -166,8 +166,7 @@ func (o *Cmd) getReplacedArguments(msg lib.Msg) []string { // the command. func (o *Cmd) Run(rl reactorlog.ReactorLog, msg lib.Msg) error { - var args []string - args = o.getReplacedArguments(msg) + args := o.getReplacedArguments(msg) logLabel := o.findReplace(msg, o.r.Label) rl.SetLabel(logLabel) diff --git a/reactor/reactor.go b/reactor/reactor.go index 9db111c..595bc39 100644 --- a/reactor/reactor.go +++ b/reactor/reactor.go @@ -54,8 +54,8 @@ func NewReactor(icfg interface{}) *Reactor { // There are several listeners for concurrency, // better not to buffer too much to avoid to many message in travel. - // Leave 1 for better use of CPU but can be zero - r.Ch = make(chan lib.Msg, 1) + // Buffer reasonable values are 0 or 1 + r.Ch = make(chan lib.Msg) log.Printf("Reactor %d concurrent %d, delay %s", r.id, r.Concurrent, r.Delay) @@ -174,8 +174,6 @@ func (r *Reactor) deadline() { } func (r *Reactor) run(msg lib.Msg) { - defer r.I.Done(msg) // To remove this message from the pending message queue - r.deadline() cc := r.cc @@ -191,20 +189,7 @@ func (r *Reactor) run(msg lib.Msg) { } err = r.O.Run(rl, msg) - defer rl.Done(err) - - if err == nil { - if err := r.I.Delete(msg); err != nil { - log.Printf("INTERNAL ERROR: %s", err) - } - return - } - - if err == ErrInvalidMsgForPlugin { - return - } - - if err := r.I.Put(msg); err != nil { - log.Printf("INTERNAL ERROR: %s", err) - } + ok := err == nil || err == ErrInvalidMsgForPlugin + r.I.Done(msg, ok) // To remove this message from the pending message queue + rl.Done(err) }