diff --git a/storagev2/internal/uplog/uplog_buffer.go b/storagev2/internal/uplog/uplog_buffer.go index 6ea89693..2699e484 100644 --- a/storagev2/internal/uplog/uplog_buffer.go +++ b/storagev2/internal/uplog/uplog_buffer.go @@ -9,6 +9,7 @@ import ( "path/filepath" "strings" "sync" + "sync/atomic" "time" "github.com/gofrs/flock" @@ -24,7 +25,8 @@ var ( uplogFileBuffer *os.File uplogFileBufferFileLocker *flock.Flock uplogFileBufferLock sync.Mutex - uplogFileBufferThreshold int64 = 4 * 1024 * 1024 + uplogFileBufferThreshold uint64 = 4 * 1024 * 1024 + uplogMaxStorageBytes uint64 = 100 * 1024 * 1024 uplogWriteFileBufferTicker *time.Ticker uplogWriteFileBufferInterval time.Duration = 1 * time.Minute uplogWriteFileBufferTimerLock sync.Mutex @@ -93,6 +95,14 @@ func IsUplogEnabled() bool { return !uplogDisabled } +func GetUplogMaxStorageBytes() uint64 { + return atomic.LoadUint64(&uplogMaxStorageBytes) +} + +func SetUplogMaxStorageBytes(max uint64) { + atomic.StoreUint64(&uplogMaxStorageBytes, max) +} + func SetUplogFileBufferDirPath(path string) { uplogFileBufferDirPathMutex.Lock() defer uplogFileBufferDirPathMutex.Unlock() @@ -148,7 +158,7 @@ func writeMemoryBufferToFileBuffer(data []byte) (n int, err error) { return } - if fi, serr := os.Stat(getUplogFileBufferPath(true)); serr == nil && fi.Size() >= uplogFileBufferThreshold { + if fi, serr := os.Stat(getUplogFileBufferPath(true)); serr == nil && uint64(fi.Size()) >= uplogFileBufferThreshold { tryToArchiveFileBuffer(false) } return diff --git a/storagev2/internal/uplog/uplog_buffer_test.go b/storagev2/internal/uplog/uplog_buffer_test.go index 47fbf4a9..a7945b65 100644 --- a/storagev2/internal/uplog/uplog_buffer_test.go +++ b/storagev2/internal/uplog/uplog_buffer_test.go @@ -15,6 +15,7 @@ import ( "net/http/httptest" "os" "path/filepath" + "strings" "sync" "sync/atomic" "testing" @@ -163,4 +164,112 @@ func TestUplogArchiveFileBuffer(t *testing.T) { if !bytes.Equal(md5HasherClient.Sum(nil), md5HasherServer.Sum(nil)) { t.Fatal("unexpected request body") } + entries, err := ioutil.ReadDir(tmpDir) + if err != nil { + t.Fatal(err) + } + if len(entries) != 2 && len(entries) != 3 { + t.Fatalf("unexpected uplog buffer files count") + } + for _, entry := range entries { + if !strings.HasSuffix(entry.Name(), ".lock") && entry.Name() != UPLOG_FILE_BUFFER_NAME { + t.Fatalf("unexpected uplog buffer file: %s", entry.Name()) + } + } +} + +func TestUplogArchiveFileBufferFailed(t *testing.T) { + testLock.Lock() + defer testLock.Unlock() + + tmpDir, err := ioutil.TempDir("", "test-uplog-*") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpDir) + + defer func() { + if err := FlushBuffer(); err != nil { + t.Fatal(err) + } + }() + + var called int32 + httpServerMux := http.NewServeMux() + httpServerMux.Handle("/log/4", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + t.Fatalf("Unexpected method: %s", r.Method) + } + if r.URL.Query().Get("compressed") != "gzip" { + t.Fatalf("Unexpected compressed: %s", r.URL.Query().Get("compressed")) + } + if r.Header.Get("Authorization") != "UpToken fakeuptoken" { + t.Fatalf("Unexpected Authorization: %s", r.Header.Get("Authorization")) + } + if atomic.AddInt32(&called, 1) > 1 { + if r.Header.Get(X_LOG_CLIENT_ID) != "fake-x-log-client-id" { + t.Fatalf("Unexpected X-Log-Client-Id: %s", r.Header.Get("X_LOG_CLIENT_ID")) + } + } + w.Header().Add(X_LOG_CLIENT_ID, "fake-x-log-client-id") + w.WriteHeader(http.StatusInternalServerError) + })) + httpServer := httptest.NewServer(httpServerMux) + defer httpServer.Close() + + SetUplogUrl(httpServer.URL) + defer SetUplogUrl("") + + getUpToken = func() (string, error) { return "fakeuptoken", nil } + defer func() { getUpToken = nil }() + + SetUplogFileBufferDirPath(tmpDir) + defer SetUplogFileBufferDirPath("") + + DisableUplog() + defer EnableUplog() + + originalUplogMaxStorageBytes := GetUplogMaxStorageBytes() + SetUplogMaxStorageBytes(48 * 1024) + defer SetUplogMaxStorageBytes(originalUplogMaxStorageBytes) + + originalUplogFileBufferThreshold := uplogFileBufferThreshold + uplogFileBufferThreshold = 24 * 1024 + defer func() { + uplogFileBufferThreshold = originalUplogFileBufferThreshold + }() + + uplogBuffer := bytes.NewBuffer(make([]byte, 0, 4*1024)) + r := rand.New(rand.NewSource(time.Now().UnixNano())) + + for i := 0; i < 4*24; i++ { + n, err := io.CopyN(uplogBuffer, r, 1024) + if err != nil { + t.Fatal(err) + } else if n != 1024 { + t.Fatalf("unexpected n: %d", n) + } + + writeMemoryBufferToFileBuffer(uplogBuffer.Bytes()) + uplogBuffer.Reset() + time.Sleep(10 * time.Nanosecond) + } + tryToArchiveFileBuffer(true) + time.Sleep(100 * time.Millisecond) + c := atomic.LoadInt32(&called) + if c == 0 { + t.Fatal("unexpected upload count") + } + + entries, err := ioutil.ReadDir(tmpDir) + if err != nil { + t.Fatal(err) + } + totalSize := uint64(0) + for _, entry := range entries { + totalSize += uint64(entry.Size()) + } + if totalSize > 48*1024 { + t.Fatalf("unexpected uplog buffer file size: %d", totalSize) + } } diff --git a/storagev2/internal/uplog/uplog_upload.go b/storagev2/internal/uplog/uplog_upload.go index 4202125b..e687006d 100644 --- a/storagev2/internal/uplog/uplog_upload.go +++ b/storagev2/internal/uplog/uplog_upload.go @@ -6,6 +6,7 @@ import ( "net/http" "os" "path/filepath" + "sort" "sync" clientv1 "github.com/qiniu/go-sdk/v7/client" @@ -72,8 +73,32 @@ func uploadAllClosedFileBuffers() { } if err = uploadUplogLog(archivedPaths); err == nil { - for _, archarchivedPath := range archivedPaths { - os.Remove(archarchivedPath) + for _, archivedPath := range archivedPaths { + os.Remove(archivedPath) + } + } else { + sort.Strings(archivedPaths) + var ( + archivedPathsLen = len(archivedPaths) + totalSize uint64 = 0 + deleteAllRest bool = false + ) + for i := range archivedPaths { + archivedPath := archivedPaths[archivedPathsLen-i-1] + if !deleteAllRest { + fileInfo, err := os.Stat(archivedPath) + if err != nil { + return + } + if totalSize+uint64(fileInfo.Size()) > GetUplogMaxStorageBytes() { + deleteAllRest = true + } else { + totalSize += uint64(fileInfo.Size()) + } + } + if deleteAllRest { + os.Remove(archivedPath) + } } } } diff --git a/storagev2/uplog/uplog.go b/storagev2/uplog/uplog.go index 14d177fa..3ef94d97 100644 --- a/storagev2/uplog/uplog.go +++ b/storagev2/uplog/uplog.go @@ -51,6 +51,16 @@ func IsUplogEnabled() bool { return uplog.IsUplogEnabled() } +// GetUplogMaxStorageBytes 获取日志最大存储容量 +func GetUplogMaxStorageBytes() uint64 { + return uplog.GetUplogMaxStorageBytes() +} + +// SetUplogMaxStorageBytes 设置日志最大存储容量 +func SetUplogMaxStorageBytes(max uint64) { + uplog.SetUplogMaxStorageBytes(max) +} + // SetUplogFileBufferDirPath 设置日志文件缓存目录 func SetUplogFileBufferDirPath(path string) { uplog.SetUplogFileBufferDirPath(path)