From ddf767aee5d781ef0ab956723fd844fd0e76dff7 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 13 Sep 2024 11:53:09 +0800 Subject: [PATCH 1/5] interrupt requests after timeout --- fuse/api.go | 5 +++++ fuse/server.go | 52 +++++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 52 insertions(+), 5 deletions(-) diff --git a/fuse/api.go b/fuse/api.go index 7db5f0d0..6486538d 100644 --- a/fuse/api.go +++ b/fuse/api.go @@ -90,6 +90,8 @@ // you care about correctness. package fuse +import "time" + // Types for users to implement. // The result of Read is an array of bytes, but for performance @@ -227,6 +229,9 @@ type MountOptions struct { // don't alloc buffer for read operation NoAllocForRead bool + + // max duration for a request + Timeout time.Duration } // RawFileSystem is an interface close to the FUSE wire protocol. diff --git a/fuse/server.go b/fuse/server.go index 98fd4d0c..626650ed 100644 --- a/fuse/server.go +++ b/fuse/server.go @@ -52,6 +52,7 @@ type Server struct { // maxReaders is the maximum number of goroutines reading requests maxReaders int + maxUnique uint64 // Pools for []byte buffers bufferPool @@ -118,6 +119,13 @@ func (ms *Server) RecordLatencies(l LatencyMap) { // Unmount calls fusermount -u on the mount. This has the effect of // shutting down the filesystem. After the Server is unmounted, it // should be discarded. +// +// Does not work when we were mounted with the magic /dev/fd/N mountpoint syntax, +// as we do not know the real mountpoint. Unmount using +// +// fusermount -u /path/to/real/mountpoint +// +// / in this case. func (ms *Server) Unmount() (err error) { if ms.mountPoint == "" { return nil @@ -382,9 +390,7 @@ func (ms *Server) readRequest(exitIdle bool) (req *request, code Status) { } req = ms.reqPool.Get().(*request) - if ms.latencies != nil { - req.startTime = time.Now() - } + req.startTime = time.Now() gobbled := req.setInput(dest[:n]) if !gobbled { ms.readPool.Put(dest) @@ -400,6 +406,9 @@ func (ms *Server) readRequest(exitIdle bool) (req *request, code Status) { if ms.recentUnique != nil { ms.recentUnique = append(ms.recentUnique, req.inHeader.Unique) } + if req.inHeader.Unique > ms.maxUnique { + ms.maxUnique = req.inHeader.Unique + } req.inflightIndex = len(ms.reqInflight) ms.reqInflight = append(ms.reqInflight, req) @@ -422,13 +431,14 @@ func (ms *Server) checkLostRequests() { var recentUnique []uint64 time.Sleep(time.Second * 3) for { - if len(ms.recentUnique) > 10 { - ms.reqMu.Lock() + ms.reqMu.Lock() + if len(ms.recentUnique) >= 30 { recentUnique = ms.recentUnique ms.recentUnique = nil ms.reqMu.Unlock() break } + ms.reqMu.Unlock() time.Sleep(time.Second) } @@ -523,6 +533,9 @@ func (ms *Server) recordStats(req *request) { // Each filesystem operation executes in a separate goroutine. func (ms *Server) Serve() { ms.loop(false) + if ms.opts.Timeout > 0 { + go ms.checkHangRequests(ms.opts.Timeout) + } ms.loops.Wait() // shutdown in-flight cache retrieves. @@ -560,6 +573,35 @@ func (ms *Server) wakeupReader() { _ = cmd.Run() } +func (ms *Server) checkHangRequests(timeout time.Duration) { + for { + time.Sleep(timeout / 100) + var batch = 100 + for i := 0; ; i += batch { + now := time.Now() + ms.reqMu.Lock() + if ms.shutdown { + ms.reqMu.Unlock() + break + } + for j := 0; j < batch && i+j < len(ms.reqInflight); j++ { + req := ms.reqInflight[i+j] + if req.interrupted { + unique := req.inHeader.Unique + ms.reqMu.Unlock() + ms.returnInterrupted(unique) + ms.reqMu.Lock() + } else if used := now.Sub(req.startTime); used > timeout || req.inHeader.Unique+2e6 > ms.maxUnique { + log.Printf("interrupt request %d after %s: %+v", req.inHeader.Unique, used, req.inHeader) + req.interrupted = true + close(req.cancel) + } + } + ms.reqMu.Unlock() + } + } +} + func (ms *Server) Shutdown() bool { log.Printf("try to restart gracefully") start := time.Now() From 4c46d3657ae140aa8c65f32ad5c7a54dfa18de74 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 13 Sep 2024 12:34:24 +0800 Subject: [PATCH 2/5] fix build in mac --- fuse/mount_darwin.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/fuse/mount_darwin.go b/fuse/mount_darwin.go index 4a1d0904..979ab5e8 100644 --- a/fuse/mount_darwin.go +++ b/fuse/mount_darwin.go @@ -194,3 +194,9 @@ func mountV4(mountPoint string, opts *MountOptions, ready chan<- error) (fd int, func unmount(dir string, opts *MountOptions) error { return syscall.Unmount(dir, 0) } + +// parseFuseFd checks if `mountPoint` is the special form /dev/fd/N (with N >= 0), +// and returns N in this case. Returns -1 otherwise. +func parseFuseFd(mountPoint string) (fd int) { + return -1 +} From 26334db5fb09040051f8fec6255560fefdfc6319 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Sat, 14 Sep 2024 16:56:47 +0800 Subject: [PATCH 3/5] bugfix --- fuse/server.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/fuse/server.go b/fuse/server.go index 626650ed..e82942ae 100644 --- a/fuse/server.go +++ b/fuse/server.go @@ -532,10 +532,10 @@ func (ms *Server) recordStats(req *request) { // // Each filesystem operation executes in a separate goroutine. func (ms *Server) Serve() { - ms.loop(false) if ms.opts.Timeout > 0 { - go ms.checkHangRequests(ms.opts.Timeout) + go ms.checkRequestTimeout(ms.opts.Timeout) } + ms.loop(false) ms.loops.Wait() // shutdown in-flight cache retrieves. @@ -573,7 +573,7 @@ func (ms *Server) wakeupReader() { _ = cmd.Run() } -func (ms *Server) checkHangRequests(timeout time.Duration) { +func (ms *Server) checkRequestTimeout(timeout time.Duration) { for { time.Sleep(timeout / 100) var batch = 100 From 952ad4c1a8869872cb9fc5878ad271e0672ccc14 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 16 Sep 2024 11:26:31 +0800 Subject: [PATCH 4/5] interrupt long standing requests --- fuse/mount_darwin.go | 6 ------ fuse/server.go | 8 ++++---- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/fuse/mount_darwin.go b/fuse/mount_darwin.go index 979ab5e8..4a1d0904 100644 --- a/fuse/mount_darwin.go +++ b/fuse/mount_darwin.go @@ -194,9 +194,3 @@ func mountV4(mountPoint string, opts *MountOptions, ready chan<- error) (fd int, func unmount(dir string, opts *MountOptions) error { return syscall.Unmount(dir, 0) } - -// parseFuseFd checks if `mountPoint` is the special form /dev/fd/N (with N >= 0), -// and returns N in this case. Returns -1 otherwise. -func parseFuseFd(mountPoint string) (fd int) { - return -1 -} diff --git a/fuse/server.go b/fuse/server.go index e82942ae..382f2345 100644 --- a/fuse/server.go +++ b/fuse/server.go @@ -454,7 +454,7 @@ func (ms *Server) checkLostRequests() { // interrupt historic ones last = recentUnique[0] - 1 var c int - for last > 0 && c < 3e6 { + for last > 0 && c < 6e6 { ms.returnInterrupted(last) last-- c++ @@ -575,12 +575,12 @@ func (ms *Server) wakeupReader() { func (ms *Server) checkRequestTimeout(timeout time.Duration) { for { - time.Sleep(timeout / 100) + time.Sleep(time.Second) var batch = 100 for i := 0; ; i += batch { now := time.Now() ms.reqMu.Lock() - if ms.shutdown { + if ms.shutdown || i >= len(ms.reqInflight) { ms.reqMu.Unlock() break } @@ -591,7 +591,7 @@ func (ms *Server) checkRequestTimeout(timeout time.Duration) { ms.reqMu.Unlock() ms.returnInterrupted(unique) ms.reqMu.Lock() - } else if used := now.Sub(req.startTime); used > timeout || req.inHeader.Unique+2e6 > ms.maxUnique { + } else if used := now.Sub(req.startTime); used > timeout || req.inHeader.Unique+5.5e6 < ms.maxUnique { log.Printf("interrupt request %d after %s: %+v", req.inHeader.Unique, used, req.inHeader) req.interrupted = true close(req.cancel) From ddfae82be0b3cf454045267c5d43a3b753ea20d0 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 16 Sep 2024 11:40:53 +0800 Subject: [PATCH 5/5] cleanup --- fuse/server.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/fuse/server.go b/fuse/server.go index 382f2345..1fd5587c 100644 --- a/fuse/server.go +++ b/fuse/server.go @@ -119,13 +119,6 @@ func (ms *Server) RecordLatencies(l LatencyMap) { // Unmount calls fusermount -u on the mount. This has the effect of // shutting down the filesystem. After the Server is unmounted, it // should be discarded. -// -// Does not work when we were mounted with the magic /dev/fd/N mountpoint syntax, -// as we do not know the real mountpoint. Unmount using -// -// fusermount -u /path/to/real/mountpoint -// -// / in this case. func (ms *Server) Unmount() (err error) { if ms.mountPoint == "" { return nil