Skip to content

Commit

Permalink
handle metadata updates (mtime, mode)
Browse files Browse the repository at this point in the history
  • Loading branch information
yaronha committed May 13, 2019
1 parent 3637f8c commit 71b677c
Show file tree
Hide file tree
Showing 8 changed files with 353 additions and 150 deletions.
42 changes: 37 additions & 5 deletions backends/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ func (c *LocalClient) ListDir(fileChan chan *FileDetails, task *ListDirTask, sum
defer close(fileChan)

visit := func(localPath string, fi os.FileInfo, err error) error {
if err != nil {
c.logger.Error("List walk error with path %s, %v", localPath, err)
return err
}
localPath = filepath.ToSlash(localPath)

if fi.IsDir() {
Expand All @@ -46,10 +50,10 @@ func (c *LocalClient) ListDir(fileChan chan *FileDetails, task *ListDirTask, sum
}

fileDetails := &FileDetails{
Key: localPath, Size: fi.Size(), Mtime: fi.ModTime(), Mode: uint32(fi.Mode()), OriginalMtime: fi.ModTime(),
Key: localPath, Size: fi.Size(), Mtime: fi.ModTime(), Mode: uint32(fi.Mode()),
}
c.logger.DebugWith("List file", "key", localPath,
"modified", fi.ModTime(), "size", fi.Size(), "mode", fi.Mode())
"modified", fi.ModTime(), "size", fi.Size(), "mode", uint32(fi.Mode()))

summary.TotalBytes += fi.Size()
summary.TotalFiles += 1
Expand All @@ -61,11 +65,36 @@ func (c *LocalClient) ListDir(fileChan chan *FileDetails, task *ListDirTask, sum
return filepath.Walk(c.params.Path, visit)
}

func (c *LocalClient) Reader(path string) (io.ReadCloser, error) {
return os.Open(path)
func (c *LocalClient) Reader(path string) (FSReader, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
}
return &fileReader{f}, err
}

func (c *LocalClient) Writer(path string, opts *WriteOptions) (io.WriteCloser, error) {
type fileReader struct {
f *os.File
}

func (r *fileReader) Read(p []byte) (n int, err error) {
return r.f.Read(p)
}

func (r *fileReader) Close() error {
return r.f.Close()
}

func (r *fileReader) Stat() (*FileMeta, error) {
stat, err := r.f.Stat()
if err != nil {
return nil, err
}
meta := FileMeta{Mtime: stat.ModTime(), Mode: uint32(stat.Mode())}
return &meta, err
}

func (c *LocalClient) Writer(path string, opts *FileMeta) (io.WriteCloser, error) {
if err := ValidFSTarget(path); err != nil {
return nil, err
}
Expand Down Expand Up @@ -103,5 +132,8 @@ func (w *fileWriter) Close() error {
if err != nil {
return err
}
if w.mtime.IsZero() {
return nil
}
return os.Chtimes(w.path, w.mtime, w.mtime)
}
65 changes: 43 additions & 22 deletions backends/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,24 +78,10 @@ func (c *s3client) ListDir(fileChan chan *FileDetails, task *ListDirTask, summar
continue
}

var mode uint32
if obj.Metadata.Get(OriginalModeKey) != "" {
if i, err := strconv.Atoi(obj.Metadata.Get(OriginalModeKey)); err == nil {
mode = uint32(i)
}
}

var originalTime time.Time
if obj.Metadata.Get(OriginalMtimeKey) != "" {
if t, err := time.Parse(time.RFC3339, obj.Metadata.Get(OriginalMtimeKey)); err == nil {
originalTime = t
}
}

c.logger.DebugWith("List dir:", "key", obj.Key, "modified", obj.LastModified, "size", obj.Size)
c.logger.DebugWith("List dir:", "key", obj.Key,
"modified", obj.LastModified, "size", obj.Size)
fileDetails := &FileDetails{
Key: c.params.Bucket + "/" + obj.Key, Size: obj.Size,
Mtime: obj.LastModified, Mode: mode, OriginalMtime: originalTime,
Key: c.params.Bucket + "/" + obj.Key, Size: obj.Size, Mtime: obj.LastModified,
}

summary.TotalBytes += obj.Size
Expand All @@ -112,7 +98,7 @@ func (c *s3client) PutObject(objectPath, filePath string) (n int64, err error) {
context.Background(), bucket, objectName, filePath, minio.PutObjectOptions{})
}

func (c *s3client) Reader(path string) (io.ReadCloser, error) {
func (c *s3client) Reader(path string) (FSReader, error) {
bucket, objectName := SplitPath(path)
if err := s3utils.CheckValidBucketName(bucket); err != nil {
return nil, err
Expand All @@ -123,20 +109,55 @@ func (c *s3client) Reader(path string) (io.ReadCloser, error) {

obj, err := c.minioClient.GetObject(bucket, objectName, minio.GetObjectOptions{})
if err != nil {
return nil, errors.WithStack(err)
return nil, err
}
return &s3Reader{obj}, nil
}

type s3Reader struct {
obj *minio.Object
}

func (r *s3Reader) Read(p []byte) (n int, err error) {
return r.obj.Read(p)
}

func (r *s3Reader) Close() error {
return r.obj.Close()
}

func (r *s3Reader) Stat() (*FileMeta, error) {
stat, err := r.obj.Stat()
if err != nil {
return nil, err
}
return obj, nil
var mode uint32
if stat.Metadata.Get(OriginalModeS3Key) != "" {
if i, err := strconv.Atoi(stat.Metadata.Get(OriginalModeS3Key)); err == nil {
mode = uint32(i)
}
}

modified := stat.LastModified
if stat.Metadata.Get(OriginalMtimeS3Key) != "" {
if t, err := time.Parse(time.RFC3339, stat.Metadata.Get(OriginalMtimeS3Key)); err == nil {
modified = t
}
}

meta := FileMeta{Mtime: modified, Mode: uint32(mode)}
return &meta, err
}

func (c *s3client) Writer(path string, opts *WriteOptions) (io.WriteCloser, error) {
func (c *s3client) Writer(path string, opts *FileMeta) (io.WriteCloser, error) {
return &s3Writer{bucket: c.params.Bucket, path: path, client: c, opts: opts}, nil
}

type s3Writer struct {
bucket string
path string
buf []byte
opts *WriteOptions
opts *FileMeta
client *s3client
}

Expand Down
33 changes: 26 additions & 7 deletions backends/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (
const OriginalMtimeKey = "original_mtime"
const OriginalModeKey = "original_mode"

const OriginalMtimeS3Key = "X-Amz-Meta-Original_mtime"
const OriginalModeS3Key = "X-Amz-Meta-Original_mode"

type ListDirTask struct {
Source *PathParams
Since time.Time
Expand All @@ -22,14 +25,14 @@ type ListDirTask struct {
Recursive bool
InclEmpty bool
Hidden bool
WithMeta bool
}

type FileDetails struct {
Key string
Mtime time.Time
OriginalMtime time.Time
Mode uint32
Size int64
Key string
Mtime time.Time
Mode uint32
Size int64
}

type ListSummary struct {
Expand All @@ -49,15 +52,31 @@ type PathParams struct {
Token string `json:"token,omitempty"`
}

func (p *PathParams) String() string {
return fmt.Sprintf("%s://%s/%s/%s", p.Kind, p.Endpoint, p.Bucket, p.Path)
}

type WriteOptions struct {
Mtime time.Time
Mode uint32
}

type FileMeta struct {
Mtime time.Time
Mode uint32
Attrs map[string]interface{}
}

type FSClient interface {
ListDir(fileChan chan *FileDetails, task *ListDirTask, summary *ListSummary) error
Reader(path string) (io.ReadCloser, error)
Writer(path string, opts *WriteOptions) (io.WriteCloser, error)
Reader(path string) (FSReader, error)
Writer(path string, opts *FileMeta) (io.WriteCloser, error)
}

type FSReader interface {
Read(p []byte) (n int, err error)
Close() error
Stat() (*FileMeta, error)
}

func GetNewClient(logger logger.Logger, params *PathParams) (FSClient, error) {
Expand Down
20 changes: 14 additions & 6 deletions backends/v3io.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,34 +124,41 @@ func (c *V3ioClient) getDir(path string, fileChan chan *FileDetails, summary *Li
return nil
}

func (c *V3ioClient) Reader(path string) (io.ReadCloser, error) {
func (c *V3ioClient) Reader(path string) (FSReader, error) {
resp, err := c.container.Sync.GetObject(&v3io.GetObjectInput{Path: url.PathEscape(path)})
if err != nil {
return nil, fmt.Errorf("Error in GetObject operation (%v)", err)
}

return v3ioReader{reader: bytes.NewReader(resp.Body())}, nil
return &v3ioReader{reader: bytes.NewReader(resp.Body())}, nil
}

type v3ioReader struct {
reader io.Reader
}

func (r v3ioReader) Read(p []byte) (n int, err error) {
func (r *v3ioReader) Read(p []byte) (n int, err error) {
return r.reader.Read(p)
}

func (r v3ioReader) Close() error {
func (r *v3ioReader) Close() error {
return nil
}

func (c *V3ioClient) Writer(path string, opts *WriteOptions) (io.WriteCloser, error) {
return &v3ioWriter{path: path, container: c.container}, nil
func (r *v3ioReader) Stat() (*FileMeta, error) {
// TBD return Mtime, ..
meta := FileMeta{}
return &meta, nil
}

func (c *V3ioClient) Writer(path string, opts *FileMeta) (io.WriteCloser, error) {
return &v3ioWriter{path: path, container: c.container, opts: opts}, nil
}

type v3ioWriter struct {
path string
buf []byte
opts *FileMeta
container *v3io.Container
}

Expand All @@ -161,6 +168,7 @@ func (w *v3ioWriter) Write(p []byte) (n int, err error) {
}

func (w *v3ioWriter) Close() error {
// TBD write time, mode, kv metadata
return w.container.Sync.PutObject(&v3io.PutObjectInput{Path: url.PathEscape(w.path), Body: w.buf})
}

Expand Down
25 changes: 16 additions & 9 deletions operators/copydir.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package operators

import (
"fmt"
"github.com/nuclio/logger"
"github.com/pkg/errors"
"github.com/v3io/xcp/backends"
"io"
"path"
Expand All @@ -18,6 +18,7 @@ func endWithSlash(path string) bool {
func CopyDir(task *backends.ListDirTask, target *backends.PathParams, logger logger.Logger, workers int) error {
fileChan := make(chan *backends.FileDetails, 1000)
summary := &backends.ListSummary{}
withMeta := task.WithMeta

if task.Source.Path != "" && !endWithSlash(task.Source.Path) {
task.Source.Path += "/"
Expand All @@ -26,7 +27,7 @@ func CopyDir(task *backends.ListDirTask, target *backends.PathParams, logger log
logger.InfoWith("copy task", "from", task.Source, "to", target)
client, err := backends.GetNewClient(logger, task.Source)
if err != nil {
return errors.Wrap(err, "failed to get list source")
return fmt.Errorf("failed to get list source, %v", err)
}

errChan := make(chan error, 60)
Expand All @@ -35,7 +36,7 @@ func CopyDir(task *backends.ListDirTask, target *backends.PathParams, logger log
var err error
err = client.ListDir(fileChan, task, summary)
if err != nil {
errChan <- errors.Wrap(err, "failed in list dir")
errChan <- fmt.Errorf("failed in list dir, %v", err)
}
}(errChan)

Expand All @@ -47,13 +48,13 @@ func CopyDir(task *backends.ListDirTask, target *backends.PathParams, logger log
defer wg.Done()
src, err := backends.GetNewClient(logger, task.Source)
if err != nil {
errChan <- errors.Wrap(err, "failed to get source")
errChan <- fmt.Errorf("failed to get source, %v", err)
return
}

dst, err := backends.GetNewClient(logger, target)
if err != nil {
errChan <- errors.Wrap(err, "failed to get target")
errChan <- fmt.Errorf("failed to get target, %v", err)
return
}

Expand All @@ -63,9 +64,9 @@ func CopyDir(task *backends.ListDirTask, target *backends.PathParams, logger log

logger.DebugWith("copy file", "src", f.Key, "dst", targetPath,
"bucket", target.Bucket, "size", f.Size, "mtime", f.Mtime)
err = copyFile(dst, src, f, targetPath)
err = copyFile(dst, src, f, targetPath, withMeta)
if err != nil {
errChan <- errors.Wrap(err, "failed in copy file")
errChan <- fmt.Errorf("failed in copy file, %v", err)
break
}
atomic.AddInt64(&transferred, 1)
Expand All @@ -86,15 +87,21 @@ func CopyDir(task *backends.ListDirTask, target *backends.PathParams, logger log
return nil
}

func copyFile(dst, src backends.FSClient, fileObj *backends.FileDetails, targetPath string) error {
func copyFile(dst, src backends.FSClient, fileObj *backends.FileDetails, targetPath string, withMeta bool) error {

reader, err := src.Reader(fileObj.Key)
if err != nil {
return err
}
defer reader.Close()

writer, err := dst.Writer(targetPath, nil)
opts := backends.FileMeta{}
if withMeta {
opts.Mode = fileObj.Mode
opts.Mtime = fileObj.Mtime
}

writer, err := dst.Writer(targetPath, &opts)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 71b677c

Please sign in to comment.