Skip to content

Commit

Permalink
Remove ADLS hint from import (#7581)
Browse files Browse the repository at this point in the history
* Remove ADLS hint from import

* CR Fixes

* Fix regex

* CR Fix

* CR Fix 2
  • Loading branch information
N-o-Z authored Mar 20, 2024
1 parent 0e4a17d commit c6d0c12
Show file tree
Hide file tree
Showing 11 changed files with 25 additions and 126 deletions.
1 change: 0 additions & 1 deletion .github/workflows/esti.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -956,7 +956,6 @@ jobs:
LAKEFS_BLOCKSTORE_TYPE: azure
ESTI_BLOCKSTORE_TYPE: azure
ESTI_STORAGE_NAMESPACE: https://esti4hns.blob.core.windows.net/esti-system-testing/${{ github.run_number }}/${{ steps.unique.outputs.value }}
ESTI_ADLS_IMPORT_BASE_URL: https://esti4hns.adls.core.windows.net/esti-system-testing-data/
ESTI_AZURE_STORAGE_ACCOUNT: esti4hns
ESTI_AZURE_STORAGE_ACCESS_KEY: ${{ secrets.LAKEFS_BLOCKSTORE_AZURE_STORAGE_GEN2_ACCESS_KEY }}

Expand Down
5 changes: 5 additions & 0 deletions cmd/lakectl/cmd/common_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
LakectlInteractive = "LAKECTL_INTERACTIVE"
DeathMessage = "{{.Error|red}}\nError executing command.\n"
DeathMessageWithFields = "{{.Message|red}}\n{{.Status}}\n"
WarnMessage = "{{.Warning|yellow}}\n\n"
)

const (
Expand Down Expand Up @@ -178,6 +179,10 @@ func WriteIfVerbose(tpl string, data interface{}) {
}
}

func Warning(message string) {
WriteTo(WarnMessage, struct{ Warning string }{Warning: "Warning: " + message}, os.Stderr)
}

