Skip to content

Commit

Permalink
now storage of uplog files would be limited
Browse files Browse the repository at this point in the history
  • Loading branch information
bachue committed Jul 2, 2024
1 parent 710aab2 commit 921a54e
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 4 deletions.
14 changes: 12 additions & 2 deletions storagev2/internal/uplog/uplog_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/gofrs/flock"
Expand All @@ -24,7 +25,8 @@ var (
uplogFileBuffer *os.File
uplogFileBufferFileLocker *flock.Flock
uplogFileBufferLock sync.Mutex
uplogFileBufferThreshold int64 = 4 * 1024 * 1024
uplogFileBufferThreshold uint64 = 4 * 1024 * 1024
uplogMaxStorageBytes uint64 = 100 * 1024 * 1024
uplogWriteFileBufferTicker *time.Ticker
uplogWriteFileBufferInterval time.Duration = 1 * time.Minute
uplogWriteFileBufferTimerLock sync.Mutex
Expand Down Expand Up @@ -93,6 +95,14 @@ func IsUplogEnabled() bool {
return !uplogDisabled
}

func GetUplogMaxStorageBytes() uint64 {
return atomic.LoadUint64(&uplogMaxStorageBytes)
}

func SetUplogMaxStorageBytes(max uint64) {
atomic.StoreUint64(&uplogMaxStorageBytes, max)
}

func SetUplogFileBufferDirPath(path string) {
uplogFileBufferDirPathMutex.Lock()
defer uplogFileBufferDirPathMutex.Unlock()
Expand Down Expand Up @@ -148,7 +158,7 @@ func writeMemoryBufferToFileBuffer(data []byte) (n int, err error) {
return
}

if fi, serr := os.Stat(getUplogFileBufferPath(true)); serr == nil && fi.Size() >= uplogFileBufferThreshold {
if fi, serr := os.Stat(getUplogFileBufferPath(true)); serr == nil && uint64(fi.Size()) >= uplogFileBufferThreshold {
tryToArchiveFileBuffer(false)
}
return
Expand Down
109 changes: 109 additions & 0 deletions storagev2/internal/uplog/uplog_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"net/http/httptest"
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -163,4 +164,112 @@ func TestUplogArchiveFileBuffer(t *testing.T) {
if !bytes.Equal(md5HasherClient.Sum(nil), md5HasherServer.Sum(nil)) {
t.Fatal("unexpected request body")
}
entries, err := ioutil.ReadDir(tmpDir)
if err != nil {
t.Fatal(err)
}
if len(entries) != 2 && len(entries) != 3 {
t.Fatalf("unexpected uplog buffer files count")
}
for _, entry := range entries {
if !strings.HasSuffix(entry.Name(), ".lock") && entry.Name() != UPLOG_FILE_BUFFER_NAME {
t.Fatalf("unexpected uplog buffer file: %s", entry.Name())
}
}
}

func TestUplogArchiveFileBufferFailed(t *testing.T) {
testLock.Lock()
defer testLock.Unlock()

tmpDir, err := ioutil.TempDir("", "test-uplog-*")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpDir)

defer func() {
if err := FlushBuffer(); err != nil {
t.Fatal(err)
}
}()

var called int32
httpServerMux := http.NewServeMux()
httpServerMux.Handle("/log/4", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Fatalf("Unexpected method: %s", r.Method)
}
if r.URL.Query().Get("compressed") != "gzip" {
t.Fatalf("Unexpected compressed: %s", r.URL.Query().Get("compressed"))
}
if r.Header.Get("Authorization") != "UpToken fakeuptoken" {
t.Fatalf("Unexpected Authorization: %s", r.Header.Get("Authorization"))
}
if atomic.AddInt32(&called, 1) > 1 {
if r.Header.Get(X_LOG_CLIENT_ID) != "fake-x-log-client-id" {
t.Fatalf("Unexpected X-Log-Client-Id: %s", r.Header.Get("X_LOG_CLIENT_ID"))
}
}
w.Header().Add(X_LOG_CLIENT_ID, "fake-x-log-client-id")
w.WriteHeader(http.StatusInternalServerError)
}))
httpServer := httptest.NewServer(httpServerMux)
defer httpServer.Close()

