From 912816f93592fabaebf17272b77b2606d9614d95 Mon Sep 17 00:00:00 2001 From: Sebastian Choren Date: Mon, 4 Mar 2024 16:14:54 -0300 Subject: [PATCH] feat(server): use nats queue subscriptions (#3706) --- docker-compose.nats.yaml | 24 +++++++++++++++++++++--- server/pkg/pipeline/nats_driver.go | 2 +- server/subscription/nats_manager.go | 2 +- 3 files changed, 23 insertions(+), 5 deletions(-) diff --git a/docker-compose.nats.yaml b/docker-compose.nats.yaml index 1a341f1b43..325d749f6a 100644 --- a/docker-compose.nats.yaml +++ b/docker-compose.nats.yaml @@ -1,6 +1,24 @@ -version: "3.2" +version: "3.8" services: + traefik: + image: traefik:v2.4 + command: + - "--api.insecure=true" + - "--providers.docker=true" + - "--providers.docker.exposedbydefault=false" + ports: + - "11633:80" # Changed this line + - "8080:8080" + volumes: + - /var/run/docker.sock:/var/run/docker.sock + tracetest: + labels: + - "traefik.enable=true" + - "traefik.http.routers.tracetest.rule=Host(`localhost`)" + - "traefik.http.services.tracetest.loadbalancer.server.port=11633" + deploy: + replicas: 3 restart: unless-stopped image: kubeshop/tracetest:${TAG:-latest} extra_hosts: @@ -14,8 +32,8 @@ services: - type: bind source: ./local-config/tracetest.provision.yaml target: /app/provisioning.yaml - ports: - - 11633:11633 + expose: + - 11633 command: --provisioning-file /app/provisioning.yaml healthcheck: test: ["CMD", "wget", "--spider", "localhost:11633"] diff --git a/server/pkg/pipeline/nats_driver.go b/server/pkg/pipeline/nats_driver.go index 7ef2663197..98bf0754ed 100644 --- a/server/pkg/pipeline/nats_driver.go +++ b/server/pkg/pipeline/nats_driver.go @@ -49,7 +49,7 @@ func (d *NatsDriver[T]) Enqueue(ctx context.Context, msg T) { // SetListener implements QueueDriver. func (d *NatsDriver[T]) SetListener(listener Listener[T]) { - subscription, err := d.conn.Subscribe(d.topic, func(msg *nats.Msg) { + subscription, err := d.conn.QueueSubscribe(d.topic, "queue_worker", func(msg *nats.Msg) { var target T err := json.Unmarshal(msg.Data, &target) if err != nil { diff --git a/server/subscription/nats_manager.go b/server/subscription/nats_manager.go index b0cf7385cb..2320120c3a 100644 --- a/server/subscription/nats_manager.go +++ b/server/subscription/nats_manager.go @@ -12,7 +12,7 @@ type natsManager struct { } func (m *natsManager) Subscribe(resourceID string, subscriber Subscriber) Subscription { - subscription, err := m.conn.Subscribe(resourceID, func(msg *nats.Msg) { + subscription, err := m.conn.QueueSubscribe(resourceID, "subscriptions", func(msg *nats.Msg) { decoded, err := DecodeMessage(msg.Data) if err != nil { log.Printf("cannot unmarshall incoming nats message: %s", err.Error())