From 71b677c53b40b2efbeedb437b664cb5e0b7b5743 Mon Sep 17 00:00:00 2001 From: Yaron Haviv Date: Mon, 13 May 2019 19:00:29 +0300 Subject: [PATCH] handle metadata updates (mtime, mode) --- backends/local.go | 42 +++++++++++-- backends/s3.go | 65 +++++++++++++------- backends/types.go | 33 ++++++++--- backends/v3io.go | 20 +++++-- operators/copydir.go | 25 +++++--- operators/listdir.go | 85 ++++++++++++++++++++++++++ tests/backends_test.go | 132 +++++++++++++++++++++++++++++++++++++++++ tests/localfs_test.go | 101 ------------------------------- 8 files changed, 353 insertions(+), 150 deletions(-) create mode 100644 operators/listdir.go create mode 100644 tests/backends_test.go delete mode 100644 tests/localfs_test.go diff --git a/backends/local.go b/backends/local.go index 7e4f057..f12fab4 100644 --- a/backends/local.go +++ b/backends/local.go @@ -27,6 +27,10 @@ func (c *LocalClient) ListDir(fileChan chan *FileDetails, task *ListDirTask, sum defer close(fileChan) visit := func(localPath string, fi os.FileInfo, err error) error { + if err != nil { + c.logger.Error("List walk error with path %s, %v", localPath, err) + return err + } localPath = filepath.ToSlash(localPath) if fi.IsDir() { @@ -46,10 +50,10 @@ func (c *LocalClient) ListDir(fileChan chan *FileDetails, task *ListDirTask, sum } fileDetails := &FileDetails{ - Key: localPath, Size: fi.Size(), Mtime: fi.ModTime(), Mode: uint32(fi.Mode()), OriginalMtime: fi.ModTime(), + Key: localPath, Size: fi.Size(), Mtime: fi.ModTime(), Mode: uint32(fi.Mode()), } c.logger.DebugWith("List file", "key", localPath, - "modified", fi.ModTime(), "size", fi.Size(), "mode", fi.Mode()) + "modified", fi.ModTime(), "size", fi.Size(), "mode", uint32(fi.Mode())) summary.TotalBytes += fi.Size() summary.TotalFiles += 1 @@ -61,11 +65,36 @@ func (c *LocalClient) ListDir(fileChan chan *FileDetails, task *ListDirTask, sum return filepath.Walk(c.params.Path, visit) } -func (c *LocalClient) Reader(path string) (io.ReadCloser, error) { - return os.Open(path) +func (c *LocalClient) Reader(path string) (FSReader, error) { + f, err := os.Open(path) + if err != nil { + return nil, err + } + return &fileReader{f}, err } -func (c *LocalClient) Writer(path string, opts *WriteOptions) (io.WriteCloser, error) { +type fileReader struct { + f *os.File +} + +func (r *fileReader) Read(p []byte) (n int, err error) { + return r.f.Read(p) +} + +func (r *fileReader) Close() error { + return r.f.Close() +} + +func (r *fileReader) Stat() (*FileMeta, error) { + stat, err := r.f.Stat() + if err != nil { + return nil, err + } + meta := FileMeta{Mtime: stat.ModTime(), Mode: uint32(stat.Mode())} + return &meta, err +} + +func (c *LocalClient) Writer(path string, opts *FileMeta) (io.WriteCloser, error) { if err := ValidFSTarget(path); err != nil { return nil, err } @@ -103,5 +132,8 @@ func (w *fileWriter) Close() error { if err != nil { return err } + if w.mtime.IsZero() { + return nil + } return os.Chtimes(w.path, w.mtime, w.mtime) } diff --git a/backends/s3.go b/backends/s3.go index d8a3ec7..d72b739 100644 --- a/backends/s3.go +++ b/backends/s3.go @@ -78,24 +78,10 @@ func (c *s3client) ListDir(fileChan chan *FileDetails, task *ListDirTask, summar continue } - var mode uint32 - if obj.Metadata.Get(OriginalModeKey) != "" { - if i, err := strconv.Atoi(obj.Metadata.Get(OriginalModeKey)); err == nil { - mode = uint32(i) - } - } - - var originalTime time.Time - if obj.Metadata.Get(OriginalMtimeKey) != "" { - if t, err := time.Parse(time.RFC3339, obj.Metadata.Get(OriginalMtimeKey)); err == nil { - originalTime = t - } - } - - c.logger.DebugWith("List dir:", "key", obj.Key, "modified", obj.LastModified, "size", obj.Size) + c.logger.DebugWith("List dir:", "key", obj.Key, + "modified", obj.LastModified, "size", obj.Size) fileDetails := &FileDetails{ - Key: c.params.Bucket + "/" + obj.Key, Size: obj.Size, - Mtime: obj.LastModified, Mode: mode, OriginalMtime: originalTime, + Key: c.params.Bucket + "/" + obj.Key, Size: obj.Size, Mtime: obj.LastModified, } summary.TotalBytes += obj.Size @@ -112,7 +98,7 @@ func (c *s3client) PutObject(objectPath, filePath string) (n int64, err error) { context.Background(), bucket, objectName, filePath, minio.PutObjectOptions{}) } -func (c *s3client) Reader(path string) (io.ReadCloser, error) { +func (c *s3client) Reader(path string) (FSReader, error) { bucket, objectName := SplitPath(path) if err := s3utils.CheckValidBucketName(bucket); err != nil { return nil, err @@ -123,12 +109,47 @@ func (c *s3client) Reader(path string) (io.ReadCloser, error) { obj, err := c.minioClient.GetObject(bucket, objectName, minio.GetObjectOptions{}) if err != nil { - return nil, errors.WithStack(err) + return nil, err + } + return &s3Reader{obj}, nil +} + +type s3Reader struct { + obj *minio.Object +} + +func (r *s3Reader) Read(p []byte) (n int, err error) { + return r.obj.Read(p) +} + +func (r *s3Reader) Close() error { + return r.obj.Close() +} + +func (r *s3Reader) Stat() (*FileMeta, error) { + stat, err := r.obj.Stat() + if err != nil { + return nil, err } - return obj, nil + var mode uint32 + if stat.Metadata.Get(OriginalModeS3Key) != "" { + if i, err := strconv.Atoi(stat.Metadata.Get(OriginalModeS3Key)); err == nil { + mode = uint32(i) + } + } + + modified := stat.LastModified + if stat.Metadata.Get(OriginalMtimeS3Key) != "" { + if t, err := time.Parse(time.RFC3339, stat.Metadata.Get(OriginalMtimeS3Key)); err == nil { + modified = t + } + } + + meta := FileMeta{Mtime: modified, Mode: uint32(mode)} + return &meta, err } -func (c *s3client) Writer(path string, opts *WriteOptions) (io.WriteCloser, error) { +func (c *s3client) Writer(path string, opts *FileMeta) (io.WriteCloser, error) { return &s3Writer{bucket: c.params.Bucket, path: path, client: c, opts: opts}, nil } @@ -136,7 +157,7 @@ type s3Writer struct { bucket string path string buf []byte - opts *WriteOptions + opts *FileMeta client *s3client } diff --git a/backends/types.go b/backends/types.go index 3d81698..12dca52 100644 --- a/backends/types.go +++ b/backends/types.go @@ -13,6 +13,9 @@ import ( const OriginalMtimeKey = "original_mtime" const OriginalModeKey = "original_mode" +const OriginalMtimeS3Key = "X-Amz-Meta-Original_mtime" +const OriginalModeS3Key = "X-Amz-Meta-Original_mode" + type ListDirTask struct { Source *PathParams Since time.Time @@ -22,14 +25,14 @@ type ListDirTask struct { Recursive bool InclEmpty bool Hidden bool + WithMeta bool } type FileDetails struct { - Key string - Mtime time.Time - OriginalMtime time.Time - Mode uint32 - Size int64 + Key string + Mtime time.Time + Mode uint32 + Size int64 } type ListSummary struct { @@ -49,15 +52,31 @@ type PathParams struct { Token string `json:"token,omitempty"` } +func (p *PathParams) String() string { + return fmt.Sprintf("%s://%s/%s/%s", p.Kind, p.Endpoint, p.Bucket, p.Path) +} + type WriteOptions struct { Mtime time.Time Mode uint32 } +type FileMeta struct { + Mtime time.Time + Mode uint32 + Attrs map[string]interface{} +} + type FSClient interface { ListDir(fileChan chan *FileDetails, task *ListDirTask, summary *ListSummary) error - Reader(path string) (io.ReadCloser, error) - Writer(path string, opts *WriteOptions) (io.WriteCloser, error) + Reader(path string) (FSReader, error) + Writer(path string, opts *FileMeta) (io.WriteCloser, error) +} + +type FSReader interface { + Read(p []byte) (n int, err error) + Close() error + Stat() (*FileMeta, error) } func GetNewClient(logger logger.Logger, params *PathParams) (FSClient, error) { diff --git a/backends/v3io.go b/backends/v3io.go index d9b031d..e7a413d 100644 --- a/backends/v3io.go +++ b/backends/v3io.go @@ -124,34 +124,41 @@ func (c *V3ioClient) getDir(path string, fileChan chan *FileDetails, summary *Li return nil } -func (c *V3ioClient) Reader(path string) (io.ReadCloser, error) { +func (c *V3ioClient) Reader(path string) (FSReader, error) { resp, err := c.container.Sync.GetObject(&v3io.GetObjectInput{Path: url.PathEscape(path)}) if err != nil { return nil, fmt.Errorf("Error in GetObject operation (%v)", err) } - return v3ioReader{reader: bytes.NewReader(resp.Body())}, nil + return &v3ioReader{reader: bytes.NewReader(resp.Body())}, nil } type v3ioReader struct { reader io.Reader } -func (r v3ioReader) Read(p []byte) (n int, err error) { +func (r *v3ioReader) Read(p []byte) (n int, err error) { return r.reader.Read(p) } -func (r v3ioReader) Close() error { +func (r *v3ioReader) Close() error { return nil } -func (c *V3ioClient) Writer(path string, opts *WriteOptions) (io.WriteCloser, error) { - return &v3ioWriter{path: path, container: c.container}, nil +func (r *v3ioReader) Stat() (*FileMeta, error) { + // TBD return Mtime, .. + meta := FileMeta{} + return &meta, nil +} + +func (c *V3ioClient) Writer(path string, opts *FileMeta) (io.WriteCloser, error) { + return &v3ioWriter{path: path, container: c.container, opts: opts}, nil } type v3ioWriter struct { path string buf []byte + opts *FileMeta container *v3io.Container } @@ -161,6 +168,7 @@ func (w *v3ioWriter) Write(p []byte) (n int, err error) { } func (w *v3ioWriter) Close() error { + // TBD write time, mode, kv metadata return w.container.Sync.PutObject(&v3io.PutObjectInput{Path: url.PathEscape(w.path), Body: w.buf}) } diff --git a/operators/copydir.go b/operators/copydir.go index 65b6f67..73fe916 100644 --- a/operators/copydir.go +++ b/operators/copydir.go @@ -1,8 +1,8 @@ package operators import ( + "fmt" "github.com/nuclio/logger" - "github.com/pkg/errors" "github.com/v3io/xcp/backends" "io" "path" @@ -18,6 +18,7 @@ func endWithSlash(path string) bool { func CopyDir(task *backends.ListDirTask, target *backends.PathParams, logger logger.Logger, workers int) error { fileChan := make(chan *backends.FileDetails, 1000) summary := &backends.ListSummary{} + withMeta := task.WithMeta if task.Source.Path != "" && !endWithSlash(task.Source.Path) { task.Source.Path += "/" @@ -26,7 +27,7 @@ func CopyDir(task *backends.ListDirTask, target *backends.PathParams, logger log logger.InfoWith("copy task", "from", task.Source, "to", target) client, err := backends.GetNewClient(logger, task.Source) if err != nil { - return errors.Wrap(err, "failed to get list source") + return fmt.Errorf("failed to get list source, %v", err) } errChan := make(chan error, 60) @@ -35,7 +36,7 @@ func CopyDir(task *backends.ListDirTask, target *backends.PathParams, logger log var err error err = client.ListDir(fileChan, task, summary) if err != nil { - errChan <- errors.Wrap(err, "failed in list dir") + errChan <- fmt.Errorf("failed in list dir, %v", err) } }(errChan) @@ -47,13 +48,13 @@ func CopyDir(task *backends.ListDirTask, target *backends.PathParams, logger log defer wg.Done() src, err := backends.GetNewClient(logger, task.Source) if err != nil { - errChan <- errors.Wrap(err, "failed to get source") + errChan <- fmt.Errorf("failed to get source, %v", err) return } dst, err := backends.GetNewClient(logger, target) if err != nil { - errChan <- errors.Wrap(err, "failed to get target") + errChan <- fmt.Errorf("failed to get target, %v", err) return } @@ -63,9 +64,9 @@ func CopyDir(task *backends.ListDirTask, target *backends.PathParams, logger log logger.DebugWith("copy file", "src", f.Key, "dst", targetPath, "bucket", target.Bucket, "size", f.Size, "mtime", f.Mtime) - err = copyFile(dst, src, f, targetPath) + err = copyFile(dst, src, f, targetPath, withMeta) if err != nil { - errChan <- errors.Wrap(err, "failed in copy file") + errChan <- fmt.Errorf("failed in copy file, %v", err) break } atomic.AddInt64(&transferred, 1) @@ -86,7 +87,7 @@ func CopyDir(task *backends.ListDirTask, target *backends.PathParams, logger log return nil } -func copyFile(dst, src backends.FSClient, fileObj *backends.FileDetails, targetPath string) error { +func copyFile(dst, src backends.FSClient, fileObj *backends.FileDetails, targetPath string, withMeta bool) error { reader, err := src.Reader(fileObj.Key) if err != nil { @@ -94,7 +95,13 @@ func copyFile(dst, src backends.FSClient, fileObj *backends.FileDetails, targetP } defer reader.Close() - writer, err := dst.Writer(targetPath, nil) + opts := backends.FileMeta{} + if withMeta { + opts.Mode = fileObj.Mode + opts.Mtime = fileObj.Mtime + } + + writer, err := dst.Writer(targetPath, &opts) if err != nil { return err } diff --git a/operators/listdir.go b/operators/listdir.go new file mode 100644 index 0000000..df478b5 --- /dev/null +++ b/operators/listdir.go @@ -0,0 +1,85 @@ +package operators + +import ( + "fmt" + "github.com/nuclio/logger" + "github.com/v3io/xcp/backends" +) + +func ListDir(task *backends.ListDirTask, logger logger.Logger) (*listResults, error) { + + list := listResults{ + fileChan: make(chan *backends.FileDetails, 1000), + summary: &backends.ListSummary{}, + errChan: make(chan error, 60), + } + + if task.Source.Path != "" && !endWithSlash(task.Source.Path) { + task.Source.Path += "/" + } + + logger.InfoWith("list task", "from", task.Source, "filter", task.Filter) + client, err := backends.GetNewClient(logger, task.Source) + if err != nil { + return nil, fmt.Errorf("failed to get list source, %v", err) + } + + go func(errChan chan error) { + var err error + err = client.ListDir(list.fileChan, task, list.summary) + if err != nil { + errChan <- fmt.Errorf("failed in list dir, %v", err) + fmt.Println(err) + } + }(list.errChan) + + return &list, nil +} + +type listResults struct { + fileChan chan *backends.FileDetails + summary *backends.ListSummary + errChan chan error + + currFile *backends.FileDetails + err error +} + +func (l *listResults) Next() bool { + var more bool + l.currFile, more = <-l.fileChan + + if !more { + select { + case err := <-l.errChan: + l.err = err + default: + } + return false + } + return true +} + +func (l *listResults) ReadAll() ([]*backends.FileDetails, error) { + list := []*backends.FileDetails{} + for l.Next() { + list = append(list, l.At()) + } + return list, l.err +} + +func (l *listResults) Err() error { + return l.err +} + +func (l *listResults) At() *backends.FileDetails { + return l.currFile +} + +func (l *listResults) Name() string { + return l.currFile.Key +} + +func (l *listResults) Summary() *backends.ListSummary { + return l.summary +} diff --git a/tests/backends_test.go b/tests/backends_test.go new file mode 100644 index 0000000..6234698 --- /dev/null +++ b/tests/backends_test.go @@ -0,0 +1,132 @@ +package tests + +import ( + "fmt" + "github.com/nuclio/logger" + "github.com/stretchr/testify/suite" + "github.com/v3io/xcp/backends" + "github.com/v3io/xcp/common" + "github.com/v3io/xcp/operators" + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" +) + +var log logger.Logger +var tempdir string +var AWS_TEST_BUCKET string + +var dummyContent = []byte("dummy content") + +type testLocalBackend struct { + suite.Suite +} + +func (suite *testLocalBackend) SetupSuite() { + AWS_TEST_BUCKET = os.Getenv("AWS_TEST_BUCKET") + src, err := common.UrlParse(tempdir) + suite.Require().Nil(err) + + client, err := backends.NewLocalClient(log, src) + suite.Require().Nil(err) + + w, err := client.Writer(filepath.Join(tempdir, "a.txt"), nil) + suite.Require().Nil(err) + n, err := w.Write(dummyContent) + suite.Require().Nil(err) + suite.Require().Equal(n, len(dummyContent)) + + opts := backends.FileMeta{ + Mtime: time.Now().Add(-23 * time.Hour), + Mode: 777} + w, err = client.Writer(filepath.Join(tempdir, "a.csv"), &opts) + suite.Require().Nil(err) + n, err = w.Write(dummyContent) + suite.Require().Nil(err) + suite.Require().Equal(n, len(dummyContent)) + err = w.Close() + suite.Require().Nil(err) +} + +func (suite *testLocalBackend) TestRead() { + src, err := common.UrlParse(tempdir) + suite.Require().Nil(err) + + client, err := backends.NewLocalClient(log, src) + suite.Require().Nil(err) + + r, err := client.Reader(filepath.Join(tempdir, "a.csv")) + data := make([]byte, 100) + n, err := r.Read(data) + suite.Require().Nil(err) + suite.Require().Equal(n, len(dummyContent)) + suite.Require().Equal(data[0:n], dummyContent) +} + +func (suite *testLocalBackend) TestList() { + + src, err := common.UrlParse(tempdir) + listTask := backends.ListDirTask{Source: src, Filter: "*.*"} + iter, err := operators.ListDir(&listTask, log) + suite.Require().Nil(err) + + for iter.Next() { + fmt.Printf("File %s: %+v", iter.Name(), iter.At()) + } + suite.Require().Nil(iter.Err()) + summary := iter.Summary() + fmt.Printf("Total files: %d, Total size: %d\n", + summary.TotalFiles, summary.TotalBytes) + suite.Require().Equal(summary.TotalFiles, 2) + + listTask = backends.ListDirTask{Source: src, Filter: "*.csv"} + iter, err = operators.ListDir(&listTask, log) + suite.Require().Nil(err) + _, err = iter.ReadAll() + suite.Require().Nil(err) + summary = iter.Summary() + + fmt.Printf("Total files: %d, Total size: %d\n", + summary.TotalFiles, summary.TotalBytes) + suite.Require().Equal(summary.TotalFiles, 1) +} + +func (suite *testLocalBackend) TestCopyToS3() { + src, err := common.UrlParse(tempdir) + suite.Require().Nil(err) + + listTask := backends.ListDirTask{Source: src, Filter: "*.*", WithMeta: true} + dst, err := common.UrlParse("s3://" + AWS_TEST_BUCKET + "/xcptests") + suite.Require().Nil(err) + + err = operators.CopyDir(&listTask, dst, log, 1) + suite.Require().Nil(err) + + // read list dir content from S3 + listTask = backends.ListDirTask{Source: dst, Filter: "*.*"} + dstdir, err := ioutil.TempDir("", "xcptest-dst") + suite.Require().Nil(err) + newdst, err := common.UrlParse(dstdir) + suite.Require().Nil(err) + + err = operators.CopyDir(&listTask, newdst, log, 1) + suite.Require().Nil(err) +} + +func TestLocalBackendSuite(t *testing.T) { + log, _ = common.NewLogger("debug") + var err error + tempdir, err = ioutil.TempDir("", "xcptest") + //tempdir = "../tst1" + if err != nil { + t.Fatal(err) + } + + fmt.Println("Target dir: ", tempdir) + + os.RemoveAll(tempdir) // clean up + suite.Run(t, new(testLocalBackend)) + os.RemoveAll(tempdir) +} diff --git a/tests/localfs_test.go b/tests/localfs_test.go deleted file mode 100644 index ae6d299..0000000 --- a/tests/localfs_test.go +++ /dev/null @@ -1,101 +0,0 @@ -package tests - -import ( - "fmt" - "github.com/nuclio/logger" - "github.com/stretchr/testify/suite" - "github.com/v3io/xcp/backends" - "github.com/v3io/xcp/common" - "io/ioutil" - "os" - "path/filepath" - "testing" - "time" -) - -var log logger.Logger -var tempdir string - -func writeFiles() (string, error) { - content := []byte("dummy content") - - tmpfn := filepath.Join(tempdir, "a.txt") - if err := ioutil.WriteFile(tmpfn, content, 0666); err != nil { - return "", err - } - - tmpfn = filepath.Join(tempdir, "a.csv") - if err := ioutil.WriteFile(tmpfn, content, 0666); err != nil { - return "", err - } - - return tempdir, nil -} - -type testLocalBackend struct { - suite.Suite -} - -func (suite *testLocalBackend) TestAWrite() { - content := []byte("dummy content") - src, err := common.UrlParse(tempdir) - suite.Require().Nil(err) - - client, err := backends.NewLocalClient(log, src) - suite.Require().Nil(err) - - w, err := client.Writer(filepath.Join(tempdir, "a.txt"), nil) - suite.Require().Nil(err) - n, err := w.Write(content) - suite.Require().Nil(err) - suite.Require().Equal(n, len(content)) - - opts := backends.WriteOptions{ - Mtime: time.Now().Add(-23 * time.Hour), - Mode: 777} - w, err = client.Writer(filepath.Join(tempdir, "a.csv"), &opts) - suite.Require().Nil(err) - n, err = w.Write(content) - suite.Require().Nil(err) - suite.Require().Equal(n, len(content)) -} - -func (suite *testLocalBackend) TestList() { - - src, err := common.UrlParse(tempdir) - suite.Require().Nil(err) - - listTask := backends.ListDirTask{ - Source: src, - Filter: "*.*", - } - - client, err := backends.NewLocalClient(log, src) - suite.Require().Nil(err) - - fileChan := make(chan *backends.FileDetails, 1000) - summary := &backends.ListSummary{} - - err = client.ListDir(fileChan, &listTask, summary) - suite.Require().Nil(err) - - fmt.Printf("Total files: %d, Total size: %d\n", - summary.TotalFiles, summary.TotalBytes) - - suite.Require().Equal(summary.TotalFiles, 2) -} - -func TestLocalBackendSuite(t *testing.T) { - log, _ = common.NewLogger("debug") - var err error - tempdir, err = ioutil.TempDir("", "xcptest") - //tempdir = "../tst1" - if err != nil { - t.Fatal(err) - } - - fmt.Println("Target dir: ", tempdir) - - os.RemoveAll(tempdir) // clean up - suite.Run(t, new(testLocalBackend)) -}