Skip to content

Commit

Permalink
Removes usage of pipeline Library (#2354)
Browse files Browse the repository at this point in the history
  • Loading branch information
nakulkar-msft authored Sep 11, 2023
1 parent 588c912 commit 7e08e2f
Show file tree
Hide file tree
Showing 71 changed files with 455 additions and 538 deletions.
11 changes: 5 additions & 6 deletions cmd/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ import (

"github.com/Azure/azure-storage-azcopy/v10/jobsAdmin"

"github.com/Azure/azure-pipeline-go/pipeline"

"github.com/spf13/cobra"

"github.com/Azure/azure-storage-azcopy/v10/common"
Expand Down Expand Up @@ -1283,7 +1281,7 @@ func (cca *CookedCopyCmdArgs) processRedirectionDownload(blobResource common.Res
}

// step 1: create client options
options := createClientOptions(pipeline.LogNone, nil, nil)
options := createClientOptions(common.LogNone, nil, nil)

// step 2: parse source url
u, err := blobResource.FullURL()
Expand Down Expand Up @@ -1330,7 +1328,7 @@ func (cca *CookedCopyCmdArgs) processRedirectionUpload(blobResource common.Resou
}

// step 0: initialize pipeline
options := createClientOptions(pipeline.LogNone, nil, nil)
options := createClientOptions(common.LogNone, nil, nil)

// step 1: parse destination url
u, err := blobResource.FullURL()
Expand Down Expand Up @@ -1777,7 +1775,7 @@ Final Job Status: %v%s%s
// log to job log
jobMan, exists := jobsAdmin.JobsAdmin.JobMgr(summary.JobID)
if exists {
jobMan.Log(pipeline.LogInfo, logStats+"\n"+output)
jobMan.Log(common.LogInfo, logStats+"\n"+output)
}
return output
}
Expand Down Expand Up @@ -1973,7 +1971,8 @@ func init() {
cpCmd.PersistentFlags().StringVar(&raw.excludeBlobType, "exclude-blob-type", "", "Optionally specifies the type of blob (BlockBlob/ PageBlob/ AppendBlob) to exclude when copying blobs from the container "+
"or the account. Use of this flag is not applicable for copying data from non azure-service to service. More than one blob should be separated by ';'. ")
// options change how the transfers are performed
cpCmd.PersistentFlags().Float64Var(&raw.blockSizeMB, "block-size-mb", 0, "Use this block size (specified in MiB) when uploading to Azure Storage, and downloading from Azure Storage. The default value is automatically calculated based on file size. Decimal fractions are allowed (For example: 0.25).")
cpCmd.PersistentFlags().Float64Var(&raw.blockSizeMB, "block-size-mb", 0, "Use this block size (specified in MiB) when uploading to Azure Storage, and downloading from Azure Storage. The default value is automatically calculated based on file size. Decimal fractions are allowed (For example: 0.25)."+
" When uploading or downloading, maximum allowed block size is 0.75 * AZCOPY_BUFFER_GB. Please refer https://learn.microsoft.com/en-us/azure/storage/common/storage-use-azcopy-optimize#optimize-memory-use.")
cpCmd.PersistentFlags().StringVar(&raw.blobType, "blob-type", "Detect", "Defines the type of blob at the destination. This is used for uploading blobs and when copying between accounts (default 'Detect'). Valid values include 'Detect', 'BlockBlob', 'PageBlob', and 'AppendBlob'. "+
"When copying between accounts, a value of 'Detect' causes AzCopy to use the type of source blob to determine the type of the destination blob. When uploading a file, 'Detect' determines if the file is a VHD or a VHDX file based on the file extension. If the file is either a VHD or VHDX file, AzCopy treats the file as a page blob.")
cpCmd.PersistentFlags().StringVar(&raw.blockBlobTier, "block-blob-tier", "None", "upload block blob to Azure Storage using this blob tier. (default 'None'). Valid options are Hot, Cold, Cool, Archive")
Expand Down
3 changes: 1 addition & 2 deletions cmd/copyEnumeratorHelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cmd

import (
"fmt"
"github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-storage-azcopy/v10/common"
"github.com/Azure/azure-storage-azcopy/v10/jobsAdmin"
"math/rand"
Expand Down Expand Up @@ -85,7 +84,7 @@ func dispatchFinalPart(e *common.CopyJobPartOrderRequest, cca *CookedCopyCmdArgs
}

if jobsAdmin.JobsAdmin != nil {
jobsAdmin.JobsAdmin.LogToJobLog(FinalPartCreatedMessage, pipeline.LogInfo)
jobsAdmin.JobsAdmin.LogToJobLog(FinalPartCreatedMessage, common.LogInfo)
}

// set the flag on cca, to indicate the enumeration is done
Expand Down
33 changes: 15 additions & 18 deletions cmd/copyEnumeratorInit.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (

"github.com/Azure/azure-storage-azcopy/v10/jobsAdmin"

"github.com/Azure/azure-pipeline-go/pipeline"

"github.com/Azure/azure-storage-azcopy/v10/common"
)

Expand Down Expand Up @@ -74,7 +72,7 @@ func (cca *CookedCopyCmdArgs) initEnumerator(jobPartOrder common.CopyJobPartOrde
jobPartOrder.S2SPreserveBlobTags = cca.S2sPreserveBlobTags

dest := cca.FromTo.To()
traverser, err = InitResourceTraverser(cca.Source, cca.FromTo.From(), &ctx, &srcCredInfo, cca.SymlinkHandling, cca.ListOfFilesChannel, cca.Recursive, getRemoteProperties, cca.IncludeDirectoryStubs, cca.permanentDeleteOption, func(common.EntityType) {}, cca.ListOfVersionIDs, cca.S2sPreserveBlobTags, common.ESyncHashType.None(), cca.preservePermissions, azcopyLogVerbosity.ToPipelineLogLevel(), cca.CpkOptions, nil, cca.StripTopDir, cca.trailingDot, nil, &dest)
traverser, err = InitResourceTraverser(cca.Source, cca.FromTo.From(), &ctx, &srcCredInfo, cca.SymlinkHandling, cca.ListOfFilesChannel, cca.Recursive, getRemoteProperties, cca.IncludeDirectoryStubs, cca.permanentDeleteOption, func(common.EntityType) {}, cca.ListOfVersionIDs, cca.S2sPreserveBlobTags, common.ESyncHashType.None(), cca.preservePermissions, azcopyLogVerbosity, cca.CpkOptions, nil, cca.StripTopDir, cca.trailingDot, &dest)

if err != nil {
return nil, err
Expand Down Expand Up @@ -161,8 +159,8 @@ func (cca *CookedCopyCmdArgs) initEnumerator(jobPartOrder common.CopyJobPartOrde
logDstContainerCreateFailureOnce.Do(func() {
glcm.Info("Failed to create one or more destination container(s). Your transfers may still succeed if the container already exists.")
})
jobsAdmin.JobsAdmin.LogToJobLog(fmt.Sprintf("Failed to create destination container %s. The transfer will continue if the container exists", dstContainerName), pipeline.LogWarning)
jobsAdmin.JobsAdmin.LogToJobLog(fmt.Sprintf("Error %s", err), pipeline.LogDebug)
jobsAdmin.JobsAdmin.LogToJobLog(fmt.Sprintf("Failed to create destination container %s. The transfer will continue if the container exists", dstContainerName), common.LogWarning)
jobsAdmin.JobsAdmin.LogToJobLog(fmt.Sprintf("Error %s", err), common.LogDebug)
seenFailedContainers[dstContainerName] = true
}
} else if cca.FromTo.From().IsRemote() { // if the destination has implicit container names
Expand Down Expand Up @@ -198,8 +196,8 @@ func (cca *CookedCopyCmdArgs) initEnumerator(jobPartOrder common.CopyJobPartOrde
logDstContainerCreateFailureOnce.Do(func() {
glcm.Info("Failed to create one or more destination container(s). Your transfers may still succeed if the container already exists.")
})
jobsAdmin.JobsAdmin.LogToJobLog(fmt.Sprintf("failed to initialize destination container %s; the transfer will continue (but be wary it may fail).", bucketName), pipeline.LogWarning)
jobsAdmin.JobsAdmin.LogToJobLog(fmt.Sprintf("Error %s", err), pipeline.LogDebug)
jobsAdmin.JobsAdmin.LogToJobLog(fmt.Sprintf("failed to initialize destination container %s; the transfer will continue (but be wary it may fail).", bucketName), common.LogWarning)
jobsAdmin.JobsAdmin.LogToJobLog(fmt.Sprintf("Error %s", err), common.LogDebug)
seenFailedContainers[bucketName] = true
}
}
Expand All @@ -219,8 +217,8 @@ func (cca *CookedCopyCmdArgs) initEnumerator(jobPartOrder common.CopyJobPartOrde
logDstContainerCreateFailureOnce.Do(func() {
glcm.Info("Failed to create one or more destination container(s). Your transfers may still succeed if the container already exists.")
})
jobsAdmin.JobsAdmin.LogToJobLog(fmt.Sprintf("failed to initialize destination container %s; the transfer will continue (but be wary it may fail).", resName), pipeline.LogWarning)
jobsAdmin.JobsAdmin.LogToJobLog(fmt.Sprintf("Error %s", err), pipeline.LogDebug)
jobsAdmin.JobsAdmin.LogToJobLog(fmt.Sprintf("failed to initialize destination container %s; the transfer will continue (but be wary it may fail).", resName), common.LogWarning)
jobsAdmin.JobsAdmin.LogToJobLog(fmt.Sprintf("Error %s", err), common.LogDebug)
seenFailedContainers[dstContainerName] = true
}
}
Expand All @@ -237,7 +235,7 @@ func (cca *CookedCopyCmdArgs) initEnumerator(jobPartOrder common.CopyJobPartOrde
glcm.Info(message)
}
if jobsAdmin.JobsAdmin != nil {
jobsAdmin.JobsAdmin.LogToJobLog(message, pipeline.LogInfo)
jobsAdmin.JobsAdmin.LogToJobLog(message, common.LogInfo)
}

processor := func(object StoredObject) error {
Expand Down Expand Up @@ -345,7 +343,7 @@ func (cca *CookedCopyCmdArgs) isDestDirectory(dst common.ResourceString, ctx *co
return false
}

rt, err := InitResourceTraverser(dst, cca.FromTo.To(), ctx, &dstCredInfo, common.ESymlinkHandlingType.Skip(), nil, false, false, false, common.EPermanentDeleteOption.None(), func(common.EntityType) {}, cca.ListOfVersionIDs, false, common.ESyncHashType.None(), cca.preservePermissions, pipeline.LogNone, cca.CpkOptions, nil, cca.StripTopDir, cca.trailingDot, nil, nil)
rt, err := InitResourceTraverser(dst, cca.FromTo.To(), ctx, &dstCredInfo, common.ESymlinkHandlingType.Skip(), nil, false, false, false, common.EPermanentDeleteOption.None(), func(common.EntityType) {}, cca.ListOfVersionIDs, false, common.ESyncHashType.None(), cca.preservePermissions, common.LogNone, cca.CpkOptions, nil, cca.StripTopDir, cca.trailingDot, nil)

if err != nil {
return false
Expand Down Expand Up @@ -415,7 +413,7 @@ func (cca *CookedCopyCmdArgs) InitModularFilters() []ObjectFilter {
// finally, log any search prefix computed from these
if jobsAdmin.JobsAdmin != nil {
if prefixFilter := FilterSet(filters).GetEnumerationPreFilter(cca.Recursive); prefixFilter != "" {
jobsAdmin.JobsAdmin.LogToJobLog("Search prefix, which may be used to optimize scanning, is: "+prefixFilter, pipeline.LogInfo) // "May be used" because we don't know here which enumerators will use it
jobsAdmin.JobsAdmin.LogToJobLog("Search prefix, which may be used to optimize scanning, is: "+prefixFilter, common.LogInfo) // "May be used" because we don't know here which enumerators will use it
}
}

Expand Down Expand Up @@ -452,12 +450,7 @@ func (cca *CookedCopyCmdArgs) createDstContainer(containerName string, dstWithSA
trailingDot = &cca.trailingDot
from = to.Ptr(cca.FromTo.From())
}
options := createClientOptions(logLevel.ToPipelineLogLevel(), trailingDot, from)
// TODO: we can pass cred here as well
dstPipeline, err := InitPipeline(ctx, cca.FromTo.To(), dstCredInfo, logLevel.ToPipelineLogLevel(), cca.trailingDot, cca.FromTo.From())
if err != nil {
return
}
options := createClientOptions(logLevel, trailingDot, from)

// Because the only use-cases for createDstContainer will be on service-level S2S and service-level download
// We only need to create "containers" on local and blob.
Expand Down Expand Up @@ -521,6 +514,10 @@ func (cca *CookedCopyCmdArgs) createDstContainer(containerName string, dstWithSA
return err
}

dstPipeline, err := createBlobFSPipeline(ctx, cca.credentialInfo, azcopyLogVerbosity)
if err != nil {
return err
}
serviceURL := azbfs.NewServiceURL(*dstURL, dstPipeline)
fsURL := serviceURL.NewFileSystemURL(containerName)
_, err = fsURL.GetProperties(ctx)
Expand Down
17 changes: 8 additions & 9 deletions cmd/credentialUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ func logAuthType(ct common.CredentialType, location common.Location, isSource bo
if _, exists := authMessagesAlreadyLogged.Load(message); !exists {
authMessagesAlreadyLogged.Store(message, struct{}{}) // dedup because source is auth'd by both enumerator and STE
if jobsAdmin.JobsAdmin != nil {
jobsAdmin.JobsAdmin.LogToJobLog(message, pipeline.LogInfo)
jobsAdmin.JobsAdmin.LogToJobLog(message, common.LogInfo)
}
glcm.Info(message)
}
Expand Down Expand Up @@ -633,13 +633,12 @@ func getCredentialType(ctx context.Context, raw rawFromToInfo, cpkOptions common
// ==============================================================================================
// pipeline factory methods
// ==============================================================================================
func createClientOptions(logLevel pipeline.LogLevel, trailingDot *common.TrailingDotOption, from *common.Location) azcore.ClientOptions {
func createClientOptions(logLevel common.LogLevel, trailingDot *common.TrailingDotOption, from *common.Location) azcore.ClientOptions {
logOptions := ste.LogOptions{}
logOptions.ShouldLog = func(level common.LogLevel) bool {return level <= logLevel}

if azcopyScanningLogger != nil {
logOptions.LogOptions = pipeline.LogOptions{
Log: azcopyScanningLogger.Log,
ShouldLog: func(level pipeline.LogLevel) bool { return level <= logLevel },
}
logOptions.Log = azcopyScanningLogger.Log
}
return ste.NewClientOptions(policy.RetryOptions{
MaxRetries: ste.UploadMaxTries,
Expand All @@ -653,7 +652,7 @@ func createClientOptions(logLevel pipeline.LogLevel, trailingDot *common.Trailin

const frontEndMaxIdleConnectionsPerHost = http.DefaultMaxIdleConnsPerHost

func createBlobFSPipeline(ctx context.Context, credInfo common.CredentialInfo, logLevel pipeline.LogLevel) (pipeline.Pipeline, error) {
func createBlobFSPipeline(ctx context.Context, credInfo common.CredentialInfo, logLevel common.LogLevel) (pipeline.Pipeline, error) {
credential := common.CreateBlobFSCredential(ctx, credInfo, common.CredentialOpOptions{
// LogInfo: glcm.Info, //Comment out for debugging
LogError: glcm.Info,
Expand All @@ -662,8 +661,8 @@ func createBlobFSPipeline(ctx context.Context, credInfo common.CredentialInfo, l
logOption := pipeline.LogOptions{}
if azcopyScanningLogger != nil {
logOption = pipeline.LogOptions{
Log: azcopyScanningLogger.Log,
ShouldLog: func(level pipeline.LogLevel) bool { return level <= logLevel },
Log: func(l pipeline.LogLevel, msg string) { azcopyScanningLogger.Log(common.LogLevel(l), msg) },
ShouldLog: func(level pipeline.LogLevel) bool { return common.LogLevel(level) <= logLevel },
}
}

Expand Down
4 changes: 1 addition & 3 deletions cmd/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import (
"strconv"
"strings"

"github.com/Azure/azure-pipeline-go/pipeline"

"github.com/spf13/cobra"

"github.com/Azure/azure-storage-azcopy/v10/common"
Expand Down Expand Up @@ -239,7 +237,7 @@ func (cooked cookedListCmdArgs) HandleListContainerCommand() (err error) {
}
}

traverser, err := InitResourceTraverser(source, cooked.location, &ctx, &credentialInfo, common.ESymlinkHandlingType.Skip(), nil, true, true, false, common.EPermanentDeleteOption.None(), func(common.EntityType) {}, nil, false, common.ESyncHashType.None(), common.EPreservePermissionsOption.None(), pipeline.LogNone, common.CpkOptions{}, nil, false, cooked.trailingDot, nil, nil)
traverser, err := InitResourceTraverser(source, cooked.location, &ctx, &credentialInfo, common.ESymlinkHandlingType.Skip(), nil, true, true, false, common.EPermanentDeleteOption.None(), func(common.EntityType) {}, nil, false, common.ESyncHashType.None(), common.EPreservePermissionsOption.None(), common.LogNone, common.CpkOptions{}, nil, false, cooked.trailingDot, nil)

if err != nil {
return fmt.Errorf("failed to initialize traverser: %s", err.Error())
Expand Down
5 changes: 2 additions & 3 deletions cmd/make.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ package cmd
import (
"context"
"fmt"
pipeline2 "github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/fileerror"
Expand Down Expand Up @@ -89,11 +88,11 @@ func (cookedArgs cookedMakeCmdArgs) process() (err error) {
}

// Note : trailing dot is only applicable to file operations anyway, so setting this to false
options := createClientOptions(pipeline2.LogNone, to.Ptr(common.ETrailingDotOption.Disable()), &cookedArgs.resourceLocation)
options := createClientOptions(common.LogNone, to.Ptr(common.ETrailingDotOption.Disable()), &cookedArgs.resourceLocation)

switch cookedArgs.resourceLocation {
case common.ELocation.BlobFS():
p, err := createBlobFSPipeline(ctx, credentialInfo, pipeline2.LogNone)
p, err := createBlobFSPipeline(ctx, credentialInfo, common.LogNone)
if err != nil {
return err
}
Expand Down
22 changes: 9 additions & 13 deletions cmd/removeEnumerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import (

"github.com/Azure/azure-storage-azcopy/v10/jobsAdmin"

"github.com/Azure/azure-pipeline-go/pipeline"

"github.com/Azure/azure-storage-azcopy/v10/azbfs"
"github.com/Azure/azure-storage-azcopy/v10/common"
"github.com/Azure/azure-storage-azcopy/v10/ste"
Expand All @@ -49,7 +47,7 @@ func newRemoveEnumerator(cca *CookedCopyCmdArgs) (enumerator *CopyEnumerator, er
ctx := context.WithValue(context.TODO(), ste.ServiceAPIVersionOverride, ste.DefaultServiceApiVersion)

// Include-path is handled by ListOfFilesChannel.
sourceTraverser, err = InitResourceTraverser(cca.Source, cca.FromTo.From(), &ctx, &cca.credentialInfo, common.ESymlinkHandlingType.Skip(), cca.ListOfFilesChannel, cca.Recursive, true, cca.IncludeDirectoryStubs, cca.permanentDeleteOption, func(common.EntityType) {}, cca.ListOfVersionIDs, false, common.ESyncHashType.None(), common.EPreservePermissionsOption.None(), azcopyLogVerbosity.ToPipelineLogLevel(), cca.CpkOptions, nil, cca.StripTopDir, cca.trailingDot, nil, nil)
sourceTraverser, err = InitResourceTraverser(cca.Source, cca.FromTo.From(), &ctx, &cca.credentialInfo, common.ESymlinkHandlingType.Skip(), cca.ListOfFilesChannel, cca.Recursive, true, cca.IncludeDirectoryStubs, cca.permanentDeleteOption, func(common.EntityType) {}, cca.ListOfVersionIDs, false, common.ESyncHashType.None(), common.EPreservePermissionsOption.None(), azcopyLogVerbosity, cca.CpkOptions, nil, cca.StripTopDir, cca.trailingDot, nil)

// report failure to create traverser
if err != nil {
Expand Down Expand Up @@ -83,7 +81,7 @@ func newRemoveEnumerator(cca *CookedCopyCmdArgs) (enumerator *CopyEnumerator, er
glcm.Info(message)
}
if jobsAdmin.JobsAdmin != nil {
jobsAdmin.JobsAdmin.LogToJobLog(message, pipeline.LogInfo)
jobsAdmin.JobsAdmin.LogToJobLog(message, common.LogInfo)
}

transferScheduler := newRemoveTransferProcessor(cca, NumOfFilesPerDispatchJobPart, fpo)
Expand Down Expand Up @@ -134,12 +132,6 @@ func removeBfsResources(cca *CookedCopyCmdArgs) (err error) {
return errors.New("pattern matches are not supported in this command")
}

// create bfs pipeline
p, err := createBlobFSPipeline(ctx, cca.credentialInfo, azcopyLogVerbosity.ToPipelineLogLevel())
if err != nil {
return err
}

// attempt to parse the source url
sourceURL, err := cca.Source.FullURL()
if err != nil {
Expand All @@ -151,7 +143,7 @@ func removeBfsResources(cca *CookedCopyCmdArgs) (err error) {

if cca.ListOfFilesChannel == nil {
if cca.dryrunMode {
return dryrunRemoveSingleDFSResource(ctx, urlParts, p, cca.Recursive)
return dryrunRemoveSingleDFSResource(ctx, cca.credentialInfo, urlParts, cca.Recursive)
} else {
err := transferProcessor.scheduleCopyTransfer(newStoredObject(
nil,
Expand Down Expand Up @@ -180,7 +172,7 @@ func removeBfsResources(cca *CookedCopyCmdArgs) (err error) {
//remove the child path
urlParts.DirectoryOrFilePath = common.GenerateFullPath(parentPath, childPath)
if cca.dryrunMode {
return dryrunRemoveSingleDFSResource(ctx, urlParts, p, cca.Recursive)
return dryrunRemoveSingleDFSResource(ctx, cca.credentialInfo, urlParts, cca.Recursive)
} else {
err := transferProcessor.scheduleCopyTransfer(newStoredObject(
nil,
Expand All @@ -206,7 +198,7 @@ func removeBfsResources(cca *CookedCopyCmdArgs) (err error) {
return err
}

func dryrunRemoveSingleDFSResource(ctx context.Context, urlParts azbfs.BfsURLParts, p pipeline.Pipeline, recursive bool) error {
func dryrunRemoveSingleDFSResource(ctx context.Context, credInfo common.CredentialInfo, urlParts azbfs.BfsURLParts, recursive bool) error {
//deleting a filesystem
if urlParts.DirectoryOrFilePath == "" {

Expand All @@ -216,6 +208,10 @@ func dryrunRemoveSingleDFSResource(ctx context.Context, urlParts azbfs.BfsURLPar
return nil
}

p, err := createBlobFSPipeline(ctx, credInfo, azcopyLogVerbosity)
if err != nil {
return err
}
// we do not know if the source is a file or a directory
// we assume it is a directory and get its properties
directoryURL := azbfs.NewDirectoryURL(urlParts.URL(), p)
Expand Down
Loading

0 comments on commit 7e08e2f

Please sign in to comment.