Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3] [DO NOT MERGE] Support setting DFS group/owner/perms #2201

Draft
wants to merge 16 commits into
base: adreed/dfs-posix-properties
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 44 additions & 77 deletions cmd/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,29 +271,6 @@ func (raw rawCopyCmdArgs) cook() (CookedCopyCmdArgs, error) {
azcopyScanningLogger.CloseLog()
})

/* We support DFS by using blob end-point of the account. We replace dfs by blob in src and dst */
if src, dst := InferArgumentLocation(raw.src), InferArgumentLocation(raw.dst); src == common.ELocation.BlobFS() || dst == common.ELocation.BlobFS() {
srcDfs := src == common.ELocation.BlobFS() && dst != common.ELocation.Local()
if srcDfs {
raw.src = strings.Replace(raw.src, ".dfs", ".blob", 1)
glcm.Info("Switching to use blob endpoint on source account.")

}

dstDfs := dst == common.ELocation.BlobFS() && src != common.ELocation.Local()
if dstDfs {
raw.dst = strings.Replace(raw.dst, ".dfs", ".blob", 1)
msg := fmt.Sprintf("Switching to use blob endpoint on destination account. There are some limitations when switching endpoints. " +
"Please refer to https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-known-issues#blob-storage-apis")
glcm.Info(msg)
if azcopyScanningLogger != nil {
azcopyScanningLogger.Log(pipeline.LogInfo, msg)
}
}

cooked.isHNStoHNS = srcDfs && dstDfs
}

