Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use kodo form upload for small content #10

Open
wants to merge 1 commit into
base: 2.6+kodoWithResume
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 32 additions & 27 deletions registry/storage/driver/kodo/kodo.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,15 @@ func New(params *DriverParameters) (*Driver, error) {
Zone: params.Zone,
}

uploader := storage.NewResumeUploader(&cfg)
largeUploader := storage.NewResumeUploader(&cfg)
smallUploader := storage.NewFormUploader(&cfg)
bucketManager := storage.NewBucketManager(mac, &cfg)

d := &driver{
params: params,
uploader: uploader,
bucketManager: bucketManager,
params: params,
largeFileUploader: largeUploader,
smallFileUploader: smallUploader,
bucketManager: bucketManager,
}

return &Driver{
Expand All @@ -137,9 +139,10 @@ func New(params *DriverParameters) (*Driver, error) {
}

type driver struct {
params *DriverParameters
uploader *storage.ResumeUploader
bucketManager *storage.BucketManager
params *DriverParameters
largeFileUploader *storage.ResumeUploader
smallFileUploader *storage.FormUploader
bucketManager *storage.BucketManager
}

// Name returns the human-readable "name" of the driver, useful in error
Expand Down Expand Up @@ -230,21 +233,29 @@ func (d *driver) PutContent(ctx context.Context, path string, content []byte) er
"path": path, "func": "PutContent",
})

writer, err := d.Writer(ctx, path, false)
if err != nil {
logger.Error("d.Writer:", err)
return err
kodoKey := d.kodoKey(path)
putPolicy := storage.PutPolicy{
Scope: fmt.Sprintf("%s:%s", BUCKET, kodoKey), //覆盖上传
Expires: 3600 * 3, //token过期时间 3小时
}
defer writer.Close()
upToken := putPolicy.UploadToken(mac)

ret := storage.PutRet{}
//设置kodo的reqid,不知道为啥reqid的key需要设置为0
//没有找到相关文档,我是直接从rpc的代码里找到的
kodoCtx := context.WithValue(ctx, 0, ctx.Value("trace.id"))

putExtra := storage.PutExtra{
OnProgress: func(fsize, uploaded int64) {
logger.Infof("file size: %d, uploaded: %d", fsize, uploaded)
},
}
err := d.smallFileUploader.Put(kodoCtx, &ret, upToken, kodoKey, bytes.NewReader(content), int64(len(content)), &putExtra)

_, err = io.Copy(writer, bytes.NewReader(content))
if err != nil {
writer.Cancel()
logger.Error("io.Copy:", err)
return err
logger.Error("PutFile:", err)
}
logger.Info("ok")
return writer.Commit()
return err
}

// Writer returns a FileWriter which will store the content written to it
Expand Down Expand Up @@ -479,7 +490,7 @@ func (d *driver) privateURL(ctx context.Context, path string) string {
return privateAccessURL
}

func (d *driver) upload(ctx context.Context, localFile, kodoKey string) error {
func (d *driver) upload(ctx context.Context, localFile string, fileSize int64, kodoKey string) (err error) {
logger := ReqEntry(ctx).WithFields(logrus.Fields{
"kodoKey": kodoKey, "func": "upload",
})
Expand All @@ -490,13 +501,6 @@ func (d *driver) upload(ctx context.Context, localFile, kodoKey string) error {
}
upToken := putPolicy.UploadToken(mac)

fileInfo, err := os.Stat(localFile)
if err != nil {
logger.Error("os.Stat:", err)
return err
}

fileSize := fileInfo.Size()
logger = logger.WithFields(logrus.Fields{
"fileSize": fileSize,
"blockCount": storage.BlockCount(fileSize),
Expand All @@ -512,7 +516,8 @@ func (d *driver) upload(ctx context.Context, localFile, kodoKey string) error {
//设置kodo的reqid,不知道为啥reqid的key需要设置为0
//没有找到相关文档,我是直接从rpc的代码里找到的
kodoCtx := context.WithValue(ctx, 0, ctx.Value("trace.id"))
err = d.uploader.PutFile(kodoCtx, &ret, upToken, kodoKey, localFile, &putExtra)
err = d.largeFileUploader.PutFile(kodoCtx, &ret, upToken, kodoKey, localFile, &putExtra)

if err != nil {
logger.Error("PutFile:", err)
}
Expand Down
3 changes: 2 additions & 1 deletion registry/storage/driver/kodo/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ func (fw *fileWriter) Commit() error {

//调用驱动上传文件
localFile := fw.file.Name()
if err := fw.driver.upload(fw.ctx, localFile, fw.kodoKey); err != nil {
fileSize := fw.Size()
if err := fw.driver.upload(fw.ctx, localFile, fileSize, fw.kodoKey); err != nil {
return err
}

Expand Down