Skip to content

Commit

Permalink
UploaderManager に limiter を持たせる
Browse files Browse the repository at this point in the history
  • Loading branch information
Hexa committed Oct 12, 2023
1 parent 8411f6e commit 880e326
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 12 deletions.
9 changes: 8 additions & 1 deletion runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

zlog "github.com/rs/zerolog/log"
"golang.org/x/time/rate"
)

type Main struct {
Expand Down Expand Up @@ -62,7 +63,13 @@ func (m *Main) run(ctx context.Context, cancel context.CancelFunc) error {
gateKeeper := newGateKeeper(m.config)
recordingFileStream := gateKeeper.run(processContext, foundFiles)

uploaderManager := newUploaderManager()
rateLimitMpbs := m.config.UploadFileRateLimitMbps
// bit を byte にする
rateLimitMByteps := rateLimitMpbs / 8
limiter := rate.NewLimiter(rate.Limit(rateLimitMByteps*1024*1024), int(rateLimitMByteps*1024*1024))

// rate.Limiter は共通
uploaderManager := newUploaderManager(limiter)
_, err = uploaderManager.run(processContext, m.config, recordingFileStream)
if err != nil {
processContextCancel()
Expand Down
6 changes: 1 addition & 5 deletions s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func isFileContinuous(err error) bool {
}

func uploadWebMFileWithRateLimit(ctx context.Context, osConfig *s3.S3CompatibleObjectStorage, dst, filePath string,
rateLimitMpbs float64) (string, error) {
limiter *rate.Limiter) (string, error) {
var creds *credentials.Credentials
if (osConfig.AccessKeyID != "") || (osConfig.SecretAccessKey != "") {
creds = credentials.NewStaticV4(
Expand Down Expand Up @@ -160,10 +160,6 @@ func uploadWebMFileWithRateLimit(ctx context.Context, osConfig *s3.S3CompatibleO
// Save the file size.
fileSize := fileStat.Size()

// bit を byte にする
rateLimitMByteps := rateLimitMpbs / 8
limiter := rate.NewLimiter(rate.Limit(rateLimitMByteps*1024*1024), int(rateLimitMByteps*1024*1024))

// リミッタを適用したReaderを作成
rateLimitedFileReader := NewRateLimitedReader(fileReader, limiter)

Expand Down
16 changes: 10 additions & 6 deletions uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/google/uuid"
"github.com/shiguredo/sora-archive-uploader/s3"
base32 "github.com/shogo82148/go-clockwork-base32"
"golang.org/x/time/rate"

zlog "github.com/rs/zerolog/log"
)
Expand All @@ -28,6 +29,7 @@ type UploaderManager struct {
ArchiveEndStream chan UploaderResult
ReportStream chan UploaderResult
uploaders []Uploader
Limiter *rate.Limiter
}

type ArchiveMetadata struct {
Expand All @@ -52,7 +54,7 @@ type ArchiveEndMetadata struct {
Filename string `json:"filename"`
}

func newUploaderManager() *UploaderManager {
func newUploaderManager(limiter *rate.Limiter) *UploaderManager {
var uploaders []Uploader
archiveStream := make(chan UploaderResult)
archiveEndStream := make(chan UploaderResult)
Expand All @@ -62,6 +64,7 @@ func newUploaderManager() *UploaderManager {
ArchiveEndStream: archiveEndStream,
ReportStream: reportStream,
uploaders: uploaders,
Limiter: limiter,
}
}

Expand All @@ -71,7 +74,7 @@ func (um *UploaderManager) run(ctx context.Context, config *Config, fileStream <
if err != nil {
return nil, err
}
uploader.run(fileStream, um.ArchiveStream, um.ArchiveEndStream, um.ReportStream)
uploader.run(fileStream, um.ArchiveStream, um.ArchiveEndStream, um.ReportStream, um.Limiter)
um.uploaders = append(um.uploaders, *uploader)
}
go func() {
Expand Down Expand Up @@ -119,6 +122,7 @@ func (u Uploader) run(
outArchive chan UploaderResult,
outArchiveEnd chan UploaderResult,
outReport chan UploaderResult,
limiter *rate.Limiter,
) {
go func() {
for {
Expand Down Expand Up @@ -166,7 +170,7 @@ func (u Uploader) run(
Int("uploader_id", u.id).
Str("file_path", inputFilepath).
Msg("FOUND-AT-STARTUP")
ok := u.handleArchive(inputFilepath, false)
ok := u.handleArchive(inputFilepath, false, limiter)
select {
case <-u.ctx.Done():
return
Expand All @@ -180,7 +184,7 @@ func (u Uploader) run(
Int("uploader_id", u.id).
Str("file_path", inputFilepath).
Msg("FOUND-AT-STARTUP")
ok := u.handleArchive(inputFilepath, true)
ok := u.handleArchive(inputFilepath, true, limiter)
select {
case <-u.ctx.Done():
return
Expand All @@ -199,7 +203,7 @@ func (u Uploader) Stop() {
u.cancel()
}

func (u Uploader) handleArchive(archiveJSONFilePath string, split bool) bool {
func (u Uploader) handleArchive(archiveJSONFilePath string, split bool, limiter *rate.Limiter) bool {
fileInfo, err := os.Stat(archiveJSONFilePath)
if err != nil {
zlog.Error().
Expand Down Expand Up @@ -297,7 +301,7 @@ func (u Uploader) handleArchive(archiveJSONFilePath string, split bool) bool {
if u.config.UploadFileRateLimitMbps == 0 {
fileURL, err = uploadWebMFile(u.ctx, osConfig, webmObjectKey, webmFilepath)
} else {
fileURL, err = uploadWebMFileWithRateLimit(u.ctx, osConfig, webmObjectKey, webmFilepath, u.config.UploadFileRateLimitMbps)
fileURL, err = uploadWebMFileWithRateLimit(u.ctx, osConfig, webmObjectKey, webmFilepath, limiter)
}

if err != nil {
Expand Down

0 comments on commit 880e326

Please sign in to comment.