Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove ADLS hint from import #7581

Merged
merged 5 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/esti.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -956,7 +956,6 @@ jobs:
LAKEFS_BLOCKSTORE_TYPE: azure
ESTI_BLOCKSTORE_TYPE: azure
ESTI_STORAGE_NAMESPACE: https://esti4hns.blob.core.windows.net/esti-system-testing/${{ github.run_number }}/${{ steps.unique.outputs.value }}
ESTI_ADLS_IMPORT_BASE_URL: https://esti4hns.adls.core.windows.net/esti-system-testing-data/
ESTI_AZURE_STORAGE_ACCOUNT: esti4hns
ESTI_AZURE_STORAGE_ACCESS_KEY: ${{ secrets.LAKEFS_BLOCKSTORE_AZURE_STORAGE_GEN2_ACCESS_KEY }}

Expand Down
13 changes: 0 additions & 13 deletions docs/howto/import.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,19 +151,6 @@ the following policy needs to be attached to the lakeFS S3 service-account to al
<div markdown="1" id="azure-storage">
See [Azure deployment][deploy-azure-storage-account-creds] on limitations when using account credentials.

### Azure Data Lake Gen2
{:.no_toc}

lakeFS requires a hint in the import source URL to understand that the provided storage account is ADLS Gen2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should document the change. Otherwise someone who's already imported has no way of understanding what went wrong!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added deprecation message


```
For source account URL:
https://<my-account>.core.windows.net/path/to/import/

Please add the *adls* subdomain to the URL as follows:
https://<my-account>.adls.core.windows.net/path/to/import/
```

</div>
<div markdown="1" id="gcs">
No specific prerequisites
Expand Down
1 change: 0 additions & 1 deletion esti/ops/docker-compose-external-db.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ services:
- ESTI_GOTEST_FLAGS
- ESTI_FLAGS
- ESTI_FORCE_PATH_STYLE
- ESTI_ADLS_IMPORT_BASE_URL
- ESTI_AZURE_STORAGE_ACCOUNT
- ESTI_AZURE_STORAGE_ACCESS_KEY
working_dir: /lakefs
Expand Down
1 change: 0 additions & 1 deletion esti/ops/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ services:
- ESTI_GOTEST_FLAGS
- ESTI_FLAGS
- ESTI_FORCE_PATH_STYLE
- ESTI_ADLS_IMPORT_BASE_URL
- ESTI_AZURE_STORAGE_ACCOUNT
- ESTI_AZURE_STORAGE_ACCESS_KEY
working_dir: /lakefs
Expand Down
8 changes: 4 additions & 4 deletions pkg/block/azure/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (a *Adapter) GetWalker(uri *url.URL) (block.Walker, error) {
return nil, err
}

return NewAzureBlobWalker(client)
return NewAzureDataLakeWalker(client, false)
}

