Skip to content

Commit

Permalink
support wild card, align with v3io 2.3+
Browse files Browse the repository at this point in the history
  • Loading branch information
yaronha committed Jul 7, 2019
1 parent 71b677c commit 90a996a
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 45 deletions.
18 changes: 18 additions & 0 deletions backends/parse_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package backends

import (
"fmt"
"testing"
)

func TestFileParser(t *testing.T) {
tests := []string{
"aa/bb", "aa/", "a*a/b", "aa/b*", "aa",
}

for _, path := range tests {
p := PathParams{}
err := ParseFilename(path, &p, true)
fmt.Println(p, err)
}
}
49 changes: 41 additions & 8 deletions backends/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ type ListDirTask struct {
Since time.Time
MinSize int64
MaxSize int64
Filter string
Recursive bool
InclEmpty bool
Hidden bool
WithMeta bool

dir string
filter string
}

type FileDetails struct {
Expand All @@ -50,17 +52,15 @@ type PathParams struct {
UserKey string `json:"userKey,omitempty"`
Secret string `json:"secret,omitempty"`
Token string `json:"token,omitempty"`

filter string
isFile bool
}

func (p *PathParams) String() string {
return fmt.Sprintf("%s://%s/%s/%s", p.Kind, p.Endpoint, p.Bucket, p.Path)
}

type WriteOptions struct {
Mtime time.Time
Mode uint32
}

type FileMeta struct {
Mtime time.Time
Mode uint32
Expand Down Expand Up @@ -145,12 +145,45 @@ func IsMatch(task *ListDirTask, name string, mtime time.Time, size int64) bool {
return false
}

if task.Filter != "" {
match, err := filepath.Match(task.Filter, name)
if task.Source.filter != "" {
match, err := filepath.Match(task.Source.filter, name)
if err != nil || !match {
return false
}
}

return true
}

// return is file, err
func ParseFilename(fullpath string, params *PathParams, forceDir bool) error {
fullpath, filter := filepath.Split(fullpath)
if hasMagics(fullpath) {
return fmt.Errorf("No support for wildcard directoty names")
}
params.Path = fullpath
if filter == "" || hasMagics(filter) {
params.filter = filter
return nil
}

params.Path = fullpath + filter
if forceDir && !endWithSlash(params.Path) {
params.Path += "/"
}
params.isFile = true
return nil
}

func hasMagics(text string) bool {
for _, c := range text {
if c == '*' || c == '?' || c == '[' {
return true
}
}
return false
}

func endWithSlash(path string) bool {
return strings.HasSuffix(path, "/") || strings.HasSuffix(path, "\\")
}
2 changes: 1 addition & 1 deletion backends/v3io.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (c *V3ioClient) getDir(path string, fileChan chan *FileDetails, summary *Li
continue
}

t, err := time.Parse(time.RFC3339, obj.LastModified+"Z")
t, err := time.Parse(time.RFC3339, obj.LastModified)
if err != nil {
return errors.Wrap(err, "Invalid object time string - not an RFC 3339 time format.")
}
Expand Down
22 changes: 13 additions & 9 deletions common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,29 @@ func NewLogger(level string) (logger.Logger, error) {
return log, nil
}

func UrlParse(fullpath string) (*backends.PathParams, error) {
func UrlParse(fullpath string, forceDir bool) (*backends.PathParams, error) {
if !strings.Contains(fullpath, "://") {
return &backends.PathParams{Path: fullpath}, nil
params := &backends.PathParams{}
err := backends.ParseFilename(fullpath, params, forceDir)
return params, err
}

u, err := url.Parse(fullpath)
if err != nil {
return nil, err
}

if strings.HasPrefix(u.Path, "/") {
u.Path = u.Path[1:]
}
pathParams := backends.PathParams{
Kind: strings.ToLower(u.Scheme),
Tag: u.Fragment,
}
if strings.HasPrefix(u.Path, "/") {
u.Path = u.Path[1:]
}
err = backends.ParseFilename(u.Path, &pathParams, forceDir)
if err != nil {
return nil, err
}

password, hasPassword := u.User.Password()
if hasPassword && u.User.Username() == "" {
Expand All @@ -63,20 +69,18 @@ func UrlParse(fullpath string) (*backends.PathParams, error) {
case "s3":
// TODO: region url
pathParams.Bucket = u.Host
pathParams.Path = u.Path
case "v3io", "v3ios":
pathParams.Secure = (pathParams.Kind == "v3ios")
pathParams.Kind = "v3io"
pathParams.Endpoint = u.Host
pathParams.Bucket, pathParams.Path = backends.SplitPath(u.Path)
pathParams.Bucket, pathParams.Path = backends.SplitPath(pathParams.Path)
case "http", "https":
pathParams.Secure = (pathParams.Kind == "https")
pathParams.Kind = "s3"
pathParams.Endpoint = u.Host
pathParams.Bucket, pathParams.Path = backends.SplitPath(u.Path)
pathParams.Bucket, pathParams.Path = backends.SplitPath(pathParams.Path)
default:
pathParams.Endpoint = u.Host
pathParams.Path = u.Path
}

return &pathParams, nil
Expand Down
8 changes: 0 additions & 8 deletions operators/copydir.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,11 @@ import (
"sync/atomic"
)

func endWithSlash(path string) bool {
return strings.HasSuffix(path, "/") || strings.HasSuffix(path, "\\")
}

func CopyDir(task *backends.ListDirTask, target *backends.PathParams, logger logger.Logger, workers int) error {
fileChan := make(chan *backends.FileDetails, 1000)
summary := &backends.ListSummary{}
withMeta := task.WithMeta

if task.Source.Path != "" && !endWithSlash(task.Source.Path) {
task.Source.Path += "/"
}

logger.InfoWith("copy task", "from", task.Source, "to", target)
client, err := backends.GetNewClient(logger, task.Source)
if err != nil {
Expand Down
6 changes: 1 addition & 5 deletions operators/listdir.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@ func ListDir(task *backends.ListDirTask, logger logger.Logger) (*listResults, er
errChan: make(chan error, 60),
}

if task.Source.Path != "" && !endWithSlash(task.Source.Path) {
task.Source.Path += "/"
}

logger.InfoWith("list task", "from", task.Source, "filter", task.Filter)
logger.InfoWith("list task", "from", task.Source)
client, err := backends.GetNewClient(logger, task.Source)
if err != nil {
return nil, fmt.Errorf("failed to get list source, %v", err)
Expand Down
22 changes: 12 additions & 10 deletions tests/backends_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type testLocalBackend struct {

func (suite *testLocalBackend) SetupSuite() {
AWS_TEST_BUCKET = os.Getenv("AWS_TEST_BUCKET")
src, err := common.UrlParse(tempdir)
src, err := common.UrlParse(tempdir, true)
suite.Require().Nil(err)

client, err := backends.NewLocalClient(log, src)
Expand All @@ -51,7 +51,7 @@ func (suite *testLocalBackend) SetupSuite() {
}

func (suite *testLocalBackend) TestRead() {
src, err := common.UrlParse(tempdir)
src, err := common.UrlParse(tempdir, true)
suite.Require().Nil(err)

client, err := backends.NewLocalClient(log, src)
Expand All @@ -67,8 +67,9 @@ func (suite *testLocalBackend) TestRead() {

func (suite *testLocalBackend) TestList() {

src, err := common.UrlParse(tempdir)
listTask := backends.ListDirTask{Source: src, Filter: "*.*"}
src, err := common.UrlParse(tempdir, true)
listTask := backends.ListDirTask{Source: src}
fmt.Println(src)
iter, err := operators.ListDir(&listTask, log)
suite.Require().Nil(err)

Expand All @@ -81,7 +82,8 @@ func (suite *testLocalBackend) TestList() {
summary.TotalFiles, summary.TotalBytes)
suite.Require().Equal(summary.TotalFiles, 2)

listTask = backends.ListDirTask{Source: src, Filter: "*.csv"}
src, _ = common.UrlParse(tempdir+"/*.csv", true)
listTask = backends.ListDirTask{Source: src}
iter, err = operators.ListDir(&listTask, log)
suite.Require().Nil(err)
_, err = iter.ReadAll()
Expand All @@ -94,21 +96,21 @@ func (suite *testLocalBackend) TestList() {
}

func (suite *testLocalBackend) TestCopyToS3() {
src, err := common.UrlParse(tempdir)
src, err := common.UrlParse(tempdir, true)
suite.Require().Nil(err)

listTask := backends.ListDirTask{Source: src, Filter: "*.*", WithMeta: true}
dst, err := common.UrlParse("s3://" + AWS_TEST_BUCKET + "/xcptests")
listTask := backends.ListDirTask{Source: src, WithMeta: true}
dst, err := common.UrlParse("s3://"+AWS_TEST_BUCKET+"/xcptests/*.*", true)
suite.Require().Nil(err)

err = operators.CopyDir(&listTask, dst, log, 1)
suite.Require().Nil(err)

// read list dir content from S3
listTask = backends.ListDirTask{Source: dst, Filter: "*.*"}
listTask = backends.ListDirTask{Source: dst}
dstdir, err := ioutil.TempDir("", "xcptest-dst")
suite.Require().Nil(err)
newdst, err := common.UrlParse(dstdir)
newdst, err := common.UrlParse(dstdir, true)
suite.Require().Nil(err)

err = operators.CopyDir(&listTask, newdst, log, 1)
Expand Down
6 changes: 2 additions & 4 deletions xcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ func main() {
maxSize := flag.Int("m", 0, "maximum file size")
minSize := flag.Int("n", 0, "minimum file size")
workers := flag.Int("w", 8, "num of worker routines")
filter := flag.String("f", "", "filter string e.g. *.png")
logLevel := flag.String("v", "debug", "log level: info | debug")
mtime := flag.String("t", "", "minimal file time e.g. 'now-7d' or RFC3339 date")
flag.Parse()
Expand All @@ -30,11 +29,11 @@ func main() {
os.Exit(1)
}

src, err := common.UrlParse(args[0])
src, err := common.UrlParse(args[0], true)
if err != nil {
panic(err)
}
dst, err := common.UrlParse(args[1])
dst, err := common.UrlParse(args[1], true)
if err != nil {
panic(err)
}
Expand All @@ -46,7 +45,6 @@ func main() {
listTask := backends.ListDirTask{
Source: src,
Since: since,
Filter: *filter,
Recursive: *recursive,
MaxSize: int64(*maxSize),
MinSize: int64(*minSize),
Expand Down

0 comments on commit 90a996a

Please sign in to comment.