diff --git a/pkg/pipeline/sink/uploader/s3.go b/pkg/pipeline/sink/uploader/s3.go index a68b55ae..bf81fd30 100644 --- a/pkg/pipeline/sink/uploader/s3.go +++ b/pkg/pipeline/sink/uploader/s3.go @@ -20,6 +20,7 @@ import ( "net/http" "net/url" "os" + "strings" "sync" "github.com/aws/aws-sdk-go/aws" @@ -56,27 +57,30 @@ func (r S3Retryer) ShouldRetry(_ *request.Request) bool { type S3Logger struct { mu sync.Mutex msgs []string + idx int } func (l *S3Logger) Log(args ...interface{}) { - msg := "aws sdk:" + var sb strings.Builder + sb.WriteString("aws sdk:") for range len(args) { - msg += " %v" + sb.WriteString(" %v") } l.mu.Lock() - if len(l.msgs) >= 10 { - l.msgs = append(l.msgs[1:], msg) - } else { - l.msgs = append(l.msgs, msg) - } + l.msgs[l.idx%len(l.msgs)] = fmt.Sprintf(sb.String(), args...) + l.idx++ l.mu.Unlock() } func (l *S3Logger) PrintLogs() { l.mu.Lock() - for _, msg := range l.msgs { - logger.Debugw(msg) + size := len(l.msgs) + for range size { + if msg := l.msgs[l.idx%size]; msg != "" { + logger.Debugw(msg) + } + l.idx++ } l.mu.Unlock() } @@ -201,7 +205,9 @@ func (u *S3Uploader) getBucketLocation() (string, error) { func (u *S3Uploader) upload(localFilepath, storageFilepath string, outputType types.OutputType) (string, int64, error) { // use a separate logger for each upload - l := &S3Logger{} + l := &S3Logger{ + msgs: make([]string, 10), + } u.mu.Lock() u.awsConfig.Logger = l sess, err := session.NewSession(u.awsConfig)