diff --git a/cmd/lakefs/cmd/run.go b/cmd/lakefs/cmd/run.go index ff497cef1f3..d76664c2bca 100644 --- a/cmd/lakefs/cmd/run.go +++ b/cmd/lakefs/cmd/run.go @@ -298,6 +298,7 @@ var runCmd = &cobra.Command{ s3FallbackURL, cfg.Logging.AuditLogLevel, cfg.Logging.TraceRequestHeaders, + cfg.Gateways.S3.VerifyUnsupported, ) s3gatewayHandler = apiAuthenticator(s3gatewayHandler) diff --git a/docs/reference/configuration.md b/docs/reference/configuration.md index b4fbe46389b..b82df994baf 100644 --- a/docs/reference/configuration.md +++ b/docs/reference/configuration.md @@ -195,6 +195,7 @@ This reference uses `.` to denote the nesting of values. local development, if using [virtual-host addressing](https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html). * `gateways.s3.region` `(string : "us-east-1")` - AWS region we're pretending to be in, it should match the region configuration used in AWS SDK clients * `gateways.s3.fallback_url` `(string)` - If specified, requests with a non-existing repository will be forwarded to this URL. This can be useful for using lakeFS side-by-side with S3, with the URL pointing at an [S3Proxy](https://github.com/gaul/s3proxy) instance. +* `gateways.s3.verify_unsupported` `(bool : true)` - The S3 gateway errors on unsupported requests, but when disabled, defers to target-based handlers. * `stats.enabled` `(bool : true)` - Whether to periodically collect anonymous usage statistics * `stats.flush_interval` `(duration : 30s)` - Interval used to post anonymous statistics collected * `stats.flush_size` `(int : 100)` - A size (in records) of anonymous statistics collected in which we post diff --git a/esti/s3_gateway_test.go b/esti/s3_gateway_test.go index 0d01a9c0c13..0571ceaae6d 100644 --- a/esti/s3_gateway_test.go +++ b/esti/s3_gateway_test.go @@ -76,34 +76,33 @@ func TestS3UploadAndDownload(t *testing.T) { objects = make(chan Object, parallelism*2) ) + client := newMinioClient(t, sig.GetCredentials) + wg.Add(parallelism) for i := 0; i < parallelism; i++ { - client := newMinioClient(t, sig.GetCredentials) - - wg.Add(1) go func() { + defer wg.Done() for o := range objects { - _, err := client.PutObject( - ctx, repo, o.Path, strings.NewReader(o.Content), int64(len(o.Content)), minio.PutObjectOptions{}) + _, err := client.PutObject(ctx, repo, o.Path, strings.NewReader(o.Content), int64(len(o.Content)), minio.PutObjectOptions{}) if err != nil { t.Errorf("minio.Client.PutObject(%s): %s", o.Path, err) + continue } - download, err := client.GetObject( - ctx, repo, o.Path, minio.GetObjectOptions{}) + download, err := client.GetObject(ctx, repo, o.Path, minio.GetObjectOptions{}) if err != nil { t.Errorf("minio.Client.GetObject(%s): %s", o.Path, err) + continue } contents := bytes.NewBuffer(nil) _, err = io.Copy(contents, download) if err != nil { t.Errorf("download %s: %s", o.Path, err) + continue } if strings.Compare(contents.String(), o.Content) != 0 { - t.Errorf( - "Downloaded bytes %v from uploaded bytes %v", contents.Bytes(), o.Content) + t.Errorf("Downloaded bytes %v from uploaded bytes %v", contents.Bytes(), o.Content) } } - wg.Done() }() } diff --git a/pkg/config/config.go b/pkg/config/config.go index cf6db120837..f6e7ef5a4b3 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -320,9 +320,10 @@ type Config struct { } `mapstructure:"graveler"` Gateways struct { S3 struct { - DomainNames Strings `mapstructure:"domain_name"` - Region string `mapstructure:"region"` - FallbackURL string `mapstructure:"fallback_url"` + DomainNames Strings `mapstructure:"domain_name"` + Region string `mapstructure:"region"` + FallbackURL string `mapstructure:"fallback_url"` + VerifyUnsupported bool `mapstructure:"verify_unsupported"` } `mapstructure:"s3"` } Stats struct { diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index 31cee5ba0b8..17ef70a7c4a 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -85,6 +85,7 @@ func setDefaults(cfgType string) { viper.SetDefault("gateways.s3.domain_name", "s3.local.lakefs.io") viper.SetDefault("gateways.s3.region", "us-east-1") + viper.SetDefault("gateways.s3.verify_unsupported", true) viper.SetDefault("blockstore.gs.s3_endpoint", "https://storage.googleapis.com") viper.SetDefault("blockstore.gs.pre_signed_expiry", 15*time.Minute) diff --git a/pkg/gateway/handler.go b/pkg/gateway/handler.go index 0c4a4e6122f..32816df33bc 100644 --- a/pkg/gateway/handler.go +++ b/pkg/gateway/handler.go @@ -49,17 +49,18 @@ type handler struct { } type ServerContext struct { - region string - bareDomains []string - catalog *catalog.Catalog - multipartTracker multipart.Tracker - blockStore block.Adapter - authService auth.GatewayService - stats stats.Collector - pathProvider upload.PathProvider + region string + bareDomains []string + catalog *catalog.Catalog + multipartTracker multipart.Tracker + blockStore block.Adapter + authService auth.GatewayService + stats stats.Collector + pathProvider upload.PathProvider + verifyUnsupported bool } -func NewHandler(region string, catalog *catalog.Catalog, multipartTracker multipart.Tracker, blockStore block.Adapter, authService auth.GatewayService, bareDomains []string, stats stats.Collector, pathProvider upload.PathProvider, fallbackURL *url.URL, auditLogLevel string, traceRequestHeaders bool) http.Handler { +func NewHandler(region string, catalog *catalog.Catalog, multipartTracker multipart.Tracker, blockStore block.Adapter, authService auth.GatewayService, bareDomains []string, stats stats.Collector, pathProvider upload.PathProvider, fallbackURL *url.URL, auditLogLevel string, traceRequestHeaders bool, verifyUnsupported bool) http.Handler { var fallbackHandler http.Handler if fallbackURL != nil { fallbackProxy := gohttputil.NewSingleHostReverseProxy(fallbackURL) @@ -75,14 +76,15 @@ func NewHandler(region string, catalog *catalog.Catalog, multipartTracker multip }) } sc := &ServerContext{ - catalog: catalog, - multipartTracker: multipartTracker, - region: region, - bareDomains: bareDomains, - blockStore: blockStore, - authService: authService, - stats: stats, - pathProvider: pathProvider, + catalog: catalog, + multipartTracker: multipartTracker, + region: region, + bareDomains: bareDomains, + blockStore: blockStore, + authService: authService, + stats: stats, + pathProvider: pathProvider, + verifyUnsupported: verifyUnsupported, } // setup routes diff --git a/pkg/gateway/middleware.go b/pkg/gateway/middleware.go index 409df49838d..f4d195fc584 100644 --- a/pkg/gateway/middleware.go +++ b/pkg/gateway/middleware.go @@ -112,12 +112,13 @@ func EnrichWithOperation(sc *ServerContext, next http.Handler) http.Handler { ctx := req.Context() client := httputil.GetRequestLakeFSClient(req) o := &operations.Operation{ - Region: sc.region, - FQDN: getBareDomain(stripPort(req.Host), sc.bareDomains), - Catalog: sc.catalog, - MultipartTracker: sc.multipartTracker, - BlockStore: sc.blockStore, - Auth: sc.authService, + Region: sc.region, + FQDN: getBareDomain(stripPort(req.Host), sc.bareDomains), + Catalog: sc.catalog, + MultipartTracker: sc.multipartTracker, + BlockStore: sc.blockStore, + Auth: sc.authService, + VerifyUnsupported: sc.verifyUnsupported, Incr: func(action, userID, repository, ref string) { logging.FromContext(ctx). WithFields(logging.Fields{ @@ -199,28 +200,21 @@ func OperationLookupHandler(next http.Handler) http.Handler { ctx := req.Context() o := ctx.Value(ContextKeyOperation).(*operations.Operation) repoID := ctx.Value(ContextKeyRepositoryID).(string) - o.OperationID = operations.OperationIDOperationNotFound - if repoID == "" { - if req.Method == http.MethodGet { - o.OperationID = operations.OperationIDListBuckets - } else { - _ = o.EncodeError(w, req, nil, gatewayerrors.ERRLakeFSNotSupported.ToAPIErr()) - return - } - } else { - ref := ctx.Value(ContextKeyRef).(string) - pth := ctx.Value(ContextKeyPath).(string) - switch { - case ref != "" && pth != "": - req = req.WithContext(ctx) - o.OperationID = pathBasedOperationID(req.Method) - case ref == "" && pth == "": - o.OperationID = repositoryBasedOperationID(req.Method) - default: - w.WriteHeader(http.StatusNotFound) - return - } + ref := ctx.Value(ContextKeyRef).(string) + pth := ctx.Value(ContextKeyPath).(string) + + // based on the operation level, we can determine the operation id + switch { + case repoID == "": + o.OperationID = rootBasedOperationID(req.Method) + case ref != "" && pth != "": + o.OperationID = pathBasedOperationID(req.Method) + case ref == "" && pth == "": + o.OperationID = repositoryBasedOperationID(req.Method) + default: + o.OperationID = operations.OperationIDOperationNotFound } + req = req.WithContext(logging.AddFields(ctx, logging.Fields{"operation_id": o.OperationID})) next.ServeHTTP(w, req) }) @@ -277,7 +271,7 @@ func ParseRequestParts(host string, urlPath string, bareDomains []string) Reques } if !parts.MatchedHost { - // assume path based for domains we don't explicitly know + // assume path-based for domains we don't explicitly know p = strings.SplitN(urlPath, path.Separator, 3) //nolint: gomnd parts.Repository = p[0] if len(p) >= 1 { @@ -295,6 +289,13 @@ func ParseRequestParts(host string, urlPath string, bareDomains []string) Reques return parts } +func rootBasedOperationID(method string) operations.OperationID { + if method == http.MethodGet { + return operations.OperationIDListBuckets + } + return operations.OperationIDOperationNotFound +} + func pathBasedOperationID(method string) operations.OperationID { switch method { case http.MethodDelete: diff --git a/pkg/gateway/operations/base.go b/pkg/gateway/operations/base.go index 9892599be84..98786e94bd7 100644 --- a/pkg/gateway/operations/base.go +++ b/pkg/gateway/operations/base.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "net/http" + "slices" "github.com/treeverse/lakefs/pkg/auth" "github.com/treeverse/lakefs/pkg/auth/keys" @@ -44,16 +45,17 @@ const ( type ActionIncr func(action, userID, repository, ref string) type Operation struct { - OperationID OperationID - Region string - FQDN string - Catalog *catalog.Catalog - MultipartTracker multipart.Tracker - BlockStore block.Adapter - Auth auth.GatewayService - Incr ActionIncr - MatchedHost bool - PathProvider upload.PathProvider + OperationID OperationID + Region string + FQDN string + Catalog *catalog.Catalog + MultipartTracker multipart.Tracker + BlockStore block.Adapter + Auth auth.GatewayService + Incr ActionIncr + MatchedHost bool + PathProvider upload.PathProvider + VerifyUnsupported bool } func StorageClassFromHeader(header http.Header) *string { @@ -84,6 +86,18 @@ func (o *Operation) EncodeXMLBytes(w http.ResponseWriter, req *http.Request, t [ } } +func (o *Operation) HandleUnsupported(w http.ResponseWriter, req *http.Request, keys ...string) bool { + if !o.VerifyUnsupported { + return false + } + query := req.URL.Query() + if slices.ContainsFunc(keys, query.Has) { + _ = o.EncodeError(w, req, nil, gwerrors.ERRLakeFSNotSupported.ToAPIErr()) + return true + } + return false +} + func EncodeResponse(w http.ResponseWriter, entity interface{}, statusCode int) error { // We don't indent the XML document because of Java. // See: https://github.com/spulec/moto/issues/1870 diff --git a/pkg/gateway/operations/deleteobject.go b/pkg/gateway/operations/deleteobject.go index 332ea0d748e..b4c83f2200a 100644 --- a/pkg/gateway/operations/deleteobject.go +++ b/pkg/gateway/operations/deleteobject.go @@ -60,10 +60,11 @@ func (controller *DeleteObject) HandleAbortMultipartUpload(w http.ResponseWriter } func (controller *DeleteObject) Handle(w http.ResponseWriter, req *http.Request, o *PathOperation) { + if o.HandleUnsupported(w, req, "tagging", "acl", "torrent") { + return + } query := req.URL.Query() - - _, hasUploadID := query[QueryParamUploadID] - if hasUploadID { + if query.Has(QueryParamUploadID) { controller.HandleAbortMultipartUpload(w, req, o) return } diff --git a/pkg/gateway/operations/deleteobjects.go b/pkg/gateway/operations/deleteobjects.go index 297c9fdd53c..a10f0e0a02f 100644 --- a/pkg/gateway/operations/deleteobjects.go +++ b/pkg/gateway/operations/deleteobjects.go @@ -27,6 +27,13 @@ func (controller *DeleteObjects) RequiredPermissions(_ *http.Request, _ string) } func (controller *DeleteObjects) Handle(w http.ResponseWriter, req *http.Request, o *RepoOperation) { + // verify we only handle delete request + query := req.URL.Query() + if !query.Has("delete") { + _ = o.EncodeError(w, req, nil, gerrors.ERRLakeFSNotSupported.ToAPIErr()) + return + } + o.Incr("delete_objects", o.Principal, o.Repository.Name, "") decodedXML := &serde.Delete{} err := DecodeXMLBody(req.Body, decodedXML) diff --git a/pkg/gateway/operations/getobject.go b/pkg/gateway/operations/getobject.go index 838494aa4b7..d7e9be94517 100644 --- a/pkg/gateway/operations/getobject.go +++ b/pkg/gateway/operations/getobject.go @@ -28,6 +28,9 @@ func (controller *GetObject) RequiredPermissions(_ *http.Request, repoID, _, pat } func (controller *GetObject) Handle(w http.ResponseWriter, req *http.Request, o *PathOperation) { + if o.HandleUnsupported(w, req, "torrent", "acl", "retention", "legal-hold", "lambdaArn") { + return + } o.Incr("get_object", o.Principal, o.Repository.Name, o.Reference) ctx := req.Context() query := req.URL.Query() diff --git a/pkg/gateway/operations/headbucket.go b/pkg/gateway/operations/headbucket.go index 3e97178ae71..6d13382328a 100644 --- a/pkg/gateway/operations/headbucket.go +++ b/pkg/gateway/operations/headbucket.go @@ -17,7 +17,10 @@ func (controller *HeadBucket) RequiredPermissions(_ *http.Request, repoID string }, nil } -func (controller *HeadBucket) Handle(w http.ResponseWriter, _ *http.Request, o *RepoOperation) { +func (controller *HeadBucket) Handle(w http.ResponseWriter, req *http.Request, o *RepoOperation) { + if o.HandleUnsupported(w, req, "acl") { + return + } o.Incr("get_repo", o.Principal, o.Repository.Name, "") w.WriteHeader(http.StatusOK) } diff --git a/pkg/gateway/operations/listbuckets.go b/pkg/gateway/operations/listbuckets.go index 370bae99539..8fde6c8151c 100644 --- a/pkg/gateway/operations/listbuckets.go +++ b/pkg/gateway/operations/listbuckets.go @@ -21,6 +21,10 @@ func (controller *ListBuckets) RequiredPermissions(_ *http.Request) (permissions // Handle - list buckets (repositories) func (controller *ListBuckets) Handle(w http.ResponseWriter, req *http.Request, o *AuthorizedOperation) { + if o.HandleUnsupported(w, req, "events") { + return + } + o.Incr("list_repos", o.Principal, "", "") buckets := make([]serde.Bucket, 0) diff --git a/pkg/gateway/operations/listobjects.go b/pkg/gateway/operations/listobjects.go index d97d27c9f22..e124d75fecd 100644 --- a/pkg/gateway/operations/listobjects.go +++ b/pkg/gateway/operations/listobjects.go @@ -18,6 +18,9 @@ import ( const ( ListObjectMaxKeys = 1000 + + // defaultBucketLocation used to identify if we need to specify the location constraint + defaultBucketLocation = "us-east-1" ) type ListObjects struct{} @@ -349,16 +352,34 @@ func (controller *ListObjects) ListV1(w http.ResponseWriter, req *http.Request, } func (controller *ListObjects) Handle(w http.ResponseWriter, req *http.Request, o *RepoOperation) { - o.Incr("list_objects", o.Principal, o.Repository.Name, "") - // parse request parameters - // GET /example?list-type=2&prefix=main%2F&delimiter=%2F&encoding-type=url HTTP/1.1 - - // handle GET /?versioning + if o.HandleUnsupported(w, req, "inventory", "metrics", "publicAccessBlock", "ownershipControls", + "intelligent-tiering", "analytics", "policy", "lifecycle", "encryption", "object-lock", "replication", + "notification", "events", "acl", "cors", "website", "accelerate", + "requestPayment", "logging", "tagging", "uploads", "versions", "policyStatus") { + return + } query := req.URL.Query() - if _, found := query["versioning"]; found { + + // getbucketlocation support + if query.Has("location") { + o.Incr("get_bucket_location", o.Principal, o.Repository.Name, "") + response := serde.LocationResponse{} + if o.Region != "" && o.Region != defaultBucketLocation { + response.Location = o.Region + } + o.EncodeResponse(w, req, response, http.StatusOK) + return + } + + // getbucketversioing support + if query.Has("versioning") { o.EncodeXMLBytes(w, req, []byte(serde.VersioningResponse), http.StatusOK) return } + o.Incr("list_objects", o.Principal, o.Repository.Name, "") + + // parse request parameters + // GET /example?list-type=2&prefix=main%2F&delimiter=%2F&encoding-type=url HTTP/1.1 // handle ListObjects versions listType := query.Get("list-type") diff --git a/pkg/gateway/operations/postobject.go b/pkg/gateway/operations/postobject.go index 787102d46f1..da927df10bd 100644 --- a/pkg/gateway/operations/postobject.go +++ b/pkg/gateway/operations/postobject.go @@ -163,20 +163,20 @@ func normalizeMultipartUploadCompletion(list *block.MultipartUploadCompletion) { } func (controller *PostObject) Handle(w http.ResponseWriter, req *http.Request, o *PathOperation) { + if o.HandleUnsupported(w, req, "select", "restore") { + return + } + // POST is only supported for CreateMultipartUpload/CompleteMultipartUpload // https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateMultipartUpload.html // https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html - _, mpuCreateParamExist := req.URL.Query()[CreateMultipartUploadQueryParam] - if mpuCreateParamExist { + query := req.URL.Query() + switch { + case query.Has(CreateMultipartUploadQueryParam): controller.HandleCreateMultipartUpload(w, req, o) - return - } - - _, mpuCompleteParamExist := req.URL.Query()[CompleteMultipartUploadQueryParam] - if mpuCompleteParamExist { + case query.Has(CompleteMultipartUploadQueryParam): controller.HandleCompleteMultipartUpload(w, req, o) - return + default: + w.WriteHeader(http.StatusMethodNotAllowed) } - // otherwise - w.WriteHeader(http.StatusMethodNotAllowed) } diff --git a/pkg/gateway/operations/putbucket.go b/pkg/gateway/operations/putbucket.go index 6793c9c2efd..853824127ef 100644 --- a/pkg/gateway/operations/putbucket.go +++ b/pkg/gateway/operations/putbucket.go @@ -25,11 +25,12 @@ func (controller *PutBucket) RequiredPermissions(_ *http.Request, repoID string) } func (controller *PutBucket) Handle(w http.ResponseWriter, req *http.Request, o *RepoOperation) { - o.Incr("put_repo", o.Principal, o.Repository.Name, "") - if o.Repository == nil { - // No repo, would have to create it, but not enough - // information -- so not supported. - o.EncodeError(w, req, nil, gatewayerrors.ERRLakeFSNotSupported.ToAPIErr()) + if o.HandleUnsupported(w, req, "cors", "metrics", "website", "logging", "accelerate", + "requestPayment", "acl", "publicAccessBlock", "ownershipControls", "intelligent-tiering", "analytics", + "lifecycle", "replication", "encryption", "policy", "object-lock", "tagging", "versioning") { + return } + + o.Incr("put_repo", o.Principal, o.Repository.Name, "") o.EncodeError(w, req, nil, gatewayerrors.ErrBucketAlreadyExists.ToAPIErr()) } diff --git a/pkg/gateway/operations/putobject.go b/pkg/gateway/operations/putobject.go index 11a334c8502..787a6420c3a 100644 --- a/pkg/gateway/operations/putobject.go +++ b/pkg/gateway/operations/putobject.go @@ -214,6 +214,10 @@ func handleUploadPart(w http.ResponseWriter, req *http.Request, o *PathOperation } func (controller *PutObject) Handle(w http.ResponseWriter, req *http.Request, o *PathOperation) { + if o.HandleUnsupported(w, req, "torrent", "acl") { + return + } + // verify branch before we upload data - fail early branchExists, err := o.Catalog.BranchExists(req.Context(), o.Repository.Name, o.Reference) if err != nil { @@ -230,8 +234,7 @@ func (controller *PutObject) Handle(w http.ResponseWriter, req *http.Request, o query := req.URL.Query() // check if this is a multipart upload creation call - _, hasUploadID := query[QueryParamUploadID] - if hasUploadID { + if query.Has(QueryParamUploadID) { handleUploadPart(w, req, o) return } diff --git a/pkg/gateway/serde/xml.go b/pkg/gateway/serde/xml.go index b441d446c3d..9082337fb07 100644 --- a/pkg/gateway/serde/xml.go +++ b/pkg/gateway/serde/xml.go @@ -168,3 +168,8 @@ type Tagging struct { XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Tagging"` TagSet TagSet `xml:"TagSet"` } + +type LocationResponse struct { + XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ LocationConstraint"` + Location string `xml:",chardata"` +} diff --git a/pkg/gateway/testutil/gateway_setup.go b/pkg/gateway/testutil/gateway_setup.go index 595817a7ede..04b4b9859de 100644 --- a/pkg/gateway/testutil/gateway_setup.go +++ b/pkg/gateway/testutil/gateway_setup.go @@ -63,19 +63,7 @@ func GetBasicHandler(t *testing.T, authService *FakeAuthService, repoName string _, err = c.CreateRepository(ctx, repoName, storageNamespace, "main") testutil.Must(t, err) - handler := gateway.NewHandler( - authService.Region, - c, - multipartTracker, - blockAdapter, - authService, - []string{authService.BareDomain}, - &stats.NullCollector{}, - upload.DefaultPathProvider, - nil, - config.DefaultLoggingAuditLogLevel, - true, - ) + handler := gateway.NewHandler(authService.Region, c, multipartTracker, blockAdapter, authService, []string{authService.BareDomain}, &stats.NullCollector{}, upload.DefaultPathProvider, nil, config.DefaultLoggingAuditLogLevel, true, false) return handler, &Dependencies{ blocks: blockAdapter,