Skip to content

Commit

Permalink
chore(examples): add grpc stream propagation example (#3908)
Browse files Browse the repository at this point in the history
* chore(examples): add context propagation with gRPC streams

* add telemetry to example

* update example and README

* Apply suggestions from code review

Co-authored-by: Julianne Fermi <[email protected]>

* small update on demo code

---------

Co-authored-by: Julianne Fermi <[email protected]>
  • Loading branch information
danielbdias and jfermi authored Jun 13, 2024
1 parent be7c461 commit ef28e16
Show file tree
Hide file tree
Showing 26 changed files with 2,140 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ dist/tracetest-docker-$(TAG).tar dist/tracetest-agent-docker-$(TAG).tar: $(CLI_S
docker save --output dist/tracetest-agent-docker-$(TAG).tar "kubeshop/tracetest-agent:$(TAG)"

help: Makefile ## show list of commands
@echo "Choose a command run:"
@echo "Choose a command to run:"
@echo ""
@awk 'BEGIN {FS = ":.*?## "} /[a-zA-Z_-]+:.*?## / {sub("\\\\n",sprintf("\n%22c"," "), $$2);printf "\033[36m%-40s\033[0m %s\n", $$1, $$2}' $(MAKEFILE_LIST) | sort

Expand Down
84 changes: 84 additions & 0 deletions examples/quick-start-grpc-stream-propagation/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
## gRPC Stream Propagation

This example shows a system with two components working together to process payment data in a Producer/Consumer fashion. Every component is instrumented with OpenTelemetry and sends trace data to a Jaeger instance. The system is composed of:
- a **Producer API** that receives payment data from customers, enqueues it internally and publishes it through a gRPC stream.
- a **Consumer worker** that reads a gRPC stream of payment data and processes it.
- an **OTel Collector** that receives trace data from both components and forwards it to a Jaeger instance.
- a **Jaeger instance** that stores and displays trace data.

```mermaid
flowchart LR
User
ProducerAPI["Producer API"]
ConsumerWorker["Consumer Worker"]
OTelCollector["OTel Collector"]
Jaeger
User -- send payment --> ProducerAPI
subgraph PaymentSystem
ProducerAPI -- enqueue payment --> ProducerAPI
ProducerAPI -- send notification --> ConsumerWorker
end
PaymentSystem -- send telemetry --> OTelCollector
OTelCollector --> Jaeger
```

### Running the Example

To run this example, you need to have the following tools installed:
- [Docker](https://www.docker.com/)
- [Tracetest CLI](https://docs.tracetest.io/getting-started/installation#install-the-tracetest-cli)
- [grpcurl](https://github.com/fullstorydev/grpcurl?tab=readme-ov-file#installation)

You can run this example using your environment on [Tracetest](https://app.tracetest.io), or using Tracetest Core. When you have a [Tracetest Agent](https://docs.tracetest.io/configuration/agent) configured on Tracetest UI and an API Key, you can run the following commands:

```sh
TRACETEST_API_KEY="your-api-key" docker compose up
```

This command will start the Producer API, Consumer Worker, OTel Collector, Tracetest Agent, and Jaeger instance.

You can execute a gRPC call to the Producer API with the following command:

```sh
grpcurl -plaintext -proto ./proto/paymentreceiver.proto -d '{ "customerId": "1234", "amount": 50000 }' localhost:8080 proto.PaymentReceiver/ReceivePayment

# Expected output
# {
# "received": true
# }
```

This will start the entire process of receiving and notifying a payment, which you can see in the logs of the Producer API and Consumer Worker:
```sh
consumer-worker-1 | 2024/06/12 19:37:03 Received payment notification: payment:{customerId:"1234" amount:50000} highValuePayment:true metadata:{key:"traceparent" value:"00-ac8e8ed08353f23cbf1028ef42e7d10f-a95f55a8f9d9e29c-01"}
```

You can access the Jaeger UI at http://localhost:16686 and see the trace generated by this call.

### Testing the Example

You can also run the tests of this example to guarantee that everything is working as expected. First, configure the Tracetest CLI to connect to your environment:
```sh
tracetest configure
# and follow the instructions shown by the CLI
```

Configure it to read traces from Jaeger with the command:

And finally, run the test:
```sh
tracetest run test -f ./trace-based-test.yaml

# Expected output
# ✔ RunGroup: #ISI8sDUSR (https://app.tracetest.io/organizations/your-org-id/environments/your-env-id/run/ISI8sDUSR)
# Summary: 1 passed, 0 failed, 0 pending
# ✔ Test gRPC Stream Propagation (https://app.tracetest.io/organizations/your-org-id/environments/your-env-id/test/pprDfSUSg/run/1/test) - trace id: 808e8592f1ec08bda8701c3dcea5810c
# ✔ It should call ReceivePayment gRPC endpoint
# ✔ In should enqueue the payment to send it in a stream
# ✔ It should send the a payment notification through a gRPC stream
# ✔ It should receive a PaymentNotification through a stream and process it
# ✔ The trace shape is correct
```

Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM golang:alpine as builder
ENV GO111MODULE=on
RUN apk update && apk add --no-cache git

WORKDIR /app
COPY go.mod ./
COPY go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ./bin/main .

FROM scratch
COPY --from=builder /app/bin/main .
CMD ["./main"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Dependencies:
# https://grpc.io/docs/protoc-installation/
# go install google.golang.org/protobuf/cmd/[email protected]
# go install google.golang.org/grpc/cmd/[email protected]

REQUIRED_BINS := protoc protoc-gen-go protoc-gen-go-grpc

help: ## show list of commands
@echo "Choose a command to run:"
@echo ""
@awk 'BEGIN {FS = ":.*?## "} /[a-zA-Z_-]+:.*?## / {sub("\\\\n",sprintf("\n%22c"," "), $$2);printf "\033[36m%-40s\033[0m %s\n", $$1, $$2}' $(MAKEFILE_LIST) | sort

build-proto: ensure-dependencies clean-proto ## generate gRPC code from proto files
@protoc \
-I=../proto \
--go_out=./proto \
--go_opt=paths=source_relative \
--go-grpc_out=./proto \
--go-grpc_opt=paths=source_relative \
../proto/paymentreceiver.proto

ensure-dependencies: ## check if required binaries are installed
$(foreach bin,$(REQUIRED_BINS),\
$(if $(shell command -v $(bin) 2> /dev/null),,$(error Please install `$(bin)` or run `make install-grpc-tools`)))

install-grpc-tools: ## install required binaries
go install google.golang.org/protobuf/cmd/[email protected]
go install google.golang.org/grpc/cmd/[email protected]

clean-proto: ## remove generated gRPC code
@rm -f proto/*.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
module github.com/kubeshop/tracetest/quick-start-grpc-stream-propagation/consumer-worker

go 1.21

require (
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.52.0
go.opentelemetry.io/otel v1.27.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0
go.opentelemetry.io/otel/sdk v1.27.0
go.opentelemetry.io/otel/trace v1.27.0
google.golang.org/grpc v1.64.0
google.golang.org/protobuf v1.34.1
)

require (
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.27.0 // indirect
go.opentelemetry.io/otel/metric v1.27.0 // indirect
go.opentelemetry.io/proto/otlp v1.2.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.52.0 h1:vS1Ao/R55RNV4O7TA2Qopok8yN+X0LIP6RVWLFkprck=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.52.0/go.mod h1:BMsdeOxN04K0L5FNUBfjFdvwWGNe/rkmSwH4Aelu/X0=
go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg=
go.opentelemetry.io/otel v1.27.0/go.mod h1:DMpAK8fzYRzs+bi3rS5REupisuqTheUlSZJ1WnZaPAQ=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.27.0 h1:R9DE4kQ4k+YtfLI2ULwX82VtNQ2J8yZmA7ZIF/D+7Mc=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.27.0/go.mod h1:OQFyQVrDlbe+R7xrEyDr/2Wr67Ol0hRUgsfA+V5A95s=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0 h1:qFffATk0X+HD+f1Z8lswGiOQYKHRlzfmdJm0wEaVrFA=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0/go.mod h1:MOiCmryaYtc+V0Ei+Tx9o5S1ZjA7kzLucuVuyzBZloQ=
go.opentelemetry.io/otel/metric v1.27.0 h1:hvj3vdEKyeCi4YaYfNjv2NUje8FqKqUY8IlF0FxV/ik=
go.opentelemetry.io/otel/metric v1.27.0/go.mod h1:mVFgmRlhljgBiuk/MP/oKylr4hs85GZAylncepAX/ak=
go.opentelemetry.io/otel/sdk v1.27.0 h1:mlk+/Y1gLPLn84U4tI8d3GNJmGT/eXe3ZuOXN9kTWmI=
go.opentelemetry.io/otel/sdk v1.27.0/go.mod h1:Ha9vbLwJE6W86YstIywK2xFfPjbWlCuwPtMkKdz/Y4A=
go.opentelemetry.io/otel/trace v1.27.0 h1:IqYb813p7cmbHk0a5y6pD5JPakbVfftRXABGt5/Rscw=
go.opentelemetry.io/otel/trace v1.27.0/go.mod h1:6RiD1hkAprV4/q+yd2ln1HG9GoPx39SuvvstaLBl+l4=
go.opentelemetry.io/proto/otlp v1.2.0 h1:pVeZGk7nXDC9O2hncA6nHldxEjm6LByfA2aN8IOkz94=
go.opentelemetry.io/proto/otlp v1.2.0/go.mod h1:gGpR8txAl5M03pDhMC79G6SdqNV26naRm/KDsgaHD8A=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac=
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A=
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU=
google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY=
google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg=
google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package main

import (
"context"
"io"
"log"

pb "github.com/kubeshop/tracetest/quick-start-grpc-stream-propagation/consumer-worker/proto"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/trace"
grpc "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

func main() {
ctx := context.Background()

producerAPIAddress := getEnvVar("PRODUCER_API_ADDRESS", "localhost:8080")
otelExporterEndpoint := getEnvVar("OTEL_EXPORTER_OTLP_ENDPOINT", "localhost:4317")
otelServiceName := getEnvVar("OTEL_SERVICE_NAME", "producer-api")

tracer, err := setupOpenTelemetry(context.Background(), otelExporterEndpoint, otelServiceName)
if err != nil {
log.Fatalf("failed to initialize OpenTelemetry: %v", err)
return
}

grpcClient, err := grpc.NewClient(
producerAPIAddress,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
)
if err != nil {
log.Fatalf("could not connect to producer API: %v", err)
}

log.Printf("Connected to producer API at %s", producerAPIAddress)

client := pb.NewPaymentReceiverClient(grpcClient)

stream, err := client.NotifyPayment(ctx, &pb.Empty{}, grpc.WaitForReady(true))
if err != nil {
log.Fatalf("could not receive payment notifications: %v", err)
}

log.Printf("Listening for payment notifications...")

for {
notification, err := stream.Recv()
if err == io.EOF {
log.Printf("No more payment notifications")
return
}
if err != nil {
log.Fatalf("could not receive payment notification: %v", err)
}

processPaymentNotification(tracer, notification)
}
}

func processPaymentNotification(tracer trace.Tracer, notification *pb.PaymentNotification) {
messageProcessingCtx := injectMetadataIntoContext(context.Background(), notification.Metadata)
_, span := tracer.Start(messageProcessingCtx, "ProcessPaymentNotification")
defer span.End()

log.Printf("Received payment notification: %v", notification)
}
Loading

0 comments on commit ef28e16

Please sign in to comment.