diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index baa57c9..8e47620 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -10,12 +10,13 @@ jobs: runs-on: ubuntu-latest env: GOPROXY: "https://proxy.golang.org,direct" + ENABLE_DOCKER_INTEGRATION_TESTS: TRUE steps: - name: Set up Go uses: actions/setup-go@v2 with: - go-version: "1.20" + go-version: "1.22" id: go - name: Check out code into the Go module directory diff --git a/README.md b/README.md index 6e62d94..54230c2 100644 --- a/README.md +++ b/README.md @@ -119,6 +119,12 @@ Close your publishers and consumers when you're done with them and do *not* atte Note that the API is currently in `v0`. I don't plan on huge changes, but there may be some small breaking changes before we hit `v1`. +## Integration testing + +By setting `ENABLE_DOCKER_INTEGRATION_TESTS=TRUE` during `go test -v ./...`, the integration tests will run. These launch a rabbitmq container in the local Docker daemon and test some publish/consume actions. + +See [integration_test.go](integration_test.go). + ## 💬 Contact [![Twitter Follow](https://img.shields.io/twitter/follow/wagslane.svg?label=Follow%20Wagslane&style=social)](https://twitter.com/intent/follow?screen_name=wagslane) diff --git a/go.mod b/go.mod index ec6f231..060bed5 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,5 @@ module github.com/wagslane/go-rabbitmq -go 1.20 +go 1.22.6 require github.com/rabbitmq/amqp091-go v1.10.0 diff --git a/go.sum b/go.sum index da51250..024eebe 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,4 @@ github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= diff --git a/integration_test.go b/integration_test.go new file mode 100644 index 0000000..c6b8014 --- /dev/null +++ b/integration_test.go @@ -0,0 +1,156 @@ +package rabbitmq + +import ( + "context" + "fmt" + "os" + "os/exec" + "strings" + "testing" + "time" +) + +const enableDockerIntegrationTestsFlag = `ENABLE_DOCKER_INTEGRATION_TESTS` + +func prepareDockerTest(t *testing.T) (connStr string) { + if v, ok := os.LookupEnv(enableDockerIntegrationTestsFlag); !ok || strings.ToUpper(v) != "TRUE" { + t.Skipf("integration tests are only run if '%s' is TRUE", enableDockerIntegrationTestsFlag) + return + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + out, err := exec.CommandContext(ctx, "docker", "run", "--rm", "--detach", "--publish=5672:5672", "--quiet", "--", "rabbitmq:3-alpine").Output() + if err != nil { + t.Log("container id", string(out)) + t.Fatalf("error launching rabbitmq in docker: %v", err) + } + t.Cleanup(func() { + containerId := strings.TrimSpace(string(out)) + t.Logf("attempting to shutdown container '%s'", containerId) + if err := exec.Command("docker", "rm", "--force", containerId).Run(); err != nil { + t.Logf("failed to stop: %v", err) + } + }) + return "amqp://guest:guest@localhost:5672/" +} + +func waitForHealthyAmqp(t *testing.T, connStr string) *Conn { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + tkr := time.NewTicker(time.Second) + + // only log connection-level logs when connection has succeeded + muted := true + connLogger := simpleLogF(func(s string, i ...interface{}) { + if !muted { + t.Logf(s, i...) + } + }) + + var lastErr error + for { + select { + case <-ctx.Done(): + t.Fatal("timed out waiting for healthy amqp", lastErr) + return nil + case <-tkr.C: + t.Log("attempting connection") + conn, err := NewConn(connStr, WithConnectionOptionsLogger(connLogger)) + if err != nil { + lastErr = err + t.Log("connection attempt failed - retrying") + } else { + if err := func() error { + pub, err := NewPublisher(conn, WithPublisherOptionsLogger(simpleLogF(t.Logf))) + if err != nil { + return fmt.Errorf("failed to setup publisher: %v", err) + } + t.Log("attempting publish") + return pub.PublishWithContext(ctx, []byte{}, []string{"ping"}, WithPublishOptionsExchange("")) + }(); err != nil { + _ = conn.Close() + t.Log("publish ping failed", err.Error()) + } else { + t.Log("ping successful") + muted = true + return conn + } + } + } + } +} + +// TestSimplePubSub is an integration testing function that validates whether we can reliably connect to a docker-based +// rabbitmq and consumer a message that we publish. This uses the default direct exchange with lots of error checking +// to ensure the result is as expected. +func TestSimplePubSub(t *testing.T) { + connStr := prepareDockerTest(t) + conn := waitForHealthyAmqp(t, connStr) + defer conn.Close() + + t.Logf("new consumer") + consumerQueue := "my_queue" + consumer, err := NewConsumer(conn, consumerQueue, WithConsumerOptionsLogger(simpleLogF(t.Logf))) + if err != nil { + t.Fatal("error creating consumer", err) + } + defer consumer.CloseWithContext(context.Background()) + + // Setup a consumer which pushes each of its consumed messages over the channel. If the channel is closed or full + // it does not block. + consumed := make(chan Delivery) + defer close(consumed) + + go func() { + err = consumer.Run(func(d Delivery) Action { + t.Log("consumed") + select { + case consumed <- d: + default: + } + return Ack + }) + if err != nil { + t.Log("consumer run failed", err) + } + }() + + // Setup a publisher with notifications enabled + t.Logf("new publisher") + publisher, err := NewPublisher(conn, WithPublisherOptionsLogger(simpleLogF(t.Logf))) + if err != nil { + t.Fatal("error creating publisher", err) + } + publisher.NotifyPublish(func(p Confirmation) { + }) + defer publisher.Close() + + // For test stability we cannot rely on the fact that the consumer go routines are up and running before the + // publisher starts it's first publish attempt. For this reason we run the publisher in a loop every second and + // pass after we see the first message come through. + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + tkr := time.NewTicker(time.Second) + for { + select { + case <-ctx.Done(): + t.Fatal("timed out waiting for pub sub", ctx.Err()) + case <-tkr.C: + t.Logf("new publish") + confirms, err := publisher.PublishWithDeferredConfirmWithContext(ctx, []byte("example"), []string{consumerQueue}) + if err != nil { + // publish should always succeed since we've verified the ping previously + t.Fatal("failed to publish", err) + } + for _, confirm := range confirms { + if _, err := confirm.WaitContext(ctx); err != nil { + t.Fatal("failed to wait for publish", err) + } + } + case d := <-consumed: + t.Logf("successfully saw message round trip: '%s'", string(d.Body)) + return + } + } +} diff --git a/logger.go b/logger.go index 34ef793..5560fa4 100644 --- a/logger.go +++ b/logger.go @@ -3,6 +3,7 @@ package rabbitmq import ( "fmt" "log" + "os" "github.com/wagslane/go-rabbitmq/internal/logger" ) @@ -39,3 +40,28 @@ func (l stdDebugLogger) Infof(format string, v ...interface{}) { func (l stdDebugLogger) Debugf(format string, v ...interface{}) { log.Printf(fmt.Sprintf("%s DEBUG: %s", loggingPrefix, format), v...) } + +// simpleLogF is used to support logging in the test functions. +// This could be exposed publicly for integration in more simple logging interfaces. +type simpleLogF func(string, ...interface{}) + +func (l simpleLogF) Fatalf(format string, v ...interface{}) { + l(fmt.Sprintf("%s FATAL: %s", loggingPrefix, format), v...) + os.Exit(1) +} + +func (l simpleLogF) Errorf(format string, v ...interface{}) { + l(fmt.Sprintf("%s ERROR: %s", loggingPrefix, format), v...) +} + +func (l simpleLogF) Warnf(format string, v ...interface{}) { + l(fmt.Sprintf("%s WARN: %s", loggingPrefix, format), v...) +} + +func (l simpleLogF) Infof(format string, v ...interface{}) { + l(fmt.Sprintf("%s INFO: %s", loggingPrefix, format), v...) +} + +func (l simpleLogF) Debugf(format string, v ...interface{}) { + l(fmt.Sprintf("%s DEBUG: %s", loggingPrefix, format), v...) +}