Skip to content

Commit

Permalink
disruption incluster: set source pod in requests
Browse files Browse the repository at this point in the history
  • Loading branch information
vrutkovs committed Nov 14, 2024
1 parent ee7e868 commit 0bb1c3c
Show file tree
Hide file tree
Showing 11 changed files with 75 additions and 18 deletions.
5 changes: 4 additions & 1 deletion pkg/cmd/openshift-tests/run-disruption/disruption.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type RunAPIDisruptionMonitorFlags struct {

ArtifactDir string
LoadBalancerType string
Source string
StopConfigMapName string

genericclioptions.IOStreams
Expand Down Expand Up @@ -100,6 +101,7 @@ func NewRunInClusterDisruptionMonitorCommand(ioStreams genericclioptions.IOStrea

func (f *RunAPIDisruptionMonitorFlags) AddFlags(flags *pflag.FlagSet) {
flags.StringVar(&f.LoadBalancerType, "lb-type", f.LoadBalancerType, "Set load balancer type, available options: internal-lb, service-network, external-lb (default)")
flags.StringVar(&f.Source, "source-name", f.Source, "Set source identifier")
flags.StringVar(&f.StopConfigMapName, "stop-configmap", f.StopConfigMapName, "the name of the configmap that indicates that this pod should stop all watchers.")

f.ConfigFlags.AddFlags(flags)
Expand Down Expand Up @@ -164,6 +166,7 @@ type RunAPIDisruptionMonitorOptions struct {
KubeClientConfig *rest.Config
OutputFile string
LoadBalancerType string
Source string
StopConfigMapName string
Namespace string

Expand All @@ -188,7 +191,7 @@ func (o *RunAPIDisruptionMonitorOptions) Run(ctx context.Context) error {
lb := backend.ParseStringToLoadBalancerType(o.LoadBalancerType)

recorder := monitor.WrapWithJSONLRecorder(monitor.NewRecorder(), o.IOStreams.Out, nil)
samplers, err := controlplane.StartAPIMonitoringUsingNewBackend(ctx, recorder, o.KubeClientConfig, o.KubeClient, lb)
samplers, err := controlplane.StartAPIMonitoringUsingNewBackend(ctx, recorder, o.KubeClientConfig, o.KubeClient, lb, o.Source)
if err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/disruption/backend/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ type RequestContextAssociatedData struct {
// ShutdownResponseHeaderParseErr is set if there was an error parsing the
// 'X-OpenShift-Disruption' response header
ShutdownResponseHeaderParseErr error

// Source contains pod name if incluster monitor is used
Source string
}

// GotConnInfo similar to net/http GotConnInfo without the connection object
Expand Down
8 changes: 8 additions & 0 deletions pkg/disruption/backend/reqresp.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (rr RequestResponse) Fields() map[string]interface{} {
fields["protocol"] = rr.Protocol()
fields["roundtrip"] = rr.RoundTripDuration.Round(time.Millisecond)
fields["retry-after"] = rr.RetryAfter()
fields["source-ip"] = rr.SourceIP()
if rr.ShutdownResponse != nil {
for k, v := range rr.ShutdownResponse.Fields() {
fields[k] = v
Expand Down Expand Up @@ -119,3 +120,10 @@ func IsRetryAfter(resp *http.Response) (string, bool) {
func (rr RequestResponse) ShutdownInProgress() bool {
return rr.ShutdownResponse != nil && rr.ShutdownResponse.ShutdownInProgress
}

func (rr RequestResponse) SourceIP() string {
if rr.Request != nil {
return rr.Request.Header.Get("Audit-ID")
}
return "<none>"
}
21 changes: 19 additions & 2 deletions pkg/disruption/backend/roundtripper/roundtripper.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type Config struct {
// HostNameDecoder, if specified, is used to decode the APIServerIdentity
// inside the shutdown response header into the actual human readable hostname.
HostNameDecoder backend.HostNameDecoder

// Source contains pod name if incluster monitor is used
Source string
}

// NewClient returns a new Client instance constructed
Expand All @@ -42,16 +45,17 @@ func NewClient(c Config) (backend.Client, error) {
Transport: c.RT,
Timeout: c.ClientTimeout,
}
return WrapClient(client, c.ClientTimeout, c.UserAgent, c.EnableShutdownResponseHeader, c.HostNameDecoder), nil
return WrapClient(client, c.ClientTimeout, c.UserAgent, c.EnableShutdownResponseHeader, c.HostNameDecoder, c.Source), nil
}

// WrapClient wraps the base http.Client object
func WrapClient(client *http.Client, timeout time.Duration, userAgent string, shutdownResponse bool, decoder backend.HostNameDecoder) backend.Client {
func WrapClient(client *http.Client, timeout time.Duration, userAgent string, shutdownResponse bool, decoder backend.HostNameDecoder, source string) backend.Client {
// This is the preferred order:
// - WithTimeout will set a timeout within which the entire chain should finish
// - WithShutdownResponseHeaderExtractor opts in for shutdown response header
// - WithAuditID attaches an audit ID to the request header
// - WithUserAgent sets the user agent
// - WithSourceIP sets the request source
// - WithGotConnTrace sets the connection trace
// - http.Client.Do executes
// - WithRoundTripLatencyTracking measures the latency of http.Client
Expand All @@ -62,6 +66,7 @@ func WrapClient(client *http.Client, timeout time.Duration, userAgent string, sh
c = WithGotConnTrace(c)
c = WithUserAgent(c, userAgent)
c = WithAuditID(c)
c = WithSource(c, source)
if shutdownResponse {
c = WithShutdownResponseHeaderExtractor(c, decoder)
}
Expand Down Expand Up @@ -201,3 +206,15 @@ func WithTimeout(delegate backend.Client, timeout time.Duration) backend.Client
return delegate.Do(req)
})
}

// WithSource sets pod name for incluster monitor.
func WithSource(delegate backend.Client, source string) backend.Client {
return backend.ClientFunc(func(req *http.Request) (*http.Response, error) {
defer func() {
if data := backend.RequestContextAssociatedDataFrom(req.Context()); data != nil {
data.Source = source
}
}()
return delegate.Do(req)
})
}
2 changes: 1 addition & 1 deletion pkg/disruption/backend/roundtripper/roundtripper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestWrapClient(t *testing.T) {
transport.DisableKeepAlives = false

agent := "my-client"
client := WrapClient(ts.Client(), 0, agent, false, nil)
client := WrapClient(ts.Client(), 0, agent, false, nil, "")

req, err := http.NewRequest(http.MethodGet, ts.URL+"/echo", nil)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/disruption/backend/sampler/producer_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestProducer(t *testing.T) {
transport.DisableKeepAlives = false

wantAgent := "test"
client := roundtripper.WrapClient(ts.Client(), 0, wantAgent, true, nil)
client := roundtripper.WrapClient(ts.Client(), 0, wantAgent, true, nil, "")
var producer sampler.Producer
producer = NewSampleProducerConsumer(client, NewHostPathRequestor(ts.URL, "/echo"), NewResponseChecker(), nil)
info, err := producer.Produce(context.TODO(), 1)
Expand Down
4 changes: 4 additions & 0 deletions pkg/disruption/ci/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ type TestConfiguration struct {
// response header extractor, this should be true only when the
// request(s) are being sent to the kube-apiserver.
EnableShutdownResponseHeader bool

// Source contains pod name if incluster monitor is used
Source string
}

// TestDescriptor defines the disruption test type, the user must
Expand Down Expand Up @@ -177,6 +180,7 @@ func (b *testFactory) New(c TestConfiguration) (Sampler, error) {
UserAgent: c.Name(),
EnableShutdownResponseHeader: c.EnableShutdownResponseHeader,
HostNameDecoder: b.hostNameDecoder,
Source: c.Source,
})
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ spec:
- run-disruption
- --output-file=/var/log/disruption-data/monitor-events/internal-lb-$(DEPLOYMENT_ID).txt
- --lb-type=$(LOAD_BALANCER)
- --source-name=$(POD_NAME)
- --stop-configmap=stop-configmap
env:
- name: KUBERNETES_SERVICE_HOST
Expand All @@ -42,6 +43,10 @@ spec:
- name: DEPLOYMENT_ID
#to be overwritten at deployment initialization time
value: "DEFAULT"
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
image: to-be-replaced
volumeMounts:
- mountPath: /var/log/disruption-data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ spec:
- openshift-tests
- run-disruption
- --output-file=/var/log/disruption-data/monitor-events/localhost-monitor-$(DEPLOYMENT_ID).txt
- --source-name=$(POD_NAME)
- --lb-type=$(LOAD_BALANCER)
- --stop-configmap=stop-configmap
env:
Expand All @@ -40,6 +41,10 @@ spec:
- name: DEPLOYMENT_ID
#to be overwritten at deployment initialization time
value: "DEFAULT"
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
image: to-be-replaced
volumeMounts:
- mountPath: /var/log/disruption-data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ spec:
- openshift-tests
- run-disruption
- --output-file=/var/log/disruption-data/monitor-events/service-network-monitor-$(DEPLOYMENT_ID).txt
- --source-name=$(POD_NAME)
- --lb-type=$(LOAD_BALANCER)
- --stop-configmap=stop-configmap
env:
Expand All @@ -38,6 +39,10 @@ spec:
- name: DEPLOYMENT_ID
#to be overwritten at deployment initialization time
value: "DEFAULT"
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
image: to-be-replaced
volumeMounts:
- mountPath: /var/log/disruption-data
Expand Down
33 changes: 20 additions & 13 deletions test/extended/util/disruption/controlplane/known_backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ func StartAPIMonitoringUsingNewBackend(
recorder monitorapi.Recorder,
clusterConfig *rest.Config,
kubeClient kubernetes.Interface,
lb backend.LoadBalancerType) ([]disruptionci.Sampler, error) {
lb backend.LoadBalancerType,
source string) ([]disruptionci.Sampler, error) {

samplers := []disruptionci.Sampler{}
errs := []error{}
Expand All @@ -34,19 +35,19 @@ func StartAPIMonitoringUsingNewBackend(
}
path := fmt.Sprintf("/api/v1/namespaces/default?resourceVersion=%s", ns.ResourceVersion)

sampler, err := createKubeAPIMonitoringWithNewConnectionsHTTP2(factory, lb, path)
sampler, err := createKubeAPIMonitoringWithNewConnectionsHTTP2(factory, lb, path, source)
samplers = append(samplers, sampler)
errs = append(errs, err)

sampler, err = createKubeAPIMonitoringWithConnectionReuseHTTP2(factory, lb, path)
sampler, err = createKubeAPIMonitoringWithConnectionReuseHTTP2(factory, lb, path, source)
samplers = append(samplers, sampler)
errs = append(errs, err)

sampler, err = createKubeAPIMonitoringWithNewConnectionsHTTP1(factory, lb, path)
sampler, err = createKubeAPIMonitoringWithNewConnectionsHTTP1(factory, lb, path, source)
samplers = append(samplers, sampler)
errs = append(errs, err)

sampler, err = createKubeAPIMonitoringWithConnectionReuseHTTP1(factory, lb, path)
sampler, err = createKubeAPIMonitoringWithConnectionReuseHTTP1(factory, lb, path, source)
samplers = append(samplers, sampler)
errs = append(errs, err)

Expand All @@ -64,11 +65,11 @@ func StartAPIMonitoringUsingNewBackend(
stream := streams.Items[0]
path = fmt.Sprintf("/apis/image.openshift.io/v1/namespaces/openshift/imagestreams/%s?resourceVersion=%s", stream.Name, stream.ResourceVersion)

sampler, err = createOpenShiftAPIMonitoringWithNewConnectionsHTTP2(factory, lb, path)
sampler, err = createOpenShiftAPIMonitoringWithNewConnectionsHTTP2(factory, lb, path, source)
samplers = append(samplers, sampler)
errs = append(errs, err)

sampler, err = createOpenShiftAPIMonitoringWithConnectionReuseHTTP2(factory, lb, path)
sampler, err = createOpenShiftAPIMonitoringWithConnectionReuseHTTP2(factory, lb, path, source)
samplers = append(samplers, sampler)
errs = append(errs, err)

Expand All @@ -84,7 +85,7 @@ func StartAPIMonitoringUsingNewBackend(
return samplers, utilerrors.NewAggregate(errs)
}

func createKubeAPIMonitoringWithNewConnectionsHTTP2(factory disruptionci.Factory, lb backend.LoadBalancerType, path string) (disruptionci.Sampler, error) {
func createKubeAPIMonitoringWithNewConnectionsHTTP2(factory disruptionci.Factory, lb backend.LoadBalancerType, path string, source string) (disruptionci.Sampler, error) {
return factory.New(disruptionci.TestConfiguration{
TestDescriptor: disruptionci.TestDescriptor{
TargetServer: disruptionci.KubeAPIServer,
Expand All @@ -96,10 +97,11 @@ func createKubeAPIMonitoringWithNewConnectionsHTTP2(factory disruptionci.Factory
Timeout: 15 * time.Second,
SampleInterval: time.Second,
EnableShutdownResponseHeader: true,
Source: source,
})
}

func createKubeAPIMonitoringWithConnectionReuseHTTP2(factory disruptionci.Factory, lb backend.LoadBalancerType, path string) (disruptionci.Sampler, error) {
func createKubeAPIMonitoringWithConnectionReuseHTTP2(factory disruptionci.Factory, lb backend.LoadBalancerType, path string, source string) (disruptionci.Sampler, error) {
return factory.New(disruptionci.TestConfiguration{
TestDescriptor: disruptionci.TestDescriptor{
TargetServer: disruptionci.KubeAPIServer,
Expand All @@ -111,10 +113,11 @@ func createKubeAPIMonitoringWithConnectionReuseHTTP2(factory disruptionci.Factor
Timeout: 15 * time.Second,
SampleInterval: time.Second,
EnableShutdownResponseHeader: true,
Source: source,
})
}

func createKubeAPIMonitoringWithNewConnectionsHTTP1(factory disruptionci.Factory, lb backend.LoadBalancerType, path string) (disruptionci.Sampler, error) {
func createKubeAPIMonitoringWithNewConnectionsHTTP1(factory disruptionci.Factory, lb backend.LoadBalancerType, path string, source string) (disruptionci.Sampler, error) {
return factory.New(disruptionci.TestConfiguration{
TestDescriptor: disruptionci.TestDescriptor{
TargetServer: disruptionci.KubeAPIServer,
Expand All @@ -126,10 +129,11 @@ func createKubeAPIMonitoringWithNewConnectionsHTTP1(factory disruptionci.Factory
Timeout: 15 * time.Second,
SampleInterval: time.Second,
EnableShutdownResponseHeader: true,
Source: source,
})
}

func createKubeAPIMonitoringWithConnectionReuseHTTP1(factory disruptionci.Factory, lb backend.LoadBalancerType, path string) (disruptionci.Sampler, error) {
func createKubeAPIMonitoringWithConnectionReuseHTTP1(factory disruptionci.Factory, lb backend.LoadBalancerType, path string, source string) (disruptionci.Sampler, error) {
return factory.New(disruptionci.TestConfiguration{
TestDescriptor: disruptionci.TestDescriptor{
TargetServer: disruptionci.KubeAPIServer,
Expand All @@ -141,10 +145,11 @@ func createKubeAPIMonitoringWithConnectionReuseHTTP1(factory disruptionci.Factor
Timeout: 15 * time.Second,
SampleInterval: time.Second,
EnableShutdownResponseHeader: true,
Source: source,
})
}

func createOpenShiftAPIMonitoringWithNewConnectionsHTTP2(factory disruptionci.Factory, lb backend.LoadBalancerType, path string) (disruptionci.Sampler, error) {
func createOpenShiftAPIMonitoringWithNewConnectionsHTTP2(factory disruptionci.Factory, lb backend.LoadBalancerType, path string, source string) (disruptionci.Sampler, error) {
return factory.New(disruptionci.TestConfiguration{
TestDescriptor: disruptionci.TestDescriptor{
TargetServer: disruptionci.OpenShiftAPIServer,
Expand All @@ -156,10 +161,11 @@ func createOpenShiftAPIMonitoringWithNewConnectionsHTTP2(factory disruptionci.Fa
Timeout: 15 * time.Second,
SampleInterval: time.Second,
EnableShutdownResponseHeader: true,
Source: source,
})
}

func createOpenShiftAPIMonitoringWithConnectionReuseHTTP2(factory disruptionci.Factory, lb backend.LoadBalancerType, path string) (disruptionci.Sampler, error) {
func createOpenShiftAPIMonitoringWithConnectionReuseHTTP2(factory disruptionci.Factory, lb backend.LoadBalancerType, path string, source string) (disruptionci.Sampler, error) {
return factory.New(disruptionci.TestConfiguration{
TestDescriptor: disruptionci.TestDescriptor{
TargetServer: disruptionci.OpenShiftAPIServer,
Expand All @@ -171,5 +177,6 @@ func createOpenShiftAPIMonitoringWithConnectionReuseHTTP2(factory disruptionci.F
Timeout: 15 * time.Second,
SampleInterval: time.Second,
EnableShutdownResponseHeader: true,
Source: source,
})
}

0 comments on commit 0bb1c3c

Please sign in to comment.