SetUplogUrl(httpServer.URL)
defer SetUplogUrl("")

getUpToken = func() (string, error) { return "fakeuptoken", nil }
defer func() { getUpToken = nil }()

SetUplogFileBufferDirPath(tmpDir)
defer SetUplogFileBufferDirPath("")

DisableUplog()
defer EnableUplog()

originalUplogMaxStorageBytes := GetUplogMaxStorageBytes()
SetUplogMaxStorageBytes(48 * 1024)
defer SetUplogMaxStorageBytes(originalUplogMaxStorageBytes)

originalUplogFileBufferThreshold := uplogFileBufferThreshold
uplogFileBufferThreshold = 24 * 1024
defer func() {
uplogFileBufferThreshold = originalUplogFileBufferThreshold
}()

uplogBuffer := bytes.NewBuffer(make([]byte, 0, 4*1024))
r := rand.New(rand.NewSource(time.Now().UnixNano()))

for i := 0; i < 4*24; i++ {
n, err := io.CopyN(uplogBuffer, r, 1024)
if err != nil {
t.Fatal(err)
} else if n != 1024 {
t.Fatalf("unexpected n: %d", n)
}

writeMemoryBufferToFileBuffer(uplogBuffer.Bytes())
uplogBuffer.Reset()
time.Sleep(10 * time.Nanosecond)
}
tryToArchiveFileBuffer(true)
time.Sleep(100 * time.Millisecond)
c := atomic.LoadInt32(&called)
if c == 0 {
t.Fatal("unexpected upload count")
}

entries, err := ioutil.ReadDir(tmpDir)
if err != nil {
t.Fatal(err)
}
totalSize := uint64(0)
for _, entry := range entries {
totalSize += uint64(entry.Size())
}
if totalSize > 48*1024 {
t.Fatalf("unexpected uplog buffer file size: %d", totalSize)
}
}
29 changes: 27 additions & 2 deletions storagev2/internal/uplog/uplog_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"os"
"path/filepath"
"sort"
"sync"

clientv1 "github.com/qiniu/go-sdk/v7/client"
Expand Down Expand Up @@ -72,8 +73,32 @@ func uploadAllClosedFileBuffers() {
}

if err = uploadUplogLog(archivedPaths); err == nil {
for _, archarchivedPath := range archivedPaths {
os.Remove(archarchivedPath)
for _, archivedPath := range archivedPaths {
os.Remove(archivedPath)
}
} else {
sort.Strings(archivedPaths)
var (
archivedPathsLen = len(archivedPaths)
totalSize uint64 = 0
deleteAllRest bool = false
)
for i := range archivedPaths {
archivedPath := archivedPaths[archivedPathsLen-i-1]
if !deleteAllRest {
fileInfo, err := os.Stat(archivedPath)
if err != nil {
return
}
if totalSize+uint64(fileInfo.Size()) > GetUplogMaxStorageBytes() {
deleteAllRest = true
} else {
totalSize += uint64(fileInfo.Size())
}
}
if deleteAllRest {
os.Remove(archivedPath)
}
}
}
}
Expand Down
10 changes: 10 additions & 0 deletions storagev2/uplog/uplog.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ func IsUplogEnabled() bool {
return uplog.IsUplogEnabled()
}

// GetUplogMaxStorageBytes 获取日志最大存储容量
func GetUplogMaxStorageBytes() uint64 {
return uplog.GetUplogMaxStorageBytes()
}

// SetUplogMaxStorageBytes 设置日志最大存储容量
func SetUplogMaxStorageBytes(max uint64) {
uplog.SetUplogMaxStorageBytes(max)
}

// SetUplogFileBufferDirPath 设置日志文件缓存目录
func SetUplogFileBufferDirPath(path string) {
uplog.SetUplogFileBufferDirPath(path)
Expand Down

0 comments on commit 921a54e

Please sign in to comment.