From ef28e16e4c02b865a24f93fbe146d489764701ae Mon Sep 17 00:00:00 2001 From: Daniel Baptista Dias Date: Thu, 13 Jun 2024 06:36:20 -0300 Subject: [PATCH] chore(examples): add grpc stream propagation example (#3908) * chore(examples): add context propagation with gRPC streams * add telemetry to example * update example and README * Apply suggestions from code review Co-authored-by: Julianne Fermi * small update on demo code --------- Co-authored-by: Julianne Fermi --- Makefile | 2 +- .../README.md | 84 ++++ .../consumer-worker/Dockerfile | 14 + .../consumer-worker/Makefile | 31 ++ .../consumer-worker/go.mod | 27 ++ .../consumer-worker/go.sum | 49 +++ .../consumer-worker/main.go | 68 ++++ .../proto/paymentreceiver.pb.go | 378 ++++++++++++++++++ .../proto/paymentreceiver_grpc.pb.go | 174 ++++++++ .../consumer-worker/telemetry.go | 118 ++++++ .../docker-compose.core.yaml | 101 +++++ .../docker-compose.yaml | 67 ++++ .../producer-api/Dockerfile | 14 + .../producer-api/Makefile | 31 ++ .../producer-api/go.mod | 27 ++ .../producer-api/go.sum | 49 +++ .../producer-api/main.go | 111 +++++ .../producer-api/proto/paymentreceiver.pb.go | 378 ++++++++++++++++++ .../proto/paymentreceiver_grpc.pb.go | 174 ++++++++ .../producer-api/telemetry.go | 118 ++++++ .../proto/paymentreceiver.proto | 27 ++ .../trace-based-test.yaml | 37 ++ .../tracetest/collector.config.yaml | 24 ++ .../tracetest/tracetest-config.yaml | 7 + .../tracetest/tracetest-provision.yaml | 20 + .../tracetest/tracetest-tracing-backend.yaml | 11 + 26 files changed, 2140 insertions(+), 1 deletion(-) create mode 100644 examples/quick-start-grpc-stream-propagation/README.md create mode 100644 examples/quick-start-grpc-stream-propagation/consumer-worker/Dockerfile create mode 100644 examples/quick-start-grpc-stream-propagation/consumer-worker/Makefile create mode 100644 examples/quick-start-grpc-stream-propagation/consumer-worker/go.mod create mode 100644 examples/quick-start-grpc-stream-propagation/consumer-worker/go.sum create mode 100644 examples/quick-start-grpc-stream-propagation/consumer-worker/main.go create mode 100644 examples/quick-start-grpc-stream-propagation/consumer-worker/proto/paymentreceiver.pb.go create mode 100644 examples/quick-start-grpc-stream-propagation/consumer-worker/proto/paymentreceiver_grpc.pb.go create mode 100644 examples/quick-start-grpc-stream-propagation/consumer-worker/telemetry.go create mode 100644 examples/quick-start-grpc-stream-propagation/docker-compose.core.yaml create mode 100644 examples/quick-start-grpc-stream-propagation/docker-compose.yaml create mode 100644 examples/quick-start-grpc-stream-propagation/producer-api/Dockerfile create mode 100644 examples/quick-start-grpc-stream-propagation/producer-api/Makefile create mode 100644 examples/quick-start-grpc-stream-propagation/producer-api/go.mod create mode 100644 examples/quick-start-grpc-stream-propagation/producer-api/go.sum create mode 100644 examples/quick-start-grpc-stream-propagation/producer-api/main.go create mode 100644 examples/quick-start-grpc-stream-propagation/producer-api/proto/paymentreceiver.pb.go create mode 100644 examples/quick-start-grpc-stream-propagation/producer-api/proto/paymentreceiver_grpc.pb.go create mode 100644 examples/quick-start-grpc-stream-propagation/producer-api/telemetry.go create mode 100644 examples/quick-start-grpc-stream-propagation/proto/paymentreceiver.proto create mode 100644 examples/quick-start-grpc-stream-propagation/trace-based-test.yaml create mode 100644 examples/quick-start-grpc-stream-propagation/tracetest/collector.config.yaml create mode 100644 examples/quick-start-grpc-stream-propagation/tracetest/tracetest-config.yaml create mode 100644 examples/quick-start-grpc-stream-propagation/tracetest/tracetest-provision.yaml create mode 100644 examples/quick-start-grpc-stream-propagation/tracetest/tracetest-tracing-backend.yaml diff --git a/Makefile b/Makefile index 242e67e4f1..809fc06971 100644 --- a/Makefile +++ b/Makefile @@ -37,7 +37,7 @@ dist/tracetest-docker-$(TAG).tar dist/tracetest-agent-docker-$(TAG).tar: $(CLI_S docker save --output dist/tracetest-agent-docker-$(TAG).tar "kubeshop/tracetest-agent:$(TAG)" help: Makefile ## show list of commands - @echo "Choose a command run:" + @echo "Choose a command to run:" @echo "" @awk 'BEGIN {FS = ":.*?## "} /[a-zA-Z_-]+:.*?## / {sub("\\\\n",sprintf("\n%22c"," "), $$2);printf "\033[36m%-40s\033[0m %s\n", $$1, $$2}' $(MAKEFILE_LIST) | sort diff --git a/examples/quick-start-grpc-stream-propagation/README.md b/examples/quick-start-grpc-stream-propagation/README.md new file mode 100644 index 0000000000..1b3037a95e --- /dev/null +++ b/examples/quick-start-grpc-stream-propagation/README.md @@ -0,0 +1,84 @@ +## gRPC Stream Propagation + +This example shows a system with two components working together to process payment data in a Producer/Consumer fashion. Every component is instrumented with OpenTelemetry and sends trace data to a Jaeger instance. The system is composed of: +- a **Producer API** that receives payment data from customers, enqueues it internally and publishes it through a gRPC stream. +- a **Consumer worker** that reads a gRPC stream of payment data and processes it. +- an **OTel Collector** that receives trace data from both components and forwards it to a Jaeger instance. +- a **Jaeger instance** that stores and displays trace data. + +```mermaid +flowchart LR + User + ProducerAPI["Producer API"] + ConsumerWorker["Consumer Worker"] + OTelCollector["OTel Collector"] + Jaeger + + User -- send payment --> ProducerAPI + subgraph PaymentSystem + ProducerAPI -- enqueue payment --> ProducerAPI + ProducerAPI -- send notification --> ConsumerWorker + + end + PaymentSystem -- send telemetry --> OTelCollector + OTelCollector --> Jaeger +``` + +### Running the Example + +To run this example, you need to have the following tools installed: +- [Docker](https://www.docker.com/) +- [Tracetest CLI](https://docs.tracetest.io/getting-started/installation#install-the-tracetest-cli) +- [grpcurl](https://github.com/fullstorydev/grpcurl?tab=readme-ov-file#installation) + +You can run this example using your environment on [Tracetest](https://app.tracetest.io), or using Tracetest Core. When you have a [Tracetest Agent](https://docs.tracetest.io/configuration/agent) configured on Tracetest UI and an API Key, you can run the following commands: + +```sh +TRACETEST_API_KEY="your-api-key" docker compose up +``` + +This command will start the Producer API, Consumer Worker, OTel Collector, Tracetest Agent, and Jaeger instance. + +You can execute a gRPC call to the Producer API with the following command: + +```sh +grpcurl -plaintext -proto ./proto/paymentreceiver.proto -d '{ "customerId": "1234", "amount": 50000 }' localhost:8080 proto.PaymentReceiver/ReceivePayment + +# Expected output +# { +# "received": true +# } +``` + +This will start the entire process of receiving and notifying a payment, which you can see in the logs of the Producer API and Consumer Worker: +```sh +consumer-worker-1 | 2024/06/12 19:37:03 Received payment notification: payment:{customerId:"1234" amount:50000} highValuePayment:true metadata:{key:"traceparent" value:"00-ac8e8ed08353f23cbf1028ef42e7d10f-a95f55a8f9d9e29c-01"} +``` + +You can access the Jaeger UI at http://localhost:16686 and see the trace generated by this call. + +### Testing the Example + +You can also run the tests of this example to guarantee that everything is working as expected. First, configure the Tracetest CLI to connect to your environment: +```sh +tracetest configure +# and follow the instructions shown by the CLI +``` + +Configure it to read traces from Jaeger with the command: + +And finally, run the test: +```sh +tracetest run test -f ./trace-based-test.yaml + +# Expected output +# ✔ RunGroup: #ISI8sDUSR (https://app.tracetest.io/organizations/your-org-id/environments/your-env-id/run/ISI8sDUSR) +# Summary: 1 passed, 0 failed, 0 pending +# ✔ Test gRPC Stream Propagation (https://app.tracetest.io/organizations/your-org-id/environments/your-env-id/test/pprDfSUSg/run/1/test) - trace id: 808e8592f1ec08bda8701c3dcea5810c +# ✔ It should call ReceivePayment gRPC endpoint +# ✔ In should enqueue the payment to send it in a stream +# ✔ It should send the a payment notification through a gRPC stream +# ✔ It should receive a PaymentNotification through a stream and process it +# ✔ The trace shape is correct +``` + diff --git a/examples/quick-start-grpc-stream-propagation/consumer-worker/Dockerfile b/examples/quick-start-grpc-stream-propagation/consumer-worker/Dockerfile new file mode 100644 index 0000000000..bbabcc4ffc --- /dev/null +++ b/examples/quick-start-grpc-stream-propagation/consumer-worker/Dockerfile @@ -0,0 +1,14 @@ +FROM golang:alpine as builder +ENV GO111MODULE=on +RUN apk update && apk add --no-cache git + +WORKDIR /app +COPY go.mod ./ +COPY go.sum ./ +RUN go mod download +COPY . . +RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ./bin/main . + +FROM scratch +COPY --from=builder /app/bin/main . +CMD ["./main"] diff --git a/examples/quick-start-grpc-stream-propagation/consumer-worker/Makefile b/examples/quick-start-grpc-stream-propagation/consumer-worker/Makefile new file mode 100644 index 0000000000..087d005038 --- /dev/null +++ b/examples/quick-start-grpc-stream-propagation/consumer-worker/Makefile @@ -0,0 +1,31 @@ +# Dependencies: +# https://grpc.io/docs/protoc-installation/ +# go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.31 +# go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.3 + +REQUIRED_BINS := protoc protoc-gen-go protoc-gen-go-grpc + +help: ## show list of commands + @echo "Choose a command to run:" + @echo "" + @awk 'BEGIN {FS = ":.*?## "} /[a-zA-Z_-]+:.*?## / {sub("\\\\n",sprintf("\n%22c"," "), $$2);printf "\033[36m%-40s\033[0m %s\n", $$1, $$2}' $(MAKEFILE_LIST) | sort + +build-proto: ensure-dependencies clean-proto ## generate gRPC code from proto files + @protoc \ + -I=../proto \ + --go_out=./proto \ + --go_opt=paths=source_relative \ + --go-grpc_out=./proto \ + --go-grpc_opt=paths=source_relative \ + ../proto/paymentreceiver.proto + +ensure-dependencies: ## check if required binaries are installed + $(foreach bin,$(REQUIRED_BINS),\ + $(if $(shell command -v $(bin) 2> /dev/null),,$(error Please install `$(bin)` or run `make install-grpc-tools`))) + +install-grpc-tools: ## install required binaries + go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.31 + go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.3 + +clean-proto: ## remove generated gRPC code + @rm -f proto/*.go diff --git a/examples/quick-start-grpc-stream-propagation/consumer-worker/go.mod b/examples/quick-start-grpc-stream-propagation/consumer-worker/go.mod new file mode 100644 index 0000000000..728357c19a --- /dev/null +++ b/examples/quick-start-grpc-stream-propagation/consumer-worker/go.mod @@ -0,0 +1,27 @@ +module github.com/kubeshop/tracetest/quick-start-grpc-stream-propagation/consumer-worker + +go 1.21 + +require ( + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.52.0 + go.opentelemetry.io/otel v1.27.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0 + go.opentelemetry.io/otel/sdk v1.27.0 + go.opentelemetry.io/otel/trace v1.27.0 + google.golang.org/grpc v1.64.0 + google.golang.org/protobuf v1.34.1 +) + +require ( + github.com/cenkalti/backoff/v4 v4.3.0 // indirect + github.com/go-logr/logr v1.4.1 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.27.0 // indirect + go.opentelemetry.io/otel/metric v1.27.0 // indirect + go.opentelemetry.io/proto/otlp v1.2.0 // indirect + golang.org/x/net v0.25.0 // indirect + golang.org/x/sys v0.20.0 // indirect + golang.org/x/text v0.15.0 // indirect + google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect +) diff --git a/examples/quick-start-grpc-stream-propagation/consumer-worker/go.sum b/examples/quick-start-grpc-stream-propagation/consumer-worker/go.sum new file mode 100644 index 0000000000..08f8245d22 --- /dev/null +++ b/examples/quick-start-grpc-stream-propagation/consumer-worker/go.sum @@ -0,0 +1,49 @@ +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= +github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.52.0 h1:vS1Ao/R55RNV4O7TA2Qopok8yN+X0LIP6RVWLFkprck= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.52.0/go.mod h1:BMsdeOxN04K0L5FNUBfjFdvwWGNe/rkmSwH4Aelu/X0= +go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg= +go.opentelemetry.io/otel v1.27.0/go.mod h1:DMpAK8fzYRzs+bi3rS5REupisuqTheUlSZJ1WnZaPAQ= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.27.0 h1:R9DE4kQ4k+YtfLI2ULwX82VtNQ2J8yZmA7ZIF/D+7Mc= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.27.0/go.mod h1:OQFyQVrDlbe+R7xrEyDr/2Wr67Ol0hRUgsfA+V5A95s= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0 h1:qFffATk0X+HD+f1Z8lswGiOQYKHRlzfmdJm0wEaVrFA= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0/go.mod h1:MOiCmryaYtc+V0Ei+Tx9o5S1ZjA7kzLucuVuyzBZloQ= +go.opentelemetry.io/otel/metric v1.27.0 h1:hvj3vdEKyeCi4YaYfNjv2NUje8FqKqUY8IlF0FxV/ik= +go.opentelemetry.io/otel/metric v1.27.0/go.mod h1:mVFgmRlhljgBiuk/MP/oKylr4hs85GZAylncepAX/ak= +go.opentelemetry.io/otel/sdk v1.27.0 h1:mlk+/Y1gLPLn84U4tI8d3GNJmGT/eXe3ZuOXN9kTWmI= +go.opentelemetry.io/otel/sdk v1.27.0/go.mod h1:Ha9vbLwJE6W86YstIywK2xFfPjbWlCuwPtMkKdz/Y4A= +go.opentelemetry.io/otel/trace v1.27.0 h1:IqYb813p7cmbHk0a5y6pD5JPakbVfftRXABGt5/Rscw= +go.opentelemetry.io/otel/trace v1.27.0/go.mod h1:6RiD1hkAprV4/q+yd2ln1HG9GoPx39SuvvstaLBl+l4= +go.opentelemetry.io/proto/otlp v1.2.0 h1:pVeZGk7nXDC9O2hncA6nHldxEjm6LByfA2aN8IOkz94= +go.opentelemetry.io/proto/otlp v1.2.0/go.mod h1:gGpR8txAl5M03pDhMC79G6SdqNV26naRm/KDsgaHD8A= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= +golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= +golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A= +google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU= +google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY= +google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg= +google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= +google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/examples/quick-start-grpc-stream-propagation/consumer-worker/main.go b/examples/quick-start-grpc-stream-propagation/consumer-worker/main.go new file mode 100644 index 0000000000..3d1de1f7f8 --- /dev/null +++ b/examples/quick-start-grpc-stream-propagation/consumer-worker/main.go @@ -0,0 +1,68 @@ +package main + +import ( + "context" + "io" + "log" + + pb "github.com/kubeshop/tracetest/quick-start-grpc-stream-propagation/consumer-worker/proto" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/otel/trace" + grpc "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +func main() { + ctx := context.Background() + + producerAPIAddress := getEnvVar("PRODUCER_API_ADDRESS", "localhost:8080") + otelExporterEndpoint := getEnvVar("OTEL_EXPORTER_OTLP_ENDPOINT", "localhost:4317") + otelServiceName := getEnvVar("OTEL_SERVICE_NAME", "producer-api") + + tracer, err := setupOpenTelemetry(context.Background(), otelExporterEndpoint, otelServiceName) + if err != nil { + log.Fatalf("failed to initialize OpenTelemetry: %v", err) + return + } + + grpcClient, err := grpc.NewClient( + producerAPIAddress, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithStatsHandler(otelgrpc.NewClientHandler()), + ) + if err != nil { + log.Fatalf("could not connect to producer API: %v", err) + } + + log.Printf("Connected to producer API at %s", producerAPIAddress) + + client := pb.NewPaymentReceiverClient(grpcClient) + + stream, err := client.NotifyPayment(ctx, &pb.Empty{}, grpc.WaitForReady(true)) + if err != nil { + log.Fatalf("could not receive payment notifications: %v", err) + } + + log.Printf("Listening for payment notifications...") + + for { + notification, err := stream.Recv() + if err == io.EOF { + log.Printf("No more payment notifications") + return + } + if err != nil { + log.Fatalf("could not receive payment notification: %v", err) + } + + processPaymentNotification(tracer, notification) + } +} + +func processPaymentNotification(tracer trace.Tracer, notification *pb.PaymentNotification) { + messageProcessingCtx := injectMetadataIntoContext(context.Background(), notification.Metadata) + _, span := tracer.Start(messageProcessingCtx, "ProcessPaymentNotification") + defer span.End() + + log.Printf("Received payment notification: %v", notification) +} diff --git a/examples/quick-start-grpc-stream-propagation/consumer-worker/proto/paymentreceiver.pb.go b/examples/quick-start-grpc-stream-propagation/consumer-worker/proto/paymentreceiver.pb.go new file mode 100644 index 0000000000..ccf7be030e --- /dev/null +++ b/examples/quick-start-grpc-stream-propagation/consumer-worker/proto/paymentreceiver.pb.go @@ -0,0 +1,378 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.31.0 +// protoc v5.27.0 +// source: paymentreceiver.proto + +package proto + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Empty struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *Empty) Reset() { + *x = Empty{} + if protoimpl.UnsafeEnabled { + mi := &file_paymentreceiver_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Empty) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Empty) ProtoMessage() {} + +func (x *Empty) ProtoReflect() protoreflect.Message { + mi := &file_paymentreceiver_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Empty.ProtoReflect.Descriptor instead. +func (*Empty) Descriptor() ([]byte, []int) { + return file_paymentreceiver_proto_rawDescGZIP(), []int{0} +} + +type Payment struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + CustomerId string `protobuf:"bytes,1,opt,name=customerId,proto3" json:"customerId,omitempty"` + Amount float32 `protobuf:"fixed32,2,opt,name=amount,proto3" json:"amount,omitempty"` +} + +func (x *Payment) Reset() { + *x = Payment{} + if protoimpl.UnsafeEnabled { + mi := &file_paymentreceiver_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Payment) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Payment) ProtoMessage() {} + +func (x *Payment) ProtoReflect() protoreflect.Message { + mi := &file_paymentreceiver_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Payment.ProtoReflect.Descriptor instead. +func (*Payment) Descriptor() ([]byte, []int) { + return file_paymentreceiver_proto_rawDescGZIP(), []int{1} +} + +func (x *Payment) GetCustomerId() string { + if x != nil { + return x.CustomerId + } + return "" +} + +func (x *Payment) GetAmount() float32 { + if x != nil { + return x.Amount + } + return 0 +} + +type ReceivePaymentResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Received bool `protobuf:"varint,1,opt,name=received,proto3" json:"received,omitempty"` +} + +func (x *ReceivePaymentResponse) Reset() { + *x = ReceivePaymentResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_paymentreceiver_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ReceivePaymentResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReceivePaymentResponse) ProtoMessage() {} + +func (x *ReceivePaymentResponse) ProtoReflect() protoreflect.Message { + mi := &file_paymentreceiver_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReceivePaymentResponse.ProtoReflect.Descriptor instead. +func (*ReceivePaymentResponse) Descriptor() ([]byte, []int) { + return file_paymentreceiver_proto_rawDescGZIP(), []int{2} +} + +func (x *ReceivePaymentResponse) GetReceived() bool { + if x != nil { + return x.Received + } + return false +} + +type PaymentNotification struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Payment *Payment `protobuf:"bytes,1,opt,name=payment,proto3" json:"payment,omitempty"` + HighValuePayment bool `protobuf:"varint,2,opt,name=highValuePayment,proto3" json:"highValuePayment,omitempty"` + Metadata map[string]string `protobuf:"bytes,3,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` +} + +func (x *PaymentNotification) Reset() { + *x = PaymentNotification{} + if protoimpl.UnsafeEnabled { + mi := &file_paymentreceiver_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PaymentNotification) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PaymentNotification) ProtoMessage() {} + +func (x *PaymentNotification) ProtoReflect() protoreflect.Message { + mi := &file_paymentreceiver_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PaymentNotification.ProtoReflect.Descriptor instead. +func (*PaymentNotification) Descriptor() ([]byte, []int) { + return file_paymentreceiver_proto_rawDescGZIP(), []int{3} +} + +func (x *PaymentNotification) GetPayment() *Payment { + if x != nil { + return x.Payment + } + return nil +} + +func (x *PaymentNotification) GetHighValuePayment() bool { + if x != nil { + return x.HighValuePayment + } + return false +} + +func (x *PaymentNotification) GetMetadata() map[string]string { + if x != nil { + return x.Metadata + } + return nil +} + +var File_paymentreceiver_proto protoreflect.FileDescriptor + +var file_paymentreceiver_proto_rawDesc = []byte{ + 0x0a, 0x15, 0x70, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x72, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, + 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x07, + 0x0a, 0x05, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x41, 0x0a, 0x07, 0x50, 0x61, 0x79, 0x6d, 0x65, + 0x6e, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x63, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x65, 0x72, 0x49, 0x64, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x63, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x65, 0x72, + 0x49, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x61, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x02, 0x52, 0x06, 0x61, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x34, 0x0a, 0x16, 0x52, 0x65, + 0x63, 0x65, 0x69, 0x76, 0x65, 0x50, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x64, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x72, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x64, + 0x22, 0xee, 0x01, 0x0a, 0x13, 0x50, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x4e, 0x6f, 0x74, 0x69, + 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x28, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6d, + 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x2e, 0x50, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6d, 0x65, + 0x6e, 0x74, 0x12, 0x2a, 0x0a, 0x10, 0x68, 0x69, 0x67, 0x68, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x50, + 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x10, 0x68, 0x69, + 0x67, 0x68, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x50, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x44, + 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, + 0x32, 0x28, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, + 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x4d, 0x65, 0x74, + 0x61, 0x64, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, + 0x64, 0x61, 0x74, 0x61, 0x1a, 0x3b, 0x0a, 0x0d, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, + 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, + 0x01, 0x32, 0x93, 0x01, 0x0a, 0x0f, 0x50, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x63, + 0x65, 0x69, 0x76, 0x65, 0x72, 0x12, 0x41, 0x0a, 0x0e, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, + 0x50, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x0e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, + 0x50, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x1a, 0x1d, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, + 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x50, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x3d, 0x0a, 0x0d, 0x4e, 0x6f, 0x74, 0x69, + 0x66, 0x79, 0x50, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1a, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, + 0x50, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x22, 0x00, 0x30, 0x01, 0x42, 0x49, 0x5a, 0x47, 0x67, 0x69, 0x74, 0x68, 0x75, + 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6b, 0x75, 0x62, 0x65, 0x73, 0x68, 0x6f, 0x70, 0x2f, 0x74, + 0x72, 0x61, 0x63, 0x65, 0x74, 0x65, 0x73, 0x74, 0x2f, 0x71, 0x75, 0x69, 0x63, 0x6b, 0x2d, 0x73, + 0x74, 0x61, 0x72, 0x74, 0x2d, 0x67, 0x72, 0x70, 0x63, 0x2d, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, + 0x2d, 0x70, 0x72, 0x6f, 0x70, 0x61, 0x67, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_paymentreceiver_proto_rawDescOnce sync.Once + file_paymentreceiver_proto_rawDescData = file_paymentreceiver_proto_rawDesc +) + +func file_paymentreceiver_proto_rawDescGZIP() []byte { + file_paymentreceiver_proto_rawDescOnce.Do(func() { + file_paymentreceiver_proto_rawDescData = protoimpl.X.CompressGZIP(file_paymentreceiver_proto_rawDescData) + }) + return file_paymentreceiver_proto_rawDescData +} + +var file_paymentreceiver_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_paymentreceiver_proto_goTypes = []interface{}{ + (*Empty)(nil), // 0: proto.Empty + (*Payment)(nil), // 1: proto.Payment + (*ReceivePaymentResponse)(nil), // 2: proto.ReceivePaymentResponse + (*PaymentNotification)(nil), // 3: proto.PaymentNotification + nil, // 4: proto.PaymentNotification.MetadataEntry +} +var file_paymentreceiver_proto_depIdxs = []int32{ + 1, // 0: proto.PaymentNotification.payment:type_name -> proto.Payment + 4, // 1: proto.PaymentNotification.metadata:type_name -> proto.PaymentNotification.MetadataEntry + 1, // 2: proto.PaymentReceiver.ReceivePayment:input_type -> proto.Payment + 0, // 3: proto.PaymentReceiver.NotifyPayment:input_type -> proto.Empty + 2, // 4: proto.PaymentReceiver.ReceivePayment:output_type -> proto.ReceivePaymentResponse + 3, // 5: proto.PaymentReceiver.NotifyPayment:output_type -> proto.PaymentNotification + 4, // [4:6] is the sub-list for method output_type + 2, // [2:4] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name +} + +func init() { file_paymentreceiver_proto_init() } +func file_paymentreceiver_proto_init() { + if File_paymentreceiver_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_paymentreceiver_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Empty); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_paymentreceiver_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Payment); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_paymentreceiver_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ReceivePaymentResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_paymentreceiver_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PaymentNotification); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_paymentreceiver_proto_rawDesc, + NumEnums: 0, + NumMessages: 5, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_paymentreceiver_proto_goTypes, + DependencyIndexes: file_paymentreceiver_proto_depIdxs, + MessageInfos: file_paymentreceiver_proto_msgTypes, + }.Build() + File_paymentreceiver_proto = out.File + file_paymentreceiver_proto_rawDesc = nil + file_paymentreceiver_proto_goTypes = nil + file_paymentreceiver_proto_depIdxs = nil +} diff --git a/examples/quick-start-grpc-stream-propagation/consumer-worker/proto/paymentreceiver_grpc.pb.go b/examples/quick-start-grpc-stream-propagation/consumer-worker/proto/paymentreceiver_grpc.pb.go new file mode 100644 index 0000000000..747090ada7 --- /dev/null +++ b/examples/quick-start-grpc-stream-propagation/consumer-worker/proto/paymentreceiver_grpc.pb.go @@ -0,0 +1,174 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.3.0 +// - protoc v5.27.0 +// source: paymentreceiver.proto + +package proto + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +const ( + PaymentReceiver_ReceivePayment_FullMethodName = "/proto.PaymentReceiver/ReceivePayment" + PaymentReceiver_NotifyPayment_FullMethodName = "/proto.PaymentReceiver/NotifyPayment" +) + +// PaymentReceiverClient is the client API for PaymentReceiver service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type PaymentReceiverClient interface { + ReceivePayment(ctx context.Context, in *Payment, opts ...grpc.CallOption) (*ReceivePaymentResponse, error) + NotifyPayment(ctx context.Context, in *Empty, opts ...grpc.CallOption) (PaymentReceiver_NotifyPaymentClient, error) +} + +type paymentReceiverClient struct { + cc grpc.ClientConnInterface +} + +func NewPaymentReceiverClient(cc grpc.ClientConnInterface) PaymentReceiverClient { + return &paymentReceiverClient{cc} +} + +func (c *paymentReceiverClient) ReceivePayment(ctx context.Context, in *Payment, opts ...grpc.CallOption) (*ReceivePaymentResponse, error) { + out := new(ReceivePaymentResponse) + err := c.cc.Invoke(ctx, PaymentReceiver_ReceivePayment_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *paymentReceiverClient) NotifyPayment(ctx context.Context, in *Empty, opts ...grpc.CallOption) (PaymentReceiver_NotifyPaymentClient, error) { + stream, err := c.cc.NewStream(ctx, &PaymentReceiver_ServiceDesc.Streams[0], PaymentReceiver_NotifyPayment_FullMethodName, opts...) + if err != nil { + return nil, err + } + x := &paymentReceiverNotifyPaymentClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type PaymentReceiver_NotifyPaymentClient interface { + Recv() (*PaymentNotification, error) + grpc.ClientStream +} + +type paymentReceiverNotifyPaymentClient struct { + grpc.ClientStream +} + +func (x *paymentReceiverNotifyPaymentClient) Recv() (*PaymentNotification, error) { + m := new(PaymentNotification) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// PaymentReceiverServer is the server API for PaymentReceiver service. +// All implementations must embed UnimplementedPaymentReceiverServer +// for forward compatibility +type PaymentReceiverServer interface { + ReceivePayment(context.Context, *Payment) (*ReceivePaymentResponse, error) + NotifyPayment(*Empty, PaymentReceiver_NotifyPaymentServer) error + mustEmbedUnimplementedPaymentReceiverServer() +} + +// UnimplementedPaymentReceiverServer must be embedded to have forward compatible implementations. +type UnimplementedPaymentReceiverServer struct { +} + +func (UnimplementedPaymentReceiverServer) ReceivePayment(context.Context, *Payment) (*ReceivePaymentResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method ReceivePayment not implemented") +} +func (UnimplementedPaymentReceiverServer) NotifyPayment(*Empty, PaymentReceiver_NotifyPaymentServer) error { + return status.Errorf(codes.Unimplemented, "method NotifyPayment not implemented") +} +func (UnimplementedPaymentReceiverServer) mustEmbedUnimplementedPaymentReceiverServer() {} + +// UnsafePaymentReceiverServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to PaymentReceiverServer will +// result in compilation errors. +type UnsafePaymentReceiverServer interface { + mustEmbedUnimplementedPaymentReceiverServer() +} + +func RegisterPaymentReceiverServer(s grpc.ServiceRegistrar, srv PaymentReceiverServer) { + s.RegisterService(&PaymentReceiver_ServiceDesc, srv) +} + +func _PaymentReceiver_ReceivePayment_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Payment) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(PaymentReceiverServer).ReceivePayment(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: PaymentReceiver_ReceivePayment_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(PaymentReceiverServer).ReceivePayment(ctx, req.(*Payment)) + } + return interceptor(ctx, in, info, handler) +} + +func _PaymentReceiver_NotifyPayment_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(Empty) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(PaymentReceiverServer).NotifyPayment(m, &paymentReceiverNotifyPaymentServer{stream}) +} + +type PaymentReceiver_NotifyPaymentServer interface { + Send(*PaymentNotification) error + grpc.ServerStream +} + +type paymentReceiverNotifyPaymentServer struct { + grpc.ServerStream +} + +func (x *paymentReceiverNotifyPaymentServer) Send(m *PaymentNotification) error { + return x.ServerStream.SendMsg(m) +} + +// PaymentReceiver_ServiceDesc is the grpc.ServiceDesc for PaymentReceiver service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var PaymentReceiver_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "proto.PaymentReceiver", + HandlerType: (*PaymentReceiverServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "ReceivePayment", + Handler: _PaymentReceiver_ReceivePayment_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "NotifyPayment", + Handler: _PaymentReceiver_NotifyPayment_Handler, + ServerStreams: true, + }, + }, + Metadata: "paymentreceiver.proto", +} diff --git a/examples/quick-start-grpc-stream-propagation/consumer-worker/telemetry.go b/examples/quick-start-grpc-stream-propagation/consumer-worker/telemetry.go new file mode 100644 index 0000000000..3936fe939b --- /dev/null +++ b/examples/quick-start-grpc-stream-propagation/consumer-worker/telemetry.go @@ -0,0 +1,118 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + sdkTrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +const spanExporterTimeout = 1 * time.Minute + +func getEnvVar(envVarName, defaultValue string) string { + envVarValue := os.Getenv(envVarName) + if envVarValue == "" { + return defaultValue + } + + return envVarValue +} + +func setupOpenTelemetry(ctx context.Context, otelExporterEndpoint, serviceName string) (trace.Tracer, error) { + log.Printf("Setting up OpenTelemetry with exporter endpoint %s and service name %s", otelExporterEndpoint, serviceName) + + spanExporter, err := getSpanExporter(ctx, otelExporterEndpoint) + if err != nil { + return nil, fmt.Errorf("failed to setup span exporter: %w", err) + } + + traceProvider, err := getTraceProvider(spanExporter, serviceName) + if err != nil { + return nil, fmt.Errorf("failed to setup trace provider: %w", err) + } + + return traceProvider.Tracer(serviceName), nil +} + +func getSpanExporter(ctx context.Context, otelExporterEndpoint string) (sdkTrace.SpanExporter, error) { + ctx, cancel := context.WithTimeout(ctx, spanExporterTimeout) + defer cancel() + + conn, err := grpc.NewClient( + otelExporterEndpoint, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + return nil, fmt.Errorf("failed to create gRPC connection to collector: %w", err) + } + + traceExporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithGRPCConn(conn)) + if err != nil { + return nil, fmt.Errorf("failed to create trace exporter: %w", err) + } + + return traceExporter, nil +} + +func getTraceProvider(spanExporter sdkTrace.SpanExporter, serviceName string) (*sdkTrace.TracerProvider, error) { + defaultResource := resource.Default() + + mergedResource, err := resource.Merge( + defaultResource, + resource.NewWithAttributes( + defaultResource.SchemaURL(), + semconv.ServiceNameKey.String(serviceName), + ), + ) + if err != nil { + return nil, fmt.Errorf("failed to create otel resource: %w", err) + } + + tp := sdkTrace.NewTracerProvider( + sdkTrace.WithBatcher(spanExporter), + sdkTrace.WithResource(mergedResource), + ) + + otel.SetTracerProvider(tp) + + otel.SetTextMapPropagator( + propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + ), + ) + + return tp, nil +} + +func injectMetadataIntoContext(ctx context.Context, metadata map[string]string) context.Context { + propagator := otel.GetTextMapPropagator() + + return propagator.Extract( + ctx, + propagation.MapCarrier(metadata), + ) +} + +func extractMetadataFromContext(ctx context.Context) map[string]string { + propagator := otel.GetTextMapPropagator() + + metadata := map[string]string{} + propagator.Inject( + ctx, + propagation.MapCarrier(metadata), + ) + + return metadata +} diff --git a/examples/quick-start-grpc-stream-propagation/docker-compose.core.yaml b/examples/quick-start-grpc-stream-propagation/docker-compose.core.yaml new file mode 100644 index 0000000000..db5957fe27 --- /dev/null +++ b/examples/quick-start-grpc-stream-propagation/docker-compose.core.yaml @@ -0,0 +1,101 @@ +version: '3' +services: + producer-api: + image: quick-start-grpc-stream-propagation-producer-api + platform: linux/amd64 + build: ./producer-api + extra_hosts: + - "host.docker.internal:host-gateway" + ports: + - 8080:8080 + environment: + OTEL_EXPORTER_OTLP_ENDPOINT: otel-collector:4317 + OTEL_SERVICE_NAME: producer-api + PORT: 8080 + depends_on: + otel-collector: + condition: service_started + + consumer-worker: + image: quick-start-grpc-stream-propagation-consumer-worker + platform: linux/amd64 + build: ./consumer-worker + extra_hosts: + - "host.docker.internal:host-gateway" + environment: + OTEL_EXPORTER_OTLP_ENDPOINT: otel-collector:4317 + OTEL_SERVICE_NAME: consumer-worker + PRODUCER_API_ADDRESS: producer-api:8080 + depends_on: + otel-collector: + condition: service_started + producer-api: + condition: service_started + + tracetest: + image: kubeshop/tracetest:${TAG:-latest} + platform: linux/amd64 + volumes: + - type: bind + source: ./tracetest/tracetest-config.yaml + target: /app/tracetest.yaml + - type: bind + source: ./tracetest/tracetest-provision.yaml + target: /app/provisioning.yaml + ports: + - 11633:11633 + command: --provisioning-file /app/provisioning.yaml + depends_on: + postgres: + condition: service_healthy + otel-collector: + condition: service_started + jaeger: + condition: service_started + healthcheck: + test: ["CMD", "wget", "--spider", "localhost:11633"] + interval: 1s + timeout: 3s + retries: 60 + environment: + TRACETEST_DEV: ${TRACETEST_DEV} + + postgres: + image: postgres:14 + environment: + POSTGRES_PASSWORD: postgres + POSTGRES_USER: postgres + healthcheck: + test: pg_isready -U "$$POSTGRES_USER" -d "$$POSTGRES_DB" + interval: 1s + timeout: 5s + retries: 60 + ports: + - 5432:5432 + + otel-collector: + image: otel/opentelemetry-collector-contrib:0.101.0 + command: + - "--config" + - "/otel-local-config.yaml" + volumes: + - ./tracetest/collector.config.yaml:/otel-local-config.yaml + ports: + - 4317:4317 + depends_on: + jaeger: + condition: service_started + + jaeger: + image: jaegertracing/all-in-one:latest + restart: unless-stopped + ports: + - 16686:16686 + - 16685:16685 + environment: + - COLLECTOR_OTLP_ENABLED=true + healthcheck: + test: ["CMD", "wget", "--spider", "localhost:16686"] + interval: 1s + timeout: 3s + retries: 60 diff --git a/examples/quick-start-grpc-stream-propagation/docker-compose.yaml b/examples/quick-start-grpc-stream-propagation/docker-compose.yaml new file mode 100644 index 0000000000..f2da79e149 --- /dev/null +++ b/examples/quick-start-grpc-stream-propagation/docker-compose.yaml @@ -0,0 +1,67 @@ +services: + producer-api: + image: quick-start-grpc-stream-propagation-producer-api + platform: linux/amd64 + build: ./producer-api + extra_hosts: + - "host.docker.internal:host-gateway" + ports: + - 8080:8080 + environment: + OTEL_EXPORTER_OTLP_ENDPOINT: otel-collector:4317 + OTEL_SERVICE_NAME: producer-api + PORT: 8080 + depends_on: + otel-collector: + condition: service_started + + consumer-worker: + image: quick-start-grpc-stream-propagation-consumer-worker + platform: linux/amd64 + build: ./consumer-worker + extra_hosts: + - "host.docker.internal:host-gateway" + environment: + OTEL_EXPORTER_OTLP_ENDPOINT: otel-collector:4317 + OTEL_SERVICE_NAME: consumer-worker + PRODUCER_API_ADDRESS: producer-api:8080 + depends_on: + otel-collector: + condition: service_started + producer-api: + condition: service_started + + # Cloud-based Managed Tracetest + tracetest-agent: + image: kubeshop/tracetest-agent:latest + environment: + # Find the Agent API Key here: https://docs.tracetest.io/configuration/agent + TRACETEST_API_KEY: ${TRACETEST_API_KEY} + TRACETEST_MODE: verbose + + otel-collector: + image: otel/opentelemetry-collector-contrib:0.101.0 + command: + - "--config" + - "/otel-local-config.yaml" + volumes: + - ./tracetest/collector.config.yaml:/otel-local-config.yaml + ports: + - 4317:4317 + depends_on: + jaeger: + condition: service_started + + jaeger: + image: jaegertracing/all-in-one:latest + restart: unless-stopped + ports: + - 16686:16686 + - 16685:16685 + environment: + - COLLECTOR_OTLP_ENABLED=true + healthcheck: + test: ["CMD", "wget", "--spider", "localhost:16686"] + interval: 1s + timeout: 3s + retries: 60 diff --git a/examples/quick-start-grpc-stream-propagation/producer-api/Dockerfile b/examples/quick-start-grpc-stream-propagation/producer-api/Dockerfile new file mode 100644 index 0000000000..bbabcc4ffc --- /dev/null +++ b/examples/quick-start-grpc-stream-propagation/producer-api/Dockerfile @@ -0,0 +1,14 @@ +FROM golang:alpine as builder +ENV GO111MODULE=on +RUN apk update && apk add --no-cache git + +WORKDIR /app +COPY go.mod ./ +COPY go.sum ./ +RUN go mod download +COPY . . +RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ./bin/main . + +FROM scratch +COPY --from=builder /app/bin/main . +CMD ["./main"] diff --git a/examples/quick-start-grpc-stream-propagation/producer-api/Makefile b/examples/quick-start-grpc-stream-propagation/producer-api/Makefile new file mode 100644 index 0000000000..087d005038 --- /dev/null +++ b/examples/quick-start-grpc-stream-propagation/producer-api/Makefile @@ -0,0 +1,31 @@ +# Dependencies: +# https://grpc.io/docs/protoc-installation/ +# go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.31 +# go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.3 + +REQUIRED_BINS := protoc protoc-gen-go protoc-gen-go-grpc + +help: ## show list of commands + @echo "Choose a command to run:" + @echo "" + @awk 'BEGIN {FS = ":.*?## "} /[a-zA-Z_-]+:.*?## / {sub("\\\\n",sprintf("\n%22c"," "), $$2);printf "\033[36m%-40s\033[0m %s\n", $$1, $$2}' $(MAKEFILE_LIST) | sort + +build-proto: ensure-dependencies clean-proto ## generate gRPC code from proto files + @protoc \ + -I=../proto \ + --go_out=./proto \ + --go_opt=paths=source_relative \ + --go-grpc_out=./proto \ + --go-grpc_opt=paths=source_relative \ + ../proto/paymentreceiver.proto + +ensure-dependencies: ## check if required binaries are installed + $(foreach bin,$(REQUIRED_BINS),\ + $(if $(shell command -v $(bin) 2> /dev/null),,$(error Please install `$(bin)` or run `make install-grpc-tools`))) + +install-grpc-tools: ## install required binaries + go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.31 + go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.3 + +clean-proto: ## remove generated gRPC code + @rm -f proto/*.go diff --git a/examples/quick-start-grpc-stream-propagation/producer-api/go.mod b/examples/quick-start-grpc-stream-propagation/producer-api/go.mod new file mode 100644 index 0000000000..16e082d98e --- /dev/null +++ b/examples/quick-start-grpc-stream-propagation/producer-api/go.mod @@ -0,0 +1,27 @@ +module github.com/kubeshop/tracetest/quick-start-grpc-stream-propagation/producer-api + +go 1.21 + +require ( + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.52.0 + go.opentelemetry.io/otel v1.27.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0 + go.opentelemetry.io/otel/sdk v1.27.0 + go.opentelemetry.io/otel/trace v1.27.0 + google.golang.org/grpc v1.64.0 + google.golang.org/protobuf v1.34.1 +) + +require ( + github.com/cenkalti/backoff/v4 v4.3.0 // indirect + github.com/go-logr/logr v1.4.1 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.27.0 // indirect + go.opentelemetry.io/otel/metric v1.27.0 // indirect + go.opentelemetry.io/proto/otlp v1.2.0 // indirect + golang.org/x/net v0.25.0 // indirect + golang.org/x/sys v0.20.0 // indirect + golang.org/x/text v0.15.0 // indirect + google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect +) diff --git a/examples/quick-start-grpc-stream-propagation/producer-api/go.sum b/examples/quick-start-grpc-stream-propagation/producer-api/go.sum new file mode 100644 index 0000000000..08f8245d22 --- /dev/null +++ b/examples/quick-start-grpc-stream-propagation/producer-api/go.sum @@ -0,0 +1,49 @@ +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= +github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.52.0 h1:vS1Ao/R55RNV4O7TA2Qopok8yN+X0LIP6RVWLFkprck= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.52.0/go.mod h1:BMsdeOxN04K0L5FNUBfjFdvwWGNe/rkmSwH4Aelu/X0= +go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg= +go.opentelemetry.io/otel v1.27.0/go.mod h1:DMpAK8fzYRzs+bi3rS5REupisuqTheUlSZJ1WnZaPAQ= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.27.0 h1:R9DE4kQ4k+YtfLI2ULwX82VtNQ2J8yZmA7ZIF/D+7Mc= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.27.0/go.mod h1:OQFyQVrDlbe+R7xrEyDr/2Wr67Ol0hRUgsfA+V5A95s= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0 h1:qFffATk0X+HD+f1Z8lswGiOQYKHRlzfmdJm0wEaVrFA= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0/go.mod h1:MOiCmryaYtc+V0Ei+Tx9o5S1ZjA7kzLucuVuyzBZloQ= +go.opentelemetry.io/otel/metric v1.27.0 h1:hvj3vdEKyeCi4YaYfNjv2NUje8FqKqUY8IlF0FxV/ik= +go.opentelemetry.io/otel/metric v1.27.0/go.mod h1:mVFgmRlhljgBiuk/MP/oKylr4hs85GZAylncepAX/ak= +go.opentelemetry.io/otel/sdk v1.27.0 h1:mlk+/Y1gLPLn84U4tI8d3GNJmGT/eXe3ZuOXN9kTWmI= +go.opentelemetry.io/otel/sdk v1.27.0/go.mod h1:Ha9vbLwJE6W86YstIywK2xFfPjbWlCuwPtMkKdz/Y4A= +go.opentelemetry.io/otel/trace v1.27.0 h1:IqYb813p7cmbHk0a5y6pD5JPakbVfftRXABGt5/Rscw= +go.opentelemetry.io/otel/trace v1.27.0/go.mod h1:6RiD1hkAprV4/q+yd2ln1HG9GoPx39SuvvstaLBl+l4= +go.opentelemetry.io/proto/otlp v1.2.0 h1:pVeZGk7nXDC9O2hncA6nHldxEjm6LByfA2aN8IOkz94= +go.opentelemetry.io/proto/otlp v1.2.0/go.mod h1:gGpR8txAl5M03pDhMC79G6SdqNV26naRm/KDsgaHD8A= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= +golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= +golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A= +google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU= +google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY= +google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg= +google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= +google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/examples/quick-start-grpc-stream-propagation/producer-api/main.go b/examples/quick-start-grpc-stream-propagation/producer-api/main.go new file mode 100644 index 0000000000..65d100b6cd --- /dev/null +++ b/examples/quick-start-grpc-stream-propagation/producer-api/main.go @@ -0,0 +1,111 @@ +package main + +import ( + "context" + "fmt" + "log" + "net" + + pb "github.com/kubeshop/tracetest/quick-start-grpc-stream-propagation/producer-api/proto" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/otel/trace" + grpc "google.golang.org/grpc" +) + +// Implement the PaymentReceiverServer interface +type serverImpl struct { + pb.PaymentReceiverServer + tracer trace.Tracer +} + +type paymentWithMetadata struct { + payment *pb.Payment + metadata map[string]string +} + +// Guarantee that the serverImpl implements the PaymentReceiverServer interface +var _ pb.PaymentReceiverServer = &serverImpl{} + +// Channel to store payments and used as a "in-memory" queue +var paymentChannel = make(chan *paymentWithMetadata) + +func (s *serverImpl) ReceivePayment(ctx context.Context, payment *pb.Payment) (*pb.ReceivePaymentResponse, error) { + go func() { + ctx, span := s.tracer.Start(ctx, "EnqueuePayment") + defer span.End() + + message := &paymentWithMetadata{ + payment: payment, + metadata: extractMetadataFromContext(ctx), + } + + // handle channel as in-memory queue + paymentChannel <- message + }() + + return &pb.ReceivePaymentResponse{Received: true}, nil +} + +func (s *serverImpl) NotifyPayment(_ *pb.Empty, stream pb.PaymentReceiver_NotifyPaymentServer) error { + for { + message, ok := <-paymentChannel + if !ok { + return nil + } + + ctx := injectMetadataIntoContext(context.Background(), message.metadata) + ctx, span := s.tracer.Start(ctx, "SendPaymentNotification") + + payment := message.payment + highValuePayment := payment.Amount > 10_000 + + notification := &pb.PaymentNotification{ + Payment: payment, + HighValuePayment: highValuePayment, + } + + // extract OTel data from context and add it to the notification + notification.Metadata = extractMetadataFromContext(ctx) + + if err := stream.Send(notification); err != nil { + return err + } + + span.End() + } +} + +func main() { + port := getEnvVar("PORT", "8080") + otelExporterEndpoint := getEnvVar("OTEL_EXPORTER_OTLP_ENDPOINT", "localhost:4317") + otelServiceName := getEnvVar("OTEL_SERVICE_NAME", "producer-api") + + tracer, err := setupOpenTelemetry(context.Background(), otelExporterEndpoint, otelServiceName) + if err != nil { + log.Fatalf("failed to initialize OpenTelemetry: %v", err) + return + } + + address := fmt.Sprintf(":%s", port) + + lis, err := net.Listen("tcp", address) + if err != nil { + log.Fatalf("failed to listen: %v", err) + return + } + + log.Printf("server listening at %s", lis.Addr()) + + grpcServer := grpc.NewServer( + grpc.StatsHandler(otelgrpc.NewServerHandler()), + ) + + paymentReceiverServer := &serverImpl{ + tracer: tracer, + } + + pb.RegisterPaymentReceiverServer(grpcServer, paymentReceiverServer) + if err := grpcServer.Serve(lis); err != nil { + log.Fatalf("failed to serve: %v", err) + } +} diff --git a/examples/quick-start-grpc-stream-propagation/producer-api/proto/paymentreceiver.pb.go b/examples/quick-start-grpc-stream-propagation/producer-api/proto/paymentreceiver.pb.go new file mode 100644 index 0000000000..ccf7be030e --- /dev/null +++ b/examples/quick-start-grpc-stream-propagation/producer-api/proto/paymentreceiver.pb.go @@ -0,0 +1,378 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.31.0 +// protoc v5.27.0 +// source: paymentreceiver.proto + +package proto + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Empty struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *Empty) Reset() { + *x = Empty{} + if protoimpl.UnsafeEnabled { + mi := &file_paymentreceiver_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Empty) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Empty) ProtoMessage() {} + +func (x *Empty) ProtoReflect() protoreflect.Message { + mi := &file_paymentreceiver_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Empty.ProtoReflect.Descriptor instead. +func (*Empty) Descriptor() ([]byte, []int) { + return file_paymentreceiver_proto_rawDescGZIP(), []int{0} +} + +type Payment struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + CustomerId string `protobuf:"bytes,1,opt,name=customerId,proto3" json:"customerId,omitempty"` + Amount float32 `protobuf:"fixed32,2,opt,name=amount,proto3" json:"amount,omitempty"` +} + +func (x *Payment) Reset() { + *x = Payment{} + if protoimpl.UnsafeEnabled { + mi := &file_paymentreceiver_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Payment) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Payment) ProtoMessage() {} + +func (x *Payment) ProtoReflect() protoreflect.Message { + mi := &file_paymentreceiver_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Payment.ProtoReflect.Descriptor instead. +func (*Payment) Descriptor() ([]byte, []int) { + return file_paymentreceiver_proto_rawDescGZIP(), []int{1} +} + +func (x *Payment) GetCustomerId() string { + if x != nil { + return x.CustomerId + } + return "" +} + +func (x *Payment) GetAmount() float32 { + if x != nil { + return x.Amount + } + return 0 +} + +type ReceivePaymentResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Received bool `protobuf:"varint,1,opt,name=received,proto3" json:"received,omitempty"` +} + +func (x *ReceivePaymentResponse) Reset() { + *x = ReceivePaymentResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_paymentreceiver_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ReceivePaymentResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReceivePaymentResponse) ProtoMessage() {} + +func (x *ReceivePaymentResponse) ProtoReflect() protoreflect.Message { + mi := &file_paymentreceiver_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReceivePaymentResponse.ProtoReflect.Descriptor instead. +func (*ReceivePaymentResponse) Descriptor() ([]byte, []int) { + return file_paymentreceiver_proto_rawDescGZIP(), []int{2} +} + +func (x *ReceivePaymentResponse) GetReceived() bool { + if x != nil { + return x.Received + } + return false +} + +type PaymentNotification struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Payment *Payment `protobuf:"bytes,1,opt,name=payment,proto3" json:"payment,omitempty"` + HighValuePayment bool `protobuf:"varint,2,opt,name=highValuePayment,proto3" json:"highValuePayment,omitempty"` + Metadata map[string]string `protobuf:"bytes,3,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` +} + +func (x *PaymentNotification) Reset() { + *x = PaymentNotification{} + if protoimpl.UnsafeEnabled { + mi := &file_paymentreceiver_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PaymentNotification) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PaymentNotification) ProtoMessage() {} + +func (x *PaymentNotification) ProtoReflect() protoreflect.Message { + mi := &file_paymentreceiver_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PaymentNotification.ProtoReflect.Descriptor instead. +func (*PaymentNotification) Descriptor() ([]byte, []int) { + return file_paymentreceiver_proto_rawDescGZIP(), []int{3} +} + +func (x *PaymentNotification) GetPayment() *Payment { + if x != nil { + return x.Payment + } + return nil +} + +func (x *PaymentNotification) GetHighValuePayment() bool { + if x != nil { + return x.HighValuePayment + } + return false +} + +func (x *PaymentNotification) GetMetadata() map[string]string { + if x != nil { + return x.Metadata + } + return nil +} + +var File_paymentreceiver_proto protoreflect.FileDescriptor + +var file_paymentreceiver_proto_rawDesc = []byte{ + 0x0a, 0x15, 0x70, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x72, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, + 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x07, + 0x0a, 0x05, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x41, 0x0a, 0x07, 0x50, 0x61, 0x79, 0x6d, 0x65, + 0x6e, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x63, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x65, 0x72, 0x49, 0x64, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x63, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x65, 0x72, + 0x49, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x61, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x02, 0x52, 0x06, 0x61, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x34, 0x0a, 0x16, 0x52, 0x65, + 0x63, 0x65, 0x69, 0x76, 0x65, 0x50, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x64, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x72, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x64, + 0x22, 0xee, 0x01, 0x0a, 0x13, 0x50, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x4e, 0x6f, 0x74, 0x69, + 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x28, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6d, + 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x2e, 0x50, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6d, 0x65, + 0x6e, 0x74, 0x12, 0x2a, 0x0a, 0x10, 0x68, 0x69, 0x67, 0x68, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x50, + 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x10, 0x68, 0x69, + 0x67, 0x68, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x50, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x44, + 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, + 0x32, 0x28, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, + 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x4d, 0x65, 0x74, + 0x61, 0x64, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, + 0x64, 0x61, 0x74, 0x61, 0x1a, 0x3b, 0x0a, 0x0d, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, + 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, + 0x01, 0x32, 0x93, 0x01, 0x0a, 0x0f, 0x50, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x63, + 0x65, 0x69, 0x76, 0x65, 0x72, 0x12, 0x41, 0x0a, 0x0e, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, + 0x50, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x0e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, + 0x50, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x1a, 0x1d, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, + 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x50, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x3d, 0x0a, 0x0d, 0x4e, 0x6f, 0x74, 0x69, + 0x66, 0x79, 0x50, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1a, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, + 0x50, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x22, 0x00, 0x30, 0x01, 0x42, 0x49, 0x5a, 0x47, 0x67, 0x69, 0x74, 0x68, 0x75, + 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6b, 0x75, 0x62, 0x65, 0x73, 0x68, 0x6f, 0x70, 0x2f, 0x74, + 0x72, 0x61, 0x63, 0x65, 0x74, 0x65, 0x73, 0x74, 0x2f, 0x71, 0x75, 0x69, 0x63, 0x6b, 0x2d, 0x73, + 0x74, 0x61, 0x72, 0x74, 0x2d, 0x67, 0x72, 0x70, 0x63, 0x2d, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, + 0x2d, 0x70, 0x72, 0x6f, 0x70, 0x61, 0x67, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_paymentreceiver_proto_rawDescOnce sync.Once + file_paymentreceiver_proto_rawDescData = file_paymentreceiver_proto_rawDesc +) + +func file_paymentreceiver_proto_rawDescGZIP() []byte { + file_paymentreceiver_proto_rawDescOnce.Do(func() { + file_paymentreceiver_proto_rawDescData = protoimpl.X.CompressGZIP(file_paymentreceiver_proto_rawDescData) + }) + return file_paymentreceiver_proto_rawDescData +} + +var file_paymentreceiver_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_paymentreceiver_proto_goTypes = []interface{}{ + (*Empty)(nil), // 0: proto.Empty + (*Payment)(nil), // 1: proto.Payment + (*ReceivePaymentResponse)(nil), // 2: proto.ReceivePaymentResponse + (*PaymentNotification)(nil), // 3: proto.PaymentNotification + nil, // 4: proto.PaymentNotification.MetadataEntry +} +var file_paymentreceiver_proto_depIdxs = []int32{ + 1, // 0: proto.PaymentNotification.payment:type_name -> proto.Payment + 4, // 1: proto.PaymentNotification.metadata:type_name -> proto.PaymentNotification.MetadataEntry + 1, // 2: proto.PaymentReceiver.ReceivePayment:input_type -> proto.Payment + 0, // 3: proto.PaymentReceiver.NotifyPayment:input_type -> proto.Empty + 2, // 4: proto.PaymentReceiver.ReceivePayment:output_type -> proto.ReceivePaymentResponse + 3, // 5: proto.PaymentReceiver.NotifyPayment:output_type -> proto.PaymentNotification + 4, // [4:6] is the sub-list for method output_type + 2, // [2:4] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name +} + +func init() { file_paymentreceiver_proto_init() } +func file_paymentreceiver_proto_init() { + if File_paymentreceiver_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_paymentreceiver_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Empty); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_paymentreceiver_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Payment); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_paymentreceiver_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ReceivePaymentResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_paymentreceiver_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PaymentNotification); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_paymentreceiver_proto_rawDesc, + NumEnums: 0, + NumMessages: 5, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_paymentreceiver_proto_goTypes, + DependencyIndexes: file_paymentreceiver_proto_depIdxs, + MessageInfos: file_paymentreceiver_proto_msgTypes, + }.Build() + File_paymentreceiver_proto = out.File + file_paymentreceiver_proto_rawDesc = nil + file_paymentreceiver_proto_goTypes = nil + file_paymentreceiver_proto_depIdxs = nil +} diff --git a/examples/quick-start-grpc-stream-propagation/producer-api/proto/paymentreceiver_grpc.pb.go b/examples/quick-start-grpc-stream-propagation/producer-api/proto/paymentreceiver_grpc.pb.go new file mode 100644 index 0000000000..747090ada7 --- /dev/null +++ b/examples/quick-start-grpc-stream-propagation/producer-api/proto/paymentreceiver_grpc.pb.go @@ -0,0 +1,174 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.3.0 +// - protoc v5.27.0 +// source: paymentreceiver.proto + +package proto + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +const ( + PaymentReceiver_ReceivePayment_FullMethodName = "/proto.PaymentReceiver/ReceivePayment" + PaymentReceiver_NotifyPayment_FullMethodName = "/proto.PaymentReceiver/NotifyPayment" +) + +// PaymentReceiverClient is the client API for PaymentReceiver service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type PaymentReceiverClient interface { + ReceivePayment(ctx context.Context, in *Payment, opts ...grpc.CallOption) (*ReceivePaymentResponse, error) + NotifyPayment(ctx context.Context, in *Empty, opts ...grpc.CallOption) (PaymentReceiver_NotifyPaymentClient, error) +} + +type paymentReceiverClient struct { + cc grpc.ClientConnInterface +} + +func NewPaymentReceiverClient(cc grpc.ClientConnInterface) PaymentReceiverClient { + return &paymentReceiverClient{cc} +} + +func (c *paymentReceiverClient) ReceivePayment(ctx context.Context, in *Payment, opts ...grpc.CallOption) (*ReceivePaymentResponse, error) { + out := new(ReceivePaymentResponse) + err := c.cc.Invoke(ctx, PaymentReceiver_ReceivePayment_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *paymentReceiverClient) NotifyPayment(ctx context.Context, in *Empty, opts ...grpc.CallOption) (PaymentReceiver_NotifyPaymentClient, error) { + stream, err := c.cc.NewStream(ctx, &PaymentReceiver_ServiceDesc.Streams[0], PaymentReceiver_NotifyPayment_FullMethodName, opts...) + if err != nil { + return nil, err + } + x := &paymentReceiverNotifyPaymentClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type PaymentReceiver_NotifyPaymentClient interface { + Recv() (*PaymentNotification, error) + grpc.ClientStream +} + +type paymentReceiverNotifyPaymentClient struct { + grpc.ClientStream +} + +func (x *paymentReceiverNotifyPaymentClient) Recv() (*PaymentNotification, error) { + m := new(PaymentNotification) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// PaymentReceiverServer is the server API for PaymentReceiver service. +// All implementations must embed UnimplementedPaymentReceiverServer +// for forward compatibility +type PaymentReceiverServer interface { + ReceivePayment(context.Context, *Payment) (*ReceivePaymentResponse, error) + NotifyPayment(*Empty, PaymentReceiver_NotifyPaymentServer) error + mustEmbedUnimplementedPaymentReceiverServer() +} + +// UnimplementedPaymentReceiverServer must be embedded to have forward compatible implementations. +type UnimplementedPaymentReceiverServer struct { +} + +func (UnimplementedPaymentReceiverServer) ReceivePayment(context.Context, *Payment) (*ReceivePaymentResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method ReceivePayment not implemented") +} +func (UnimplementedPaymentReceiverServer) NotifyPayment(*Empty, PaymentReceiver_NotifyPaymentServer) error { + return status.Errorf(codes.Unimplemented, "method NotifyPayment not implemented") +} +func (UnimplementedPaymentReceiverServer) mustEmbedUnimplementedPaymentReceiverServer() {} + +// UnsafePaymentReceiverServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to PaymentReceiverServer will +// result in compilation errors. +type UnsafePaymentReceiverServer interface { + mustEmbedUnimplementedPaymentReceiverServer() +} + +func RegisterPaymentReceiverServer(s grpc.ServiceRegistrar, srv PaymentReceiverServer) { + s.RegisterService(&PaymentReceiver_ServiceDesc, srv) +} + +func _PaymentReceiver_ReceivePayment_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Payment) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(PaymentReceiverServer).ReceivePayment(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: PaymentReceiver_ReceivePayment_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(PaymentReceiverServer).ReceivePayment(ctx, req.(*Payment)) + } + return interceptor(ctx, in, info, handler) +} + +func _PaymentReceiver_NotifyPayment_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(Empty) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(PaymentReceiverServer).NotifyPayment(m, &paymentReceiverNotifyPaymentServer{stream}) +} + +type PaymentReceiver_NotifyPaymentServer interface { + Send(*PaymentNotification) error + grpc.ServerStream +} + +type paymentReceiverNotifyPaymentServer struct { + grpc.ServerStream +} + +func (x *paymentReceiverNotifyPaymentServer) Send(m *PaymentNotification) error { + return x.ServerStream.SendMsg(m) +} + +// PaymentReceiver_ServiceDesc is the grpc.ServiceDesc for PaymentReceiver service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var PaymentReceiver_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "proto.PaymentReceiver", + HandlerType: (*PaymentReceiverServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "ReceivePayment", + Handler: _PaymentReceiver_ReceivePayment_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "NotifyPayment", + Handler: _PaymentReceiver_NotifyPayment_Handler, + ServerStreams: true, + }, + }, + Metadata: "paymentreceiver.proto", +} diff --git a/examples/quick-start-grpc-stream-propagation/producer-api/telemetry.go b/examples/quick-start-grpc-stream-propagation/producer-api/telemetry.go new file mode 100644 index 0000000000..3936fe939b --- /dev/null +++ b/examples/quick-start-grpc-stream-propagation/producer-api/telemetry.go @@ -0,0 +1,118 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + sdkTrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +const spanExporterTimeout = 1 * time.Minute + +func getEnvVar(envVarName, defaultValue string) string { + envVarValue := os.Getenv(envVarName) + if envVarValue == "" { + return defaultValue + } + + return envVarValue +} + +func setupOpenTelemetry(ctx context.Context, otelExporterEndpoint, serviceName string) (trace.Tracer, error) { + log.Printf("Setting up OpenTelemetry with exporter endpoint %s and service name %s", otelExporterEndpoint, serviceName) + + spanExporter, err := getSpanExporter(ctx, otelExporterEndpoint) + if err != nil { + return nil, fmt.Errorf("failed to setup span exporter: %w", err) + } + + traceProvider, err := getTraceProvider(spanExporter, serviceName) + if err != nil { + return nil, fmt.Errorf("failed to setup trace provider: %w", err) + } + + return traceProvider.Tracer(serviceName), nil +} + +func getSpanExporter(ctx context.Context, otelExporterEndpoint string) (sdkTrace.SpanExporter, error) { + ctx, cancel := context.WithTimeout(ctx, spanExporterTimeout) + defer cancel() + + conn, err := grpc.NewClient( + otelExporterEndpoint, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + return nil, fmt.Errorf("failed to create gRPC connection to collector: %w", err) + } + + traceExporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithGRPCConn(conn)) + if err != nil { + return nil, fmt.Errorf("failed to create trace exporter: %w", err) + } + + return traceExporter, nil +} + +func getTraceProvider(spanExporter sdkTrace.SpanExporter, serviceName string) (*sdkTrace.TracerProvider, error) { + defaultResource := resource.Default() + + mergedResource, err := resource.Merge( + defaultResource, + resource.NewWithAttributes( + defaultResource.SchemaURL(), + semconv.ServiceNameKey.String(serviceName), + ), + ) + if err != nil { + return nil, fmt.Errorf("failed to create otel resource: %w", err) + } + + tp := sdkTrace.NewTracerProvider( + sdkTrace.WithBatcher(spanExporter), + sdkTrace.WithResource(mergedResource), + ) + + otel.SetTracerProvider(tp) + + otel.SetTextMapPropagator( + propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + ), + ) + + return tp, nil +} + +func injectMetadataIntoContext(ctx context.Context, metadata map[string]string) context.Context { + propagator := otel.GetTextMapPropagator() + + return propagator.Extract( + ctx, + propagation.MapCarrier(metadata), + ) +} + +func extractMetadataFromContext(ctx context.Context) map[string]string { + propagator := otel.GetTextMapPropagator() + + metadata := map[string]string{} + propagator.Inject( + ctx, + propagation.MapCarrier(metadata), + ) + + return metadata +} diff --git a/examples/quick-start-grpc-stream-propagation/proto/paymentreceiver.proto b/examples/quick-start-grpc-stream-propagation/proto/paymentreceiver.proto new file mode 100644 index 0000000000..1f15645383 --- /dev/null +++ b/examples/quick-start-grpc-stream-propagation/proto/paymentreceiver.proto @@ -0,0 +1,27 @@ +syntax = "proto3"; + +package proto; +option go_package = "github.com/kubeshop/tracetest/quick-start-grpc-stream-propagation/proto"; + +service PaymentReceiver { + rpc ReceivePayment(Payment) returns (ReceivePaymentResponse) {} + + rpc NotifyPayment(Empty) returns (stream PaymentNotification) {} +} + +message Empty {} + +message Payment { + string customerId = 1; + float amount = 2; +} + +message ReceivePaymentResponse { + bool received = 1; +} + +message PaymentNotification { + Payment payment = 1; + bool highValuePayment = 2; + map metadata = 3; +} diff --git a/examples/quick-start-grpc-stream-propagation/trace-based-test.yaml b/examples/quick-start-grpc-stream-propagation/trace-based-test.yaml new file mode 100644 index 0000000000..37a61e6f6a --- /dev/null +++ b/examples/quick-start-grpc-stream-propagation/trace-based-test.yaml @@ -0,0 +1,37 @@ +type: Test +spec: + id: pprDfSUSg + name: Test gRPC Stream Propagation + trigger: + type: grpc + grpc: + address: producer-api:8080 + method: proto.PaymentReceiver.ReceivePayment + protobufFile: ./proto/paymentreceiver.proto + request: | + { + "customerId": "1234", + "amount": 50000 + } + specs: + - selector: span[name="proto.PaymentReceiver/ReceivePayment"] + name: It should call ReceivePayment gRPC endpoint + assertions: + - attr:tracetest.selected_spans.count = 1 + - selector: span[name="EnqueuePayment"] + name: In should enqueue the payment to send it in a stream + assertions: + - attr:tracetest.selected_spans.count = 1 + - selector: span[name="SendPaymentNotification"] + name: It should send the a payment notification through a gRPC stream + assertions: + - attr:tracetest.selected_spans.count = 1 + - selector: span[name="ProcessPaymentNotification"] + name: It should receive a PaymentNotification through a stream and process it + assertions: + - attr:tracetest.selected_spans.count = 1 + - selector: span[name="proto.PaymentReceiver/ReceivePayment"] span[name="EnqueuePayment"] span[name="SendPaymentNotification"] span[name="ProcessPaymentNotification"] + name: The trace shape is correct + assertions: + - attr:tracetest.selected_spans.count = 1 + diff --git a/examples/quick-start-grpc-stream-propagation/tracetest/collector.config.yaml b/examples/quick-start-grpc-stream-propagation/tracetest/collector.config.yaml new file mode 100644 index 0000000000..a61a770ffd --- /dev/null +++ b/examples/quick-start-grpc-stream-propagation/tracetest/collector.config.yaml @@ -0,0 +1,24 @@ +receivers: + otlp: + protocols: + grpc: + http: + +processors: + batch: + timeout: 100ms + +exporters: + logging: + loglevel: debug + otlp/jaeger: + endpoint: jaeger:4317 + tls: + insecure: true + +service: + pipelines: + traces/1: + receivers: [otlp] + processors: [batch] + exporters: [otlp/jaeger] diff --git a/examples/quick-start-grpc-stream-propagation/tracetest/tracetest-config.yaml b/examples/quick-start-grpc-stream-propagation/tracetest/tracetest-config.yaml new file mode 100644 index 0000000000..5e732f6d38 --- /dev/null +++ b/examples/quick-start-grpc-stream-propagation/tracetest/tracetest-config.yaml @@ -0,0 +1,7 @@ +postgres: + host: postgres + user: postgres + password: postgres + port: 5432 + dbname: postgres + params: sslmode=disable diff --git a/examples/quick-start-grpc-stream-propagation/tracetest/tracetest-provision.yaml b/examples/quick-start-grpc-stream-propagation/tracetest/tracetest-provision.yaml new file mode 100644 index 0000000000..bcf9a527e0 --- /dev/null +++ b/examples/quick-start-grpc-stream-propagation/tracetest/tracetest-provision.yaml @@ -0,0 +1,20 @@ +--- +type: PollingProfile +spec: + name: Default + strategy: periodic + default: true + periodic: + retryDelay: 5s + timeout: 10m + +--- +type: DataStore +spec: + name: Jaeger + type: jaeger + default: true + jaeger: + endpoint: jaeger:16685 + tls: + insecure: true diff --git a/examples/quick-start-grpc-stream-propagation/tracetest/tracetest-tracing-backend.yaml b/examples/quick-start-grpc-stream-propagation/tracetest/tracetest-tracing-backend.yaml new file mode 100644 index 0000000000..1595add71d --- /dev/null +++ b/examples/quick-start-grpc-stream-propagation/tracetest/tracetest-tracing-backend.yaml @@ -0,0 +1,11 @@ +--- +type: DataStore +spec: + id: current + name: Jaeger + type: jaeger + default: true + jaeger: + endpoint: jaeger:16685 + tls: + insecure: true