Skip to content

Commit

Permalink
Merge pull request #20 from gabrielperezs/feature/store-sqs-MessageId…
Browse files Browse the repository at this point in the history
…-and-ReceiptHandle

Feature/store sqs message id and receipt handle
  • Loading branch information
gabrielperezs authored Feb 8, 2024
2 parents 58bd85d + a59e82e commit 455b587
Show file tree
Hide file tree
Showing 9 changed files with 28 additions and 6 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ jobs:
run: go test -v ./...

- name: Build linux amd64
run: GOOS=linux GOARCH=amd64 go build -o goreactor_amd64 -v .
run: CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o goreactor_amd64 -v .

- name: Build linux arm64
run: GOOS=linux GOARCH=arm64 go build -o goreactor_arm64 -v .
run: CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -o goreactor_arm64 -v .

- name: Create Release
id: create_release
Expand Down
7 changes: 6 additions & 1 deletion inputs/sqs/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type Msg struct {
M *sqs.DeleteMessageBatchRequestEntry
B []byte
SentTimestamp int64
Hash string
}

// Body will return the bytes of the SQS message
Expand All @@ -21,4 +22,8 @@ func (m *Msg) Body() []byte {
// CreationTimestampMilliseconds will return the creation timestamp for the message
func (m *Msg) CreationTimestampMilliseconds() int64 {
return m.SentTimestamp
}
}

func (m *Msg) GetHash() string {
return m.Hash
}
1 change: 1 addition & 0 deletions inputs/sqs/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func (p *sqsListen) deliver(msg *sqs.Message) {
},
URL: aws.String(p.URL),
SentTimestamp: sentTimestamp,
Hash: *msg.MessageId,
}

jsonParsed, err := gabs.ParseJSON(m.B)
Expand Down
1 change: 1 addition & 0 deletions lib/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ package lib
type Msg interface {
Body() []byte
CreationTimestampMilliseconds() int64
GetHash() string
}
1 change: 1 addition & 0 deletions outputs/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func (o *Cmd) Run(rl reactorlog.ReactorLog, msg lib.Msg) error {

logLabel := o.findReplace(msg, o.r.Label)
rl.SetLabel(logLabel)
rl.SetHash(msg.GetHash())

ctx, cancel := context.WithTimeout(context.Background(), o.maximumCmdTimeLive)
defer cancel()
Expand Down
9 changes: 7 additions & 2 deletions outputs/cmd/cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import (
)

type Msg struct {
B []byte
ts int64
B []byte
ts int64
hash string
}

func (m *Msg) Body() []byte {
Expand All @@ -21,6 +22,10 @@ func (m *Msg) CreationTimestampMilliseconds() int64 {
return m.ts
}

func (m *Msg) GetHash() string {
return m.hash
}

func TestJqReplaceActuallyReplacing(t *testing.T) {
var r *reactor.Reactor = nil

Expand Down
9 changes: 8 additions & 1 deletion reactorlog/jsonreactorlog/jsonreactorlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ type ReactorLog interface {
Write(b []byte) (int, error)
Start(pid int, s string)
SetLabel(string)
SetHash(string)
}

// JSONReactorLog lets you log as json lines.
type JSONReactorLog struct {
Host string `json:",omitempty"`
Label string `json:",omitempty"`
Hash string `json:",omitempty"`
Pid int `json:",omitempty"`
RID uint64 `json:",omitempty"`
TID uint64 `json:",omitempty"`
Expand All @@ -69,6 +71,10 @@ func (rl *JSONReactorLog) SetLabel(value string) {
rl.Label = value
}

func (rl *JSONReactorLog) SetHash(value string) {
rl.Hash = value
}

// Write will be called by the reactor and this bytes will be sent to the general log channel
func (rl *JSONReactorLog) Write(b []byte) (int, error) {
rl.Lock()
Expand Down Expand Up @@ -121,7 +127,7 @@ func (rl *JSONReactorLog) Done(err error) {
rl.Error = err.Error()
}
// https://github.com/golang/go/issues/5491#issuecomment-66079585
rl.Elapse = time.Now().Sub(rl.st).Seconds()
rl.Elapse = time.Since(rl.st).Seconds()
rl.printJSON()
rl.reset()
}
Expand All @@ -146,6 +152,7 @@ func (rl *JSONReactorLog) printJSON() {
func (rl *JSONReactorLog) reset() {
rl.Host = ""
rl.Label = ""
rl.Hash = ""
rl.Pid = 0
rl.RID = 0
rl.TID = 0
Expand Down
1 change: 1 addition & 0 deletions reactorlog/noopreactorlog/noopreactorlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ type NoopReactorLog struct{}

func (NoopReactorLog) Start(pid int, s string) {}
func (NoopReactorLog) SetLabel(string) {}
func (NoopReactorLog) SetHash(string) {}
func (NoopReactorLog) Done(error) {}
func (NoopReactorLog) Write(b []byte) (int, error) {
return len(b), nil
Expand Down
1 change: 1 addition & 0 deletions reactorlog/reactorlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ type ReactorLog interface {
Write(b []byte) (int, error)
Start(pid int, s string)
SetLabel(string)
SetHash(string)
Done(error)
}

0 comments on commit 455b587

Please sign in to comment.