func (a *Adapter) GetPreSignedURL(ctx context.Context, obj block.ObjectPointer, mode block.PreSignMode) (string, time.Time, error) {
Expand Down Expand Up @@ -568,11 +568,11 @@ func (a *Adapter) CompleteMultiPartUpload(ctx context.Context, obj block.ObjectP

func (a *Adapter) GetStorageNamespaceInfo() block.StorageNamespaceInfo {
info := block.DefaultStorageNamespaceInfo(block.BlockstoreTypeAzure)
info.ImportValidityRegex = `^https?://[a-z0-9_-]+\.(blob|adls)\.core\.windows\.net` // added adls for import hint validation in UI
info.ImportValidityRegex = `^https?://[a-z0-9_-]+\.blob\.core\.windows\.net`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This breaks any existing import scripts. Does this mean we need a major release 🤢 ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As stated in the PR description - this is a breaking change. But I believe we can avoid a major release since this affects specific users we can identify

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a WA for lakectl so it doesn't break. Effectively this is only going to impact WebUI users (I'm fine with that)

info.ValidityRegex = `^https?://[a-z0-9_-]+\.blob\.core\.windows\.net`

if a.chinaCloud {
info.ImportValidityRegex = `^https?://[a-z0-9_-]+\.(blob|adls)\.core\.chinacloudapi\.cn`
info.ImportValidityRegex = `^https?://[a-z0-9_-]+\.blob\.core\.chinacloudapi\.cn`
info.ValidityRegex = `^https?://[a-z0-9_-]+\.blob\.core\.chinacloudapi\.cn`
}

Expand All @@ -598,6 +598,6 @@ func (a *Adapter) newPreSignedTime() time.Time {
return time.Now().UTC().Add(a.preSignedExpiry)
}

func (a *Adapter) GetPresignUploadPartURL(ctx context.Context, obj block.ObjectPointer, uploadID string, partNumber int) (string, error) {
func (a *Adapter) GetPresignUploadPartURL(_ context.Context, _ block.ObjectPointer, _ string, _ int) (string, error) {
return "", block.ErrOperationNotSupported
}
85 changes: 0 additions & 85 deletions pkg/block/azure/walker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,6 @@ const DirectoryBlobMetadataKey = "hdi_isfolder"

var ErrAzureInvalidURL = errors.New("invalid Azure storage URL")

func NewAzureBlobWalker(svc *service.Client) (*BlobWalker, error) {
return &BlobWalker{
client: svc,
mark: block.Mark{HasMore: true},
}, nil
}

type BlobWalker struct {
client *service.Client
mark block.Mark
}

// extractAzurePrefix takes a URL that looks like this: https://storageaccount.blob.core.windows.net/container/prefix
// and return the URL for the container and a prefix, if one exists
func extractAzurePrefix(storageURI *url.URL) (*url.URL, string, error) {
Expand All @@ -52,71 +40,6 @@ func getAzureBlobURL(containerURL *url.URL, blobName string) *url.URL {
return containerURL.ResolveReference(&relativePath)
}

func (a *BlobWalker) Walk(ctx context.Context, storageURI *url.URL, op block.WalkOptions, walkFn func(e block.ObjectStoreEntry) error) error {
// we use bucket as container and prefix as path
containerURL, prefix, err := extractAzurePrefix(storageURI)
if err != nil {
return err
}
var basePath string
if idx := strings.LastIndex(prefix, "/"); idx != -1 {
basePath = prefix[:idx+1]
}

qk, err := ResolveBlobURLInfoFromURL(containerURL)
if err != nil {
return err
}

containerClient := a.client.NewContainerClient(qk.ContainerName)
listBlob := containerClient.NewListBlobsFlatPager(&azblob.ListBlobsFlatOptions{
Prefix: &prefix,
Marker: &op.ContinuationToken,
Include: container.ListBlobsInclude{
Metadata: true,
},
})

for listBlob.More() {
resp, err := listBlob.NextPage(ctx)
if err != nil {
return err
}
if resp.Marker != nil {
a.mark.ContinuationToken = *resp.Marker
}
for _, blobInfo := range resp.Segment.BlobItems {
// skipping everything in the page which is before 'After' (without forgetting the possible empty string key!)
if op.After != "" && *blobInfo.Name <= op.After {
continue
}

// Skip folders
if isBlobItemFolder(blobInfo) {
continue
}

a.mark.LastKey = *blobInfo.Name
if err := walkFn(block.ObjectStoreEntry{
FullKey: *blobInfo.Name,
RelativeKey: strings.TrimPrefix(*blobInfo.Name, basePath),
Address: getAzureBlobURL(containerURL, *blobInfo.Name).String(),
ETag: extractBlobItemEtag(blobInfo),
Mtime: *blobInfo.Properties.LastModified,
Size: *blobInfo.Properties.ContentLength,
}); err != nil {
return err
}
}
}

a.mark = block.Mark{
HasMore: false,
}

return nil
}

// isBlobItemFolder returns true if the blob item is a folder.
// Make sure that metadata is populated before calling this function.
// Example: for listing using blob API passing options with `Include: container.ListBlobsInclude{ Metadata: true }`
Expand Down Expand Up @@ -147,14 +70,6 @@ func extractBlobItemEtag(blobItem *container.BlobItem) string {
return ""
}

func (a *BlobWalker) Marker() block.Mark {
return a.mark
}

func (a *BlobWalker) GetSkippedEntries() []block.ObjectStoreEntry {
return nil
}

//
// DataLakeWalker
//
Expand Down
6 changes: 3 additions & 3 deletions pkg/catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,19 +805,19 @@ func (c *Catalog) GetBranchReference(ctx context.Context, repositoryID string, b

func (c *Catalog) HardResetBranch(ctx context.Context, repositoryID, branch, refExpr string, opts ...graveler.SetOptionsFunc) error {
branchID := graveler.BranchID(branch)
ref := graveler.Ref(refExpr)
reference := graveler.Ref(refExpr)
if err := validator.Validate([]validator.ValidateArg{
{Name: "repository", Value: repositoryID, Fn: graveler.ValidateRepositoryID},
{Name: "branch", Value: branchID, Fn: graveler.ValidateBranchID},
{Name: "ref", Value: ref, Fn: graveler.ValidateRef},
{Name: "ref", Value: reference, Fn: graveler.ValidateRef},
}); err != nil {
return err
}
repository, err := c.getRepository(ctx, repositoryID)
if err != nil {
return err
}
return c.Store.ResetHard(ctx, repository, branchID, ref, opts...)
return c.Store.ResetHard(ctx, repository, branchID, reference, opts...)
}

func (c *Catalog) ResetBranch(ctx context.Context, repositoryID string, branch string, opts ...graveler.SetOptionsFunc) error {
Expand Down
18 changes: 1 addition & 17 deletions pkg/ingest/store/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"net/url"
"strings"

"cloud.google.com/go/storage"
"github.com/aws/aws-sdk-go-v2/aws"
Expand Down Expand Up @@ -128,22 +127,7 @@ func (f *WalkerFactory) buildAzureWalker(importURL *url.URL, skipOutOfOrder bool
return nil, err
}

isHNS := isHierarchicalNamespaceEnabled(importURL)
if isHNS {
return azure.NewAzureDataLakeWalker(client, skipOutOfOrder)
}
return azure.NewAzureBlobWalker(client)
}

// isHierarchicalNamespaceEnabled - identify if hns enabled on the account,
// based on the import URL.
// Until we enable a way to extract the account information, we assume it based on the domain used in import:
// https://<account>.<blob|adls>.core.windows.net/
// adls - azure data lake storage
func isHierarchicalNamespaceEnabled(u *url.URL) bool {
const importURLParts = 3
n := strings.SplitN(u.Host, ".", importURLParts)
return len(n) == importURLParts && n[1] == "adls"
return azure.NewAzureDataLakeWalker(client, skipOutOfOrder)
}

func (f *WalkerFactory) GetWalker(ctx context.Context, opts WalkerOptions) (*WalkerWrapper, error) {
Expand Down
Loading