Skip to content

Commit

Permalink
Reduce memory usage during walk (#114)
Browse files Browse the repository at this point in the history
* Reuse request channels instead of just throwing them away

* Prevent unbounded memory allocations if path writing stalls

* Reuse flowcontrol channels

* Error callback should only be used for non-fatal errors

* Buffer dirent channel

* Buffer output files and avoid allocation during path quoting.

* Create new filePath type that uses a sync.Pool to avoid string allocations
  • Loading branch information
mjkw31 authored Nov 29, 2024
1 parent bf9da81 commit e162843
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 77 deletions.
58 changes: 55 additions & 3 deletions walk/dirent.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,66 @@ package walk
import (
"io/fs"
"os"
"sync"
"unsafe"

"github.com/wtsi-hgi/godirwalk"
)

const maxPathLength = 4096

var filepathPool = sync.Pool{ //nolint:gochecknoglobals
New: func() any {
return new(filePath)
},
}

type filePath struct {
buf [maxPathLength]byte
len int
}

func newFilePath(path string) *filePath {
c := filepathPool.Get().(*filePath) //nolint:errcheck,forcetypeassert
c.len = copy(c.buf[:], path)

return c
}

func (f *filePath) Done() {
f.len = 0

filepathPool.Put(f)
}

func (f *filePath) Sub(d *godirwalk.Dirent) *filePath {
c := filepathPool.Get().(*filePath) //nolint:errcheck,forcetypeassert

copy(c.buf[:f.len], f.buf[:f.len])

c.len = len(append(c.buf[:f.len], d.Name()...))

if d.IsDir() {
c.len = len(append(c.buf[:c.len], '/'))
}

return c
}

func (f *filePath) Bytes() []byte {
return f.buf[:f.len]
}

func (f *filePath) String() string {
return unsafe.String(&f.buf[0], f.len)
}

// Dirent represents a file system directory entry (a file or a directory),
// providing information about the entry's path, type and inode.
type Dirent struct {
// Path is the complete path to the directory entry (including both
// directory and basename)
Path string
Path *filePath

// Type is the type bits of the file mode of this entry.
Type os.FileMode
Expand All @@ -46,8 +98,8 @@ type Dirent struct {

// newDirentForDirectoryPath returns a Dirent for the given directory, with
// a Type for directories and no Inode.
func newDirentForDirectoryPath(dir string) *Dirent {
return &Dirent{Path: dir, Type: fs.ModeDir}
func newDirentForDirectoryPath(dir string) Dirent {
return Dirent{Path: newFilePath(dir), Type: fs.ModeDir}
}

// IsDir returns true if we are a directory.
Expand Down
42 changes: 28 additions & 14 deletions walk/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
package walk

import (
"bufio"
"fmt"
"io"
"os"
Expand All @@ -36,6 +37,9 @@ import (

const userOnlyPerm = 0700

// non-ascii bytes could become \xXX (4x the length at worst) and the two speech-marks are +2.
const maxQuotedPathLength = maxPathLength*4 + 2

// WriteError is an error received when trying to write strings to disk.
type WriteError struct {
Err error
Expand All @@ -45,10 +49,23 @@ func (e *WriteError) Error() string { return e.Err.Error() }

func (e *WriteError) Unwrap() error { return e.Err }

type bufferedFile struct {
bufio.Writer
io.Closer
}

func (b *bufferedFile) Close() error {
if err := b.Writer.Flush(); err != nil {
return err
}

return b.Closer.Close()
}

// Files represents a collection of output files that can be written to in a
// round-robin.
type Files struct {
files []*os.File
files []bufferedFile
Paths []string
filesI int
filesMax int
Expand All @@ -70,19 +87,20 @@ func NewFiles(outDir string, n int) (*Files, error) {
return nil, err
}

files := make([]*os.File, n)
files := make([]bufferedFile, n)
outPaths := make([]string, n)

for i := range files {
var err error

path := filepath.Join(outDir, fmt.Sprintf("walk.%d", i+1))

files[i], err = os.Create(path)
file, err := os.Create(path)
if err != nil {
return nil, err
}

files[i].Reset(file)
files[i].Closer = file

outPaths[i] = path
}

Expand All @@ -100,28 +118,24 @@ func NewFiles(outDir string, n int) (*Files, error) {
//
// It will terminate the walk if writes to our output files fail.
func (f *Files) WritePaths() PathCallback {
var quoted [maxQuotedPathLength]byte

return func(entry *Dirent) error {
return f.writePath(strconv.Quote(entry.Path))
return f.writePath(append(strconv.AppendQuote(quoted[:0], entry.Path.String()), '\n'))
}
}

// writePath is a thread-safe way of writing the given path to our next output
// file. Returns a WriteError on failure to write to an output file.
func (f *Files) writePath(path string) error {
f.mu.Lock()
func (f *Files) writePath(path []byte) error {
i := f.filesI
f.filesI++

if f.filesI == f.filesMax {
f.filesI = 0
}

f.mu.Unlock()

f.mus[i].Lock()
defer f.mus[i].Unlock()

_, err := io.WriteString(f.files[i], path+"\n")
_, err := f.files[i].Write(path)
if err != nil {
err = &WriteError{Err: err}
}
Expand Down
Loading

0 comments on commit e162843

Please sign in to comment.