Skip to content

Commit

Permalink
Added Kubernetes service resolver to loadbalancing exporters. (#5678)
Browse files Browse the repository at this point in the history
* Added kubernetes service resolver to loadbalancing exporter.

* Mention the need to k8s roles

* Change config attribute from "k8s" to "kubernetes"

---------

Co-authored-by: Clayton Cornell <[email protected]>
  • Loading branch information
ptodev and clayton-cornell authored Nov 14, 2023
1 parent 6301a32 commit 94447c5
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 5 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ v0.37.4 (2023-11-06)
- Fix a bug where reloading the configuration of a `loki.write` component lead
to a panic. (@tpaschalis)

- Added Kubernetes service resolver to static node's loadbalancing exporter
and to Flow's `otelcol.exporter.loadbalancing`. (@ptodev)

v0.37.3 (2023-10-26)
-----------------

Expand Down
33 changes: 31 additions & 2 deletions component/otelcol/exporter/loadbalancing/loadbalancing.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,9 @@ func (otlpConfig OtlpConfig) Convert() otlpexporter.Config {

// ResolverSettings defines the configurations for the backend resolver
type ResolverSettings struct {
Static *StaticResolver `river:"static,block,optional"`
DNS *DNSResolver `river:"dns,block,optional"`
Static *StaticResolver `river:"static,block,optional"`
DNS *DNSResolver `river:"dns,block,optional"`
Kubernetes *KubernetesResolver `river:"kubernetes,block,optional"`
}

func (resolverSettings ResolverSettings) Convert() loadbalancingexporter.ResolverSettings {
Expand All @@ -154,6 +155,11 @@ func (resolverSettings ResolverSettings) Convert() loadbalancingexporter.Resolve
res.DNS = &dnsResolver
}

if resolverSettings.Kubernetes != nil {
kubernetesResolver := resolverSettings.Kubernetes.Convert()
res.K8sSvc = &kubernetesResolver
}

return res
}

Expand Down Expand Up @@ -199,6 +205,29 @@ func (dnsResolver *DNSResolver) Convert() loadbalancingexporter.DNSResolver {
}
}

// KubernetesResolver defines the configuration for the k8s resolver
type KubernetesResolver struct {
Service string `river:"service,attr"`
Ports []int32 `river:"ports,attr,optional"`
}

var _ river.Defaulter = &KubernetesResolver{}

// SetToDefault implements river.Defaulter.
func (args *KubernetesResolver) SetToDefault() {
if args == nil {
args = &KubernetesResolver{}
}
args.Ports = []int32{4317}
}

func (k8sSvcResolver *KubernetesResolver) Convert() loadbalancingexporter.K8sSvcResolver {
return loadbalancingexporter.K8sSvcResolver{
Service: k8sSvcResolver.Service,
Ports: append([]int32{}, k8sSvcResolver.Ports...),
}
}

// Extensions implements exporter.Arguments.
func (args Arguments) Extensions() map[otelcomponent.ID]otelextension.Extension {
return args.Protocol.OTLP.Client.Extensions()
Expand Down
53 changes: 53 additions & 0 deletions component/otelcol/exporter/loadbalancing/loadbalancing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,59 @@ func TestConfigConversion(t *testing.T) {
Protocol: defaultProtocol,
},
},
{
testName: "k8s with defaults",
agentCfg: `
resolver {
kubernetes {
service = "lb-svc.lb-ns"
}
}
protocol {
otlp {
client {}
}
}
`,
expected: loadbalancingexporter.Config{
Resolver: loadbalancingexporter.ResolverSettings{
Static: nil,
K8sSvc: &loadbalancingexporter.K8sSvcResolver{
Service: "lb-svc.lb-ns",
Ports: []int32{4317},
},
},
RoutingKey: "traceID",
Protocol: defaultProtocol,
},
},
{
testName: "k8s with non-defaults",
agentCfg: `
resolver {
kubernetes {
service = "lb-svc.lb-ns"
ports = [55690, 55691]
}
}
protocol {
otlp {
client {}
}
}
`,
expected: loadbalancingexporter.Config{
Resolver: loadbalancingexporter.ResolverSettings{
Static: nil,
K8sSvc: &loadbalancingexporter.K8sSvcResolver{
Service: "lb-svc.lb-ns",
Ports: []int32{55690, 55691},
},
},
RoutingKey: "traceID",
Protocol: defaultProtocol,
},
},
}

