Skip to content

Commit

Permalink
feat(cluster): initial commit for scale-out cluster
Browse files Browse the repository at this point in the history
Signed-off-by: Ramkumar Chinchani <[email protected]>
  • Loading branch information
rchincha committed Nov 13, 2023
1 parent b2a9239 commit d1d910f
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 1 deletion.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.21.6
github.com/aws/aws-secretsmanager-caching-go v1.1.2
github.com/containers/image/v5 v5.28.0
github.com/dchest/siphash v1.2.3
github.com/google/go-github/v52 v52.0.0
github.com/gorilla/handlers v1.5.2
github.com/gorilla/securecookie v1.1.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dchest/siphash v1.2.3 h1:QXwFc8cFOR2dSa/gE6o/HokBMWtLUaNDVd+22aKHeEA=
github.com/dchest/siphash v1.2.3/go.mod h1:0NvQU092bT0ipiFN++/rXm69QG9tVxLAlQHIXMPAkHc=
github.com/decred/dcrd/crypto/blake256 v1.0.1/go.mod h1:2OfgNZ5wDpcsFmHmCK5gZTPcCXqlm2ArzUIkw9czNJo=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 h1:8UrgZ3GkP4i/CLijOJx79Yu+etlyjdBU4sfcs2WYQMs=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0=
Expand Down
4 changes: 3 additions & 1 deletion pkg/api/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

distspec "github.com/opencontainers/distribution-spec/specs-go"

"zotregistry.io/zot/pkg/cluster"
extconf "zotregistry.io/zot/pkg/extensions/config"
storageConstants "zotregistry.io/zot/pkg/storage/constants"
)
Expand Down Expand Up @@ -203,7 +204,8 @@ type Config struct {
HTTP HTTPConfig
Log *LogConfig
Extensions *extconf.ExtensionConfig
Scheduler *SchedulerConfig `json:"scheduler" mapstructure:",omitempty"`
Scheduler *SchedulerConfig `json:"scheduler" mapstructure:",omitempty"`
Cluster *cluster.ClusterConfig `json:"cluster" mapstructure:",omitempty"`
}

func New() *Config {
Expand Down
7 changes: 7 additions & 0 deletions pkg/api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ func (c *Controller) Run(reloadCtx context.Context) error {
engine.Use(SessionAuditLogger(c.Audit))
}

/*
if c.Cluster != nil {
engine.Use(ProxyCluster)
}
*/

c.Router = engine
c.Router.UseEncodedPath()

Expand Down
92 changes: 92 additions & 0 deletions pkg/cluster/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package cluster

import (
"fmt"
"net/http"

"github.com/dchest/siphash"

"zotregistry.io/zot/pkg/api"
"zotregistry.io/zot/pkg/api/constants"
zreg "zotregistry.io/zot/pkg/regexp"
)

type ProxyRouteHandler struct {
c *api.Controller
}

func NewRouteHandler(c *api.Controller) *RouteHandler {
rh := &ProxyRouteHandler{c: c}
rh.SetupRoutes()

// FIXME: this is a scale-out load balancer cluster so doesn't do replicas

return rh
}

func (rh *ProxyRouteHandler) SetupRoutes() {
prefixedRouter := rh.c.Router.PathPrefix(constants.RoutePrefix).Subrouter()
prefixedDistSpecRouter := prefixedRouter.NewRoute().Subrouter()

prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/", zreg.NameRegexp.String()), proxyRequestResponse(rh.c.Config)())
}

func proxyRequestResponse(config rh.c.Config) func(http.HandlerFunc) http.HandlerFunc {
return func(next http.HandlerFunc) http.HandlerFunc {
return http.HandlerFunc(func(response http.ResponseWriter, request *http.Request) {
// if no cluster or single-node cluster, handle locally
if config.Cluster == nil || len(config.Cluster.Members) {
next.ServeHTTP(response, request)
}

vars := mux.Vars(request)

name, ok := vars["name"]

if !ok || name == "" {
response.WriteHeader(http.StatusNotFound)

return
}

h := siphash.New(key)
h.Write([]byte(name)
sum64 := h.Sum64(nil)

member := config.Cluster.Members[sum64%len(config.Cluster.Members)]
/*
if member == localMember {
next.ServeHTTP(response, request)
}
*/
handleHTTP(response, request)
})
}
}

type ClusterConfig {
Members string[]
HashKey string
}

func handleHTTP(w http.ResponseWriter, req *http.Request) {
resp, err := http.DefaultTransport.RoundTrip(req)
if err != nil {
http.Error(w, err.Error(), http.StatusServiceUnavailable)
return
}
defer resp.Body.Close()
copyHeader(w.Header(), resp.Header)
w.WriteHeader(resp.StatusCode)
io.Copy(w, resp.Body)
}

func copyHeader(dst, src http.Header) {
for k, vv := range src {
for _, v := range vv {
dst.Add(k, v)
}
}
}

0 comments on commit d1d910f

Please sign in to comment.