diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler.go index 7cafef4446cd6..8e236a7505c7c 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler.go +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler.go @@ -82,12 +82,6 @@ type peerProxyHandler struct { finishedSync atomic.Bool } -type serviceableByResponse struct { - locallyServiceable bool - errorFetchingAddressFromLease bool - peerEndpoints []string -} - // responder implements rest.Responder for assisting a connector in writing objects or errors. type responder struct { w http.ResponseWriter @@ -149,84 +143,97 @@ func (h *peerProxyHandler) WrapHandler(handler http.Handler) http.Handler { gvr.Group = "core" } - // find servers that are capable of serving this request - serviceableByResp, err := h.findServiceableByServers(gvr) + apiservers, err := h.findServiceableByServers(gvr) if err != nil { - // this means that resource is an aggregated API or a CR since it wasn't found in SV informer cache, pass as it is - handler.ServeHTTP(w, r) - return - } - // found the gvr locally, pass request to the next handler in local apiserver - if serviceableByResp.locallyServiceable { + // resource wasn't found in SV informer cache which means that resource is an aggregated API + // or a CR. This situation is ok to be handled by local handler. handler.ServeHTTP(w, r) return } - gv := schema.GroupVersion{Group: gvr.Group, Version: gvr.Version} - if serviceableByResp.errorFetchingAddressFromLease { - klog.ErrorS(err, "error fetching ip and port of remote server while proxying") + locallyServiceable, peerEndpoints, err := h.resolveServingLocation(apiservers) + if err != nil { + gv := schema.GroupVersion{Group: gvr.Group, Version: gvr.Version} + klog.ErrorS(err, "error finding serviceable-by apiservers for the requested resource", "gvr", gvr) responsewriters.ErrorNegotiated(apierrors.NewServiceUnavailable("Error getting ip and port info of the remote server while proxying"), h.serializer, gv, w, r) return } - // no apiservers were found that could serve the request, pass request to - // next handler, that should eventually serve 404 - + // pass request to the next handler if found the gvr locally. // TODO: maintain locally serviceable GVRs somewhere so that we dont have to // consult the storageversion-informed map for those - if len(serviceableByResp.peerEndpoints) == 0 { + if locallyServiceable { + handler.ServeHTTP(w, r) + return + } + + if len(peerEndpoints) == 0 { klog.Errorf("gvr %v is not served by anything in this cluster", gvr) handler.ServeHTTP(w, r) return } // otherwise, randomly select an apiserver and proxy request to it - rand := rand.Intn(len(serviceableByResp.peerEndpoints)) - destServerHostPort := serviceableByResp.peerEndpoints[rand] + rand := rand.Intn(len(peerEndpoints)) + destServerHostPort := peerEndpoints[rand] h.proxyRequestToDestinationAPIServer(r, w, destServerHostPort) - }) } -func (h *peerProxyHandler) findServiceableByServers(gvr schema.GroupVersionResource) (serviceableByResponse, error) { - +func (h *peerProxyHandler) findServiceableByServers(gvr schema.GroupVersionResource) (*sync.Map, error) { apiserversi, ok := h.svMap.Load(gvr) - - // no value found for the requested gvr in svMap if !ok || apiserversi == nil { - return serviceableByResponse{}, fmt.Errorf("no StorageVersions found for the GVR: %v", gvr) + return nil, fmt.Errorf("no storageVersions found for the GVR: %v", gvr) } - apiservers := apiserversi.(*sync.Map) - response := serviceableByResponse{} + + apiservers, _ := apiserversi.(*sync.Map) + return apiservers, nil +} + +func (h *peerProxyHandler) resolveServingLocation(apiservers *sync.Map) (bool, []string, error) { var peerServerEndpoints []string + var locallyServiceable bool + var respErr error + apiservers.Range(func(key, value interface{}) bool { apiserverKey := key.(string) if apiserverKey == h.serverId { - response.locallyServiceable = true + locallyServiceable = true // stop iteration return false } - hostPort, err := h.reconciler.GetEndpoint(apiserverKey) + hostPort, err := h.hostportInfo(apiserverKey) if err != nil { - response.errorFetchingAddressFromLease = true - klog.ErrorS(err, "failed to get peer ip from storage lease for server", "serverID", apiserverKey) + respErr = err // continue with iteration return true } - // check ip format - _, _, err = net.SplitHostPort(hostPort) - if err != nil { - response.errorFetchingAddressFromLease = true - klog.ErrorS(err, "invalid address found for server", "serverID", apiserverKey) - return true - } + peerServerEndpoints = append(peerServerEndpoints, hostPort) return true }) - response.peerEndpoints = peerServerEndpoints - return response, nil + // reset err if there was atleast one valid peer server found. + if len(peerServerEndpoints) > 0 { + respErr = nil + } + + return locallyServiceable, peerServerEndpoints, respErr +} + +func (h *peerProxyHandler) hostportInfo(apiserverKey string) (string, error) { + hostport, err := h.reconciler.GetEndpoint(apiserverKey) + if err != nil { + return "", err + } + // check ip format + _, _, err = net.SplitHostPort(hostport) + if err != nil { + return "", err + } + + return hostport, nil } func (h *peerProxyHandler) proxyRequestToDestinationAPIServer(req *http.Request, rw http.ResponseWriter, host string) { @@ -248,13 +255,11 @@ func (h *peerProxyHandler) proxyRequestToDestinationAPIServer(req *http.Request, defer cancelFn() proxyRoundTripper := transport.NewAuthProxyRoundTripper(user.GetName(), user.GetUID(), user.GetGroups(), user.GetExtra(), h.proxyTransport) - delegate := &epmetrics.ResponseWriterDelegator{ResponseWriter: rw} w := responsewriter.WrapForHTTP1Or2(delegate) handler := proxy.NewUpgradeAwareHandler(location, proxyRoundTripper, true, false, &responder{w: w, ctx: req.Context()}) handler.ServeHTTP(w, newReq) - // Increment the count of proxied requests metrics.IncPeerProxiedRequest(req.Context(), strconv.Itoa(delegate.Status())) } @@ -280,11 +285,13 @@ func (h *peerProxyHandler) updateSV(oldObj interface{}, newObj interface{}) { klog.Error("Invalid StorageVersion provided to updateSV()") return } + newSV, ok := newObj.(*v1alpha1.StorageVersion) if !ok { klog.Error("Invalid StorageVersion provided to updateSV()") return } + h.updateSVMap(oldSV, newSV) } @@ -295,17 +302,17 @@ func (h *peerProxyHandler) deleteSV(obj interface{}) { klog.Error("Invalid StorageVersion provided to deleteSV()") return } + h.updateSVMap(sv, nil) } // Delete old storageversion, add new storagversion func (h *peerProxyHandler) updateSVMap(oldSV *v1alpha1.StorageVersion, newSV *v1alpha1.StorageVersion) { if oldSV != nil { - // delete old SV entries h.deleteSVFromMap(oldSV) } + if newSV != nil { - // add new SV entries h.addSVToMap(newSV) } } diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler_test.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler_test.go index 22784168627ce..f9c9aec9e26d0 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler_test.go @@ -54,19 +54,23 @@ import ( const ( requestTimeout = 30 * time.Second - localServerId = "local-apiserver" - remoteServerId = "remote-apiserver" + localServerID = "local-apiserver" + remoteServerID = "remote-apiserver" ) type FakeSVMapData struct { - gvr schema.GroupVersionResource - serverId string + gvr schema.GroupVersionResource + serverIDs []string } -type reconciler struct { - do bool +type server struct { publicIP string - serverId string + serverID string +} + +type reconciler struct { + do bool + servers []server } func TestPeerProxy(t *testing.T) { @@ -116,7 +120,7 @@ func TestPeerProxy(t *testing.T) { Group: "core", Version: "bar", Resource: "baz"}, - serverId: ""}, + serverIDs: []string{}}, }, { desc: "503 if no endpoint fetched from lease", @@ -128,7 +132,7 @@ func TestPeerProxy(t *testing.T) { Group: "core", Version: "foo", Resource: "bar"}, - serverId: remoteServerId}, + serverIDs: []string{remoteServerID}}, }, { desc: "200 if locally serviceable", @@ -140,7 +144,7 @@ func TestPeerProxy(t *testing.T) { Group: "core", Version: "foo", Resource: "bar"}, - serverId: localServerId}, + serverIDs: []string{localServerID}}, }, { desc: "503 unreachable peer bind address", @@ -152,11 +156,15 @@ func TestPeerProxy(t *testing.T) { Group: "core", Version: "foo", Resource: "bar"}, - serverId: remoteServerId}, + serverIDs: []string{remoteServerID}}, reconcilerConfig: reconciler{ - do: true, - publicIP: "1.2.3.4", - serverId: remoteServerId, + do: true, + servers: []server{ + { + publicIP: "1.2.3.4", + serverID: remoteServerID, + }, + }, }, metrics: []string{ "apiserver_rerouted_request_total", @@ -177,11 +185,15 @@ func TestPeerProxy(t *testing.T) { Group: "core", Version: "foo", Resource: "bar"}, - serverId: remoteServerId}, + serverIDs: []string{remoteServerID}}, reconcilerConfig: reconciler{ - do: true, - publicIP: "1.2.3.4", - serverId: remoteServerId, + do: true, + servers: []server{ + { + publicIP: "1.2.3.4", + serverID: remoteServerID, + }, + }, }, metrics: []string{ "apiserver_rerouted_request_total", @@ -192,6 +204,52 @@ func TestPeerProxy(t *testing.T) { apiserver_rerouted_request_total{code="503"} 2 `, }, + { + desc: "503 if one apiserver's endpoint lease wasnt found but another valid (unreachable) apiserver was found", + requestPath: "/api/foo/bar", + expectedStatus: http.StatusServiceUnavailable, + informerFinishedSync: true, + svdata: FakeSVMapData{ + gvr: schema.GroupVersionResource{ + Group: "core", + Version: "foo", + Resource: "bar"}, + serverIDs: []string{"aggregated-apiserver", remoteServerID}}, + reconcilerConfig: reconciler{ + do: true, + servers: []server{ + { + publicIP: "1.2.3.4", + serverID: remoteServerID, + }, + }, + }, + }, + { + desc: "503 if all peers had invalid host:port info", + requestPath: "/api/foo/bar", + expectedStatus: http.StatusServiceUnavailable, + informerFinishedSync: true, + svdata: FakeSVMapData{ + gvr: schema.GroupVersionResource{ + Group: "core", + Version: "foo", + Resource: "bar"}, + serverIDs: []string{"aggregated-apiserver", remoteServerID}}, + reconcilerConfig: reconciler{ + do: true, + servers: []server{ + { + publicIP: "1[2.4", + serverID: "aggregated-apiserver", + }, + { + publicIP: "2.4]6", + serverID: remoteServerID, + }, + }, + }, + }, } metrics.Register() @@ -210,10 +268,15 @@ func TestPeerProxy(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true) featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true) - reconciler.UpdateLease(tt.reconcilerConfig.serverId, - tt.reconcilerConfig.publicIP, - []corev1.EndpointPort{{Name: "foo", - Port: 8080, Protocol: "TCP"}}) + for _, server := range tt.reconcilerConfig.servers { + err := reconciler.UpdateLease(server.serverID, + server.publicIP, + []corev1.EndpointPort{{Name: "foo", + Port: 8080, Protocol: "TCP"}}) + if err != nil { + t.Fatalf("failed to update peer endpoint lease - %v", err) + } + } } req, err := http.NewRequest(http.MethodGet, server.URL+tt.requestPath, nil) @@ -261,7 +324,7 @@ func newFakePeerEndpointReconciler(t *testing.T) reconcilers.PeerEndpointLeaseRe func newHandlerChain(t *testing.T, handler http.Handler, reconciler reconcilers.PeerEndpointLeaseReconciler, informerFinishedSync bool, svdata FakeSVMapData) http.Handler { // Add peerproxy handler s := serializer.NewCodecFactory(runtime.NewScheme()).WithoutConversion() - peerProxyHandler, err := newFakePeerProxyHandler(informerFinishedSync, reconciler, svdata, localServerId, s) + peerProxyHandler, err := newFakePeerProxyHandler(reconciler, svdata, localServerID, s) if err != nil { t.Fatalf("Error creating peer proxy handler: %v", err) } @@ -277,7 +340,7 @@ func newHandlerChain(t *testing.T, handler http.Handler, reconciler reconcilers. return handler } -func newFakePeerProxyHandler(informerFinishedSync bool, reconciler reconcilers.PeerEndpointLeaseReconciler, svdata FakeSVMapData, id string, s runtime.NegotiatedSerializer) (*peerProxyHandler, error) { +func newFakePeerProxyHandler(reconciler reconcilers.PeerEndpointLeaseReconciler, svdata FakeSVMapData, id string, s runtime.NegotiatedSerializer) (*peerProxyHandler, error) { clientset := fake.NewSimpleClientset() informerFactory := informers.NewSharedInformerFactory(clientset, 0) clientConfig := &transport.Config{ @@ -290,16 +353,18 @@ func newFakePeerProxyHandler(informerFinishedSync bool, reconciler reconcilers.P } ppI := NewPeerProxyHandler(informerFactory, storageversion.NewDefaultManager(), proxyRoundTripper, id, reconciler, s) if testDataExists(svdata.gvr) { - ppI.addToStorageVersionMap(svdata.gvr, svdata.serverId) + ppI.addToStorageVersionMap(svdata.gvr, svdata.serverIDs) } return ppI, nil } -func (h *peerProxyHandler) addToStorageVersionMap(gvr schema.GroupVersionResource, serverId string) { +func (h *peerProxyHandler) addToStorageVersionMap(gvr schema.GroupVersionResource, serverIDs []string) { apiserversi, _ := h.svMap.LoadOrStore(gvr, &sync.Map{}) apiservers := apiserversi.(*sync.Map) - if serverId != "" { - apiservers.Store(serverId, true) + for _, serverID := range serverIDs { + if serverID != "" { + apiservers.Store(serverID, true) + } } }