From c44e2e57224ae11417480bab4cf3e05944a708f8 Mon Sep 17 00:00:00 2001 From: gabriel Date: Sat, 10 Feb 2024 01:48:14 +0100 Subject: [PATCH 1/5] Send a keepalive signal to notify that the command is still running --- README.md | 216 +++++++++++++++++++++++---------------------- inputs/sqs/pool.go | 27 ++++++ inputs/sqs/sqs.go | 6 ++ lib/interfaces.go | 18 ++-- outputs/cmd/cmd.go | 4 +- reactor/reactor.go | 76 ++++++++++++---- 6 files changed, 216 insertions(+), 131 deletions(-) diff --git a/README.md b/README.md index 75a9c2b..955832c 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,119 @@ -[![Build Status](https://travis-ci.org/gabrielperezs/goreactor.svg?branch=master)](https://travis-ci.org/gabrielperezs/goreactor) [![Go Report Card](https://goreportcard.com/badge/github.com/gabrielperezs/goreactor)](https://goreportcard.com/report/github.com/gabrielperezs/goreactor) -![Go](https://github.com/gabrielperezs/goreactor/workflows/Go/badge.svg?branch=master) +[![Go Report Card](https://goreportcard.com/badge/github.com/gabrielperezs/goreactor)](https://goreportcard.com/report/github.com/gabrielperezs/goreactor) Goreactor, trigger a message and execute commands ================================================= The current version just support inputs from AWS SQS, in next versions will support also HTTP/S and Redis QUEUES. -Features -======== + +Usage case, message from SQS +=========================== + +In the next example, we will run a command after receiving a message from SQS, the message received will look like the JSON below + +Message received from SQS +------------------------ + +```json +{ + "Progress": 50, + "AccountId": "9999999999", + "Description": "Launching EC2 instance: i-00000009999", + "RequestId": "5f88cdf3-311c-4d61-bdec-0000000000", + "EndTime": "2018-04-18T14:17:22.779Z", + "AutoScalingGroupARN": "arn:aws:autoscaling:eu-west-1:9999999999:autoScalingGroup:0b35d38a-8270-45d0-a0d8-0000000000:autoScalingGroupName/SOMEGROUPNAME", + "ActivityId": "5f88cdf3-311c-4d61-bdec-0000000000", + "StartTime": "2018-04-18T14:16:26.804Z", + "Service": "AWS Auto Scaling", + "Time": "2018-04-18T14:17:22.779Z", + "EC2InstanceId": "i-00000009999", // <--- We read this + "StatusCode": "InProgress", + "StatusMessage": "", + "Details": { + "Subnet ID": "subnet-XXXXXXXX", + "Availability Zone": "eu-west-1a" + }, + "AutoScalingGroupName": "SOMEGROUPNAME", // <--- We read this + "Cause": "At 2018-04-18T14:16:21Z a user request update of AutoScalingGroup constraints to min: 1, max: 5, desired: 3 changing the desired capacity from 2 to 3....", + "Event": "autoscaling:EC2_INSTANCE_LAUNCH" // <--- We read this, as condition +} +``` + +Configuration in go reactor +--------------------------- + +In the goreactor, we are ready to listen to messages and execute the command. + +Please __put attention__ in the "cond" parameter (is a condition, like an "if"). __This means that the command will be executed JUST if we can find "Event = autoscaling:EC2_INSTANCE_LAUNCH" in the JSON__. If this key is not there or the value is not what we defined, then goreactor will ignore the message + + +```toml +[logstream] +logstream = "stdout" + +[[reactor]] +concurrent = 10 +delay = "5s" +keepAliveInterval = "30s" +input = "sqs" +url = "https://sqs.eu-west-1.amazonaws.com/9999999999/testing" +profile = "default" +region = "eu-west-1" +maxNumberOfMessages = 10 +output = "cmd" +cond = [ + { "$.Event" = "autoscaling:EC2_INSTANCE_LAUNCH" } +] +cmd = "/usr/local/bin/do-something-with-the-instance" +args = ["asg=$.AutoScalingGroupName", "instance_id=$.EC2InstanceId"] +workingDirectory = "/path/to/process/wd/" +``` + + +The daemon will execute a command like +-------------------------------------- + +```bash +/usr/local/bin/do-something-with-the instance asg=SOMEGROUPNAME instance_id=i-00000009999 +``` + +Arguments of a reactor +---------------------- + +- **concurrent** - Maximum of commands (cmd) that can run concurrently +- **delay** - Period of time that a new command runs without concurrency, after that period the next message will start. Format https://pkg.go.dev/time#ParseDuration +- **keepAliveInterval** - A signal will be sent to the underline input source to inform that the process is still running. Format https://pkg.go.dev/time#ParseDuration. In SQS input will trigger a request ChangeMessageVisibility (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ChangeMessageVisibility.html) + +Set working directory +--------------------- + +The working directory of the process can be set with `workingDirectory`. + +Execute as a specific user +-------------------------- + +It may be desired to execute some commands as a specific user. + +NOTE: that this only works in unix or unix like systems and have only been tested in linux. In other OS, this will be ignored. + +In order to do so, each [[reactor]] entry may have a `user` value defined with the desired user. +The env will be empty (only containing the HOME - which can be overridden) unless you specify it in a `env` section +that will only be used if `user` is defined. + +```toml +[[reactor]] +# (...) All the desired values +user = "john" +env = [ +"PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin", +"AWS_PROFILE=stage" +] +``` + +This would run the reactor using john user and 2 environ: HOME set to the home defined for the john user, +PATH defined to "PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" and AWS_PROFILE set to "stage" + +**DO NOT USE SET THE SETUID BIT FOR GOREACTOR!!!!** Substitutions in args entry in the config file ---------------------------------------------- @@ -85,105 +191,3 @@ The output will be a json per output line with the following format: {"Host":"RUNNER_HOSTNAME","Pid":44274,"RID":1,"TID":1,"Line":3,"Output":"last line","Status":"RUN","Timestamp":1635149955} {"Host":"RUNNER_HOSTNAME","Pid":44274,"RID":1,"TID":1,"Line":4,"Status":"END","Elapse":3.067383247,"Timestamp":1635149955} ``` - - -Set working directory ---------------------- - -The working directory of the process can be set with `workingDirectory`. - -Execute as a specific user --------------------------- - -It may be desired to execute some commands as a specific user. - -NOTE: that this only works in unix or unix like systems and have only been tested in linux. In other OS, this will be ignored. - -In order to do so, each [[reactor]] entry may have a `user` value defined with the desired user. -The env will be empty (only containing the HOME - which can be overridden) unless you specify it in a `env` section -that will only be used if `user` is defined. - -```toml -[[reactor]] -# (...) All the desired values -user = "john" -env = [ -"PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin", -"AWS_PROFILE=stage" -] -``` - -This would run the reactor using john user and 2 environ: HOME set to the home defined for the john user, -PATH defined to "PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" and AWS_PROFILE set to "stage" - -**DO NOT USE SET THE SETUID BIT FOR GOREACTOR!!!!** - -Usage case, message from SQS -=========================== - -In the next example, we will run a command after receiving a message from SQS, the message received will look like the JSON below - -Message received from SQS ------------------------- - -```json -{ - "Progress": 50, - "AccountId": "9999999999", - "Description": "Launching EC2 instance: i-00000009999", - "RequestId": "5f88cdf3-311c-4d61-bdec-0000000000", - "EndTime": "2018-04-18T14:17:22.779Z", - "AutoScalingGroupARN": "arn:aws:autoscaling:eu-west-1:9999999999:autoScalingGroup:0b35d38a-8270-45d0-a0d8-0000000000:autoScalingGroupName/SOMEGROUPNAME", - "ActivityId": "5f88cdf3-311c-4d61-bdec-0000000000", - "StartTime": "2018-04-18T14:16:26.804Z", - "Service": "AWS Auto Scaling", - "Time": "2018-04-18T14:17:22.779Z", - "EC2InstanceId": "i-00000009999", // <--- We read this - "StatusCode": "InProgress", - "StatusMessage": "", - "Details": { - "Subnet ID": "subnet-XXXXXXXX", - "Availability Zone": "eu-west-1a" - }, - "AutoScalingGroupName": "SOMEGROUPNAME", // <--- We read this - "Cause": "At 2018-04-18T14:16:21Z a user request update of AutoScalingGroup constraints to min: 1, max: 5, desired: 3 changing the desired capacity from 2 to 3....", - "Event": "autoscaling:EC2_INSTANCE_LAUNCH" // <--- We read this, as condition -} -``` - -Configuration in go reactor ---------------------------- - -In the goreactor, we are ready to listen to messages and execute the command. - -Please __put attention__ in the "cond" parameter (is a condition, like an "if"). __This means that the command will be executed JUST if we can find "Event = autoscaling:EC2_INSTANCE_LAUNCH" in the JSON__. If this key is not there or the value is not what we defined, then goreactor will ignore the message - - -```toml -[logstream] -logstream = "stdout" - -[[reactor]] -concurrent = 10 -delay = "5s" -input = "sqs" -url = "https://sqs.eu-west-1.amazonaws.com/9999999999/testing" -profile = "default" -region = "eu-west-1" -maxNumberOfMessages = 10 -output = "cmd" -cond = [ - { "$.Event" = "autoscaling:EC2_INSTANCE_LAUNCH" } -] -cmd = "/usr/local/bin/do-something-with-the-instance" -args = ["asg=$.AutoScalingGroupName", "instance_id=$.EC2InstanceId"] -workingDirectory = "/path/to/process/wd/" -``` - - -The daemon will execute a command like --------------------------------------- - -```bash -/usr/local/bin/do-something-with-the instance asg=SOMEGROUPNAME instance_id=i-00000009999 -``` diff --git a/inputs/sqs/pool.go b/inputs/sqs/pool.go index dc30c9a..4b7f514 100644 --- a/inputs/sqs/pool.go +++ b/inputs/sqs/pool.go @@ -1,8 +1,10 @@ package sqs import ( + "context" "fmt" "log" + "math" "runtime" "strconv" "strings" @@ -257,6 +259,31 @@ func (p *sqsListen) delete(v lib.Msg) (err error) { return } +func (p *sqsListen) KeepAlive(ctx context.Context, t time.Duration, v lib.Msg) (err error) { + msg, ok := v.(*Msg) + if !ok { + log.Printf("ERROR SQS KeepAlive: invalid message %+v", v) + return + } + + if msg.SQS == nil || msg == nil { + return + } + + // increase the visibility timeout by 10% of the original time + t = t + time.Duration(float64(t)*0.1) + sec := int64(math.Ceil(t.Seconds())) + + if _, err = msg.SQS.ChangeMessageVisibilityWithContext(ctx, &sqs.ChangeMessageVisibilityInput{ + QueueUrl: msg.URL, + ReceiptHandle: msg.M.ReceiptHandle, + VisibilityTimeout: aws.Int64(sec), + }); err != nil { + log.Printf("ERROR: %s - %s", *msg.URL, err) + } + return +} + func (p *sqsListen) addPending(m lib.Msg) { p.Lock() defer p.Unlock() diff --git a/inputs/sqs/sqs.go b/inputs/sqs/sqs.go index d814f32..1a95bd9 100644 --- a/inputs/sqs/sqs.go +++ b/inputs/sqs/sqs.go @@ -1,8 +1,10 @@ package sqs import ( + "context" "fmt" "strings" + "time" "github.com/gabrielperezs/goreactor/lib" "github.com/gabrielperezs/goreactor/reactor" @@ -85,3 +87,7 @@ func (p *SQSPlugin) Stop() { func (p *SQSPlugin) Done(v lib.Msg, status bool) { p.l.Done(v, status) } + +func (p *SQSPlugin) KeepAlive(ctx context.Context, t time.Duration, v lib.Msg) (err error) { + return p.l.KeepAlive(ctx, t, v) +} diff --git a/lib/interfaces.go b/lib/interfaces.go index 0f84d33..c02a533 100644 --- a/lib/interfaces.go +++ b/lib/interfaces.go @@ -1,20 +1,24 @@ package lib -import "github.com/gabrielperezs/goreactor/reactorlog" +import ( + "context" + "time" + + "github.com/gabrielperezs/goreactor/reactorlog" +) // Input is the interface for the Input plugins type Input interface { - // Put(m Msg) error - //Delete(m Msg) error - Done(m Msg, status bool) // Input was processed and don't need to keep it as pending - Stop() // Stop accepting input - Exit() // Exit from the loop + KeepAlive(context.Context, time.Duration, Msg) error + Done(Msg, bool) // Input was processed and don't need to keep it as pending + Stop() // Stop accepting input + Exit() // Exit from the loop } // Output is the interface for the Output plugins type Output interface { MatchConditions(a Msg) error - Run(rl reactorlog.ReactorLog, a Msg) error + Run(ctx context.Context, rl reactorlog.ReactorLog, a Msg) error Exit() } diff --git a/outputs/cmd/cmd.go b/outputs/cmd/cmd.go index b222158..fd4ab52 100644 --- a/outputs/cmd/cmd.go +++ b/outputs/cmd/cmd.go @@ -164,7 +164,7 @@ func (o *Cmd) getReplacedArguments(msg lib.Msg) []string { // Run will execute the binary command that was defined in the config. // In this function we also define the OUT and ERR data destination of // the command. -func (o *Cmd) Run(rl reactorlog.ReactorLog, msg lib.Msg) error { +func (o *Cmd) Run(parentCtx context.Context, rl reactorlog.ReactorLog, msg lib.Msg) error { args := o.getReplacedArguments(msg) @@ -172,7 +172,7 @@ func (o *Cmd) Run(rl reactorlog.ReactorLog, msg lib.Msg) error { rl.SetLabel(logLabel) rl.SetHash(msg.GetHash()) - ctx, cancel := context.WithTimeout(context.Background(), o.maximumCmdTimeLive) + ctx, cancel := context.WithTimeout(parentCtx, o.maximumCmdTimeLive) defer cancel() var c *exec.Cmd diff --git a/reactor/reactor.go b/reactor/reactor.go index 595bc39..1841cc9 100644 --- a/reactor/reactor.go +++ b/reactor/reactor.go @@ -1,6 +1,7 @@ package reactor import ( + "context" "fmt" "log" "strings" @@ -19,26 +20,33 @@ var ( counters uint64 // ErrInvalidMsgForPlugin error - ErrInvalidMsgForPlugin = fmt.Errorf("This message is not valid for this output") + ErrInvalidMsgForPlugin = fmt.Errorf("this message is not valid for this output") + + keepAliveLogMessage = []byte("keepalive") +) + +const ( + defaultKeepAliveInterval = 5 * time.Minute ) // Reactor is the struct where we keep the relation betwean Input plugins // and the Output plugins. Also contains the configuration for concurrency... type Reactor struct { - mu sync.Mutex - I lib.Input - O lib.Output - Ch chan lib.Msg - id uint64 - tid uint64 - Concurrent int - Delay time.Duration - Label string - Hostname string - nextDeadline time.Time - done chan bool - logStream lib.LogStream - cc *dynsemaphore.DynSemaphore + mu sync.Mutex + I lib.Input + O lib.Output + Ch chan lib.Msg + id uint64 + tid uint64 + Concurrent int + Delay time.Duration + KeepAliveInterval time.Duration + Label string + Hostname string + nextDeadline time.Time + done chan bool + logStream lib.LogStream + cc *dynsemaphore.DynSemaphore } // NewReactor will create a reactor with the configuration @@ -85,6 +93,12 @@ func (r *Reactor) Reload(icfg interface{}) { if err != nil { r.Delay = 0 } + case "keepaliveinterval": + var err error + r.KeepAliveInterval, err = time.ParseDuration(v.(string)) + if err != nil { + r.KeepAliveInterval = 0 + } } } @@ -188,8 +202,38 @@ func (r *Reactor) run(msg lib.Msg) { rl = jsonreactorlog.NewJSONReactorLog(r.logStream, r.Hostname, r.id, atomic.AddUint64(&r.tid, 1)) } - err = r.O.Run(rl, msg) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // run keep alive go routine if needed + if r.KeepAliveInterval > 0 { + go r.KeepAlive(ctx, rl, msg) + } + + err = r.O.Run(ctx, rl, msg) ok := err == nil || err == ErrInvalidMsgForPlugin r.I.Done(msg, ok) // To remove this message from the pending message queue rl.Done(err) } + +func (r *Reactor) KeepAlive(ctx context.Context, rl reactorlog.ReactorLog, msg lib.Msg) { + t := time.NewTicker(r.KeepAliveInterval) + defer t.Stop() + + for { + select { + case <-t.C: + keepAliveCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + err := r.I.KeepAlive(keepAliveCtx, r.KeepAliveInterval, msg) + cancel() + + if err != nil { + rl.Write([]byte(fmt.Sprintf("keepalive error: %s", err))) + } else { + rl.Write(keepAliveLogMessage) + } + case <-ctx.Done(): + return + } + } +} From 531d0dd9a5f063e77b9a5356b481f9955356f248 Mon Sep 17 00:00:00 2001 From: gabriel Date: Tue, 13 Feb 2024 01:43:12 +0100 Subject: [PATCH 2/5] delivery to reactors in sequence and in parallel with a new config variable called "noblocking" noblocking false: send messages in sequence noblocking true: send in parallel to avoid blocking --- inputs/sqs/pool.go | 90 +++++++++++++++++++++++++++++----------------- 1 file changed, 58 insertions(+), 32 deletions(-) diff --git a/inputs/sqs/pool.go b/inputs/sqs/pool.go index 4b7f514..a7a795c 100644 --- a/inputs/sqs/pool.go +++ b/inputs/sqs/pool.go @@ -27,10 +27,11 @@ var connPool sync.Map type sqsListen struct { sync.Mutex - URL string - Region string - Profile string - MaxNumberOfMessages int64 + url string + region string + profile string + maxNumberOfMessages int64 + noBlocking bool // If true, the input will not block waiting for a reactor to finish svc *sqs.SQS @@ -54,32 +55,34 @@ func newSQSListen(r *reactor.Reactor, c map[string]interface{}) (*sqsListen, err for k, v := range c { switch strings.ToLower(k) { case "url": - p.URL = v.(string) + p.url = v.(string) case "region": - p.Region = v.(string) + p.region = v.(string) case "profile": - p.Profile = v.(string) + p.profile = v.(string) case "maxnumberofmessages": - p.MaxNumberOfMessages, _ = v.(int64) + p.maxNumberOfMessages, _ = v.(int64) + case "noblocking": + p.noBlocking, _ = v.(bool) } } - if p.MaxNumberOfMessages == 0 { - p.MaxNumberOfMessages = defaultMaxNumberOfMessages + if p.maxNumberOfMessages == 0 { + p.maxNumberOfMessages = defaultMaxNumberOfMessages } - if p.URL == "" { + if p.url == "" { return nil, fmt.Errorf("SQS ERROR: URL not found or invalid") } - if p.Region == "" { + if p.region == "" { return nil, fmt.Errorf("SQS ERROR: Region not found or invalid") } - log.Printf("SQS NEW %s", p.URL) + log.Printf("SQS NEW %s", p.url) sess, err := session.NewSessionWithOptions(session.Options{ - Profile: p.Profile, + Profile: p.profile, SharedConfigState: session.SharedConfigEnable, }) if err != nil { @@ -89,7 +92,7 @@ func newSQSListen(r *reactor.Reactor, c map[string]interface{}) (*sqsListen, err p.pendings = make(map[string]int) p.messError = make(map[string]bool) - p.svc = sqs.New(sess, &aws.Config{Region: aws.String(p.Region)}) + p.svc = sqs.New(sess, &aws.Config{Region: aws.String(p.region)}) p.maxQueuedMessages = dynsemaphore.New(0) p.updateConcurrency() @@ -121,12 +124,12 @@ func (p *sqsListen) updateConcurrency() { func (p *sqsListen) listen() { defer func() { p.done <- true - log.Printf("SQS EXIT %s", p.URL) + log.Printf("SQS EXIT %s", p.url) }() for { if atomic.LoadUint32(&p.exiting) > 0 { - log.Printf("SQS Listener Stopped %s", p.URL) + log.Printf("SQS Listener Stopped %s", p.url) tries := 0 for len(p.pendings) > 0 { time.Sleep(time.Second) @@ -140,8 +143,8 @@ func (p *sqsListen) listen() { } params := &sqs.ReceiveMessageInput{ - QueueUrl: aws.String(p.URL), - MaxNumberOfMessages: aws.Int64(p.MaxNumberOfMessages), + QueueUrl: aws.String(p.url), + MaxNumberOfMessages: aws.Int64(p.maxNumberOfMessages), WaitTimeSeconds: aws.Int64(waitTimeSeconds), AttributeNames: []*string{&MessageSystemAttributeNameSentTimestamp}, } @@ -149,7 +152,7 @@ func (p *sqsListen) listen() { resp, err := p.svc.ReceiveMessage(params) if err != nil { - log.Printf("ERROR: AWS session on %s - %s", p.URL, err) + log.Printf("ERROR: AWS session on %s - %s", p.url, err) time.Sleep(15 * time.Second) continue } @@ -177,7 +180,7 @@ func (p *sqsListen) deliver(msg *sqs.Message) { Id: msg.MessageId, ReceiptHandle: msg.ReceiptHandle, }, - URL: aws.String(p.URL), + URL: aws.String(p.url), SentTimestamp: sentTimestamp, Hash: *msg.MessageId, } @@ -190,33 +193,56 @@ func (p *sqsListen) deliver(msg *sqs.Message) { m.B = []byte(s) } + if p.noBlocking { + p.deliverNoBlocking(m) + } else { + p.deliverBlocking(m) + } + + // We delete this message if is invalid for all the reactors + if !atLeastOneValid { + log.Printf("Invalid message from %s, deleted: %s", p.url, *msg.Body) + p.delete(m) + } + +} + +// deliverBlocking send the message to the reactors in sequence +func (p *sqsListen) deliverBlocking(m *Msg) (atLeastOneValid bool) { + p.broadcastCh.Range(func(k, v interface{}) bool { + if err := k.(*reactor.Reactor).MatchConditions(m); err != nil { + return true + } + atLeastOneValid = true + p.addPending(m) + k.(*reactor.Reactor).Ch <- m + return true + }) + return +} + +// deliverNoBlocking send the message in parallel to avoid blocking +// all messages due to a long standing reactor that has its chan full +func (p *sqsListen) deliverNoBlocking(m *Msg) (atLeastOneValid bool) { p.broadcastCh.Range(func(k, v interface{}) bool { if err := k.(*reactor.Reactor).MatchConditions(m); err != nil { return true } atLeastOneValid = true p.addPending(m) - // Send the message in parallel to avoid blocking all messages - // due to a long standing reactor that has its chan full p.maxQueuedMessages.Access() // Check the limit of goroutines go func(m *Msg) { defer func() { + p.maxQueuedMessages.Release() if r := recover(); r != nil { return // Ignore "closed channel" error when the program finishes } }() k.(*reactor.Reactor).Ch <- m - p.maxQueuedMessages.Release() }(m) return true }) - - // We delete this message if is invalid for all the reactors - if !atLeastOneValid { - log.Printf("Invalid message from %s, deleted: %s", p.URL, *msg.Body) - p.delete(m) - } - + return } func (p *sqsListen) Stop() { @@ -224,7 +250,7 @@ func (p *sqsListen) Stop() { return } atomic.AddUint32(&p.exiting, 1) - log.Printf("SQS Input Stopping %s", p.URL) + log.Printf("SQS Input Stopping %s", p.url) } func (p *sqsListen) Exit() error { From dadbf93e7244a9f17dab2f3c7c1f44b91ae143ed Mon Sep 17 00:00:00 2001 From: gabriel Date: Tue, 13 Feb 2024 11:01:39 +0100 Subject: [PATCH 3/5] FIX --- inputs/sqs/pool.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/inputs/sqs/pool.go b/inputs/sqs/pool.go index a7a795c..bf7e490 100644 --- a/inputs/sqs/pool.go +++ b/inputs/sqs/pool.go @@ -194,9 +194,9 @@ func (p *sqsListen) deliver(msg *sqs.Message) { } if p.noBlocking { - p.deliverNoBlocking(m) + atLeastOneValid = p.deliverNoBlocking(m) } else { - p.deliverBlocking(m) + atLeastOneValid = p.deliverBlocking(m) } // We delete this message if is invalid for all the reactors From 048ebf15d2411f430a23ae0a56e639694c459471 Mon Sep 17 00:00:00 2001 From: gabriel Date: Tue, 13 Feb 2024 11:34:01 +0100 Subject: [PATCH 4/5] just for clarity --- inputs/sqs/pool.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/inputs/sqs/pool.go b/inputs/sqs/pool.go index bf7e490..cf92d6a 100644 --- a/inputs/sqs/pool.go +++ b/inputs/sqs/pool.go @@ -164,9 +164,6 @@ func (p *sqsListen) listen() { } func (p *sqsListen) deliver(msg *sqs.Message) { - // Flag to delete the message if don't match with at least one reactor condition - atLeastOneValid := false - timestamp, ok := msg.Attributes[sqs.MessageSystemAttributeNameSentTimestamp] var sentTimestamp int64 if ok && timestamp != nil { @@ -193,6 +190,8 @@ func (p *sqsListen) deliver(msg *sqs.Message) { m.B = []byte(s) } + // Flag to delete the message if don't match with at least one reactor condition + atLeastOneValid := false if p.noBlocking { atLeastOneValid = p.deliverNoBlocking(m) } else { @@ -204,7 +203,6 @@ func (p *sqsListen) deliver(msg *sqs.Message) { log.Printf("Invalid message from %s, deleted: %s", p.url, *msg.Body) p.delete(m) } - } // deliverBlocking send the message to the reactors in sequence From 618ef1b243c20215ff98b37d4e40175dcd01fcd4 Mon Sep 17 00:00:00 2001 From: gabriel Date: Tue, 13 Feb 2024 12:09:05 +0100 Subject: [PATCH 5/5] documentation --- README.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/README.md b/README.md index 955832c..e748891 100644 --- a/README.md +++ b/README.md @@ -84,6 +84,18 @@ Arguments of a reactor - **delay** - Period of time that a new command runs without concurrency, after that period the next message will start. Format https://pkg.go.dev/time#ParseDuration - **keepAliveInterval** - A signal will be sent to the underline input source to inform that the process is still running. Format https://pkg.go.dev/time#ParseDuration. In SQS input will trigger a request ChangeMessageVisibility (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ChangeMessageVisibility.html) +Arguments of a reactor SQS +-------------------------- + +- **url** - AWS SQS URL +- **region** - AWS Region +- **profile** - AWS Profile +- **maxnumberofmessages** - AWS SQS setting: The maximum number of messages to return. Amazon SQS never returns more messages than this value (however, fewer messages might be returned). Valid values: 1 to 10. Default: 1. +- **noblocking** is a boolean value that defines the parallelims of the reactors + - _false_ (default): send the message to the reactors in sequence + - _true_: send the message in parallel to avoid blocking + + Set working directory ---------------------