Skip to content

Commit

Permalink
Interrupt long standing requests (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
davies authored Sep 16, 2024
1 parent 3428c8e commit dbcfaf9
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 6 deletions.
5 changes: 5 additions & 0 deletions fuse/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@
// you care about correctness.
package fuse

import "time"

// Types for users to implement.

// The result of Read is an array of bytes, but for performance
Expand Down Expand Up @@ -227,6 +229,9 @@ type MountOptions struct {

// don't alloc buffer for read operation
NoAllocForRead bool

// max duration for a request
Timeout time.Duration
}

// RawFileSystem is an interface close to the FUSE wire protocol.
Expand Down
47 changes: 41 additions & 6 deletions fuse/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Server struct {

// maxReaders is the maximum number of goroutines reading requests
maxReaders int
maxUnique uint64

// Pools for []byte
buffers bufferPool
Expand Down Expand Up @@ -382,9 +383,7 @@ func (ms *Server) readRequest(exitIdle bool) (req *request, code Status) {
}

req = ms.reqPool.Get().(*request)
if ms.latencies != nil {
req.startTime = time.Now()
}
req.startTime = time.Now()
gobbled := req.setInput(dest[:n])
if !gobbled {
ms.readPool.Put(dest)
Expand All @@ -400,6 +399,9 @@ func (ms *Server) readRequest(exitIdle bool) (req *request, code Status) {
if ms.recentUnique != nil {
ms.recentUnique = append(ms.recentUnique, req.inHeader.Unique)
}
if req.inHeader.Unique > ms.maxUnique {
ms.maxUnique = req.inHeader.Unique
}
req.inflightIndex = len(ms.reqInflight)
ms.reqInflight = append(ms.reqInflight, req)

Expand All @@ -422,13 +424,14 @@ func (ms *Server) checkLostRequests() {
var recentUnique []uint64
time.Sleep(time.Second * 3)
for {
if len(ms.recentUnique) > 10 {
ms.reqMu.Lock()
ms.reqMu.Lock()
if len(ms.recentUnique) >= 30 {
recentUnique = ms.recentUnique
ms.recentUnique = nil
ms.reqMu.Unlock()
break
}
ms.reqMu.Unlock()
time.Sleep(time.Second)
}

Expand All @@ -444,7 +447,7 @@ func (ms *Server) checkLostRequests() {
// interrupt historic ones
last = recentUnique[0] - 1
var c int
for last > 0 && c < 3e6 {
for last > 0 && c < 6e6 {
ms.returnInterrupted(last)
last--
c++
Expand Down Expand Up @@ -522,6 +525,9 @@ func (ms *Server) recordStats(req *request) {
//
// Each filesystem operation executes in a separate goroutine.
func (ms *Server) Serve() {
if ms.opts.Timeout > 0 {
go ms.checkRequestTimeout(ms.opts.Timeout)
}
ms.loop(false)
ms.loops.Wait()

Expand Down Expand Up @@ -560,6 +566,35 @@ func (ms *Server) wakeupReader() {
_ = cmd.Run()
}

func (ms *Server) checkRequestTimeout(timeout time.Duration) {
for {
time.Sleep(time.Second)
var batch = 100
for i := 0; ; i += batch {
now := time.Now()
ms.reqMu.Lock()
if ms.shutdown || i >= len(ms.reqInflight) {
ms.reqMu.Unlock()
break
}
for j := 0; j < batch && i+j < len(ms.reqInflight); j++ {
req := ms.reqInflight[i+j]
if req.interrupted {
unique := req.inHeader.Unique
ms.reqMu.Unlock()
ms.returnInterrupted(unique)
ms.reqMu.Lock()
} else if used := now.Sub(req.startTime); used > timeout || req.inHeader.Unique+5.5e6 < ms.maxUnique {
log.Printf("interrupt request %d after %s: %+v", req.inHeader.Unique, used, req.inHeader)
req.interrupted = true
close(req.cancel)
}
}
ms.reqMu.Unlock()
}
}
}

func (ms *Server) Shutdown() bool {
log.Printf("try to restart gracefully")
start := time.Now()
Expand Down

0 comments on commit dbcfaf9

Please sign in to comment.