diff --git a/CHANGELOG.md b/CHANGELOG.md index 5beddbba6d2c..8063214858a4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -156,6 +156,9 @@ v0.37.4 (2023-11-06) - Fix a bug where reloading the configuration of a `loki.write` component lead to a panic. (@tpaschalis) +- Added Kubernetes service resolver to static node's loadbalancing exporter + and to Flow's `otelcol.exporter.loadbalancing`. (@ptodev) + v0.37.3 (2023-10-26) ----------------- diff --git a/component/otelcol/exporter/loadbalancing/loadbalancing.go b/component/otelcol/exporter/loadbalancing/loadbalancing.go index 5fee871a7f03..3a8c2f87fdd0 100644 --- a/component/otelcol/exporter/loadbalancing/loadbalancing.go +++ b/component/otelcol/exporter/loadbalancing/loadbalancing.go @@ -137,8 +137,9 @@ func (otlpConfig OtlpConfig) Convert() otlpexporter.Config { // ResolverSettings defines the configurations for the backend resolver type ResolverSettings struct { - Static *StaticResolver `river:"static,block,optional"` - DNS *DNSResolver `river:"dns,block,optional"` + Static *StaticResolver `river:"static,block,optional"` + DNS *DNSResolver `river:"dns,block,optional"` + Kubernetes *KubernetesResolver `river:"kubernetes,block,optional"` } func (resolverSettings ResolverSettings) Convert() loadbalancingexporter.ResolverSettings { @@ -154,6 +155,11 @@ func (resolverSettings ResolverSettings) Convert() loadbalancingexporter.Resolve res.DNS = &dnsResolver } + if resolverSettings.Kubernetes != nil { + kubernetesResolver := resolverSettings.Kubernetes.Convert() + res.K8sSvc = &kubernetesResolver + } + return res } @@ -199,6 +205,29 @@ func (dnsResolver *DNSResolver) Convert() loadbalancingexporter.DNSResolver { } } +// KubernetesResolver defines the configuration for the k8s resolver +type KubernetesResolver struct { + Service string `river:"service,attr"` + Ports []int32 `river:"ports,attr,optional"` +} + +var _ river.Defaulter = &KubernetesResolver{} + +// SetToDefault implements river.Defaulter. +func (args *KubernetesResolver) SetToDefault() { + if args == nil { + args = &KubernetesResolver{} + } + args.Ports = []int32{4317} +} + +func (k8sSvcResolver *KubernetesResolver) Convert() loadbalancingexporter.K8sSvcResolver { + return loadbalancingexporter.K8sSvcResolver{ + Service: k8sSvcResolver.Service, + Ports: append([]int32{}, k8sSvcResolver.Ports...), + } +} + // Extensions implements exporter.Arguments. func (args Arguments) Extensions() map[otelcomponent.ID]otelextension.Extension { return args.Protocol.OTLP.Client.Extensions() diff --git a/component/otelcol/exporter/loadbalancing/loadbalancing_test.go b/component/otelcol/exporter/loadbalancing/loadbalancing_test.go index a5e0851fef3d..5e528dd373a3 100644 --- a/component/otelcol/exporter/loadbalancing/loadbalancing_test.go +++ b/component/otelcol/exporter/loadbalancing/loadbalancing_test.go @@ -203,6 +203,59 @@ func TestConfigConversion(t *testing.T) { Protocol: defaultProtocol, }, }, + { + testName: "k8s with defaults", + agentCfg: ` + resolver { + kubernetes { + service = "lb-svc.lb-ns" + } + } + protocol { + otlp { + client {} + } + } + `, + expected: loadbalancingexporter.Config{ + Resolver: loadbalancingexporter.ResolverSettings{ + Static: nil, + K8sSvc: &loadbalancingexporter.K8sSvcResolver{ + Service: "lb-svc.lb-ns", + Ports: []int32{4317}, + }, + }, + RoutingKey: "traceID", + Protocol: defaultProtocol, + }, + }, + { + testName: "k8s with non-defaults", + agentCfg: ` + resolver { + kubernetes { + service = "lb-svc.lb-ns" + ports = [55690, 55691] + } + } + protocol { + otlp { + client {} + } + } + `, + expected: loadbalancingexporter.Config{ + Resolver: loadbalancingexporter.ResolverSettings{ + Static: nil, + K8sSvc: &loadbalancingexporter.K8sSvcResolver{ + Service: "lb-svc.lb-ns", + Ports: []int32{55690, 55691}, + }, + }, + RoutingKey: "traceID", + Protocol: defaultProtocol, + }, + }, } for _, tc := range tests { diff --git a/docs/sources/flow/reference/components/otelcol.exporter.loadbalancing.md b/docs/sources/flow/reference/components/otelcol.exporter.loadbalancing.md index 5e42e1d318cf..f6951d828cf8 100644 --- a/docs/sources/flow/reference/components/otelcol.exporter.loadbalancing.md +++ b/docs/sources/flow/reference/components/otelcol.exporter.loadbalancing.md @@ -81,6 +81,7 @@ Hierarchy | Block | Description | Required resolver | [resolver][] | Configures discovering the endpoints to export to. | yes resolver > static | [static][] | Static list of endpoints to export to. | no resolver > dns | [dns][] | DNS-sourced list of endpoints to export to. | no +resolver > kubernetes | [kubernetes][] | Kubernetes-sourced list of endpoints to export to. | no protocol | [protocol][] | Protocol settings. Only OTLP is supported at the moment. | no protocol > otlp | [otlp][] | Configures an OTLP exporter. | no protocol > otlp > client | [client][] | Configures the exporter gRPC client. | no @@ -96,6 +97,7 @@ refers to a `static` block defined inside a `resolver` block. [resolver]: #resolver-block [static]: #static-block [dns]: #dns-block +[kubernetes]: #kubernetes-block [protocol]: #protocol-block [otlp]: #otlp-block [client]: #client-block @@ -137,6 +139,26 @@ Name | Type | Description | Default | Required `timeout` | `duration` | Resolver timeout. | `"1s"` | no `port` | `string` | Port to be used with the IP addresses resolved from the DNS hostname. | `"4317"` | no +### kubernetes block + +You can use the `kubernetes` block to load balance across the pods of a Kubernetes service. The Agent will be notified +by the Kubernetes API whenever a new pod is added or removed from the service. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`service` | `string` | Kubernetes service to resolve. | | yes +`ports` | `list(number)` | Ports to use with the IP addresses resolved from `service`. | `[4317]` | no + +If no namespace is specified inside `service`, an attempt will be made to infer the namespace for this Agent. +If this fails, the `default` namespace will be used. + +Each of the ports listed in `ports` will be used with each of the IPs resolved from `service`. + +The "get", "list", and "watch" [roles](https://kubernetes.io/docs/reference/access-authn-authz/rbac/#role-example) +must be granted in Kubernetes for the resolver to work. + ### protocol block The `protocol` block configures protocol-related settings for exporting. diff --git a/docs/sources/static/configuration/traces-config.md b/docs/sources/static/configuration/traces-config.md index b8c822f28bf8..57a2a724ea58 100644 --- a/docs/sources/static/configuration/traces-config.md +++ b/docs/sources/static/configuration/traces-config.md @@ -332,6 +332,9 @@ load_balancing: [ interval: | default = 5s ] # Resolver timeout [ timeout: | default = 1s ] + kubernetes: + service: + [ ports: | default = 4317 ] # routing_key can be either "traceID" or "service": # * "service": exports spans based on their service name. diff --git a/pkg/traces/config.go b/pkg/traces/config.go index be9adf1f56a9..69d7792a599f 100644 --- a/pkg/traces/config.go +++ b/pkg/traces/config.go @@ -64,8 +64,9 @@ const ( // defaultLoadBalancingPort is the default port the agent uses for internal load balancing defaultLoadBalancingPort = "4318" // agent's load balancing options - dnsTagName = "dns" - staticTagName = "static" + dnsTagName = "dns" + staticTagName = "static" + kubernetesTagName = "kubernetes" // sampling policies alwaysSamplePolicy = "always_sample" @@ -587,6 +588,8 @@ func resolver(config map[string]interface{}) (map[string]interface{}, error) { switch typ { case dnsTagName, staticTagName: resolverCfg[typ] = cfg + case kubernetesTagName: + resolverCfg["k8s"] = cfg default: return nil, fmt.Errorf("unsupported resolver config type: %s", typ) } diff --git a/pkg/traces/config_test.go b/pkg/traces/config_test.go index 80edb01b82f5..9ee9e0b32abf 100644 --- a/pkg/traces/config_test.go +++ b/pkg/traces/config_test.go @@ -754,7 +754,7 @@ service: `, }, { - name: "tail sampling config with load balancing", + name: "tail sampling config with DNS load balancing", cfg: ` receivers: jaeger: @@ -829,6 +829,94 @@ processors: - value1 - value2 extensions: {} +service: + pipelines: + traces/0: + exporters: ["loadbalancing"] + processors: [] + receivers: ["jaeger", "push_receiver"] + traces/1: + exporters: ["otlp/0"] + processors: ["tail_sampling"] + receivers: ["otlp/lb"] +`, + }, + { + name: "tail sampling config with Kubernetes load balancing", + cfg: ` +receivers: + jaeger: + protocols: + grpc: +remote_write: + - endpoint: example.com:12345 +tail_sampling: + policies: + - type: always_sample + - type: string_attribute + string_attribute: + key: key + values: + - value1 + - value2 +load_balancing: + receiver_port: 8181 + routing_key: service + exporter: + insecure: true + resolver: + kubernetes: + service: lb-svc.lb-ns + ports: + - 55690 + - 55691 +`, + expectedConfig: ` +receivers: + jaeger: + protocols: + grpc: + push_receiver: {} + otlp/lb: + protocols: + grpc: + endpoint: "0.0.0.0:8181" +exporters: + otlp/0: + endpoint: example.com:12345 + compression: gzip + retry_on_failure: + max_elapsed_time: 60s + loadbalancing: + routing_key: service + protocol: + otlp: + tls: + insecure: true + endpoint: noop + retry_on_failure: + max_elapsed_time: 60s + compression: none + resolver: + k8s: + service: lb-svc.lb-ns + ports: + - 55690 + - 55691 +processors: + tail_sampling: + decision_wait: 5s + policies: + - name: always_sample/0 + type: always_sample + - name: string_attribute/1 + type: string_attribute + string_attribute: + key: key + values: + - value1 + - value2 +extensions: {} service: pipelines: traces/0: