Skip to content

Commit

Permalink
Experimental: Support pre-signed redirects for S3A (#7630)
Browse files Browse the repository at this point in the history
* experimental: Supports redirects for S3A

* Fixes + Tests

* Update docs/integrations/spark.md

* Update docs/integrations/spark.md

* Fix workflow

* Fix workflow 2

* Fix workflow 3

* Update esti/s3_gateway_test.go

Co-authored-by: Ariel Shaqed (Scolnicov) <[email protected]>

* CR Fixes

* Fix docs

* Fix docs

* Rebase fixes

---------

Co-authored-by: Nir Ozery <[email protected]>
Co-authored-by: N-o-Z <[email protected]>
Co-authored-by: Ariel Shaqed (Scolnicov) <[email protected]>
  • Loading branch information
4 people authored Apr 11, 2024
1 parent 0bd9aa9 commit dc37a59
Show file tree
Hide file tree
Showing 10 changed files with 261 additions and 13 deletions.
23 changes: 20 additions & 3 deletions .github/workflows/esti.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -481,10 +481,11 @@ jobs:
LAKEFS_BLOCKSTORE_S3_CREDENTIALS_SECRET_ACCESS_KEY: ${{ secrets.ESTI_AWS_SECRET_ACCESS_KEY }}

- name: Spark${{ matrix.spark.tag }} + S3 gateway
timeout-minutes: 8
working-directory: test/spark
run: |
python ./run-test.py \
--storage_namespace s3://esti-system-testing/${{ github.run_number }}-spark${{ matrix.spark.tag }}/${{ steps.unique.outputs.value }} \
--storage_namespace s3://esti-system-testing/${{ github.run_number }}-spark${{ matrix.spark.tag }}-gw/${{ steps.unique.outputs.value }} \
--repository gateway-test-spark${{ matrix.spark.tag }} \
--sonnet_jar ${{ matrix.spark.sonnet_jar }}
- name: lakeFS Logs on Spark with gateway failure
Expand All @@ -498,13 +499,29 @@ jobs:
working-directory: test/spark
run: |
python ./run-test.py \
--storage_namespace s3://esti-system-testing/${{ github.run_number }}-spark${{ matrix.spark.tag }}-client/${{ steps.unique.outputs.value }} \
--repository thick-client-test \
--storage_namespace s3://esti-system-testing/${{ github.run_number }}-spark${{ matrix.spark.tag }}-lakefsfs/${{ steps.unique.outputs.value }} \
--repository lakefsfs-test-spark \
--sonnet_jar ${{ matrix.spark.sonnet_jar }} \
--access_mode hadoopfs \
--aws_access_key ${{ secrets.ESTI_AWS_ACCESS_KEY_ID }} \
--aws_secret_key ${{ secrets.ESTI_AWS_SECRET_ACCESS_KEY }}
- name: lakeFS Logs on HadoopFS test failure
if: ${{ failure() }}
continue-on-error: true
working-directory: test/spark
run: docker-compose logs --tail=2500 lakefs

- name: Spark${{ matrix.spark.tag }} + lakeFS GW + Redirect
if: env.SPARK_TAG == '3'
timeout-minutes: 8
working-directory: test/spark
run: |
python ./run-test.py \
--storage_namespace s3://esti-system-testing/${{ github.run_number }}-spark${{ matrix.spark.tag }}-gw-redirect/${{ steps.unique.outputs.value }} \
--repository gateway-redirect-test-spark${{ matrix.spark.tag }} \
--sonnet_jar ${{ matrix.spark.sonnet_jar }} \
--redirect
- name: lakeFS Logs on Spark with gateway redirect failure
if: ${{ failure() }}
continue-on-error: true
working-directory: test/spark
Expand Down
27 changes: 27 additions & 0 deletions docs/integrations/spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,33 @@ spark.hadoop.fs.s3a.bucket.example-repo.path.style.access true
```sql
SELECT * FROM delta.`s3a://example-repo/main/datasets/delta-table/` LIMIT 100
```
### ⚠️ Experimental: Pre-signed mode for S3A

In Hadoop 3.1.4 version and above (as tested using our lakeFS Hadoop FS), it is possible to use pre-signed URLs as return values from the lakeFS S3 Gateway.

This has the immediate benefit of reducing the amount of traffic that has to go through the lakeFS server thus improving IO performance.
To read more about pre-signed URLs, see [this guide](../reference/security/presigned-url.html).

Here's an example Spark configuration to enable this support:

```
spark.hadoop.fs.s3a.impl shaded.databricks.org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.s3a.bucket.example-repo.access.key AKIAIOSFODNN7EXAMPLE // The access key to your lakeFS server
spark.hadoop.fs.s3a.bucket.example-repo.secret.key wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY // The secret key to your lakeFS server
spark.hadoop.fs.s3a.path.style.access true
spark.hadoop.fs.s3a.bucket.example-repo.signing-algorithm QueryStringSignerType
spark.hadoop.fs.s3a.bucket.example-repo.user.agent.prefix s3RedirectionSupport
```

`user.agent.prefix` should **contain** the string `s3RedirectionSupport` but does not have to match the string exactly.
{: .note }


Once configured, requests will include the string `s3RedirectionSupport` in the `User-Agent` HTTP header sent with GetObject requests, resulting in lakeFS responding with a pre-signed URL.
Setting the `signing-algorithm` to `QueryStringSignerType` is required to stop S3A from signing a pre-signed URL, since the existence of more than one signature method will return an error from S3.

ℹ This feature requires a lakeFS server of version `>1.18.0`
{: .note }

## lakeFS Hadoop FileSystem

Expand Down
29 changes: 29 additions & 0 deletions esti/s3_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,3 +626,32 @@ func TestS3CopyObjectErrors(t *testing.T) {
require.Contains(t, err.Error(), "NoSuchKey")
})
}

func TestS3ReadObjectRedirect(t *testing.T) {
const (
contents = "the quick brown fox jumps over the lazy dog"
goodPath = "main/exists.txt"
)

ctx, _, repo := setupTest(t)
defer tearDownTest(repo)

// Upload an object
minioClient := newMinioClient(t, credentials.NewStaticV4)

_, err := minioClient.PutObject(ctx, repo, goodPath, strings.NewReader(contents), int64(len(contents)), minio.PutObjectOptions{})
if err != nil {
t.Errorf("PutObject(%s, %s): %s", repo, goodPath, err)
}

t.Run("get_exists", func(t *testing.T) {
opts := minio.GetObjectOptions{}
opts.Set("User-Agent", "client with s3RedirectionSupport set")
res, err := minioClient.GetObject(ctx, repo, goodPath, opts)
require.NoError(t, err)

// Verify we got redirect
_, err = io.ReadAll(res)
require.Contains(t, err.Error(), "307 Temporary Redirect")
})
}
1 change: 1 addition & 0 deletions pkg/gateway/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func AuthenticationHandler(authService auth.GatewayService, next http.Handler) h
authenticator := sig.ChainedAuthenticator(
sig.NewV4Authenticator(req),
sig.NewV2SigAuthenticator(req, o.FQDN),
sig.NewJavaV2SigAuthenticator(req, o.FQDN),
)
authContext, err := authenticator.Parse()
if err != nil {
Expand Down
22 changes: 21 additions & 1 deletion pkg/gateway/operations/getobject.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"net/http"
"strconv"
"strings"
"time"

"github.com/treeverse/lakefs/pkg/block"
Expand All @@ -22,7 +23,8 @@ import (
const (
QueryParamMaxParts = "max-parts"
// QueryParamPartNumberMarker Specifies the part after which listing should begin. Only parts with higher part numbers will be listed.
QueryParamPartNumberMarker = "part-number-marker"
QueryParamPartNumberMarker = "part-number-marker"
s3RedirectionSupportUserAgentTag = "s3RedirectionSupport"
)

type GetObject struct{}
Expand All @@ -40,6 +42,11 @@ func (controller *GetObject) Handle(w http.ResponseWriter, req *http.Request, o
if o.HandleUnsupported(w, req, "torrent", "acl", "retention", "legal-hold", "lambdaArn") {
return
}
userAgent := req.Header.Get("User-Agent")
redirect := strings.Contains(userAgent, s3RedirectionSupportUserAgentTag)
if redirect {
req = req.WithContext(logging.AddFields(req.Context(), logging.Fields{"S3_redirect": true}))
}
o.Incr("get_object", o.Principal, o.Repository.Name, o.Reference)
ctx := req.Context()
query := req.URL.Query()
Expand Down Expand Up @@ -108,6 +115,19 @@ func (controller *GetObject) Handle(w http.ResponseWriter, req *http.Request, o
Identifier: entry.PhysicalAddress,
}

if redirect {
preSignedURL, _, err := o.BlockStore.GetPreSignedURL(ctx, objectPointer, block.PreSignModeRead)
if err != nil {
code := gatewayerrors.ErrInternalError
_ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(code))
return
}

o.SetHeader(w, "Location", preSignedURL)
w.WriteHeader(http.StatusTemporaryRedirect)
return
}

if rangeSpec == "" || err != nil {
// assemble a response body (range-less query)
data, err = o.BlockStore.Get(ctx, objectPointer, entry.Size)
Expand Down
2 changes: 1 addition & 1 deletion pkg/gateway/operations/putobject_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const (
cheapString = "CHEAP"
)

func TestReadBlob(t *testing.T) {
func TestWriteBlob(t *testing.T) {
tt := []struct {
name string
size int64
Expand Down
146 changes: 146 additions & 0 deletions pkg/gateway/sig/javav2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package sig

import (
"crypto/hmac"
"crypto/sha256"
"encoding/base64"
"net/http"
"net/url"
"sort"
"strings"

"github.com/treeverse/lakefs/pkg/auth/model"
"github.com/treeverse/lakefs/pkg/gateway/errors"
"github.com/treeverse/lakefs/pkg/logging"
)

// Implements the "signing protocol" as exists at aws-sdk-java @ 1.12.390 (commit 07926f08a70).
// This should never have worked, on any version of S3, but the implementation is there, unmodified since 2015.
// The code is below, for reference
/*
private String calculateStringToSignV2(SignableRequest<?> request) throws SdkClientException {
URI endpoint = request.getEndpoint();
StringBuilder data = new StringBuilder();
data.append("POST")
.append("\n")
.append(getCanonicalizedEndpoint(endpoint)) // <----- bare host, lower-cased
.append("\n")
.append(getCanonicalizedResourcePath(request)) // <----- path relative to the endpoint
.append("\n")
.append(getCanonicalizedQueryString(request.getParameters())); // <----- ordered query string parameters
return data.toString();
}
*/

func canonicalJavaV2String(host string, query url.Values, path string) string {
cs := strings.ToUpper(http.MethodPost) // so weird.
cs += "\n"
cs += host
cs += "\n"
cs += path
cs += "\n"
cs += canonicalJavaV2Query(query)
return cs
}

func canonicalJavaV2Query(q url.Values) string {
escaped := make([][2]string, 0)
for k, vs := range q {
if strings.EqualFold(k, "signature") {
continue
}
escapedKey := url.QueryEscape(k)
for _, v := range vs {
pair := [2]string{escapedKey, url.QueryEscape(v)}
escaped = append(escaped, pair)
}
}
// sort
sort.Slice(escaped, func(i, j int) bool {
return escaped[i][0] < escaped[j][0]
})
// output
out := ""
for i, pair := range escaped {
out += pair[0] + "=" + pair[1]
isLast := i == len(escaped)-1
if !isLast {
out += "&"
}
}
return out
}

type JavaV2Signer struct {
bareDomain string
req *http.Request
sigCtx *JavaV2SignerContext
}

type JavaV2SignerContext struct {
awsAccessKeyID string
signature []byte
}

func NewJavaV2SigAuthenticator(r *http.Request, bareDomain string) *JavaV2Signer {
return &JavaV2Signer{
req: r,
bareDomain: bareDomain,
}
}

func (j *JavaV2SignerContext) GetAccessKeyID() string {
return j.awsAccessKeyID
}

func (j *JavaV2Signer) Parse() (SigContext, error) {
ctx := j.req.Context()
awsAccessKeyID := j.req.URL.Query().Get("AWSAccessKeyId")
if awsAccessKeyID == "" {
return nil, ErrHeaderMalformed
}
signature := j.req.URL.Query().Get("Signature")
if signature == "" {
return nil, ErrHeaderMalformed
}
sig, err := base64.StdEncoding.DecodeString(signature)
if err != nil {
logging.FromContext(ctx).Error("log header does not match v2 structure (isn't proper base64)")
return nil, ErrHeaderMalformed
}
sigMethod := j.req.URL.Query().Get("SignatureMethod")
if sigMethod != "HmacSHA256" {
return nil, ErrHeaderMalformed
}
sigVersion := j.req.URL.Query().Get("SignatureVersion")
if sigVersion != "2" {
return nil, ErrHeaderMalformed
}
sigCtx := &JavaV2SignerContext{
awsAccessKeyID: awsAccessKeyID,
signature: sig,
}
j.sigCtx = sigCtx
return sigCtx, nil
}

func signCanonicalJavaV2String(msg string, signature []byte) []byte {
h := hmac.New(sha256.New, signature)
_, _ = h.Write([]byte(msg))
return h.Sum(nil)
}

func (j *JavaV2Signer) Verify(creds *model.Credential) error {
rawPath := j.req.URL.EscapedPath()

path := buildPath(j.req.Host, j.bareDomain, rawPath)
stringToSign := canonicalJavaV2String(j.req.Host, j.req.URL.Query(), path)
digest := signCanonicalJavaV2String(stringToSign, []byte(creds.SecretAccessKey))
if !Equal(digest, j.sigCtx.signature) {
return errors.ErrSignatureDoesNotMatch
}
return nil
}
14 changes: 8 additions & 6 deletions pkg/gateway/sig/v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,31 +89,33 @@ func NewV2SigAuthenticator(r *http.Request, bareDomain string) *V2SigAuthenticat

func (a *V2SigAuthenticator) Parse() (SigContext, error) {
ctx := a.req.Context()
var sigCtx v2Context
headerValue := a.req.Header.Get(v2authHeaderName)
if len(headerValue) > 0 {
match := V2AuthHeaderRegexp.FindStringSubmatch(headerValue)
if len(match) == 0 {
logging.FromContext(ctx).Error("log header does not match v2 structure")
return sigCtx, ErrHeaderMalformed
return nil, ErrHeaderMalformed
}
result := make(map[string]string)
for i, name := range V2AuthHeaderRegexp.SubexpNames() {
if i != 0 && name != "" {
result[name] = match[i]
}
}
sigCtx.accessKeyID = result["AccessKeyId"]
sigCtx := v2Context{
accessKeyID: result["AccessKeyId"],
}
// parse signature
sig, err := base64.StdEncoding.DecodeString(result["Signature"])
if err != nil {
logging.FromContext(ctx).Error("log header does not match v2 structure (isn't proper base64)")
return sigCtx, ErrHeaderMalformed
return nil, ErrHeaderMalformed
}
sigCtx.signature = sig
a.sigCtx = sigCtx
return sigCtx, nil
}
a.sigCtx = sigCtx
return sigCtx, nil
return nil, ErrHeaderMalformed
}

func headerValueToString(val []string) string {
Expand Down
1 change: 0 additions & 1 deletion test/spark/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: '3.9'
x-lakefs-common:
&lakefs-common
image: "${REPO:-treeverse}/lakefs:${TAG:-latest}"
Expand Down
Loading

0 comments on commit dc37a59

Please sign in to comment.