From 43ba0899c2033c0fdb25a5e734ac861735ff61dd Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Tue, 17 Oct 2023 16:19:50 +0900 Subject: [PATCH] =?UTF-8?q?=E3=82=A2=E3=83=83=E3=83=97=E3=83=AD=E3=83=BC?= =?UTF-8?q?=E3=83=89=E9=80=9F=E5=BA=A6=E3=81=AE=E5=88=B6=E9=99=90=E3=82=92?= =?UTF-8?q?=E8=BF=BD=E5=8A=A0=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.go | 2 +- go.mod | 2 ++ go.sum | 2 ++ runner.go | 5 +++++ s3.go | 64 ++++++++++++++++++++++--------------------------------- s3/s3.go | 17 ++++++++++++--- 6 files changed, 49 insertions(+), 43 deletions(-) diff --git a/config.go b/config.go index 7ac15d3..a16a3d9 100644 --- a/config.go +++ b/config.go @@ -43,7 +43,7 @@ type Config struct { UploadWorkers int `ini:"upload_workers"` // 1 ファイルあたりのアップロードレート制限 - UploadFileRateLimitMbps float64 `ini:"upload_file_rate_limit_mbps"` + UploadFileRateLimitMbps int `ini:"upload_file_rate_limit_mbps"` UploadedFileCacheSize int `ini:"uploaded_file_cache_size"` diff --git a/go.mod b/go.mod index 5498eff..2c3f291 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,8 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.2.1 ) +require github.com/conduitio/bwlimit v0.1.0 // indirect + require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/dustin/go-humanize v1.0.1 // indirect diff --git a/go.sum b/go.sum index b9a61ac..83f2556 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/conduitio/bwlimit v0.1.0 h1:x3ijON0TSghQob4tFKaEvKixFmYKfVJQeSpXluC2JvE= +github.com/conduitio/bwlimit v0.1.0/go.mod h1:E+ASZ1/5L33MTb8hJTERs5Xnmh6Ulq3jbRh7LrdbXWU= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= diff --git a/runner.go b/runner.go index d177fa2..74a7dda 100644 --- a/runner.go +++ b/runner.go @@ -151,6 +151,11 @@ func Run(configFilePath *string) { resp.Body.Close() } + // 指定は 0 (制限なし) または 10 Mbps 以上 + if (config.UploadFileRateLimitMbps > 0) && (config.UploadFileRateLimitMbps < 10) { + zlog.Fatal().Msg("UPLOAD-FILE-RATE-LIMIT-MBPS-ERROR") + } + zlog.Info().Msg("STARTED-SORA-ARCHIVE-UPLOADER") // シグナルをキャッチして停止処理 diff --git a/s3.go b/s3.go index 8cfc8c9..ff76b63 100644 --- a/s3.go +++ b/s3.go @@ -3,17 +3,19 @@ package archive import ( "context" "fmt" - "io" + "net" + "net/http" "net/url" "os" "path/filepath" - - "golang.org/x/time/rate" + "time" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" zlog "github.com/rs/zerolog/log" "github.com/shiguredo/sora-archive-uploader/s3" + + "github.com/conduitio/bwlimit" ) func uploadJSONFile( @@ -34,7 +36,7 @@ func uploadJSONFile( creds = credentials.NewIAM("") } - s3Client, err := s3.NewClient(osConfig.Endpoint, creds) + s3Client, err := s3.NewClient(osConfig.Endpoint, creds, nil) if err != nil { return "", err } @@ -78,7 +80,7 @@ func uploadWebMFile(ctx context.Context, osConfig *s3.S3CompatibleObjectStorage, } else { creds = credentials.NewIAM("") } - s3Client, err := s3.NewClient(osConfig.Endpoint, creds) + s3Client, err := s3.NewClient(osConfig.Endpoint, creds, nil) if err != nil { return "", err } @@ -127,7 +129,7 @@ func isFileContinuous(err error) bool { } func uploadWebMFileWithRateLimit(ctx context.Context, osConfig *s3.S3CompatibleObjectStorage, dst, filePath string, - rateLimitMpbs float64) (string, error) { + rateLimitMpbs int) (string, error) { var creds *credentials.Credentials if (osConfig.AccessKeyID != "") || (osConfig.SecretAccessKey != "") { creds = credentials.NewStaticV4( @@ -140,7 +142,20 @@ func uploadWebMFileWithRateLimit(ctx context.Context, osConfig *s3.S3CompatibleO } else { creds = credentials.NewIAM("") } - s3Client, err := s3.NewClient(osConfig.Endpoint, creds) + + // bit を byte にする + rateLimitMByteps := (bwlimit.Byte(rateLimitMpbs) * bwlimit.MiB) / 8 + + // 受信には制限をかけない + dialer := bwlimit.NewDialer(&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }, rateLimitMByteps, 0) + + transport := http.DefaultTransport + transport.(*http.Transport).DialContext = dialer.DialContext + + s3Client, err := s3.NewClient(osConfig.Endpoint, creds, &transport) if err != nil { return "", err } @@ -160,15 +175,9 @@ func uploadWebMFileWithRateLimit(ctx context.Context, osConfig *s3.S3CompatibleO // Save the file size. fileSize := fileStat.Size() - // bit を byte にする - rateLimitMByteps := rateLimitMpbs / 8 - limiter := rate.NewLimiter(rate.Limit(rateLimitMByteps*1024*1024), int(rateLimitMByteps*1024*1024)) - - // リミッタを適用したReaderを作成 - rateLimitedFileReader := NewRateLimitedReader(fileReader, limiter) - - n, err := s3Client.PutObject(ctx, osConfig.BucketName, dst, rateLimitedFileReader, fileSize, - minio.PutObjectOptions{ContentType: "application/octet-stream"}) + // 制限時にはマルチパートアップロードを行わない + n, err := s3Client.PutObject(ctx, osConfig.BucketName, dst, fileReader, fileSize, + minio.PutObjectOptions{ContentType: "application/octet-stream", DisableMultipart: true}) if err != nil { return "", err } @@ -176,26 +185,3 @@ func uploadWebMFileWithRateLimit(ctx context.Context, osConfig *s3.S3CompatibleO objectURL := fmt.Sprintf("s3://%s/%s", n.Bucket, n.Key) return objectURL, nil } - -type RateLimitedReader struct { - reader io.Reader - limiter *rate.Limiter -} - -func (r *RateLimitedReader) Read(p []byte) (int, error) { - n, err := r.reader.Read(p) - if err != nil { - return n, err - } - - // ここで制限をかける - err = r.limiter.WaitN(context.TODO(), n) - return n, err -} - -func NewRateLimitedReader(r io.Reader, l *rate.Limiter) *RateLimitedReader { - return &RateLimitedReader{ - reader: r, - limiter: l, - } -} diff --git a/s3/s3.go b/s3/s3.go index 612bec0..593dcc3 100644 --- a/s3/s3.go +++ b/s3/s3.go @@ -1,6 +1,7 @@ package s3 import ( + "net/http" "net/url" "github.com/minio/minio-go/v7" @@ -41,12 +42,22 @@ func maybeEndpointURL(endpoint string) (string, bool) { return endpoint, secure } -func NewClient(endpoint string, credentials *credentials.Credentials) (*minio.Client, error) { +func NewClient(endpoint string, credentials *credentials.Credentials, transport *http.RoundTripper) (*minio.Client, error) { newEndpoint, secure := maybeEndpointURL(endpoint) + if transport == nil { + return minio.New( + newEndpoint, + &minio.Options{ + Creds: credentials, + Secure: secure, + }) + } + return minio.New( newEndpoint, &minio.Options{ - Creds: credentials, - Secure: secure, + Creds: credentials, + Secure: secure, + Transport: *transport, }) }