Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: initial integration test #171

Merged
merged 4 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ jobs:
runs-on: ubuntu-latest
env:
GOPROXY: "https://proxy.golang.org,direct"
ENABLE_DOCKER_INTEGRATION_TESTS: TRUE

steps:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: "1.20"
go-version: "1.22"
id: go

- name: Check out code into the Go module directory
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ Close your publishers and consumers when you're done with them and do *not* atte

Note that the API is currently in `v0`. I don't plan on huge changes, but there may be some small breaking changes before we hit `v1`.

## Integration testing

By setting `ENABLE_DOCKER_INTEGRATION_TESTS=TRUE` during `go test -v ./...`, the integration tests will run. These launch a rabbitmq container in the local Docker daemon and test some publish/consume actions.

See [integration_test.go](integration_test.go).

## 💬 Contact

[![Twitter Follow](https://img.shields.io/twitter/follow/wagslane.svg?label=Follow%20Wagslane&style=social)](https://twitter.com/intent/follow?screen_name=wagslane)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module github.com/wagslane/go-rabbitmq

go 1.20
go 1.22.6

require github.com/rabbitmq/amqp091-go v1.10.0
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
156 changes: 156 additions & 0 deletions integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package rabbitmq

import (
"context"
"fmt"
"os"
"os/exec"
"strings"
"testing"
"time"
)

const enableDockerIntegrationTestsFlag = `ENABLE_DOCKER_INTEGRATION_TESTS`

func prepareDockerTest(t *testing.T) (connStr string) {
if v, ok := os.LookupEnv(enableDockerIntegrationTestsFlag); !ok || strings.ToUpper(v) != "TRUE" {
t.Skipf("integration tests are only run if '%s' is TRUE", enableDockerIntegrationTestsFlag)
return
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

out, err := exec.CommandContext(ctx, "docker", "run", "--rm", "--detach", "--publish=5672:5672", "--quiet", "--", "rabbitmq:3-alpine").Output()
if err != nil {
t.Log("container id", string(out))
t.Fatalf("error launching rabbitmq in docker: %v", err)
}
t.Cleanup(func() {
containerId := strings.TrimSpace(string(out))
t.Logf("attempting to shutdown container '%s'", containerId)
if err := exec.Command("docker", "rm", "--force", containerId).Run(); err != nil {
t.Logf("failed to stop: %v", err)
}
})
return "amqp://guest:guest@localhost:5672/"
}

func waitForHealthyAmqp(t *testing.T, connStr string) *Conn {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
tkr := time.NewTicker(time.Second)

// only log connection-level logs when connection has succeeded
muted := true
connLogger := simpleLogF(func(s string, i ...interface{}) {
if !muted {
t.Logf(s, i...)
}
})

var lastErr error
for {
select {
case <-ctx.Done():
t.Fatal("timed out waiting for healthy amqp", lastErr)
return nil
case <-tkr.C:
t.Log("attempting connection")
conn, err := NewConn(connStr, WithConnectionOptionsLogger(connLogger))
if err != nil {
lastErr = err
t.Log("connection attempt failed - retrying")
} else {
if err := func() error {
pub, err := NewPublisher(conn, WithPublisherOptionsLogger(simpleLogF(t.Logf)))
if err != nil {
return fmt.Errorf("failed to setup publisher: %v", err)
}
t.Log("attempting publish")
return pub.PublishWithContext(ctx, []byte{}, []string{"ping"}, WithPublishOptionsExchange(""))
}(); err != nil {
_ = conn.Close()
t.Log("publish ping failed", err.Error())
} else {
t.Log("ping successful")
muted = true
return conn
}
}
}
}
}

// TestSimplePubSub is an integration testing function that validates whether we can reliably connect to a docker-based
// rabbitmq and consumer a message that we publish. This uses the default direct exchange with lots of error checking
// to ensure the result is as expected.
func TestSimplePubSub(t *testing.T) {
connStr := prepareDockerTest(t)
conn := waitForHealthyAmqp(t, connStr)
defer conn.Close()

t.Logf("new consumer")
consumerQueue := "my_queue"
consumer, err := NewConsumer(conn, consumerQueue, WithConsumerOptionsLogger(simpleLogF(t.Logf)))
if err != nil {
t.Fatal("error creating consumer", err)
}
defer consumer.CloseWithContext(context.Background())

// Setup a consumer which pushes each of its consumed messages over the channel. If the channel is closed or full
// it does not block.
consumed := make(chan Delivery)
defer close(consumed)

go func() {
err = consumer.Run(func(d Delivery) Action {
t.Log("consumed")
select {
case consumed <- d:
default:
}
return Ack
})
if err != nil {
t.Log("consumer run failed", err)
}
}()

// Setup a publisher with notifications enabled
t.Logf("new publisher")
publisher, err := NewPublisher(conn, WithPublisherOptionsLogger(simpleLogF(t.Logf)))
if err != nil {
t.Fatal("error creating publisher", err)
}
publisher.NotifyPublish(func(p Confirmation) {
})
defer publisher.Close()

// For test stability we cannot rely on the fact that the consumer go routines are up and running before the
// publisher starts it's first publish attempt. For this reason we run the publisher in a loop every second and
// pass after we see the first message come through.
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
tkr := time.NewTicker(time.Second)
for {
select {
case <-ctx.Done():
t.Fatal("timed out waiting for pub sub", ctx.Err())
case <-tkr.C:
t.Logf("new publish")
confirms, err := publisher.PublishWithDeferredConfirmWithContext(ctx, []byte("example"), []string{consumerQueue})
if err != nil {
// publish should always succeed since we've verified the ping previously
t.Fatal("failed to publish", err)
}
for _, confirm := range confirms {
if _, err := confirm.WaitContext(ctx); err != nil {
t.Fatal("failed to wait for publish", err)
}
}
case d := <-consumed:
t.Logf("successfully saw message round trip: '%s'", string(d.Body))
return
}
}
}
26 changes: 26 additions & 0 deletions logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rabbitmq
import (
"fmt"
"log"
"os"

"github.com/wagslane/go-rabbitmq/internal/logger"
)
Expand Down Expand Up @@ -39,3 +40,28 @@ func (l stdDebugLogger) Infof(format string, v ...interface{}) {
func (l stdDebugLogger) Debugf(format string, v ...interface{}) {
log.Printf(fmt.Sprintf("%s DEBUG: %s", loggingPrefix, format), v...)
}

// simpleLogF is used to support logging in the test functions.
// This could be exposed publicly for integration in more simple logging interfaces.
type simpleLogF func(string, ...interface{})

func (l simpleLogF) Fatalf(format string, v ...interface{}) {
l(fmt.Sprintf("%s FATAL: %s", loggingPrefix, format), v...)
os.Exit(1)
}

func (l simpleLogF) Errorf(format string, v ...interface{}) {
l(fmt.Sprintf("%s ERROR: %s", loggingPrefix, format), v...)
}

func (l simpleLogF) Warnf(format string, v ...interface{}) {
l(fmt.Sprintf("%s WARN: %s", loggingPrefix, format), v...)
}

func (l simpleLogF) Infof(format string, v ...interface{}) {
l(fmt.Sprintf("%s INFO: %s", loggingPrefix, format), v...)
}

func (l simpleLogF) Debugf(format string, v ...interface{}) {
l(fmt.Sprintf("%s DEBUG: %s", loggingPrefix, format), v...)
}
Loading