Skip to content

Commit

Permalink
アップロード速度の制限を追加する
Browse files Browse the repository at this point in the history
  • Loading branch information
Hexa committed Oct 17, 2023
1 parent a93c9ee commit 43ba089
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 43 deletions.
2 changes: 1 addition & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Config struct {
UploadWorkers int `ini:"upload_workers"`

// 1 ファイルあたりのアップロードレート制限
UploadFileRateLimitMbps float64 `ini:"upload_file_rate_limit_mbps"`
UploadFileRateLimitMbps int `ini:"upload_file_rate_limit_mbps"`

UploadedFileCacheSize int `ini:"uploaded_file_cache_size"`

Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.2.1
)

require github.com/conduitio/bwlimit v0.1.0 // indirect

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/conduitio/bwlimit v0.1.0 h1:x3ijON0TSghQob4tFKaEvKixFmYKfVJQeSpXluC2JvE=
github.com/conduitio/bwlimit v0.1.0/go.mod h1:E+ASZ1/5L33MTb8hJTERs5Xnmh6Ulq3jbRh7LrdbXWU=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand Down
5 changes: 5 additions & 0 deletions runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ func Run(configFilePath *string) {
resp.Body.Close()
}

// 指定は 0 (制限なし) または 10 Mbps 以上
if (config.UploadFileRateLimitMbps > 0) && (config.UploadFileRateLimitMbps < 10) {
zlog.Fatal().Msg("UPLOAD-FILE-RATE-LIMIT-MBPS-ERROR")
}

zlog.Info().Msg("STARTED-SORA-ARCHIVE-UPLOADER")

// シグナルをキャッチして停止処理
Expand Down
64 changes: 25 additions & 39 deletions s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@ package archive
import (
"context"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os"
"path/filepath"

"golang.org/x/time/rate"
"time"

"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
zlog "github.com/rs/zerolog/log"
"github.com/shiguredo/sora-archive-uploader/s3"

"github.com/conduitio/bwlimit"
)

func uploadJSONFile(
Expand All @@ -34,7 +36,7 @@ func uploadJSONFile(
creds = credentials.NewIAM("")
}

s3Client, err := s3.NewClient(osConfig.Endpoint, creds)
s3Client, err := s3.NewClient(osConfig.Endpoint, creds, nil)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -78,7 +80,7 @@ func uploadWebMFile(ctx context.Context, osConfig *s3.S3CompatibleObjectStorage,
} else {
creds = credentials.NewIAM("")
}
s3Client, err := s3.NewClient(osConfig.Endpoint, creds)
s3Client, err := s3.NewClient(osConfig.Endpoint, creds, nil)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -127,7 +129,7 @@ func isFileContinuous(err error) bool {
}

func uploadWebMFileWithRateLimit(ctx context.Context, osConfig *s3.S3CompatibleObjectStorage, dst, filePath string,
rateLimitMpbs float64) (string, error) {
rateLimitMpbs int) (string, error) {
var creds *credentials.Credentials
if (osConfig.AccessKeyID != "") || (osConfig.SecretAccessKey != "") {
creds = credentials.NewStaticV4(
Expand All @@ -140,7 +142,20 @@ func uploadWebMFileWithRateLimit(ctx context.Context, osConfig *s3.S3CompatibleO
} else {
creds = credentials.NewIAM("")
}
s3Client, err := s3.NewClient(osConfig.Endpoint, creds)

// bit を byte にする
rateLimitMByteps := (bwlimit.Byte(rateLimitMpbs) * bwlimit.MiB) / 8

// 受信には制限をかけない
dialer := bwlimit.NewDialer(&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}, rateLimitMByteps, 0)

transport := http.DefaultTransport
transport.(*http.Transport).DialContext = dialer.DialContext

s3Client, err := s3.NewClient(osConfig.Endpoint, creds, &transport)
if err != nil {
return "", err
}
Expand All @@ -160,42 +175,13 @@ func uploadWebMFileWithRateLimit(ctx context.Context, osConfig *s3.S3CompatibleO
// Save the file size.
fileSize := fileStat.Size()

// bit を byte にする
rateLimitMByteps := rateLimitMpbs / 8
limiter := rate.NewLimiter(rate.Limit(rateLimitMByteps*1024*1024), int(rateLimitMByteps*1024*1024))

// リミッタを適用したReaderを作成
rateLimitedFileReader := NewRateLimitedReader(fileReader, limiter)

n, err := s3Client.PutObject(ctx, osConfig.BucketName, dst, rateLimitedFileReader, fileSize,
minio.PutObjectOptions{ContentType: "application/octet-stream"})
// 制限時にはマルチパートアップロードを行わない
n, err := s3Client.PutObject(ctx, osConfig.BucketName, dst, fileReader, fileSize,
minio.PutObjectOptions{ContentType: "application/octet-stream", DisableMultipart: true})
if err != nil {
return "", err
}

objectURL := fmt.Sprintf("s3://%s/%s", n.Bucket, n.Key)
return objectURL, nil
}

type RateLimitedReader struct {
reader io.Reader
limiter *rate.Limiter
}

func (r *RateLimitedReader) Read(p []byte) (int, error) {
n, err := r.reader.Read(p)
if err != nil {
return n, err
}

// ここで制限をかける
err = r.limiter.WaitN(context.TODO(), n)
return n, err
}

func NewRateLimitedReader(r io.Reader, l *rate.Limiter) *RateLimitedReader {
return &RateLimitedReader{
reader: r,
limiter: l,
}
}
17 changes: 14 additions & 3 deletions s3/s3.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package s3

import (
"net/http"
"net/url"

"github.com/minio/minio-go/v7"
Expand Down Expand Up @@ -41,12 +42,22 @@ func maybeEndpointURL(endpoint string) (string, bool) {
return endpoint, secure
}

func NewClient(endpoint string, credentials *credentials.Credentials) (*minio.Client, error) {
func NewClient(endpoint string, credentials *credentials.Credentials, transport *http.RoundTripper) (*minio.Client, error) {
newEndpoint, secure := maybeEndpointURL(endpoint)
if transport == nil {
return minio.New(
newEndpoint,
&minio.Options{
Creds: credentials,
Secure: secure,
})
}

return minio.New(
newEndpoint,
&minio.Options{
Creds: credentials,
Secure: secure,
Creds: credentials,
Secure: secure,
Transport: *transport,
})
}

0 comments on commit 43ba089

Please sign in to comment.