From a445db89fae039337ac19c429f0cbd6a4b5e52fe Mon Sep 17 00:00:00 2001 From: mrekucci Date: Fri, 16 Aug 2024 18:14:09 +0200 Subject: [PATCH] feat: instrument grpc API calls (#345) --- .../jobs/mev-commit-emulator.nomad.j2 | 5 +-- .../templates/jobs/mev-commit.nomad.j2 | 2 +- .../nomad/playbooks/variables/profiles.yml | 7 ++++ p2p/go.mod | 1 + p2p/go.sum | 2 ++ p2p/integrationtest/bidder/main.go | 35 +++++++++++++++++++ p2p/integrationtest/provider/client.go | 2 ++ p2p/integrationtest/provider/main.go | 34 ++++++++++++++++++ p2p/integrationtest/real-bidder/main.go | 35 +++++++++++++++++++ p2p/pkg/node/node.go | 7 +++- x/util/otelutil/otel.go | 19 +++++++--- 11 files changed, 141 insertions(+), 8 deletions(-) diff --git a/infrastructure/nomad/playbooks/templates/jobs/mev-commit-emulator.nomad.j2 b/infrastructure/nomad/playbooks/templates/jobs/mev-commit-emulator.nomad.j2 index afa29d354..0ad1f422e 100644 --- a/infrastructure/nomad/playbooks/templates/jobs/mev-commit-emulator.nomad.j2 +++ b/infrastructure/nomad/playbooks/templates/jobs/mev-commit-emulator.nomad.j2 @@ -67,7 +67,7 @@ job "{{ job.name }}" { if job.get('env') and job.env.get('log-tags') else 'service:' + job.name + '-{{ env "NOMAD_ALLOC_INDEX" }}' }}" - + EMULATOR_OTEL_COLLECTOR_ENDPOINT_URL="{{ job.env.get('otel-collector-endpoint-url', '') }}" {%- raw %} {{- $idx := add (env "NOMAD_ALLOC_INDEX" | parseInt) 1 }} {{- range nomadService (printf "%s%d" "{% endraw %}{{ job.target_name }}{% raw %}" $idx) }} @@ -103,7 +103,8 @@ job "{{ job.name }}" { -rpc-addr "${EMULATOR_L1_RPC_URL}" \ {% endif %} -log-tags "${EMULATOR_LOG_TAGS}" \ - -log-fmt "${EMULATOR_LOG_FMT}" + -log-fmt "${EMULATOR_LOG_FMT}" \ + -otel-collector-endpoint-url "${EMULATOR_OTEL_COLLECTOR_ENDPOINT_URL}" \ EOH destination = "local/run.sh" perms = "0755" diff --git a/infrastructure/nomad/playbooks/templates/jobs/mev-commit.nomad.j2 b/infrastructure/nomad/playbooks/templates/jobs/mev-commit.nomad.j2 index d673deca1..33918f059 100644 --- a/infrastructure/nomad/playbooks/templates/jobs/mev-commit.nomad.j2 +++ b/infrastructure/nomad/playbooks/templates/jobs/mev-commit.nomad.j2 @@ -93,7 +93,7 @@ job "{{ job.name }}" { if job.env['log-tags'] is defined and job.env['log-tags'] else 'service:' + job.name + '-{{ env "NOMAD_ALLOC_INDEX" }}' }}" - MEV_COMMIT_OTEL_COLLECTOR_ENDPOINT_URL="grpc://{{ ansible_facts['default_ipv4']['address'] }}:4317" + MEV_COMMIT_OTEL_COLLECTOR_ENDPOINT_URL="{{ job.env['otel-collector-endpoint-url'] }}" {%- raw %} MEV_COMMIT_KEYSTORE_PATH="/local/data-{{ env "NOMAD_ALLOC_INDEX" }}/keystore" MEV_COMMIT_KEYSTORE_FILENAME="{{ with secret "secret/data/mev-commit" }}{{ .Data.data.{% endraw %}{{ job.artifacts | selectattr('keystore', 'defined') | map(attribute='keystore.name') | first }}{% raw %}_filename }}{{ end }}" diff --git a/infrastructure/nomad/playbooks/variables/profiles.yml b/infrastructure/nomad/playbooks/variables/profiles.yml index 9853dabbc..bdb2d1530 100644 --- a/infrastructure/nomad/playbooks/variables/profiles.yml +++ b/infrastructure/nomad/playbooks/variables/profiles.yml @@ -1,4 +1,5 @@ datacenter: "dc1" +otel_collector_endpoint_url: "grpc://{{ ansible_facts['default_ipv4']['address'] }}:4317" artifacts: bidder_emulator: &bidder_emulator_artifact @@ -145,6 +146,7 @@ jobs: type: bootnode tls_crt_file: "{{ tls_crt_file }}" tls_key_file: "{{ tls_key_file }}" + otel-collector-endpoint-url: "{{ otel_collector_endpoint_url }}" mev_commit_provider_node1: &mev_commit_provider_node1_job name: mev-commit-provider-node1 @@ -171,6 +173,7 @@ jobs: nat_address: "{{ ansible_facts['default_ipv4']['address'] }}" tls_crt_file: "{{ tls_crt_file }}" tls_key_file: "{{ tls_key_file }}" + otel-collector-endpoint-url: "{{ otel_collector_endpoint_url }}" mev_commit_provider_node2: &mev_commit_provider_node2_job name: mev-commit-provider-node2 @@ -253,6 +256,8 @@ jobs: ports: - metrics: to: 8080 + env: + otel-collector-endpoint-url: "{{ otel_collector_endpoint_url }}" mev-commit-provider-emulator-nodes: &mev_commit_provider_emulator_nodes_job name: mev-commit-provider-emulator-nodes @@ -290,6 +295,7 @@ jobs: type: bidder tls_crt_file: "{{ tls_crt_file }}" tls_key_file: "{{ tls_key_file }}" + otel-collector-endpoint-url: "{{ otel_collector_endpoint_url }}" mev_commit_bidder_node2: &mev_commit_bidder_node2_job name: mev-commit-bidder-node2 @@ -434,6 +440,7 @@ jobs: to: 8080 env: l1_rpc_url: "{{ l1_rpc_url }}" + otel-collector-endpoint-url: "{{ otel_collector_endpoint_url }}" mev_commit_bidder_emulator_nodes: &mev_commit_bidder_emulator_nodes_job name: mev-commit-bidder-emulator-nodes diff --git a/p2p/go.mod b/p2p/go.mod index d78353174..b40ab525a 100644 --- a/p2p/go.mod +++ b/p2p/go.mod @@ -22,6 +22,7 @@ require ( github.com/stretchr/testify v1.9.0 github.com/urfave/cli/v2 v2.27.1 github.com/vmihailenco/msgpack/v5 v5.4.1 + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.53.0 go.opentelemetry.io/otel v1.28.0 golang.org/x/crypto v0.25.0 golang.org/x/sync v0.7.0 diff --git a/p2p/go.sum b/p2p/go.sum index e5476f676..267bc1903 100644 --- a/p2p/go.sum +++ b/p2p/go.sum @@ -499,6 +499,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.53.0 h1:9G6E0TXzGFVfTnawRzrPl83iHOAV7L8NJiR8RSGYV1g= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.53.0/go.mod h1:azvtTADFQJA8mX80jIH/akaE7h+dbm/sVuaHqN13w74= go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 h1:3Q/xZUyC1BBkualc9ROb4G8qkH90LXEIICcs5zv1OYY= diff --git a/p2p/integrationtest/bidder/main.go b/p2p/integrationtest/bidder/main.go index 2265db30e..c1112350a 100644 --- a/p2p/integrationtest/bidder/main.go +++ b/p2p/integrationtest/bidder/main.go @@ -20,10 +20,14 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethclient" + "github.com/go-logr/logr" pb "github.com/primev/mev-commit/p2p/gen/go/bidderapi/v1" "github.com/primev/mev-commit/x/util" + "github.com/primev/mev-commit/x/util/otelutil" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/otel" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" @@ -56,6 +60,11 @@ var ( "", "Comma-separated list of pairs that will be inserted into each log line", ) + otelCollectorEndpointURL = flag.String( + "otel-collector-endpoint-url", + "", + "URL for OpenTelemetry collector endpoint", + ) httpPort = flag.Int( "http-port", 8080, @@ -111,6 +120,31 @@ func main() { return } + if *otelCollectorEndpointURL != "" { + logger.Info("setting up OpenTelemetry SDK", "endpoint", *otelCollectorEndpointURL) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + shutdown, err := otelutil.SetupOTelSDK( + ctx, + *otelCollectorEndpointURL, + *logTags, + ) + if err != nil { + logger.Warn("failed to setup OpenTelemetry SDK; continuing without telemetry", "error", err) + } else { + otel.SetLogger(logr.FromSlogHandler( + logger.Handler().WithAttrs([]slog.Attr{ + {Key: "component", Value: slog.StringValue("otel")}, + }), + )) + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + err = errors.Join(err, shutdown(ctx)) + cancel() + }() + } + } + registry := prometheus.NewRegistry() registry.MustRegister( receivedPreconfs, @@ -162,6 +196,7 @@ func main() { *serverAddr, grpc.WithBlock(), grpc.WithTransportCredentials(e.credential), + grpc.WithStatsHandler(otelgrpc.NewClientHandler()), ) if err != nil { logger.Error("failed to dial grpc server", "error", err) diff --git a/p2p/integrationtest/provider/client.go b/p2p/integrationtest/provider/client.go index c694c3a48..7692c0b0b 100644 --- a/p2p/integrationtest/provider/client.go +++ b/p2p/integrationtest/provider/client.go @@ -11,6 +11,7 @@ import ( "time" providerapiv1 "github.com/primev/mev-commit/p2p/gen/go/providerapi/v1" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" @@ -55,6 +56,7 @@ func NewProviderClient( serverAddr, grpc.WithBlock(), grpc.WithTransportCredentials(e.credential), + grpc.WithStatsHandler(otelgrpc.NewClientHandler()), ) if err != nil { logger.Error("failed to dial grpc server", "error", err) diff --git a/p2p/integrationtest/provider/main.go b/p2p/integrationtest/provider/main.go index 01a0bba6d..ca0a51628 100644 --- a/p2p/integrationtest/provider/main.go +++ b/p2p/integrationtest/provider/main.go @@ -6,15 +6,19 @@ import ( "errors" "flag" "fmt" + "log/slog" "math/rand" "net/http" "os" "time" + "github.com/go-logr/logr" providerapiv1 "github.com/primev/mev-commit/p2p/gen/go/providerapi/v1" "github.com/primev/mev-commit/x/util" + "github.com/primev/mev-commit/x/util/otelutil" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "go.opentelemetry.io/otel" ) // The following const block contains the name of the cli flags, especially @@ -54,6 +58,11 @@ var ( 8080, "The port to serve the HTTP metrics endpoint on", ) + otelCollectorEndpointURL = flag.String( + "otel-collector-endpoint-url", + "", + "URL for OpenTelemetry collector endpoint", + ) errorProbability = flag.Int( errorProbabilityFlagName, 20, @@ -91,6 +100,31 @@ func main() { return } + if *otelCollectorEndpointURL != "" { + logger.Info("setting up OpenTelemetry SDK", "endpoint", *otelCollectorEndpointURL) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + shutdown, err := otelutil.SetupOTelSDK( + ctx, + *otelCollectorEndpointURL, + *logTags, + ) + if err != nil { + logger.Warn("failed to setup OpenTelemetry SDK; continuing without telemetry", "error", err) + } else { + otel.SetLogger(logr.FromSlogHandler( + logger.Handler().WithAttrs([]slog.Attr{ + {Key: "component", Value: slog.StringValue("otel")}, + }), + )) + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + err = errors.Join(err, shutdown(ctx)) + cancel() + }() + } + } + if *serverAddr == "" { fmt.Printf("please provide a valid server address with the -%s flag\n", serverAddrFlagName) return diff --git a/p2p/integrationtest/real-bidder/main.go b/p2p/integrationtest/real-bidder/main.go index 356a892ca..066a5ce37 100644 --- a/p2p/integrationtest/real-bidder/main.go +++ b/p2p/integrationtest/real-bidder/main.go @@ -18,10 +18,14 @@ import ( "time" "github.com/ethereum/go-ethereum/ethclient" + "github.com/go-logr/logr" pb "github.com/primev/mev-commit/p2p/gen/go/bidderapi/v1" "github.com/primev/mev-commit/x/util" + "github.com/primev/mev-commit/x/util/otelutil" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/otel" "google.golang.org/grpc" "google.golang.org/grpc/credentials" ) @@ -52,6 +56,11 @@ var ( "", "Comma-separated list of pairs that will be inserted into each log line", ) + otelCollectorEndpointURL = flag.String( + "otel-collector-endpoint-url", + "", + "URL for OpenTelemetry collector endpoint", + ) httpPort = flag.Int( "http-port", 8080, @@ -97,6 +106,31 @@ func main() { return } + if *otelCollectorEndpointURL != "" { + logger.Info("setting up OpenTelemetry SDK", "endpoint", *otelCollectorEndpointURL) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + shutdown, err := otelutil.SetupOTelSDK( + ctx, + *otelCollectorEndpointURL, + *logTags, + ) + if err != nil { + logger.Warn("failed to setup OpenTelemetry SDK; continuing without telemetry", "error", err) + } else { + otel.SetLogger(logr.FromSlogHandler( + logger.Handler().WithAttrs([]slog.Attr{ + {Key: "component", Value: slog.StringValue("otel")}, + }), + )) + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + err = errors.Join(err, shutdown(ctx)) + cancel() + }() + } + } + if *serverAddr == "" { fmt.Println("please provide a valid server address with the -serverAddr flag") return @@ -133,6 +167,7 @@ func main() { // thus we do not expect machine-in-the-middle attacks. &tls.Config{InsecureSkipVerify: true}, )), + grpc.WithStatsHandler(otelgrpc.NewClientHandler()), ) if err != nil { logger.Error("failed to connect to server", "err", err) diff --git a/p2p/pkg/node/node.go b/p2p/pkg/node/node.go index edc30695c..a392b3774 100644 --- a/p2p/pkg/node/node.go +++ b/p2p/pkg/node/node.go @@ -61,6 +61,7 @@ import ( "github.com/primev/mev-commit/x/contracts/txmonitor" "github.com/primev/mev-commit/x/health" "github.com/primev/mev-commit/x/keysigner" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" @@ -316,7 +317,10 @@ func NewNode(opts *Options) (*Node, error) { } } - grpcServer := grpc.NewServer(grpc.Creds(tlsCredentials)) + grpcServer := grpc.NewServer( + grpc.Creds(tlsCredentials), + grpc.StatsHandler(otelgrpc.NewServerHandler()), + ) debugService := debugapi.NewService( txnStore, @@ -627,6 +631,7 @@ func NewNode(opts *Options) (*Node, error) { opts.RPCAddr, grpc.WithBlock(), grpc.WithTransportCredentials(e.credential), + grpc.WithStatsHandler(otelgrpc.NewClientHandler()), ) if err != nil { opts.Logger.Error("failed to dial grpc server", "error", err) diff --git a/x/util/otelutil/otel.go b/x/util/otelutil/otel.go index 47a3f4c3b..efcc13baf 100644 --- a/x/util/otelutil/otel.go +++ b/x/util/otelutil/otel.go @@ -91,15 +91,26 @@ func SetupOTelSDK(ctx context.Context, epurl, tags string) (func(context.Context return nil, fmt.Errorf("unsupported scheme %q in: %s", val.Scheme, val) } - exporter, err := otlptrace.New(ctx, client) + exp, err := otlptrace.New(ctx, client) + if err != nil { + return nil, err + } + shutdownFuncs = append(shutdownFuncs, exp.Shutdown) + + res, err := resource.Merge( + resource.Default(), + resource.NewWithAttributes( + semconv.SchemaURL, + args..., + ), + ) if err != nil { return nil, err } - shutdownFuncs = append(shutdownFuncs, exporter.Shutdown) tracerProvider := trace.NewTracerProvider( - trace.WithBatcher(exporter), - trace.WithResource(resource.NewWithAttributes(semconv.SchemaURL, args...)), + trace.WithBatcher(exp), + trace.WithResource(res), ) shutdownFuncs = append(shutdownFuncs, tracerProvider.Shutdown)