for _, tc := range tests {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ Hierarchy | Block | Description | Required
resolver | [resolver][] | Configures discovering the endpoints to export to. | yes
resolver > static | [static][] | Static list of endpoints to export to. | no
resolver > dns | [dns][] | DNS-sourced list of endpoints to export to. | no
resolver > kubernetes | [kubernetes][] | Kubernetes-sourced list of endpoints to export to. | no
protocol | [protocol][] | Protocol settings. Only OTLP is supported at the moment. | no
protocol > otlp | [otlp][] | Configures an OTLP exporter. | no
protocol > otlp > client | [client][] | Configures the exporter gRPC client. | no
Expand All @@ -96,6 +97,7 @@ refers to a `static` block defined inside a `resolver` block.
[resolver]: #resolver-block
[static]: #static-block
[dns]: #dns-block
[kubernetes]: #kubernetes-block
[protocol]: #protocol-block
[otlp]: #otlp-block
[client]: #client-block
Expand Down Expand Up @@ -137,6 +139,26 @@ Name | Type | Description | Default | Required
`timeout` | `duration` | Resolver timeout. | `"1s"` | no
`port` | `string` | Port to be used with the IP addresses resolved from the DNS hostname. | `"4317"` | no

### kubernetes block

You can use the `kubernetes` block to load balance across the pods of a Kubernetes service. The Agent will be notified
by the Kubernetes API whenever a new pod is added or removed from the service.

The following arguments are supported:

Name | Type | Description | Default | Required
---- | ---- | ----------- | ------- | --------
`service` | `string` | Kubernetes service to resolve. | | yes
`ports` | `list(number)` | Ports to use with the IP addresses resolved from `service`. | `[4317]` | no

If no namespace is specified inside `service`, an attempt will be made to infer the namespace for this Agent.
If this fails, the `default` namespace will be used.

Each of the ports listed in `ports` will be used with each of the IPs resolved from `service`.

The "get", "list", and "watch" [roles](https://kubernetes.io/docs/reference/access-authn-authz/rbac/#role-example)
must be granted in Kubernetes for the resolver to work.

### protocol block

The `protocol` block configures protocol-related settings for exporting.
Expand Down
3 changes: 3 additions & 0 deletions docs/sources/static/configuration/traces-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,9 @@ load_balancing:
[ interval: <duration> | default = 5s ]
# Resolver timeout
[ timeout: <duration> | default = 1s ]
kubernetes:
service: <string>
[ ports: <int array> | default = 4317 ]

# routing_key can be either "traceID" or "service":
# * "service": exports spans based on their service name.
Expand Down
7 changes: 5 additions & 2 deletions pkg/traces/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ const (
// defaultLoadBalancingPort is the default port the agent uses for internal load balancing
defaultLoadBalancingPort = "4318"
// agent's load balancing options
dnsTagName = "dns"
staticTagName = "static"
dnsTagName = "dns"
staticTagName = "static"
kubernetesTagName = "kubernetes"

// sampling policies
alwaysSamplePolicy = "always_sample"
Expand Down Expand Up @@ -587,6 +588,8 @@ func resolver(config map[string]interface{}) (map[string]interface{}, error) {
switch typ {
case dnsTagName, staticTagName:
resolverCfg[typ] = cfg
case kubernetesTagName:
resolverCfg["k8s"] = cfg
default:
return nil, fmt.Errorf("unsupported resolver config type: %s", typ)
}
Expand Down
90 changes: 89 additions & 1 deletion pkg/traces/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ service:
`,
},
{
name: "tail sampling config with load balancing",
name: "tail sampling config with DNS load balancing",
cfg: `
receivers:
jaeger:
Expand Down Expand Up @@ -829,6 +829,94 @@ processors:
- value1
- value2
extensions: {}
service:
pipelines:
traces/0:
exporters: ["loadbalancing"]
processors: []
receivers: ["jaeger", "push_receiver"]
traces/1:
exporters: ["otlp/0"]
processors: ["tail_sampling"]
receivers: ["otlp/lb"]
`,
},
{
name: "tail sampling config with Kubernetes load balancing",
cfg: `
receivers:
jaeger:
protocols:
grpc:
remote_write:
- endpoint: example.com:12345
tail_sampling:
policies:
- type: always_sample
- type: string_attribute
string_attribute:
key: key
values:
- value1
- value2
load_balancing:
receiver_port: 8181
routing_key: service
exporter:
insecure: true
resolver:
kubernetes:
service: lb-svc.lb-ns
ports:
- 55690
- 55691
`,
expectedConfig: `
receivers:
jaeger:
protocols:
grpc:
push_receiver: {}
otlp/lb:
protocols:
grpc:
endpoint: "0.0.0.0:8181"
exporters:
otlp/0:
endpoint: example.com:12345
compression: gzip
retry_on_failure:
max_elapsed_time: 60s
loadbalancing:
routing_key: service
protocol:
otlp:
tls:
insecure: true
endpoint: noop
retry_on_failure:
max_elapsed_time: 60s
compression: none
resolver:
k8s:
service: lb-svc.lb-ns
ports:
- 55690
- 55691
processors:
tail_sampling:
decision_wait: 5s
policies:
- name: always_sample/0
type: always_sample
- name: string_attribute/1
type: string_attribute
string_attribute:
key: key
values:
- value1
- value2
extensions: {}
service:
pipelines:
traces/0:
Expand Down

0 comments on commit 94447c5

Please sign in to comment.