fromTo, err := ValidateFromTo(raw.src, raw.dst, raw.fromTo) // TODO: src/dst
if err != nil {
return cooked, err
Expand Down Expand Up @@ -652,7 +629,6 @@ func (raw rawCopyCmdArgs) cook() (CookedCopyCmdArgs, error) {
if err != nil {
return cooked, err
}
globalBlobFSMd5ValidationOption = cooked.md5ValidationOption // workaround, to avoid having to pass this all the way through the chain of methods in enumeration, just for one weird and (presumably) temporary workaround

cooked.CheckLength = raw.CheckLength
// length of devnull will be 0, thus this will always fail unless downloading an empty file
Expand Down Expand Up @@ -688,14 +664,11 @@ func (raw rawCopyCmdArgs) cook() (CookedCopyCmdArgs, error) {
return cooked, err
}
cooked.preservePermissions = common.NewPreservePermissionsOption(isUserPersistingPermissions, raw.preserveOwner, cooked.FromTo)
if cooked.FromTo == common.EFromTo.BlobBlob() && cooked.preservePermissions.IsTruthy() {
cooked.isHNStoHNS = true // override HNS settings, since if a user is tx'ing blob->blob and copying permissions, it's DEFINITELY going to be HNS (since perms don't exist w/o HNS).
}

// --as-subdir is OK on all sources and destinations, but additional verification has to be done down the line. (e.g. https://account.blob.core.windows.net is not a valid root)
cooked.asSubdir = raw.asSubdir

cooked.IncludeDirectoryStubs = raw.includeDirectoryStubs || (cooked.isHNStoHNS && cooked.preservePermissions.IsTruthy())
cooked.IncludeDirectoryStubs = raw.includeDirectoryStubs

if err = crossValidateSymlinksAndPermissions(cooked.SymlinkHandling, cooked.preservePermissions.IsTruthy()); err != nil {
return cooked, err
Expand Down Expand Up @@ -954,18 +927,24 @@ func areBothLocationsSMBAware(fromTo common.FromTo) bool {
}

func areBothLocationsPOSIXAware(fromTo common.FromTo) bool {
// POSIX properties are stored in blob metadata-- They don't need a special persistence strategy for BlobBlob.
return runtime.GOOS == "linux" && (
fromTo == common.EFromTo.BlobLocal() ||
fromTo == common.EFromTo.LocalBlob()) ||
fromTo == common.EFromTo.BlobBlob()
// POSIX properties are stored in blob metadata-- They don't need a special persistence strategy for S2S methods.
switch fromTo {
case common.EFromTo.BlobLocal(), common.EFromTo.LocalBlob(), common.EFromTo.BlobFSLocal(), common.EFromTo.LocalBlobFS():
return runtime.GOOS == "linux"
case common.EFromTo.BlobBlob(), common.EFromTo.BlobFSBlobFS(), common.EFromTo.BlobFSBlob(), common.EFromTo.BlobBlobFS():
return true
default:
return false
}
}

func validatePreserveSMBPropertyOption(toPreserve bool, fromTo common.FromTo, overwrite *common.OverwriteOption, flagName string) error {
if toPreserve && !(fromTo == common.EFromTo.LocalFile() ||
if toPreserve && flagName == PreservePermissionsFlag && (fromTo == common.EFromTo.BlobBlob() || fromTo == common.EFromTo.BlobFSBlob() || fromTo == common.EFromTo.BlobBlobFS() || fromTo == common.EFromTo.BlobFSBlobFS()) {
// the user probably knows what they're doing if they're trying to persist permissions between blob-type endpoints.
return nil
} else if toPreserve && !(fromTo == common.EFromTo.LocalFile() ||
fromTo == common.EFromTo.FileLocal() ||
fromTo == common.EFromTo.FileFile() ||
fromTo == common.EFromTo.BlobBlob()) {
fromTo == common.EFromTo.FileFile()) {
return fmt.Errorf("%s is set but the job is not between %s-aware resources", flagName, common.IffString(flagName == PreservePermissionsFlag, "permission", "SMB"))
}

Expand All @@ -990,9 +969,9 @@ func validatePreserveOwner(preserve bool, fromTo common.FromTo) error {
func validateSymlinkHandlingMode(symlinkHandling common.SymlinkHandlingType, fromTo common.FromTo) error {
if symlinkHandling.Preserve() {
switch fromTo {
case common.EFromTo.LocalBlob(), common.EFromTo.BlobLocal():
case common.EFromTo.LocalBlob(), common.EFromTo.BlobLocal(), common.EFromTo.BlobFSLocal(), common.EFromTo.LocalBlobFS():
return nil // Fine on all OSes that support symlink via the OS package. (Win, MacOS, and Linux do, and that's what we officially support.)
case common.EFromTo.BlobBlob():
case common.EFromTo.BlobBlob(), common.EFromTo.BlobFSBlobFS(), common.EFromTo.BlobBlobFS(), common.EFromTo.BlobFSBlob():
return nil // Blob->Blob doesn't involve any local requirements
default:
return fmt.Errorf("flag --%s can only be used on Blob<->Blob or Local<->Blob", common.PreserveSymlinkFlagName)
Expand Down Expand Up @@ -1109,7 +1088,6 @@ type CookedCopyCmdArgs struct {
// from arguments
Source common.ResourceString
Destination common.ResourceString
isHNStoHNS bool // workaround to indicate that BlobBlob is actually HNS->HNS, since we shift to Blob instead of HNS.
FromTo common.FromTo

// new include/exclude only apply to file names
Expand Down Expand Up @@ -1404,15 +1382,16 @@ func (cca *CookedCopyCmdArgs) processRedirectionUpload(blobResource common.Resou

// get source credential - if there is a token it will be used to get passed along our pipeline
func (cca *CookedCopyCmdArgs) getSrcCredential(ctx context.Context, jpo *common.CopyJobPartOrderRequest) (common.CredentialInfo, error) {

srcCredInfo, isPublic, err := GetCredentialInfoForLocation(ctx, cca.FromTo.From(), cca.Source.Value, cca.Source.SAS, true, cca.CpkOptions)
if err != nil {
return srcCredInfo, err
// If S2S and source takes OAuthToken as its cred type (OR) source takes anonymous as its cred type, but it's not public and there's no SAS
} else if cca.FromTo.IsS2S() &&
((srcCredInfo.CredentialType == common.ECredentialType.OAuthToken() && cca.FromTo.To() != common.ELocation.Blob()) || // Blob can forward OAuth tokens
((srcCredInfo.CredentialType == common.ECredentialType.OAuthToken() && !cca.FromTo.To().CanForwardOAuthTokens()) || // Blob can forward OAuth tokens; BlobFS inherits this.
(srcCredInfo.CredentialType == common.ECredentialType.Anonymous() && !isPublic && cca.Source.SAS == "")) {
return srcCredInfo, errors.New("a SAS token (or S3 access key) is required as a part of the source in S2S transfers, unless the source is a public resource, or the destination is blob storage")
return srcCredInfo, errors.New("a SAS token (or S3 access key) is required as a part of the source in S2S transfers, unless the source is a public resource. Blob and BlobFS additionally support OAuth on both source and destination")
} else if cca.FromTo.IsS2S() && (srcCredInfo.CredentialType == common.ECredentialType.SharedKey() || jpo.CredentialInfo.CredentialType == common.ECredentialType.SharedKey()) {
return srcCredInfo, errors.New("shared key auth is not supported for S2S operations")
}

if cca.Source.SAS != "" && cca.FromTo.IsS2S() && jpo.CredentialInfo.CredentialType == common.ECredentialType.OAuthToken() {
Expand Down Expand Up @@ -1544,51 +1523,39 @@ func (cca *CookedCopyCmdArgs) processCopyJobPartOrders() (err error) {
cca.StripTopDir = true
}

// depending on the source and destination type, we process the cp command differently
// Create enumerator and do enumerating
switch cca.FromTo {
case common.EFromTo.LocalBlob(),
common.EFromTo.LocalBlobFS(),
common.EFromTo.LocalFile(),
common.EFromTo.BlobLocal(),
common.EFromTo.FileLocal(),
common.EFromTo.BlobFSLocal(),
common.EFromTo.BlobBlob(),
common.EFromTo.FileBlob(),
common.EFromTo.FileFile(),
common.EFromTo.BlobFile(),
common.EFromTo.S3Blob(),
common.EFromTo.GCPBlob(),
common.EFromTo.BenchmarkBlob(),
common.EFromTo.BenchmarkBlobFS(),
common.EFromTo.BenchmarkFile():

switch {
case cca.FromTo.IsUpload(), cca.FromTo.IsDownload(), cca.FromTo.IsS2S():
// Execute a standard copy command
var e *CopyEnumerator
srcCredInfo, _ := cca.getSrcCredential(ctx, &jobPartOrder)
var srcCredInfo common.CredentialInfo
srcCredInfo, err = cca.getSrcCredential(ctx, &jobPartOrder)
if err != nil {
return fmt.Errorf("failed to discern source credential type: %w", err)
}

e, err = cca.initEnumerator(jobPartOrder, srcCredInfo, ctx)
if err != nil {
return fmt.Errorf("failed to initialize enumerator: %w", err)
}
err = e.enumerate()
case common.EFromTo.BlobTrash(), common.EFromTo.FileTrash():
e, createErr := newRemoveEnumerator(cca)
if createErr != nil {
return fmt.Errorf("failed to initialize enumerator: %w", createErr)
}

err = e.enumerate()

case common.EFromTo.BlobFSTrash():
// TODO merge with BlobTrash case
err = removeBfsResources(cca)
case cca.FromTo.IsDelete():
// Delete gets ran through copy, so handle delete
if cca.FromTo.From() == common.ELocation.BlobFS() {
// TODO merge with BlobTrash case
// Currently, Blob Delete in STE does not appropriately handle folders. In addition, dfs delete is free-ish.
err = removeBfsResources(cca)
} else {
e, createErr := newRemoveEnumerator(cca)
if createErr != nil {
return fmt.Errorf("failed to initialize enumerator: %w", createErr)
}

// TODO: Hide the File to Blob direction temporarily, as service support on-going.
// case common.EFromTo.FileBlob():
// e := copyFileToNEnumerator(jobPartOrder)
// err = e.enumerate(cca)
err = e.enumerate()
}

case common.EFromTo.BlobNone(), common.EFromTo.BlobFSNone(), common.EFromTo.FileNone():
case cca.FromTo.IsSetProperties():
// Set properties as well
e, createErr := setPropertiesEnumerator(cca)
if createErr != nil {
return fmt.Errorf("failed to initialize enumerator: %w", createErr)
Expand Down
39 changes: 33 additions & 6 deletions cmd/copyEnumeratorInit.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/Azure/azure-storage-azcopy/v10/azbfs"
"log"
"net/url"
"os"
Expand Down Expand Up @@ -230,7 +231,7 @@ func (cca *CookedCopyCmdArgs) initEnumerator(jobPartOrder common.CopyJobPartOrde

// decide our folder transfer strategy
var message string
jobPartOrder.Fpo, message = NewFolderPropertyOption(cca.FromTo, cca.Recursive, cca.StripTopDir, filters, cca.preserveSMBInfo, cca.preservePermissions.IsTruthy(), cca.preservePOSIXProperties, cca.isHNStoHNS, strings.EqualFold(cca.Destination.Value, common.Dev_Null), cca.IncludeDirectoryStubs)
jobPartOrder.Fpo, message = NewFolderPropertyOption(cca.FromTo, cca.Recursive, cca.StripTopDir, filters, cca.preserveSMBInfo, cca.preservePermissions.IsTruthy(), cca.preservePOSIXProperties, strings.EqualFold(cca.Destination.Value, common.Dev_Null), cca.IncludeDirectoryStubs)
if !cca.dryrunMode {
glcm.Info(message)
}
Expand Down Expand Up @@ -505,7 +506,6 @@ func (cca *CookedCopyCmdArgs) createDstContainer(containerName string, dstWithSA
fsu := azfile.NewServiceURL(*dstURL, dstPipeline)
shareURL := fsu.NewShareURL(containerName)
_, err = shareURL.GetProperties(ctx)

if err == nil {
return err
}
Expand All @@ -521,6 +521,33 @@ func (cca *CookedCopyCmdArgs) createDstContainer(containerName string, dstWithSA
} else {
return err
}
case common.ELocation.BlobFS():
// TODO: Implement blobfs container creation
accountRoot, err := GetAccountRoot(dstWithSAS, cca.FromTo.To())
if err != nil {
return err
}

dstURL, err := url.Parse(accountRoot)
if err != nil {
return err
}

serviceURL := azbfs.NewServiceURL(*dstURL, dstPipeline)
fsURL := serviceURL.NewFileSystemURL(containerName)
_, err = fsURL.GetProperties(ctx)
if err == nil {
return err
}

_, err = fsURL.Create(ctx)
if stgErr, ok := err.(azbfs.StorageError); ok {
if stgErr.ServiceCode() != azbfs.ServiceCodeFileSystemAlreadyExists {
return err
}
} else {
return err
}
default:
panic(fmt.Sprintf("cannot create a destination container at location %s.", cca.FromTo.To()))
}
Expand Down Expand Up @@ -664,25 +691,25 @@ func (cca *CookedCopyCmdArgs) MakeEscapedRelativePath(source bool, dstIsDir bool
}

// we assume that preserveSmbPermissions and preserveSmbInfo have already been validated, such that they are only true if both resource types support them
func NewFolderPropertyOption(fromTo common.FromTo, recursive, stripTopDir bool, filters []ObjectFilter, preserveSmbInfo, preserveSmbPermissions, preservePosixProperties, isDfsDfs, isDstNull, includeDirectoryStubs bool) (common.FolderPropertyOption, string) {
func NewFolderPropertyOption(fromTo common.FromTo, recursive, stripTopDir bool, filters []ObjectFilter, preserveSmbInfo, preservePermissions, preservePosixProperties, isDstNull, includeDirectoryStubs bool) (common.FolderPropertyOption, string) {

getSuffix := func(willProcess bool) string {
willProcessString := common.IffString(willProcess, "will be processed", "will not be processed")

template := ". For the same reason, %s defined on folders %s"
switch {
case preserveSmbPermissions && preserveSmbInfo:
case preservePermissions && preserveSmbInfo:
return fmt.Sprintf(template, "properties and permissions", willProcessString)
case preserveSmbInfo:
return fmt.Sprintf(template, "properties", willProcessString)
case preserveSmbPermissions:
case preservePermissions:
return fmt.Sprintf(template, "permissions", willProcessString)
default:
return "" // no preserve flags set, so we have nothing to say about them
}
}

bothFolderAware := (fromTo.AreBothFolderAware() || isDfsDfs || preservePosixProperties || includeDirectoryStubs) && !isDstNull
bothFolderAware := (fromTo.AreBothFolderAware() || preservePosixProperties || preservePermissions || includeDirectoryStubs) && !isDstNull
isRemoveFromFolderAware := fromTo == common.EFromTo.FileTrash()
if bothFolderAware || isRemoveFromFolderAware {
if !recursive {
Expand Down
19 changes: 0 additions & 19 deletions cmd/copyUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"strings"

"github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-storage-azcopy/v10/azbfs"
"github.com/Azure/azure-storage-azcopy/v10/common"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/Azure/azure-storage-file-go/azfile"
Expand Down Expand Up @@ -119,24 +118,6 @@ func (util copyHandlerUtil) ConstructCommandStringFromArgs() string {
return s.String()
}

func (util copyHandlerUtil) urlIsBFSFileSystemOrDirectory(ctx context.Context, url *url.URL, p pipeline.Pipeline) bool {
if util.urlIsContainerOrVirtualDirectory(url) {

return true
}
// Need to get the resource properties and verify if it is a file or directory
dirURL := azbfs.NewDirectoryURL(*url, p)
isDir, err := dirURL.IsDirectory(ctx)

if err != nil {
if jobsAdmin.JobsAdmin != nil {
jobsAdmin.JobsAdmin.LogToJobLog(fmt.Sprintf("Failed to check if destination is a folder or a file (ADLSg2). Assuming the destination is a file: %s", err), pipeline.LogWarning)
}
}

return isDir
}

func (util copyHandlerUtil) urlIsAzureFileDirectory(ctx context.Context, url *url.URL, p pipeline.Pipeline) bool {
// Azure file share case
if util.urlIsContainerOrVirtualDirectory(url) {
Expand Down
25 changes: 15 additions & 10 deletions cmd/credentialUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,15 +646,7 @@ func getCredentialType(ctx context.Context, raw rawFromToInfo, cpkOptions common
// ==============================================================================================
// pipeline factory methods
// ==============================================================================================
func createBlobPipeline(ctx context.Context, credInfo common.CredentialInfo, logLevel pipeline.LogLevel) (pipeline.Pipeline, error) {
// are we getting dest token?
credential := credInfo.SourceBlobToken
if credential == nil {
credential = common.CreateBlobCredential(ctx, credInfo, common.CredentialOpOptions{
// LogInfo: glcm.Info, //Comment out for debugging
LogError: glcm.Info,
})
}
func createBlobPipelineFromCred(credential azblob.Credential, logLevel pipeline.LogLevel) pipeline.Pipeline {
logOption := pipeline.LogOptions{}
if azcopyScanningLogger != nil {
logOption = pipeline.LogOptions{
Expand All @@ -681,7 +673,20 @@ func createBlobPipeline(ctx context.Context, credInfo common.CredentialInfo, log
nil,
ste.NewAzcopyHTTPClient(frontEndMaxIdleConnectionsPerHost),
nil, // we don't gather network stats on the credential pipeline
), nil
)
}

func createBlobPipeline(ctx context.Context, credInfo common.CredentialInfo, logLevel pipeline.LogLevel) (pipeline.Pipeline, error) {
// are we getting dest token?
credential := credInfo.SourceBlobToken
if credential == nil {
credential = common.CreateBlobCredential(ctx, credInfo, common.CredentialOpOptions{
// LogInfo: glcm.Info, //Comment out for debugging
LogError: glcm.Info,
})
}

return createBlobPipelineFromCred(credential, logLevel), nil
}

const frontEndMaxIdleConnectionsPerHost = http.DefaultMaxIdleConnsPerHost
Expand Down
2 changes: 1 addition & 1 deletion cmd/removeEnumerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func newRemoveEnumerator(cca *CookedCopyCmdArgs) (enumerator *CopyEnumerator, er
// (Must enumerate folders when deleting from a folder-aware location. Can't do folder deletion just based on file
// deletion, because that would not handle folders that were empty at the start of the job).
// isHNStoHNS is IGNORED here, because BlobFS locations don't take this route currently.
fpo, message := NewFolderPropertyOption(cca.FromTo, cca.Recursive, cca.StripTopDir, filters, false, false, false, false, false, cca.IncludeDirectoryStubs)
fpo, message := NewFolderPropertyOption(cca.FromTo, cca.Recursive, cca.StripTopDir, filters, false, false, false, false, cca.IncludeDirectoryStubs)
// do not print Info message if in dry run mode
if !cca.dryrunMode {
glcm.Info(message)
Expand Down
2 changes: 1 addition & 1 deletion cmd/setPropertiesEnumerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func setPropertiesEnumerator(cca *CookedCopyCmdArgs) (enumerator *CopyEnumerator
filters = append(filters, excludePathFilters...)
filters = append(filters, includeSoftDelete...)

fpo, message := NewFolderPropertyOption(cca.FromTo, cca.Recursive, cca.StripTopDir, filters, false, false, false, cca.isHNStoHNS, strings.EqualFold(cca.Destination.Value, common.Dev_Null), cca.IncludeDirectoryStubs)
fpo, message := NewFolderPropertyOption(cca.FromTo, cca.Recursive, cca.StripTopDir, filters, false, false, false, strings.EqualFold(cca.Destination.Value, common.Dev_Null), cca.IncludeDirectoryStubs)
// do not print Info message if in dry run mode
if !cca.dryrunMode {
glcm.Info(message)
Expand Down
Loading