diff --git a/pkg/cmd/openshift-tests/run-disruption/disruption.go b/pkg/cmd/openshift-tests/run-disruption/disruption.go index 245a64b2282b..c0b40890f5b8 100644 --- a/pkg/cmd/openshift-tests/run-disruption/disruption.go +++ b/pkg/cmd/openshift-tests/run-disruption/disruption.go @@ -36,6 +36,7 @@ type RunAPIDisruptionMonitorFlags struct { ArtifactDir string LoadBalancerType string + Source string StopConfigMapName string genericclioptions.IOStreams @@ -100,6 +101,7 @@ func NewRunInClusterDisruptionMonitorCommand(ioStreams genericclioptions.IOStrea func (f *RunAPIDisruptionMonitorFlags) AddFlags(flags *pflag.FlagSet) { flags.StringVar(&f.LoadBalancerType, "lb-type", f.LoadBalancerType, "Set load balancer type, available options: internal-lb, service-network, external-lb (default)") + flags.StringVar(&f.Source, "source-name", f.Source, "Set source identifier") flags.StringVar(&f.StopConfigMapName, "stop-configmap", f.StopConfigMapName, "the name of the configmap that indicates that this pod should stop all watchers.") f.ConfigFlags.AddFlags(flags) @@ -164,6 +166,7 @@ type RunAPIDisruptionMonitorOptions struct { KubeClientConfig *rest.Config OutputFile string LoadBalancerType string + Source string StopConfigMapName string Namespace string @@ -188,7 +191,7 @@ func (o *RunAPIDisruptionMonitorOptions) Run(ctx context.Context) error { lb := backend.ParseStringToLoadBalancerType(o.LoadBalancerType) recorder := monitor.WrapWithJSONLRecorder(monitor.NewRecorder(), o.IOStreams.Out, nil) - samplers, err := controlplane.StartAPIMonitoringUsingNewBackend(ctx, recorder, o.KubeClientConfig, o.KubeClient, lb) + samplers, err := controlplane.StartAPIMonitoringUsingNewBackend(ctx, recorder, o.KubeClientConfig, o.KubeClient, lb, o.Source) if err != nil { return err } diff --git a/pkg/disruption/backend/context.go b/pkg/disruption/backend/context.go index 207c118938eb..e2a510028056 100644 --- a/pkg/disruption/backend/context.go +++ b/pkg/disruption/backend/context.go @@ -55,6 +55,9 @@ type RequestContextAssociatedData struct { // ShutdownResponseHeaderParseErr is set if there was an error parsing the // 'X-OpenShift-Disruption' response header ShutdownResponseHeaderParseErr error + + // Source contains pod name if incluster monitor is used + Source string } // GotConnInfo similar to net/http GotConnInfo without the connection object diff --git a/pkg/disruption/backend/reqresp.go b/pkg/disruption/backend/reqresp.go index 8b773ba0273b..d461b2cf9ac3 100644 --- a/pkg/disruption/backend/reqresp.go +++ b/pkg/disruption/backend/reqresp.go @@ -24,8 +24,8 @@ type RequestResponse struct { } func (rr RequestResponse) String() string { - s := fmt.Sprintf("audit-id=%s conn-reused=%s status-code=%s protocol=%s roundtrip=%s retry-after=%s", - rr.GetAuditID(), rr.ConnectionReused(), rr.StatusCode(), rr.Protocol(), rr.RoundTripDuration.Round(time.Millisecond), rr.RetryAfter()) + s := fmt.Sprintf("audit-id=%s conn-reused=%s status-code=%s protocol=%s roundtrip=%s retry-after=%s source=%s", + rr.GetAuditID(), rr.ConnectionReused(), rr.StatusCode(), rr.Protocol(), rr.RoundTripDuration.Round(time.Millisecond), rr.RetryAfter(), rr.Source) if rr.ShutdownResponse != nil { s = fmt.Sprintf("%s %s", s, rr.ShutdownResponse.String()) } @@ -40,6 +40,7 @@ func (rr RequestResponse) Fields() map[string]interface{} { fields["protocol"] = rr.Protocol() fields["roundtrip"] = rr.RoundTripDuration.Round(time.Millisecond) fields["retry-after"] = rr.RetryAfter() + fields["source"] = rr.Source if rr.ShutdownResponse != nil { for k, v := range rr.ShutdownResponse.Fields() { fields[k] = v diff --git a/pkg/disruption/backend/roundtripper/roundtripper.go b/pkg/disruption/backend/roundtripper/roundtripper.go index bc58c912877b..4469373b7eab 100644 --- a/pkg/disruption/backend/roundtripper/roundtripper.go +++ b/pkg/disruption/backend/roundtripper/roundtripper.go @@ -29,6 +29,9 @@ type Config struct { // HostNameDecoder, if specified, is used to decode the APIServerIdentity // inside the shutdown response header into the actual human readable hostname. HostNameDecoder backend.HostNameDecoder + + // Source contains pod name if incluster monitor is used + Source string } // NewClient returns a new Client instance constructed @@ -42,16 +45,17 @@ func NewClient(c Config) (backend.Client, error) { Transport: c.RT, Timeout: c.ClientTimeout, } - return WrapClient(client, c.ClientTimeout, c.UserAgent, c.EnableShutdownResponseHeader, c.HostNameDecoder), nil + return WrapClient(client, c.ClientTimeout, c.UserAgent, c.EnableShutdownResponseHeader, c.HostNameDecoder, c.Source), nil } // WrapClient wraps the base http.Client object -func WrapClient(client *http.Client, timeout time.Duration, userAgent string, shutdownResponse bool, decoder backend.HostNameDecoder) backend.Client { +func WrapClient(client *http.Client, timeout time.Duration, userAgent string, shutdownResponse bool, decoder backend.HostNameDecoder, source string) backend.Client { // This is the preferred order: // - WithTimeout will set a timeout within which the entire chain should finish // - WithShutdownResponseHeaderExtractor opts in for shutdown response header // - WithAuditID attaches an audit ID to the request header // - WithUserAgent sets the user agent + // - WithSource sets the request source // - WithGotConnTrace sets the connection trace // - http.Client.Do executes // - WithRoundTripLatencyTracking measures the latency of http.Client @@ -60,6 +64,7 @@ func WrapClient(client *http.Client, timeout time.Duration, userAgent string, sh c := WithRoundTripLatencyTracking(client) c = WithResponseBodyReader(c) c = WithGotConnTrace(c) + c = WithSource(c, source) c = WithUserAgent(c, userAgent) c = WithAuditID(c) if shutdownResponse { @@ -201,3 +206,15 @@ func WithTimeout(delegate backend.Client, timeout time.Duration) backend.Client return delegate.Do(req) }) } + +// WithSource sets pod name for incluster monitor. +func WithSource(delegate backend.Client, source string) backend.Client { + return backend.ClientFunc(func(req *http.Request) (*http.Response, error) { + defer func() { + if data := backend.RequestContextAssociatedDataFrom(req.Context()); data != nil { + data.Source = source + } + }() + return delegate.Do(req) + }) +} diff --git a/pkg/disruption/backend/roundtripper/roundtripper_test.go b/pkg/disruption/backend/roundtripper/roundtripper_test.go index 91f08a4675a9..3b23438ed623 100644 --- a/pkg/disruption/backend/roundtripper/roundtripper_test.go +++ b/pkg/disruption/backend/roundtripper/roundtripper_test.go @@ -26,7 +26,7 @@ func TestWrapClient(t *testing.T) { transport.DisableKeepAlives = false agent := "my-client" - client := WrapClient(ts.Client(), 0, agent, false, nil) + client := WrapClient(ts.Client(), 0, agent, false, nil, "") req, err := http.NewRequest(http.MethodGet, ts.URL+"/echo", nil) if err != nil { diff --git a/pkg/disruption/backend/sampler/producer_consumer_test.go b/pkg/disruption/backend/sampler/producer_consumer_test.go index 8c072ed2d30a..992111a4ef14 100644 --- a/pkg/disruption/backend/sampler/producer_consumer_test.go +++ b/pkg/disruption/backend/sampler/producer_consumer_test.go @@ -27,7 +27,7 @@ func TestProducer(t *testing.T) { transport.DisableKeepAlives = false wantAgent := "test" - client := roundtripper.WrapClient(ts.Client(), 0, wantAgent, true, nil) + client := roundtripper.WrapClient(ts.Client(), 0, wantAgent, true, nil, "") var producer sampler.Producer producer = NewSampleProducerConsumer(client, NewHostPathRequestor(ts.URL, "/echo"), NewResponseChecker(), nil) info, err := producer.Produce(context.TODO(), 1) diff --git a/pkg/disruption/ci/factory.go b/pkg/disruption/ci/factory.go index f6c67b1c6adf..03c40374bc7a 100644 --- a/pkg/disruption/ci/factory.go +++ b/pkg/disruption/ci/factory.go @@ -69,6 +69,9 @@ type TestConfiguration struct { // response header extractor, this should be true only when the // request(s) are being sent to the kube-apiserver. EnableShutdownResponseHeader bool + + // Source contains pod name if incluster monitor is used + Source string } // TestDescriptor defines the disruption test type, the user must @@ -177,6 +180,7 @@ func (b *testFactory) New(c TestConfiguration) (Sampler, error) { UserAgent: c.Name(), EnableShutdownResponseHeader: c.EnableShutdownResponseHeader, HostNameDecoder: b.hostNameDecoder, + Source: c.Source, }) if err != nil { return nil, err diff --git a/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/dep-internal-lb.yaml b/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/dep-internal-lb.yaml index d9a62bbbf49e..88726062b97f 100644 --- a/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/dep-internal-lb.yaml +++ b/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/dep-internal-lb.yaml @@ -31,6 +31,7 @@ spec: - run-disruption - --output-file=/var/log/disruption-data/monitor-events/internal-lb-$(DEPLOYMENT_ID).txt - --lb-type=$(LOAD_BALANCER) + - --source-name=$(POD_NAME) - --stop-configmap=stop-configmap env: - name: KUBERNETES_SERVICE_HOST @@ -42,6 +43,10 @@ spec: - name: DEPLOYMENT_ID #to be overwritten at deployment initialization time value: "DEFAULT" + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name image: to-be-replaced volumeMounts: - mountPath: /var/log/disruption-data diff --git a/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/dep-localhost.yaml b/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/dep-localhost.yaml index b6f8b778fa5f..229b6cbc0d2e 100644 --- a/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/dep-localhost.yaml +++ b/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/dep-localhost.yaml @@ -30,6 +30,7 @@ spec: - openshift-tests - run-disruption - --output-file=/var/log/disruption-data/monitor-events/localhost-monitor-$(DEPLOYMENT_ID).txt + - --source-name=$(POD_NAME) - --lb-type=$(LOAD_BALANCER) - --stop-configmap=stop-configmap env: @@ -40,6 +41,10 @@ spec: - name: DEPLOYMENT_ID #to be overwritten at deployment initialization time value: "DEFAULT" + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name image: to-be-replaced volumeMounts: - mountPath: /var/log/disruption-data diff --git a/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/dep-service-network.yaml b/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/dep-service-network.yaml index 7cead915de01..06188262c2ae 100644 --- a/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/dep-service-network.yaml +++ b/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/dep-service-network.yaml @@ -30,6 +30,7 @@ spec: - openshift-tests - run-disruption - --output-file=/var/log/disruption-data/monitor-events/service-network-monitor-$(DEPLOYMENT_ID).txt + - --source-name=$(POD_NAME) - --lb-type=$(LOAD_BALANCER) - --stop-configmap=stop-configmap env: @@ -38,6 +39,10 @@ spec: - name: DEPLOYMENT_ID #to be overwritten at deployment initialization time value: "DEFAULT" + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name image: to-be-replaced volumeMounts: - mountPath: /var/log/disruption-data diff --git a/test/extended/util/disruption/controlplane/known_backends.go b/test/extended/util/disruption/controlplane/known_backends.go index 57e2262e540e..8f1a5b6bb166 100644 --- a/test/extended/util/disruption/controlplane/known_backends.go +++ b/test/extended/util/disruption/controlplane/known_backends.go @@ -22,7 +22,8 @@ func StartAPIMonitoringUsingNewBackend( recorder monitorapi.Recorder, clusterConfig *rest.Config, kubeClient kubernetes.Interface, - lb backend.LoadBalancerType) ([]disruptionci.Sampler, error) { + lb backend.LoadBalancerType, + source string) ([]disruptionci.Sampler, error) { samplers := []disruptionci.Sampler{} errs := []error{} @@ -34,19 +35,19 @@ func StartAPIMonitoringUsingNewBackend( } path := fmt.Sprintf("/api/v1/namespaces/default?resourceVersion=%s", ns.ResourceVersion) - sampler, err := createKubeAPIMonitoringWithNewConnectionsHTTP2(factory, lb, path) + sampler, err := createKubeAPIMonitoringWithNewConnectionsHTTP2(factory, lb, path, source) samplers = append(samplers, sampler) errs = append(errs, err) - sampler, err = createKubeAPIMonitoringWithConnectionReuseHTTP2(factory, lb, path) + sampler, err = createKubeAPIMonitoringWithConnectionReuseHTTP2(factory, lb, path, source) samplers = append(samplers, sampler) errs = append(errs, err) - sampler, err = createKubeAPIMonitoringWithNewConnectionsHTTP1(factory, lb, path) + sampler, err = createKubeAPIMonitoringWithNewConnectionsHTTP1(factory, lb, path, source) samplers = append(samplers, sampler) errs = append(errs, err) - sampler, err = createKubeAPIMonitoringWithConnectionReuseHTTP1(factory, lb, path) + sampler, err = createKubeAPIMonitoringWithConnectionReuseHTTP1(factory, lb, path, source) samplers = append(samplers, sampler) errs = append(errs, err) @@ -64,11 +65,11 @@ func StartAPIMonitoringUsingNewBackend( stream := streams.Items[0] path = fmt.Sprintf("/apis/image.openshift.io/v1/namespaces/openshift/imagestreams/%s?resourceVersion=%s", stream.Name, stream.ResourceVersion) - sampler, err = createOpenShiftAPIMonitoringWithNewConnectionsHTTP2(factory, lb, path) + sampler, err = createOpenShiftAPIMonitoringWithNewConnectionsHTTP2(factory, lb, path, source) samplers = append(samplers, sampler) errs = append(errs, err) - sampler, err = createOpenShiftAPIMonitoringWithConnectionReuseHTTP2(factory, lb, path) + sampler, err = createOpenShiftAPIMonitoringWithConnectionReuseHTTP2(factory, lb, path, source) samplers = append(samplers, sampler) errs = append(errs, err) @@ -84,7 +85,7 @@ func StartAPIMonitoringUsingNewBackend( return samplers, utilerrors.NewAggregate(errs) } -func createKubeAPIMonitoringWithNewConnectionsHTTP2(factory disruptionci.Factory, lb backend.LoadBalancerType, path string) (disruptionci.Sampler, error) { +func createKubeAPIMonitoringWithNewConnectionsHTTP2(factory disruptionci.Factory, lb backend.LoadBalancerType, path string, source string) (disruptionci.Sampler, error) { return factory.New(disruptionci.TestConfiguration{ TestDescriptor: disruptionci.TestDescriptor{ TargetServer: disruptionci.KubeAPIServer, @@ -96,10 +97,11 @@ func createKubeAPIMonitoringWithNewConnectionsHTTP2(factory disruptionci.Factory Timeout: 15 * time.Second, SampleInterval: time.Second, EnableShutdownResponseHeader: true, + Source: source, }) } -func createKubeAPIMonitoringWithConnectionReuseHTTP2(factory disruptionci.Factory, lb backend.LoadBalancerType, path string) (disruptionci.Sampler, error) { +func createKubeAPIMonitoringWithConnectionReuseHTTP2(factory disruptionci.Factory, lb backend.LoadBalancerType, path string, source string) (disruptionci.Sampler, error) { return factory.New(disruptionci.TestConfiguration{ TestDescriptor: disruptionci.TestDescriptor{ TargetServer: disruptionci.KubeAPIServer, @@ -111,10 +113,11 @@ func createKubeAPIMonitoringWithConnectionReuseHTTP2(factory disruptionci.Factor Timeout: 15 * time.Second, SampleInterval: time.Second, EnableShutdownResponseHeader: true, + Source: source, }) } -func createKubeAPIMonitoringWithNewConnectionsHTTP1(factory disruptionci.Factory, lb backend.LoadBalancerType, path string) (disruptionci.Sampler, error) { +func createKubeAPIMonitoringWithNewConnectionsHTTP1(factory disruptionci.Factory, lb backend.LoadBalancerType, path string, source string) (disruptionci.Sampler, error) { return factory.New(disruptionci.TestConfiguration{ TestDescriptor: disruptionci.TestDescriptor{ TargetServer: disruptionci.KubeAPIServer, @@ -126,10 +129,11 @@ func createKubeAPIMonitoringWithNewConnectionsHTTP1(factory disruptionci.Factory Timeout: 15 * time.Second, SampleInterval: time.Second, EnableShutdownResponseHeader: true, + Source: source, }) } -func createKubeAPIMonitoringWithConnectionReuseHTTP1(factory disruptionci.Factory, lb backend.LoadBalancerType, path string) (disruptionci.Sampler, error) { +func createKubeAPIMonitoringWithConnectionReuseHTTP1(factory disruptionci.Factory, lb backend.LoadBalancerType, path string, source string) (disruptionci.Sampler, error) { return factory.New(disruptionci.TestConfiguration{ TestDescriptor: disruptionci.TestDescriptor{ TargetServer: disruptionci.KubeAPIServer, @@ -141,10 +145,11 @@ func createKubeAPIMonitoringWithConnectionReuseHTTP1(factory disruptionci.Factor Timeout: 15 * time.Second, SampleInterval: time.Second, EnableShutdownResponseHeader: true, + Source: source, }) } -func createOpenShiftAPIMonitoringWithNewConnectionsHTTP2(factory disruptionci.Factory, lb backend.LoadBalancerType, path string) (disruptionci.Sampler, error) { +func createOpenShiftAPIMonitoringWithNewConnectionsHTTP2(factory disruptionci.Factory, lb backend.LoadBalancerType, path string, source string) (disruptionci.Sampler, error) { return factory.New(disruptionci.TestConfiguration{ TestDescriptor: disruptionci.TestDescriptor{ TargetServer: disruptionci.OpenShiftAPIServer, @@ -156,10 +161,11 @@ func createOpenShiftAPIMonitoringWithNewConnectionsHTTP2(factory disruptionci.Fa Timeout: 15 * time.Second, SampleInterval: time.Second, EnableShutdownResponseHeader: true, + Source: source, }) } -func createOpenShiftAPIMonitoringWithConnectionReuseHTTP2(factory disruptionci.Factory, lb backend.LoadBalancerType, path string) (disruptionci.Sampler, error) { +func createOpenShiftAPIMonitoringWithConnectionReuseHTTP2(factory disruptionci.Factory, lb backend.LoadBalancerType, path string, source string) (disruptionci.Sampler, error) { return factory.New(disruptionci.TestConfiguration{ TestDescriptor: disruptionci.TestDescriptor{ TargetServer: disruptionci.OpenShiftAPIServer, @@ -171,5 +177,6 @@ func createOpenShiftAPIMonitoringWithConnectionReuseHTTP2(factory disruptionci.F Timeout: 15 * time.Second, SampleInterval: time.Second, EnableShutdownResponseHeader: true, + Source: source, }) }