From 7332f108ab45e95090c8ed89c5d7e3c7f4e50ca1 Mon Sep 17 00:00:00 2001 From: Michael Woolnough <130465766+mjkw31@users.noreply.github.com> Date: Fri, 6 Dec 2024 14:00:18 +0000 Subject: [PATCH] Restore expected speed of walk (#117) * Allocate buffers for the buffered writers * Use multiple filePath pools to avoid also creating 4KB buffers * Add additional space in quoted buffer for newline byte * Export FilePath and NewFilePath --- walk/dirent.go | 111 +++++++++++++++++++++++++++++++++++----------- walk/file.go | 19 +++++--- walk/walk.go | 49 ++++++++++---------- walk/walk_test.go | 4 ++ 4 files changed, 124 insertions(+), 59 deletions(-) diff --git a/walk/dirent.go b/walk/dirent.go index ff3ffac4..4a51a898 100644 --- a/walk/dirent.go +++ b/walk/dirent.go @@ -1,7 +1,8 @@ /******************************************************************************* - * Copyright (c) 2023 Genome Research Ltd. + * Copyright (c) 2023, 2024 Genome Research Ltd. * * Author: Sendu Bala + * Michael Woolnough * * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the @@ -34,52 +35,108 @@ import ( "github.com/wtsi-hgi/godirwalk" ) -const maxPathLength = 4096 +var ( + filePathPool64 = sync.Pool{New: func() any { x := make(FilePath, 0, 64); return &x }} //nolint:gochecknoglobals,mnd,nlreturn,lll + filePathPool128 = sync.Pool{New: func() any { x := make(FilePath, 0, 128); return &x }} //nolint:gochecknoglobals,mnd,nlreturn,lll + filePathPool256 = sync.Pool{New: func() any { x := make(FilePath, 0, 256); return &x }} //nolint:gochecknoglobals,mnd,nlreturn,lll + filePathPool512 = sync.Pool{New: func() any { x := make(FilePath, 0, 512); return &x }} //nolint:gochecknoglobals,mnd,nlreturn,lll + filePathPool1024 = sync.Pool{New: func() any { x := make(FilePath, 0, 1024); return &x }} //nolint:gochecknoglobals,mnd,nlreturn,lll + filePathPool2048 = sync.Pool{New: func() any { x := make(FilePath, 0, 2048); return &x }} //nolint:gochecknoglobals,mnd,nlreturn,lll + filePathPool4096 = sync.Pool{New: func() any { x := make(FilePath, 0, 4096); return &x }} //nolint:gochecknoglobals,mnd,nlreturn,lll +) -var filepathPool = sync.Pool{ //nolint:gochecknoglobals - New: func() any { - return new(filePath) - }, -} +// FilePath is a byte-slice of a path, utilising object pools to reduce memory +// allocations. +// +// It is the clients responsibility to call the Done method once it is no longer +// needed. +type FilePath []byte + +func newFilePathSize(size int) *FilePath { + switch { + case size <= 64: //nolint:mnd + return filePathPool64.Get().(*FilePath) //nolint:forcetypeassert + case size <= 128: //nolint:mnd + return filePathPool128.Get().(*FilePath) //nolint:forcetypeassert + case size <= 256: //nolint:mnd + return filePathPool256.Get().(*FilePath) //nolint:forcetypeassert + case size <= 512: //nolint:mnd + return filePathPool512.Get().(*FilePath) //nolint:forcetypeassert + case size <= 1024: //nolint:mnd + return filePathPool1024.Get().(*FilePath) //nolint:forcetypeassert + case size <= 2048: //nolint:mnd + return filePathPool2048.Get().(*FilePath) //nolint:forcetypeassert + } -type filePath struct { - buf [maxPathLength]byte - len int + return filePathPool4096.Get().(*FilePath) //nolint:forcetypeassert } -func newFilePath(path string) *filePath { - c := filepathPool.Get().(*filePath) //nolint:errcheck,forcetypeassert - c.len = copy(c.buf[:], path) +// NewFilePath creates a new FilePath, setting the value to the given string. +func NewFilePath(path string) *FilePath { + c := newFilePathSize(len(path)) + c.writeString(path) return c } -func (f *filePath) Done() { - f.len = 0 +func (f *FilePath) writeString(str string) { + *f = append(*f, str...) +} + +func (f *FilePath) writeBytes(p []byte) { + *f = append(*f, p...) +} - filepathPool.Put(f) +// Done deallocates the underlying byte-slice; any uses of the Bytes method are +// now invalid and may change. +func (f *FilePath) Done() { //nolint:gocyclo + *f = (*f)[:0] + + switch cap(*f) { + case 64: //nolint:mnd + filePathPool64.Put(f) + case 128: //nolint:mnd + filePathPool128.Put(f) + case 256: //nolint:mnd + filePathPool256.Put(f) + case 512: //nolint:mnd + filePathPool512.Put(f) + case 1024: //nolint:mnd + filePathPool1024.Put(f) + case 2048: //nolint:mnd + filePathPool2048.Put(f) + case 4096: //nolint:mnd + filePathPool4096.Put(f) + } } -func (f *filePath) Sub(d *godirwalk.Dirent) *filePath { - c := filepathPool.Get().(*filePath) //nolint:errcheck,forcetypeassert +func (f *FilePath) sub(d *godirwalk.Dirent) *FilePath { + name := d.Name() + size := len(*f) + len(name) + + if d.IsDir() { + size++ + } - copy(c.buf[:f.len], f.buf[:f.len]) + c := newFilePathSize(size) - c.len = len(append(c.buf[:f.len], d.Name()...)) + c.writeBytes(*f) + c.writeString(name) if d.IsDir() { - c.len = len(append(c.buf[:c.len], '/')) + c.writeString("/") } return c } -func (f *filePath) Bytes() []byte { - return f.buf[:f.len] +// Bytes returns the FilePath as a literal byte-slice. +func (f *FilePath) Bytes() []byte { + return *f } -func (f *filePath) String() string { - return unsafe.String(&f.buf[0], f.len) +func (f *FilePath) string() string { + return unsafe.String(&(*f)[0], len(*f)) } // Dirent represents a file system directory entry (a file or a directory), @@ -87,7 +144,7 @@ func (f *filePath) String() string { type Dirent struct { // Path is the complete path to the directory entry (including both // directory and basename) - Path *filePath + Path *FilePath // Type is the type bits of the file mode of this entry. Type os.FileMode @@ -99,7 +156,7 @@ type Dirent struct { // newDirentForDirectoryPath returns a Dirent for the given directory, with // a Type for directories and no Inode. func newDirentForDirectoryPath(dir string) Dirent { - return Dirent{Path: newFilePath(dir), Type: fs.ModeDir} + return Dirent{Path: NewFilePath(dir), Type: fs.ModeDir} } // IsDir returns true if we are a directory. diff --git a/walk/file.go b/walk/file.go index 2f19e8f2..2019b21c 100644 --- a/walk/file.go +++ b/walk/file.go @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2022, 2023 Genome Research Ltd. + * Copyright (c) 2022, 2023, 2024 Genome Research Ltd. * * Author: Sendu Bala * @@ -37,8 +37,11 @@ import ( const userOnlyPerm = 0700 -// non-ascii bytes could become \xXX (4x the length at worst) and the two speech-marks are +2. -const maxQuotedPathLength = maxPathLength*4 + 2 +// non-ascii bytes could become \xXX (4x the length at worst), the two +// speech-marks are +2 and a newline is +1. +const maxQuotedPathLength = 4096*4 + 2 + 1 + +const bufferSize = 1 << 20 // WriteError is an error received when trying to write strings to disk. type WriteError struct { @@ -50,7 +53,7 @@ func (e *WriteError) Error() string { return e.Err.Error() } func (e *WriteError) Unwrap() error { return e.Err } type bufferedFile struct { - bufio.Writer + *bufio.Writer io.Closer } @@ -98,7 +101,7 @@ func NewFiles(outDir string, n int) (*Files, error) { return nil, err } - files[i].Reset(file) + files[i].Writer = bufio.NewWriterSize(file, bufferSize) files[i].Closer = file outPaths[i] = path @@ -121,7 +124,9 @@ func (f *Files) WritePaths() PathCallback { var quoted [maxQuotedPathLength]byte return func(entry *Dirent) error { - return f.writePath(append(strconv.AppendQuote(quoted[:0], entry.Path.String()), '\n')) + defer entry.Path.Done() + + return f.writePath(append(strconv.AppendQuote(quoted[:0], entry.Path.string()), '\n')) } } @@ -147,7 +152,7 @@ func (f *Files) writePath(path []byte) error { func (f *Files) Close() error { for _, file := range f.files { if err := file.Close(); err != nil { - return err + return &WriteError{err} } } diff --git a/walk/walk.go b/walk/walk.go index bc868f7a..bc60e5a0 100644 --- a/walk/walk.go +++ b/walk/walk.go @@ -1,8 +1,8 @@ /******************************************************************************* - * Copyright (c) 2022, 2023 Genome Research Ltd. + * Copyright (c) 2022, 2023, 2024 Genome Research Ltd. * * Author: Sendu Bala - * Partially based on github.com/MichaelTJones/walk + * Michael Woolnough * * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the @@ -78,7 +78,7 @@ func New(cb PathCallback, includDirs, ignoreSymlinks bool) *Walker { type ErrorCallback func(path string, err error) type pathRequest struct { - path *filePath + path *FilePath response chan []Dirent } @@ -124,8 +124,8 @@ func (w *Walker) Walk(dir string, errCB ErrorCallback) error { return w.sendDirentsToPathCallback(direntCh) } -func createPathRequestor(requestCh chan *pathRequest) func(*filePath) []Dirent { - return func(path *filePath) []Dirent { +func createPathRequestor(requestCh chan *pathRequest) func(*FilePath) []Dirent { + return func(path *FilePath) []Dirent { pr := pathRequestPool.Get().(*pathRequest) //nolint:errcheck,forcetypeassert defer pathRequestPool.Put(pr) @@ -142,8 +142,6 @@ func (w *Walker) sendDirentsToPathCallback(direntCh <-chan Dirent) error { if err := w.pathCB(&dirent); err != nil { return err } - - dirent.Path.Done() } return nil @@ -152,7 +150,7 @@ func (w *Walker) sendDirentsToPathCallback(direntCh <-chan Dirent) error { type heap []*pathRequest func pathCompare(a, b *pathRequest) int { - return strings.Compare(b.path.String(), a.path.String()) + return strings.Compare(b.path.string(), a.path.string()) } func (h *heap) Insert(req *pathRequest) { @@ -206,7 +204,7 @@ Loop: case <-ctx.Done(): break Loop case request := <-requests: - children, err := godirwalk.ReadDirents(request.path.String(), buffer) + children, err := godirwalk.ReadDirents(request.path.string(), buffer) if err != nil { errCB(string(request.path.Bytes()), err) } @@ -216,12 +214,12 @@ Loop: } } -func (w *Walker) childrenToDirents(children godirwalk.Dirents, parent *filePath) []Dirent { +func (w *Walker) childrenToDirents(children godirwalk.Dirents, parent *FilePath) []Dirent { dirents := make([]Dirent, 0, len(children)) for _, child := range children { dirent := Dirent{ - Path: parent.Sub(child), + Path: parent.sub(child), Type: child.ModeType(), Inode: child.Inode(), } @@ -234,7 +232,7 @@ func (w *Walker) childrenToDirents(children godirwalk.Dirents, parent *filePath) } sort.Slice(dirents, func(i, j int) bool { - return dirents[i].Path.String() < dirents[j].Path.String() + return dirents[i].Path.string() < dirents[j].Path.string() }) return dirents @@ -245,7 +243,7 @@ type flowController struct { } func newController() *flowController { - return controllerPool.Get().(*flowController) //nolint:forcetypeassert,errcheck + return controllerPool.Get().(*flowController) //nolint:forcetypeassert } func (f *flowController) GetControl() chan<- Dirent { @@ -271,27 +269,28 @@ var controllerPool = sync.Pool{ //nolint:gochecknoglobals } func walkDirectory(ctx context.Context, dirent Dirent, - flowControl *flowController, request func(*filePath) []Dirent, sendDirs bool) { + flowControl *flowController, request func(*FilePath) []Dirent, sendDirs bool) { children := request(dirent.Path) - childChans := make([]*flowController, len(children)) - control := flowControl.GetControl() - - if sendDirs { - sendEntry(ctx, dirent, control) - } + childControllers := make([]*flowController, len(children)) for n, child := range children { - childChans[n] = newController() + childControllers[n] = newController() if child.IsDir() { - go walkDirectory(ctx, child, childChans[n], request, sendDirs) + go walkDirectory(ctx, child, childControllers[n], request, sendDirs) } else { - go sendFileEntry(ctx, child, childChans[n]) + go sendFileEntry(ctx, child, childControllers[n]) } } - for _, childChan := range childChans { - childChan.PassControl(control) + control := flowControl.GetControl() + + if sendDirs { + sendEntry(ctx, dirent, control) + } + + for _, childController := range childControllers { + childController.PassControl(control) } flowControl.EndControl() diff --git a/walk/walk_test.go b/walk/walk_test.go index 4d8af362..e1645d45 100644 --- a/walk/walk_test.go +++ b/walk/walk_test.go @@ -131,6 +131,10 @@ func TestWalk(t *testing.T) { So(err, ShouldBeNil) err = w.Walk(walkDir, cb) + if err == nil { + err = files.Close() + } + So(err, ShouldNotBeNil) var writeError *WriteError