func Die(errMsg string, code int) {
WriteTo(DeathMessage, struct{ Error string }{Error: errMsg}, os.Stderr)
os.Exit(code)
Expand Down
7 changes: 7 additions & 0 deletions cmd/lakectl/cmd/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"os/signal"
"regexp"
"strings"
"syscall"
"time"

Expand Down Expand Up @@ -160,6 +161,12 @@ func newImportProgressBar(visible bool) *progressbar.ProgressBar {
}

func verifySourceMatchConfiguredStorage(ctx context.Context, client *apigen.ClientWithResponses, source string) {
// Adds backwards compatibility for ADLS Gen2 storage import `hint`
if strings.Contains(source, "adls.core.windows.net") {
source = strings.Replace(source, "adls.core.windows.net", "blob.core.windows.net", 1)
Warning(fmt.Sprintf("'adls' hint is deprecated\n Using %s", source))
}

confResp, err := client.GetConfigWithResponse(ctx)
DieOnErrorOrUnexpectedStatusCode(confResp, err, http.StatusOK)
storageConfig := confResp.JSON200.StorageConfig
Expand Down
16 changes: 3 additions & 13 deletions docs/howto/import.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ In addition, the following for provider-specific permissions may be required:
</ul>
<div markdown="1" id="aws-s3">


## AWS S3: Importing from public buckets
{:.no_toc}

Expand Down Expand Up @@ -149,20 +148,11 @@ the following policy needs to be attached to the lakeFS S3 service-account to al

</div>
<div markdown="1" id="azure-storage">
See [Azure deployment][deploy-azure-storage-account-creds] on limitations when using account credentials.

### Azure Data Lake Gen2
{:.no_toc}

lakeFS requires a hint in the import source URL to understand that the provided storage account is ADLS Gen2
**Note:** The use of the `alds` hint for ADLS Gen2 storage accounts is deprecated, please use the original source url for import.
{: .note}

```
For source account URL:
https://<my-account>.core.windows.net/path/to/import/
Please add the *adls* subdomain to the URL as follows:
https://<my-account>.adls.core.windows.net/path/to/import/
```
See [Azure deployment][deploy-azure-storage-account-creds] on limitations when using account credentials.

</div>
<div markdown="1" id="gcs">
Expand Down
1 change: 0 additions & 1 deletion esti/ops/docker-compose-external-db.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ services:
- ESTI_GOTEST_FLAGS
- ESTI_FLAGS
- ESTI_FORCE_PATH_STYLE
- ESTI_ADLS_IMPORT_BASE_URL
- ESTI_AZURE_STORAGE_ACCOUNT
- ESTI_AZURE_STORAGE_ACCESS_KEY
working_dir: /lakefs
Expand Down
1 change: 0 additions & 1 deletion esti/ops/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ services:
- ESTI_GOTEST_FLAGS
- ESTI_FLAGS
- ESTI_FORCE_PATH_STYLE
- ESTI_ADLS_IMPORT_BASE_URL
- ESTI_AZURE_STORAGE_ACCOUNT
- ESTI_AZURE_STORAGE_ACCESS_KEY
working_dir: /lakefs
Expand Down
3 changes: 2 additions & 1 deletion pkg/actions/lua/storage/azure/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ func parsePath(l *lua.State, path string) (string, string) {
func transformPathToAbfss(l *lua.State) int {
path := lua.CheckString(l, 1)
const numOfParts = 3
r := regexp.MustCompile(`^https://(\w+)\.blob\.core\.windows\.net/([^/]*)/(.+)$`)
// Added adls for backwards compatibility in imports created pre fix of bug: https://github.com/treeverse/lakeFS/issues/7580
r := regexp.MustCompile(`^https://(\w+)\.(?:blob|adls)\.core\.windows\.net/([^/]*)/(.+)$`)
parts := r.FindStringSubmatch(path)
if len(parts) != numOfParts+1 {
lua.Errorf(l, "expected valid Azure https URL: %s", path)
Expand Down
8 changes: 4 additions & 4 deletions pkg/block/azure/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (a *Adapter) GetWalker(uri *url.URL) (block.Walker, error) {
return nil, err
}

return NewAzureBlobWalker(client)
return NewAzureDataLakeWalker(client, false)
}

func (a *Adapter) GetPreSignedURL(ctx context.Context, obj block.ObjectPointer, mode block.PreSignMode) (string, time.Time, error) {
Expand Down Expand Up @@ -568,11 +568,11 @@ func (a *Adapter) CompleteMultiPartUpload(ctx context.Context, obj block.ObjectP

func (a *Adapter) GetStorageNamespaceInfo() block.StorageNamespaceInfo {
info := block.DefaultStorageNamespaceInfo(block.BlockstoreTypeAzure)
info.ImportValidityRegex = `^https?://[a-z0-9_-]+\.(blob|adls)\.core\.windows\.net` // added adls for import hint validation in UI
info.ImportValidityRegex = `^https?://[a-z0-9_-]+\.blob\.core\.windows\.net`
info.ValidityRegex = `^https?://[a-z0-9_-]+\.blob\.core\.windows\.net`

if a.chinaCloud {
info.ImportValidityRegex = `^https?://[a-z0-9_-]+\.(blob|adls)\.core\.chinacloudapi\.cn`
info.ImportValidityRegex = `^https?://[a-z0-9_-]+\.blob\.core\.chinacloudapi\.cn`
info.ValidityRegex = `^https?://[a-z0-9_-]+\.blob\.core\.chinacloudapi\.cn`
}

Expand All @@ -598,6 +598,6 @@ func (a *Adapter) newPreSignedTime() time.Time {
return time.Now().UTC().Add(a.preSignedExpiry)
}

func (a *Adapter) GetPresignUploadPartURL(ctx context.Context, obj block.ObjectPointer, uploadID string, partNumber int) (string, error) {
func (a *Adapter) GetPresignUploadPartURL(_ context.Context, _ block.ObjectPointer, _ string, _ int) (string, error) {
return "", block.ErrOperationNotSupported
}
85 changes: 0 additions & 85 deletions pkg/block/azure/walker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,6 @@ const DirectoryBlobMetadataKey = "hdi_isfolder"

var ErrAzureInvalidURL = errors.New("invalid Azure storage URL")

func NewAzureBlobWalker(svc *service.Client) (*BlobWalker, error) {
return &BlobWalker{
client: svc,
mark: block.Mark{HasMore: true},
}, nil
}

type BlobWalker struct {
client *service.Client
mark block.Mark
}

// extractAzurePrefix takes a URL that looks like this: https://storageaccount.blob.core.windows.net/container/prefix
// and return the URL for the container and a prefix, if one exists
func extractAzurePrefix(storageURI *url.URL) (*url.URL, string, error) {
Expand All @@ -52,71 +40,6 @@ func getAzureBlobURL(containerURL *url.URL, blobName string) *url.URL {
return containerURL.ResolveReference(&relativePath)
}

func (a *BlobWalker) Walk(ctx context.Context, storageURI *url.URL, op block.WalkOptions, walkFn func(e block.ObjectStoreEntry) error) error {
// we use bucket as container and prefix as path
containerURL, prefix, err := extractAzurePrefix(storageURI)
if err != nil {
return err
}
var basePath string
if idx := strings.LastIndex(prefix, "/"); idx != -1 {
basePath = prefix[:idx+1]
}

qk, err := ResolveBlobURLInfoFromURL(containerURL)
if err != nil {
return err
}

containerClient := a.client.NewContainerClient(qk.ContainerName)
listBlob := containerClient.NewListBlobsFlatPager(&azblob.ListBlobsFlatOptions{
Prefix: &prefix,
Marker: &op.ContinuationToken,
Include: container.ListBlobsInclude{
Metadata: true,
},
})

for listBlob.More() {
resp, err := listBlob.NextPage(ctx)
if err != nil {
return err
}
if resp.Marker != nil {
a.mark.ContinuationToken = *resp.Marker
}
for _, blobInfo := range resp.Segment.BlobItems {
// skipping everything in the page which is before 'After' (without forgetting the possible empty string key!)
if op.After != "" && *blobInfo.Name <= op.After {
continue
}

// Skip folders
if isBlobItemFolder(blobInfo) {
continue
}

a.mark.LastKey = *blobInfo.Name
if err := walkFn(block.ObjectStoreEntry{
FullKey: *blobInfo.Name,
RelativeKey: strings.TrimPrefix(*blobInfo.Name, basePath),
Address: getAzureBlobURL(containerURL, *blobInfo.Name).String(),
ETag: extractBlobItemEtag(blobInfo),
Mtime: *blobInfo.Properties.LastModified,
Size: *blobInfo.Properties.ContentLength,
}); err != nil {
return err
}
}
}

a.mark = block.Mark{
HasMore: false,
}

return nil
}

// isBlobItemFolder returns true if the blob item is a folder.
// Make sure that metadata is populated before calling this function.
// Example: for listing using blob API passing options with `Include: container.ListBlobsInclude{ Metadata: true }`
Expand Down Expand Up @@ -147,14 +70,6 @@ func extractBlobItemEtag(blobItem *container.BlobItem) string {
return ""
}

func (a *BlobWalker) Marker() block.Mark {
return a.mark
}

func (a *BlobWalker) GetSkippedEntries() []block.ObjectStoreEntry {
return nil
}

//
// DataLakeWalker
//
Expand Down
6 changes: 3 additions & 3 deletions pkg/catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,19 +805,19 @@ func (c *Catalog) GetBranchReference(ctx context.Context, repositoryID string, b

func (c *Catalog) HardResetBranch(ctx context.Context, repositoryID, branch, refExpr string, opts ...graveler.SetOptionsFunc) error {
branchID := graveler.BranchID(branch)
ref := graveler.Ref(refExpr)
reference := graveler.Ref(refExpr)
if err := validator.Validate([]validator.ValidateArg{
{Name: "repository", Value: repositoryID, Fn: graveler.ValidateRepositoryID},
{Name: "branch", Value: branchID, Fn: graveler.ValidateBranchID},
{Name: "ref", Value: ref, Fn: graveler.ValidateRef},
{Name: "ref", Value: reference, Fn: graveler.ValidateRef},
}); err != nil {
return err
}
repository, err := c.getRepository(ctx, repositoryID)
if err != nil {
return err
}
return c.Store.ResetHard(ctx, repository, branchID, ref, opts...)
return c.Store.ResetHard(ctx, repository, branchID, reference, opts...)
}

func (c *Catalog) ResetBranch(ctx context.Context, repositoryID string, branch string, opts ...graveler.SetOptionsFunc) error {
Expand Down
18 changes: 1 addition & 17 deletions pkg/ingest/store/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"net/url"
"strings"

"cloud.google.com/go/storage"
"github.com/aws/aws-sdk-go-v2/aws"
Expand Down Expand Up @@ -128,22 +127,7 @@ func (f *WalkerFactory) buildAzureWalker(importURL *url.URL, skipOutOfOrder bool
return nil, err
}

isHNS := isHierarchicalNamespaceEnabled(importURL)
if isHNS {
return azure.NewAzureDataLakeWalker(client, skipOutOfOrder)
}
return azure.NewAzureBlobWalker(client)
}

// isHierarchicalNamespaceEnabled - identify if hns enabled on the account,
// based on the import URL.
// Until we enable a way to extract the account information, we assume it based on the domain used in import:
// https://<account>.<blob|adls>.core.windows.net/
// adls - azure data lake storage
func isHierarchicalNamespaceEnabled(u *url.URL) bool {
const importURLParts = 3
n := strings.SplitN(u.Host, ".", importURLParts)
return len(n) == importURLParts && n[1] == "adls"
return azure.NewAzureDataLakeWalker(client, skipOutOfOrder)
}

func (f *WalkerFactory) GetWalker(ctx context.Context, opts WalkerOptions) (*WalkerWrapper, error) {
Expand Down

0 comments on commit c6d0c12

Please sign in to comment.