From 0e471fe85781cce3a65e7ce740c1934183b5648f Mon Sep 17 00:00:00 2001 From: Andrey Smirnov Date: Fri, 21 Apr 2023 22:48:43 +0400 Subject: [PATCH] feat: implement distributed image store This is PoC/experiment. Each Talos node runs `registryd` component which acts both as a registry and a fan-out service. For local requests, `registryd` serves manifests/blobs from the containerd content storage. For incoming requests, `registryd` fans out requests to other nodes (cluster members), finding the first one which has the content. I had to disable content store deduplication, as otherwise containerd drops original layers immediately. One not fully solved question is how to inject `registryd`, what I did in my testing is to inject it as the endpoint in the registry mirror scheme, so if `registryd` has nothing, `containerd` falls back to "upstream" registry/mirror. There needs some work to be done to support it for `*` redirects. There is unresolved issues with images protected by authorization. At the moment `registryd` never resolves tags (defers it to the upstream registry), but still it might deliver images without pull secrets given the proper digest. How to secure `registryd` from access outside of the cluster? Signed-off-by: Andrey Smirnov --- hack/cri-plugin.part | 1 - .../v1alpha1/v1alpha1_sequencer_tasks.go | 1 + .../machined/pkg/system/services/registryd.go | 479 ++++++++++++++++++ internal/pkg/containers/image/image.go | 3 +- 4 files changed, 481 insertions(+), 3 deletions(-) create mode 100644 internal/app/machined/pkg/system/services/registryd.go diff --git a/hack/cri-plugin.part b/hack/cri-plugin.part index fbfe254110..8315e625e9 100644 --- a/hack/cri-plugin.part +++ b/hack/cri-plugin.part @@ -2,4 +2,3 @@ version = 2 [plugins."io.containerd.grpc.v1.cri".containerd.runtimes.runc] runtime_type = "io.containerd.runc.v2" - discard_unpacked_layers = true diff --git a/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer_tasks.go b/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer_tasks.go index 1b577b8536..6d67282882 100644 --- a/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer_tasks.go +++ b/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer_tasks.go @@ -872,6 +872,7 @@ func StartAllServices(runtime.Sequence, any) (runtime.TaskExecutionFunc, string) serviceList := []system.Service{ &services.CRI{}, + &services.Registryd{}, } switch t := r.Config().Machine().Type(); t { diff --git a/internal/app/machined/pkg/system/services/registryd.go b/internal/app/machined/pkg/system/services/registryd.go new file mode 100644 index 0000000000..5452b118b7 --- /dev/null +++ b/internal/app/machined/pkg/system/services/registryd.go @@ -0,0 +1,479 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package services + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "net/http/httputil" + "net/netip" + "net/url" + "path" + "strconv" + "strings" + "time" + + containerdapi "github.com/containerd/containerd" + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/namespaces" + criconstants "github.com/containerd/containerd/pkg/cri/constants" + "github.com/cosi-project/runtime/pkg/resource" + "github.com/cosi-project/runtime/pkg/safe" + "github.com/cosi-project/runtime/pkg/state" + "github.com/docker/distribution/reference" + "github.com/docker/docker/errdefs" + "github.com/hashicorp/go-cleanhttp" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/siderolabs/gen/channel" + "github.com/siderolabs/gen/slices" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + "github.com/siderolabs/talos/internal/app/machined/pkg/runtime" + "github.com/siderolabs/talos/internal/app/machined/pkg/system" + "github.com/siderolabs/talos/internal/app/machined/pkg/system/events" + "github.com/siderolabs/talos/internal/app/machined/pkg/system/health" + "github.com/siderolabs/talos/internal/app/machined/pkg/system/runner" + "github.com/siderolabs/talos/internal/app/machined/pkg/system/runner/goroutine" + "github.com/siderolabs/talos/pkg/conditions" + "github.com/siderolabs/talos/pkg/logging" + "github.com/siderolabs/talos/pkg/machinery/constants" + "github.com/siderolabs/talos/pkg/machinery/resources/cluster" +) + +type registrydService struct { + logger *zap.Logger + resources state.State + client *containerdapi.Client + httpClient *http.Client +} + +// Main is an entrypoint to the API service. +func (s *registrydService) Main(ctx context.Context, r runtime.Runtime, logWriter io.Writer) error { + s.logger = logging.ZapLogger( + logging.NewLogDestination(logWriter, zapcore.DebugLevel, logging.WithColoredLevels()), + ) + s.resources = r.State().V1Alpha2().Resources() + s.httpClient = cleanhttp.DefaultPooledClient() + + var err error + + s.client, err = containerdapi.New(constants.CRIContainerdAddress) + if err != nil { + return err + } + //nolint:errcheck + defer s.client.Close() + + server := http.Server{ + Addr: ":3172", + Handler: s, + } + + go func() { + server.ListenAndServe() //nolint:errcheck + }() + + <-ctx.Done() + + shutdownCtx, shutdownCtxCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer shutdownCtxCancel() + + return server.Shutdown(shutdownCtx) +} + +func (s *registrydService) ServeHTTP(w http.ResponseWriter, req *http.Request) { + isProxied := req.Header.Get("X-Talos-Registry-Proxy") == "true" + + logger := s.logger.With( + zap.String("method", req.Method), + zap.String("url", req.URL.String()), + zap.Bool("proxied", isProxied), + zap.String("remote_addr", req.RemoteAddr), + ) + + logger.Info("got request") + + switch req.Method { + case http.MethodGet, http.MethodHead: + // accepted + default: + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + + return + } + + requestPath := path.Clean(req.URL.Path) + + // quickly respond to health check + switch requestPath { + case "/v2", "/healthz": + w.WriteHeader(http.StatusOK) + + return + } + + registry := req.URL.Query().Get("ns") + if registry == "" { + logger.Error("no registry specified") + w.WriteHeader(http.StatusNotFound) + + return + } + + parts := strings.Split(requestPath, "/") + if len(parts) < 5 { + logger.Error("wrong paths count") + w.WriteHeader(http.StatusNotFound) + + return + } + + l := len(parts) + + var ( + name, digest string + isBlob bool + ) + + switch { + case parts[1] == "v2" && parts[l-2] == "manifests": + name = strings.Join(parts[2:l-2], "/") + digest = parts[l-1] + case parts[1] == "v2" && parts[l-2] == "blobs": + name = strings.Join(parts[2:l-2], "/") + digest = parts[l-1] + + isBlob = true + default: + logger.Error("wrong path") + w.WriteHeader(http.StatusNotFound) + + return + } + + if !reference.NameRegexp.MatchString(name) { + w.WriteHeader(http.StatusBadRequest) + + return + } + + if !reference.DigestRegexp.MatchString(digest) { + w.WriteHeader(http.StatusNotFound) + + return + } + + logger.Info("image request", zap.String("name", name), zap.String("digest", digest), zap.Bool("is_blob", isBlob), zap.String("registry", registry)) + + ref, err := reference.Parse(fmt.Sprintf("%s/%s@%s", registry, name, digest)) + if err != nil { + s.logger.Error("failed to parse reference", zap.Error(err)) + w.WriteHeader(http.StatusInternalServerError) + + return + } + + canonical, ok := ref.(reference.Canonical) + if !ok { + logger.Error("not a canonical reference") + w.WriteHeader(http.StatusInternalServerError) + + return + } + + logger.Info("canonical reference", zap.String("canonical", canonical.String())) + + if isProxied { + s.handleLocal(logger.With(zap.String("handler", "local")), w, req, canonical, isBlob) + } else { + s.handleFanout(logger.With(zap.String("handler", "fanout")), w, req, canonical, isBlob) + } +} + +func (s *registrydService) handleLocal(logger *zap.Logger, w http.ResponseWriter, req *http.Request, canonical reference.Canonical, isBlob bool) { + var ( + ctx context.Context + info content.Info + err error + ) + + for _, namespace := range []string{constants.SystemContainerdNamespace, criconstants.K8sContainerdNamespace} { + ctx = namespaces.WithNamespace(req.Context(), namespace) + + info, err = s.client.ContentStore().Info(ctx, canonical.Digest()) + if err != nil { + if errdefs.IsNotFound(err) { + continue + } + + logger.Error("failed to get content info", zap.Error(err)) + w.WriteHeader(http.StatusInternalServerError) + + return + } + + break + } + + if err != nil { + logger.Error("failed to find content info", zap.Error(err)) + w.WriteHeader(http.StatusNotFound) + + return + } + + w.Header().Set("Content-Length", strconv.FormatInt(info.Size, 10)) + w.Header().Set("Docker-Content-Digest", canonical.Digest().String()) + + // for manifestBlob requests, we need to set the content type and read the manifestBlob immediately + var manifestBlob []byte + + if !isBlob { + manifestBlob, err = content.ReadBlob(ctx, s.client.ContentStore(), ocispec.Descriptor{Digest: canonical.Digest()}) + if err != nil { + logger.Error("failed to read content blob", zap.Error(err)) + w.WriteHeader(http.StatusInternalServerError) + + return + } + + var manifest struct { + MediaType string `json:"mediaType"` + } + + if err = json.Unmarshal(manifestBlob, &manifest); err != nil { + logger.Error("failed to unmarshal manifest", zap.Error(err)) + w.WriteHeader(http.StatusInternalServerError) + + return + } + + if manifest.MediaType == "" { + logger.Error("failed to get media type", zap.Error(err)) + w.WriteHeader(http.StatusInternalServerError) + + return + } + + w.Header().Set("Content-Type", manifest.MediaType) + } + + logger.Info("response headers set", zap.Stringer("digest", canonical.Digest()), zap.Int64("size", info.Size)) + + if req.Method == http.MethodHead { + // all done + w.WriteHeader(http.StatusOK) + + return + } + + if !isBlob { + w.WriteHeader(http.StatusOK) + w.Write(manifestBlob) //nolint:errcheck + + logger.Info("manifest sent") + + return + } + + reader, err := s.client.ContentStore().ReaderAt(ctx, ocispec.Descriptor{Digest: info.Digest}) + if err != nil { + logger.Error("failed to get content reader", zap.Error(err)) + w.WriteHeader(http.StatusInternalServerError) + + return + } + defer reader.Close() //nolint:errcheck + + io.Copy(w, content.NewReader(reader)) //nolint:errcheck + + logger.Info("stream sent") +} + +func (s *registrydService) handleFanout(logger *zap.Logger, w http.ResponseWriter, req *http.Request, canonical reference.Canonical, isBlob bool) { + ctx := req.Context() + + members, err := safe.StateList[*cluster.Member](ctx, s.resources, resource.NewMetadata(cluster.NamespaceName, cluster.MemberType, "", resource.VersionUndefined)) + if err != nil { + logger.Error("failed to list members", zap.Error(err)) + w.WriteHeader(http.StatusInternalServerError) + + return + } + + var endpoints []string + + for iter := safe.IteratorFromList(members); iter.Next(); { + endpoints = append(endpoints, slices.Map(iter.Value().TypedSpec().Addresses, func(addr netip.Addr) string { + return net.JoinHostPort(addr.String(), "3172") + })...) + } + + if len(endpoints) == 0 { + logger.Error("no endpoints found") + w.WriteHeader(http.StatusNotFound) + + return + } + + logger.Info("fan out", zap.Strings("endpoints", endpoints), zap.String("canonical", canonical.String()), zap.Bool("is_blob", isBlob)) + + fanoutCtx, fanoutCancel := context.WithTimeout(ctx, 15*time.Second) + defer fanoutCancel() + + result := make(chan string) + + for _, endpoint := range endpoints { + go func(endpoint string) { + channel.SendWithContext(fanoutCtx, result, func() string { + fanoutURL := url.URL{ + Scheme: "http", + Host: endpoint, + Path: req.URL.Path, + RawQuery: req.URL.RawQuery, + } + + fanoutReq, err := http.NewRequestWithContext(fanoutCtx, http.MethodHead, fanoutURL.String(), nil) + if err != nil { + logger.Error("failed to create fanout request", zap.Error(err), zap.String("endpoint", endpoint)) + + return "" + } + + fanoutReq.Header.Set("X-Talos-Registry-Proxy", "true") + + resp, err := s.httpClient.Do(fanoutReq) + if err != nil { + logger.Error("failed to fanout request", zap.Error(err), zap.String("endpoint", endpoint)) + + return "" + } + + if resp.Body != nil { + resp.Body.Close() //nolint:errcheck + } + + if resp.StatusCode != http.StatusOK { + logger.Error("fanout request failed", zap.Int("status", resp.StatusCode), zap.String("endpoint", endpoint)) + + return "" + } + + logger.Info("fanout request succeeded", zap.String("endpoint", endpoint)) + + return endpoint + }()) + }(endpoint) + } + + var ( + goodEndpoint string + responses int + ) + +collectLoop: + for { + select { + case <-fanoutCtx.Done(): + logger.Error("fanout timed out") + w.WriteHeader(http.StatusNotFound) + + return + case endpoint := <-result: + logger.Info("fanout response", zap.String("endpoint", endpoint)) + responses++ + + if endpoint != "" { + goodEndpoint = endpoint + + fanoutCancel() + + break collectLoop + } + + if responses == len(endpoints) { + logger.Error("no good endpoints found") + w.WriteHeader(http.StatusNotFound) + + return + } + } + } + + logger.Info("good endpoint", zap.String("endpoint", goodEndpoint)) + + if req.Method == http.MethodHead { + // all done + w.WriteHeader(http.StatusOK) + + return + } + + // we have a good endpoint, proxy the request + req.Header.Set("X-Talos-Registry-Proxy", "true") + + proxy := httputil.NewSingleHostReverseProxy(&url.URL{Scheme: "http", Host: goodEndpoint}) + proxy.Transport = s.httpClient.Transport + + proxy.ServeHTTP(w, req) +} + +var _ system.HealthcheckedService = (*Registryd)(nil) + +// Registryd implements the Service interface. It serves as the concrete type with +// the required methods. +type Registryd struct { + Controller runtime.Controller +} + +// ID implements the Service interface. +func (m *Registryd) ID(r runtime.Runtime) string { + return "registryd" +} + +// PreFunc implements the Service interface. +func (m *Registryd) PreFunc(ctx context.Context, r runtime.Runtime) error { + return nil +} + +// PostFunc implements the Service interface. +func (m *Registryd) PostFunc(r runtime.Runtime, state events.ServiceState) (err error) { + return nil +} + +// Condition implements the Service interface. +func (m *Registryd) Condition(r runtime.Runtime) conditions.Condition { + return nil +} + +// DependsOn implements the Service interface. +func (m *Registryd) DependsOn(r runtime.Runtime) []string { + return []string{"cri"} +} + +// Runner implements the Service interface. +func (m *Registryd) Runner(r runtime.Runtime) (runner.Runner, error) { + svc := ®istrydService{} + + return goroutine.NewRunner(r, "registryd", svc.Main, runner.WithLoggingManager(r.Logging())), nil +} + +// HealthFunc implements the HealthcheckedService interface. +func (m *Registryd) HealthFunc(runtime.Runtime) health.Check { + return func(ctx context.Context) error { + // TODO: implement me + return nil + } +} + +// HealthSettings implements the HealthcheckedService interface. +func (m *Registryd) HealthSettings(runtime.Runtime) *health.Settings { + return &health.DefaultSettings +} diff --git a/internal/pkg/containers/image/image.go b/internal/pkg/containers/image/image.go index 630ab3dc1e..33e1caffd1 100644 --- a/internal/pkg/containers/image/image.go +++ b/internal/pkg/containers/image/image.go @@ -12,7 +12,6 @@ import ( "github.com/containerd/containerd" "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/images" "github.com/containerd/containerd/pkg/kmutex" "github.com/siderolabs/go-retry/retry" @@ -80,7 +79,7 @@ func Pull(ctx context.Context, reg config.Registries, client *containerd.Client, ref, containerd.WithPullUnpack, containerd.WithResolver(resolver), - containerd.WithChildLabelMap(images.ChildGCLabelsFilterLayers), + //containerd.WithChildLabelMap(images.ChildGCLabelsFilterLayers), containerd.WithUnpackOpts( []containerd.UnpackOpt{ containerd.WithUnpackDuplicationSuppressor(unpackDuplicationSuppressor),