Skip to content

Commit

Permalink
refactor, fix object upload bugs, prep for metadata (mtime/mode) copy
Browse files Browse the repository at this point in the history
  • Loading branch information
yaronha committed May 11, 2019
1 parent e369e7d commit 339cfb2
Show file tree
Hide file tree
Showing 8 changed files with 244 additions and 41 deletions.
8 changes: 5 additions & 3 deletions backends/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func NewLocalClient(logger logger.Logger, params *PathParams) (FSClient, error)
return &LocalClient{logger: logger, params: params}, err
}

func (c *LocalClient) ListDir(fileChan chan *FileDetails, task *CopyTask, summary *ListSummary) error {
func (c *LocalClient) ListDir(fileChan chan *FileDetails, task *ListDirTask, summary *ListSummary) error {
defer close(fileChan)

visit := func(localPath string, fi os.FileInfo, err error) error {
Expand All @@ -45,7 +45,7 @@ func (c *LocalClient) ListDir(fileChan chan *FileDetails, task *CopyTask, summar
}

fileDetails := &FileDetails{
Key: localPath, Size: fi.Size(), Mtime: fi.ModTime(),
Key: localPath, Size: fi.Size(), Mtime: fi.ModTime(), Mode: uint32(fi.Mode()), OriginalMtime: fi.ModTime(),
}
c.logger.DebugWith("List file", "key", localPath, "modified", fi.ModTime(), "size", fi.Size())

Expand All @@ -63,7 +63,7 @@ func (c *LocalClient) Reader(path string) (io.ReadCloser, error) {
return os.Open(path)
}

func (c *LocalClient) Writer(path string) (io.WriteCloser, error) {
func (c *LocalClient) Writer(path string, opts *WriteOptions) (io.WriteCloser, error) {
if err := ValidFSTarget(path); err != nil {
return nil, err
}
Expand All @@ -74,3 +74,5 @@ func (c *LocalClient) Writer(path string) (io.WriteCloser, error) {
0666,
)
}

// err = os.Chtimes(filename, currenttime, currenttime)
53 changes: 44 additions & 9 deletions backends/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"github.com/pkg/errors"
"io"
"path/filepath"
"strconv"
"strings"
"time"
)

type s3client struct {
Expand Down Expand Up @@ -46,14 +48,17 @@ func NewS3Client(logger logger.Logger, params *PathParams) (FSClient, error) {
}

func SplitPath(path string) (string, string) {
if strings.HasPrefix(path, "/") {
path = path[1:]
}
parts := strings.Split(path, "/")
if len(parts) <= 1 {
return path, ""
}
return parts[0], path[len(parts[0])+1:]
}

func (c *s3client) ListDir(fileChan chan *FileDetails, task *CopyTask, summary *ListSummary) error {
func (c *s3client) ListDir(fileChan chan *FileDetails, task *ListDirTask, summary *ListSummary) error {
doneCh := make(chan struct{})
defer close(doneCh)
defer close(fileChan)
Expand All @@ -73,9 +78,24 @@ func (c *s3client) ListDir(fileChan chan *FileDetails, task *CopyTask, summary *
continue
}

var mode uint32
if obj.Metadata.Get(OriginalModeKey) != "" {
if i, err := strconv.Atoi(obj.Metadata.Get(OriginalModeKey)); err == nil {
mode = uint32(i)
}
}

var originalTime time.Time
if obj.Metadata.Get(OriginalMtimeKey) != "" {
if t, err := time.Parse(time.RFC3339, obj.Metadata.Get(OriginalMtimeKey)); err == nil {
originalTime = t
}
}

c.logger.DebugWith("List dir:", "key", obj.Key, "modified", obj.LastModified, "size", obj.Size)
fileDetails := &FileDetails{
Key: c.params.Bucket + "/" + obj.Key, Size: obj.Size, Mtime: obj.LastModified,
Key: c.params.Bucket + "/" + obj.Key, Size: obj.Size,
Mtime: obj.LastModified, Mode: mode, OriginalMtime: originalTime,
}

summary.TotalBytes += obj.Size
Expand Down Expand Up @@ -108,14 +128,16 @@ func (c *s3client) Reader(path string) (io.ReadCloser, error) {
return obj, nil
}

func (c *s3client) Writer(path string) (io.WriteCloser, error) {
return &s3Writer{path: path, minioClient: c.minioClient}, nil
func (c *s3client) Writer(path string, opts *WriteOptions) (io.WriteCloser, error) {
return &s3Writer{bucket: c.params.Bucket, path: path, client: c, opts: opts}, nil
}

type s3Writer struct {
path string
buf []byte
minioClient *minio.Client
bucket string
path string
buf []byte
opts *WriteOptions
client *s3client
}

func (w *s3Writer) Write(p []byte) (n int, err error) {
Expand All @@ -124,8 +146,21 @@ func (w *s3Writer) Write(p []byte) (n int, err error) {
}

func (w *s3Writer) Close() error {
bucket, objectName := SplitPath(w.path)
r := bytes.NewReader(w.buf)
_, err := w.minioClient.PutObject(bucket, objectName, r, int64(len(w.buf)), minio.PutObjectOptions{})
opts := minio.PutObjectOptions{}
if w.opts != nil {
// optionally set metadata keys with original mode and mtime
opts.UserMetadata = map[string]string{OriginalMtimeKey: w.opts.Mtime.Format(time.RFC3339),
OriginalModeKey: strconv.Itoa(int(w.opts.Mode))}
}

objectName := w.path
if strings.HasPrefix(objectName, "/") {
objectName = objectName[1:]
}
_, err := w.client.minioClient.PutObject(w.bucket, objectName, r, int64(len(w.buf)), opts)
if err != nil {
w.client.logger.Error("obj %s put error (%v)", w.path, err)
}
return err
}
29 changes: 19 additions & 10 deletions backends/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
"time"
)

const OriginalMtimeKey = "original_mtime"
const OriginalModeKey = "original_mode"

type FileSearcher struct {
SourcePath string
TargetPath string
Expand All @@ -21,22 +24,23 @@ type FileSearcher struct {
Hidden bool
}

type CopyTask struct {
type ListDirTask struct {
Source *PathParams
Target *PathParams
Since time.Time
MinSize int64
MaxSize int64
Filter string
Recursive bool
CopyEmpty bool
InclEmpty bool
Hidden bool
}

type FileDetails struct {
Key string
Mtime time.Time
Size int64
Key string
Mtime time.Time
OriginalMtime time.Time
Mode uint32
Size int64
}

type ListSummary struct {
Expand All @@ -56,10 +60,15 @@ type PathParams struct {
Token string `json:"token,omitempty"`
}

type WriteOptions struct {
Mtime time.Time
Mode uint32
}

type FSClient interface {
ListDir(fileChan chan *FileDetails, task *CopyTask, summary *ListSummary) error
ListDir(fileChan chan *FileDetails, task *ListDirTask, summary *ListSummary) error
Reader(path string) (io.ReadCloser, error)
Writer(path string) (io.WriteCloser, error)
Writer(path string, opts *WriteOptions) (io.WriteCloser, error)
}

func GetNewClient(logger logger.Logger, params *PathParams) (FSClient, error) {
Expand Down Expand Up @@ -111,8 +120,8 @@ func defaultFromEnv(param string, envvar string) string {
return param
}

func IsMatch(task *CopyTask, name string, mtime time.Time, size int64) bool {
if !task.CopyEmpty && size == 0 {
func IsMatch(task *ListDirTask, name string, mtime time.Time, size int64) bool {
if !task.InclEmpty && size == 0 {
return false
}

Expand Down
6 changes: 3 additions & 3 deletions backends/v3io.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type V3ioClient struct {
params *PathParams
container *v3io.Container
logger logger.Logger
task *CopyTask
task *ListDirTask
path string
}

Expand Down Expand Up @@ -64,7 +64,7 @@ func NewV3ioClient(logger logger.Logger, params *PathParams) (FSClient, error) {
return &newClient, err
}

func (c *V3ioClient) ListDir(fileChan chan *FileDetails, task *CopyTask, summary *ListSummary) error {
func (c *V3ioClient) ListDir(fileChan chan *FileDetails, task *ListDirTask, summary *ListSummary) error {
//bucket, keyPrefix := splitPath(searcher.Path)
defer close(fileChan)
c.task = task
Expand Down Expand Up @@ -145,7 +145,7 @@ func (r v3ioReader) Close() error {
return nil
}

func (c *V3ioClient) Writer(path string) (io.WriteCloser, error) {
func (c *V3ioClient) Writer(path string, opts *WriteOptions) (io.WriteCloser, error) {
return &v3ioWriter{path: path, container: c.container}, nil
}

Expand Down
1 change: 1 addition & 0 deletions common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func UrlParse(fullpath string) (*backends.PathParams, error) {
case "s3":
// TODO: region url
pathParams.Bucket = u.Host
pathParams.Path = u.Path
case "v3io", "v3ios":
pathParams.Secure = (pathParams.Kind == "v3ios")
pathParams.Kind = "v3io"
Expand Down
150 changes: 150 additions & 0 deletions hack/ssh1.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/* copied from: https://github.com/bramvdbogaerde/go-scp
* Copyright (c) 2018 Bram Vandenbogaerde
* You may use, distribute or modify this code under the
* terms of the Mozilla Public License 2.0, which is distributed
* along with the source code.
*/
package scp

import (
"bytes"
"errors"
"fmt"
"golang.org/x/crypto/ssh"
"io"
"io/ioutil"
"os"
"path"
"sync"
"time"
)

type Client struct {
// the host to connect to
Host string

// the client config to use
ClientConfig *ssh.ClientConfig

// stores the SSH session while the connection is running
Session *ssh.Session

// stores the SSH connection itself in order to close it after transfer
Conn ssh.Conn

// the clients waits for the given timeout until given up the connection
Timeout time.Duration

// the absolute path to the remote SCP binary
RemoteBinary string
}

// Connects to the remote SSH server, returns error if it couldn't establish a session to the SSH server
func (a *Client) Connect() error {
client, err := ssh.Dial("tcp", a.Host, a.ClientConfig)
if err != nil {
return err
}

a.Conn = client.Conn
a.Session, err = client.NewSession()
if err != nil {
return err
}
return nil
}

//Copies the contents of an os.File to a remote location, it will get the length of the file by looking it up from the filesystem
func (a *Client) CopyFromFile(file os.File, remotePath string, permissions string) error {
stat, _ := file.Stat()
return a.Copy(&file, remotePath, permissions, stat.Size())
}

// Copies the contents of an io.Reader to a remote location, the length is determined by reading the io.Reader until EOF
// if the file length in know in advance please use "Copy" instead
func (a *Client) CopyFile(fileReader io.Reader, remotePath string, permissions string) error {
contents_bytes, _ := ioutil.ReadAll(fileReader)
bytes_reader := bytes.NewReader(contents_bytes)

return a.Copy(bytes_reader, remotePath, permissions, int64(len(contents_bytes)))
}

// waitTimeout waits for the waitgroup for the specified max timeout.
// Returns true if waiting timed out.
func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
c := make(chan struct{})
go func() {
defer close(c)
wg.Wait()
}()
select {
case <-c:
return false // completed normally
case <-time.After(timeout):
return true // timed out
}
}

// Copies the contents of an io.Reader to a remote location
func (a *Client) Copy(r io.Reader, remotePath string, permissions string, size int64) error {
filename := path.Base(remotePath)
directory := path.Dir(remotePath)

wg := sync.WaitGroup{}
wg.Add(2)

errCh := make(chan error, 2)

go func() {
defer wg.Done()
w, err := a.Session.StdinPipe()
if err != nil {
errCh <- err
return
}
defer w.Close()

_, err = fmt.Fprintln(w, "C"+permissions, size, filename)
if err != nil {
errCh <- err
return
}

_, err = io.Copy(w, r)
if err != nil {
errCh <- err
return
}

_, err = fmt.Fprint(w, "\x00")
if err != nil {
errCh <- err
return
}
}()

go func() {
defer wg.Done()
err := a.Session.Run(fmt.Sprintf("%s -qt %s", a.RemoteBinary, directory))
if err != nil {
errCh <- err
return
}
}()

if waitTimeout(&wg, a.Timeout) {
return errors.New("timeout when upload files")
}
close(errCh)
for err := range errCh {
if err != nil {
return err
}
}
return nil
}

func (a *Client) Close() {
a.Session.Close()
a.Conn.Close()
}
Loading

0 comments on commit 339cfb2

Please sign in to comment.