Skip to content

Commit

Permalink
Restore expected speed of walk (#117)
Browse files Browse the repository at this point in the history
* Allocate buffers for the buffered writers

* Use multiple filePath pools to avoid also creating 4KB buffers

* Add additional space in quoted buffer for newline byte

* Export FilePath and NewFilePath
  • Loading branch information
mjkw31 authored Dec 6, 2024
1 parent 00c0003 commit 7332f10
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 59 deletions.
111 changes: 84 additions & 27 deletions walk/dirent.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
/*******************************************************************************
* Copyright (c) 2023 Genome Research Ltd.
* Copyright (c) 2023, 2024 Genome Research Ltd.
*
* Author: Sendu Bala <[email protected]>
* Michael Woolnough <[email protected]>
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
Expand Down Expand Up @@ -34,60 +35,116 @@ import (
"github.com/wtsi-hgi/godirwalk"
)

const maxPathLength = 4096
var (
filePathPool64 = sync.Pool{New: func() any { x := make(FilePath, 0, 64); return &x }} //nolint:gochecknoglobals,mnd,nlreturn,lll
filePathPool128 = sync.Pool{New: func() any { x := make(FilePath, 0, 128); return &x }} //nolint:gochecknoglobals,mnd,nlreturn,lll
filePathPool256 = sync.Pool{New: func() any { x := make(FilePath, 0, 256); return &x }} //nolint:gochecknoglobals,mnd,nlreturn,lll
filePathPool512 = sync.Pool{New: func() any { x := make(FilePath, 0, 512); return &x }} //nolint:gochecknoglobals,mnd,nlreturn,lll
filePathPool1024 = sync.Pool{New: func() any { x := make(FilePath, 0, 1024); return &x }} //nolint:gochecknoglobals,mnd,nlreturn,lll
filePathPool2048 = sync.Pool{New: func() any { x := make(FilePath, 0, 2048); return &x }} //nolint:gochecknoglobals,mnd,nlreturn,lll
filePathPool4096 = sync.Pool{New: func() any { x := make(FilePath, 0, 4096); return &x }} //nolint:gochecknoglobals,mnd,nlreturn,lll
)

var filepathPool = sync.Pool{ //nolint:gochecknoglobals
New: func() any {
return new(filePath)
},
}
// FilePath is a byte-slice of a path, utilising object pools to reduce memory
// allocations.
//
// It is the clients responsibility to call the Done method once it is no longer
// needed.
type FilePath []byte

func newFilePathSize(size int) *FilePath {
switch {
case size <= 64: //nolint:mnd
return filePathPool64.Get().(*FilePath) //nolint:forcetypeassert
case size <= 128: //nolint:mnd
return filePathPool128.Get().(*FilePath) //nolint:forcetypeassert
case size <= 256: //nolint:mnd
return filePathPool256.Get().(*FilePath) //nolint:forcetypeassert
case size <= 512: //nolint:mnd
return filePathPool512.Get().(*FilePath) //nolint:forcetypeassert
case size <= 1024: //nolint:mnd
return filePathPool1024.Get().(*FilePath) //nolint:forcetypeassert
case size <= 2048: //nolint:mnd
return filePathPool2048.Get().(*FilePath) //nolint:forcetypeassert
}

type filePath struct {
buf [maxPathLength]byte
len int
return filePathPool4096.Get().(*FilePath) //nolint:forcetypeassert
}

func newFilePath(path string) *filePath {
c := filepathPool.Get().(*filePath) //nolint:errcheck,forcetypeassert
c.len = copy(c.buf[:], path)
// NewFilePath creates a new FilePath, setting the value to the given string.
func NewFilePath(path string) *FilePath {
c := newFilePathSize(len(path))
c.writeString(path)

return c
}

func (f *filePath) Done() {
f.len = 0
func (f *FilePath) writeString(str string) {
*f = append(*f, str...)
}

func (f *FilePath) writeBytes(p []byte) {
*f = append(*f, p...)
}

filepathPool.Put(f)
// Done deallocates the underlying byte-slice; any uses of the Bytes method are
// now invalid and may change.
func (f *FilePath) Done() { //nolint:gocyclo
*f = (*f)[:0]

switch cap(*f) {
case 64: //nolint:mnd
filePathPool64.Put(f)
case 128: //nolint:mnd
filePathPool128.Put(f)
case 256: //nolint:mnd
filePathPool256.Put(f)
case 512: //nolint:mnd
filePathPool512.Put(f)
case 1024: //nolint:mnd
filePathPool1024.Put(f)
case 2048: //nolint:mnd
filePathPool2048.Put(f)
case 4096: //nolint:mnd
filePathPool4096.Put(f)
}
}

func (f *filePath) Sub(d *godirwalk.Dirent) *filePath {
c := filepathPool.Get().(*filePath) //nolint:errcheck,forcetypeassert
func (f *FilePath) sub(d *godirwalk.Dirent) *FilePath {
name := d.Name()
size := len(*f) + len(name)

if d.IsDir() {
size++
}

copy(c.buf[:f.len], f.buf[:f.len])
c := newFilePathSize(size)

c.len = len(append(c.buf[:f.len], d.Name()...))
c.writeBytes(*f)
c.writeString(name)

if d.IsDir() {
c.len = len(append(c.buf[:c.len], '/'))
c.writeString("/")
}

return c
}

func (f *filePath) Bytes() []byte {
return f.buf[:f.len]
// Bytes returns the FilePath as a literal byte-slice.
func (f *FilePath) Bytes() []byte {
return *f
}

func (f *filePath) String() string {
return unsafe.String(&f.buf[0], f.len)
func (f *FilePath) string() string {
return unsafe.String(&(*f)[0], len(*f))
}

// Dirent represents a file system directory entry (a file or a directory),
// providing information about the entry's path, type and inode.
type Dirent struct {
// Path is the complete path to the directory entry (including both
// directory and basename)
Path *filePath
Path *FilePath

// Type is the type bits of the file mode of this entry.
Type os.FileMode
Expand All @@ -99,7 +156,7 @@ type Dirent struct {
// newDirentForDirectoryPath returns a Dirent for the given directory, with
// a Type for directories and no Inode.
func newDirentForDirectoryPath(dir string) Dirent {
return Dirent{Path: newFilePath(dir), Type: fs.ModeDir}
return Dirent{Path: NewFilePath(dir), Type: fs.ModeDir}
}

// IsDir returns true if we are a directory.
Expand Down
19 changes: 12 additions & 7 deletions walk/file.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2022, 2023 Genome Research Ltd.
* Copyright (c) 2022, 2023, 2024 Genome Research Ltd.
*
* Author: Sendu Bala <[email protected]>
*
Expand Down Expand Up @@ -37,8 +37,11 @@ import (

const userOnlyPerm = 0700

// non-ascii bytes could become \xXX (4x the length at worst) and the two speech-marks are +2.
const maxQuotedPathLength = maxPathLength*4 + 2
// non-ascii bytes could become \xXX (4x the length at worst), the two
// speech-marks are +2 and a newline is +1.
const maxQuotedPathLength = 4096*4 + 2 + 1

const bufferSize = 1 << 20

// WriteError is an error received when trying to write strings to disk.
type WriteError struct {
Expand All @@ -50,7 +53,7 @@ func (e *WriteError) Error() string { return e.Err.Error() }
func (e *WriteError) Unwrap() error { return e.Err }

type bufferedFile struct {
bufio.Writer
*bufio.Writer
io.Closer
}

Expand Down Expand Up @@ -98,7 +101,7 @@ func NewFiles(outDir string, n int) (*Files, error) {
return nil, err
}

files[i].Reset(file)
files[i].Writer = bufio.NewWriterSize(file, bufferSize)
files[i].Closer = file

outPaths[i] = path
Expand All @@ -121,7 +124,9 @@ func (f *Files) WritePaths() PathCallback {
var quoted [maxQuotedPathLength]byte

return func(entry *Dirent) error {
return f.writePath(append(strconv.AppendQuote(quoted[:0], entry.Path.String()), '\n'))
defer entry.Path.Done()

return f.writePath(append(strconv.AppendQuote(quoted[:0], entry.Path.string()), '\n'))
}
}

Expand All @@ -147,7 +152,7 @@ func (f *Files) writePath(path []byte) error {
func (f *Files) Close() error {
for _, file := range f.files {
if err := file.Close(); err != nil {
return err
return &WriteError{err}
}
}

Expand Down
49 changes: 24 additions & 25 deletions walk/walk.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/*******************************************************************************
* Copyright (c) 2022, 2023 Genome Research Ltd.
* Copyright (c) 2022, 2023, 2024 Genome Research Ltd.
*
* Author: Sendu Bala <[email protected]>
* Partially based on github.com/MichaelTJones/walk
* Michael Woolnough <[email protected]>
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
Expand Down Expand Up @@ -78,7 +78,7 @@ func New(cb PathCallback, includDirs, ignoreSymlinks bool) *Walker {
type ErrorCallback func(path string, err error)

type pathRequest struct {
path *filePath
path *FilePath
response chan []Dirent
}

Expand Down Expand Up @@ -124,8 +124,8 @@ func (w *Walker) Walk(dir string, errCB ErrorCallback) error {
return w.sendDirentsToPathCallback(direntCh)
}

func createPathRequestor(requestCh chan *pathRequest) func(*filePath) []Dirent {
return func(path *filePath) []Dirent {
func createPathRequestor(requestCh chan *pathRequest) func(*FilePath) []Dirent {
return func(path *FilePath) []Dirent {
pr := pathRequestPool.Get().(*pathRequest) //nolint:errcheck,forcetypeassert
defer pathRequestPool.Put(pr)

Expand All @@ -142,8 +142,6 @@ func (w *Walker) sendDirentsToPathCallback(direntCh <-chan Dirent) error {
if err := w.pathCB(&dirent); err != nil {
return err
}

dirent.Path.Done()
}

return nil
Expand All @@ -152,7 +150,7 @@ func (w *Walker) sendDirentsToPathCallback(direntCh <-chan Dirent) error {
type heap []*pathRequest

func pathCompare(a, b *pathRequest) int {
return strings.Compare(b.path.String(), a.path.String())
return strings.Compare(b.path.string(), a.path.string())
}

func (h *heap) Insert(req *pathRequest) {
Expand Down Expand Up @@ -206,7 +204,7 @@ Loop:
case <-ctx.Done():
break Loop
case request := <-requests:
children, err := godirwalk.ReadDirents(request.path.String(), buffer)
children, err := godirwalk.ReadDirents(request.path.string(), buffer)
if err != nil {
errCB(string(request.path.Bytes()), err)
}
Expand All @@ -216,12 +214,12 @@ Loop:
}
}

func (w *Walker) childrenToDirents(children godirwalk.Dirents, parent *filePath) []Dirent {
func (w *Walker) childrenToDirents(children godirwalk.Dirents, parent *FilePath) []Dirent {
dirents := make([]Dirent, 0, len(children))

for _, child := range children {
dirent := Dirent{
Path: parent.Sub(child),
Path: parent.sub(child),
Type: child.ModeType(),
Inode: child.Inode(),
}
Expand All @@ -234,7 +232,7 @@ func (w *Walker) childrenToDirents(children godirwalk.Dirents, parent *filePath)
}

sort.Slice(dirents, func(i, j int) bool {
return dirents[i].Path.String() < dirents[j].Path.String()
return dirents[i].Path.string() < dirents[j].Path.string()
})

return dirents
Expand All @@ -245,7 +243,7 @@ type flowController struct {
}

func newController() *flowController {
return controllerPool.Get().(*flowController) //nolint:forcetypeassert,errcheck
return controllerPool.Get().(*flowController) //nolint:forcetypeassert
}

func (f *flowController) GetControl() chan<- Dirent {
Expand All @@ -271,27 +269,28 @@ var controllerPool = sync.Pool{ //nolint:gochecknoglobals
}

func walkDirectory(ctx context.Context, dirent Dirent,
flowControl *flowController, request func(*filePath) []Dirent, sendDirs bool) {
flowControl *flowController, request func(*FilePath) []Dirent, sendDirs bool) {
children := request(dirent.Path)
childChans := make([]*flowController, len(children))
control := flowControl.GetControl()

if sendDirs {
sendEntry(ctx, dirent, control)
}
childControllers := make([]*flowController, len(children))

for n, child := range children {
childChans[n] = newController()
childControllers[n] = newController()

if child.IsDir() {
go walkDirectory(ctx, child, childChans[n], request, sendDirs)
go walkDirectory(ctx, child, childControllers[n], request, sendDirs)
} else {
go sendFileEntry(ctx, child, childChans[n])
go sendFileEntry(ctx, child, childControllers[n])
}
}

for _, childChan := range childChans {
childChan.PassControl(control)
control := flowControl.GetControl()

if sendDirs {
sendEntry(ctx, dirent, control)
}

for _, childController := range childControllers {
childController.PassControl(control)
}

flowControl.EndControl()
Expand Down
4 changes: 4 additions & 0 deletions walk/walk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ func TestWalk(t *testing.T) {
So(err, ShouldBeNil)

err = w.Walk(walkDir, cb)
if err == nil {
err = files.Close()
}

So(err, ShouldNotBeNil)

var writeError *WriteError
Expand Down

0 comments on commit 7332f10

Please sign in to comment.