From f624f073510ca53ca77ca42a87f34ad7d0d7f500 Mon Sep 17 00:00:00 2001 From: Michael Woolnough <130465766+mjkw31@users.noreply.github.com> Date: Thu, 12 Dec 2024 09:58:33 +0000 Subject: [PATCH] Asynchronously write to walk output files (#118) * Asynchronously write to walk output files * Reduce number of goroutines to reduce memory usage (by ~3/4) * Update linter version --- .github/workflows/golangci-lint.yml | 4 +-- walk/file.go | 45 +++++++++++++++++++++++++++-- walk/walk.go | 21 +++++--------- 3 files changed, 53 insertions(+), 17 deletions(-) diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index a833f396..a13b12b4 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -22,5 +22,5 @@ jobs: - name: golangci-lint uses: golangci/golangci-lint-action@v3 with: - version: v1.59.1 - only-new-issues: true + version: v1.62.0 + only-new-issues: true \ No newline at end of file diff --git a/walk/file.go b/walk/file.go index 2019b21c..21b0b03a 100644 --- a/walk/file.go +++ b/walk/file.go @@ -65,6 +65,45 @@ func (b *bufferedFile) Close() error { return b.Closer.Close() } +type asyncWriter struct { + mu sync.Mutex + buffer [bufferSize]byte + len int + err error + io.WriteCloser +} + +func (a *asyncWriter) Write(p []byte) (int, error) { + a.mu.Lock() + + if a.err != nil { + defer a.mu.Unlock() + + return 0, a.err + } + + a.len = copy(a.buffer[:], p) + + go func() { + defer a.mu.Unlock() + + _, a.err = a.WriteCloser.Write(a.buffer[:a.len]) + }() + + return len(p), nil +} + +func (a *asyncWriter) Close() error { + a.mu.Lock() + defer a.mu.Unlock() + + if a.err != nil { + return a.err + } + + return a.WriteCloser.Close() +} + // Files represents a collection of output files that can be written to in a // round-robin. type Files struct { @@ -101,8 +140,10 @@ func NewFiles(outDir string, n int) (*Files, error) { return nil, err } - files[i].Writer = bufio.NewWriterSize(file, bufferSize) - files[i].Closer = file + f := &asyncWriter{WriteCloser: file} + + files[i].Writer = bufio.NewWriterSize(f, bufferSize) + files[i].Closer = f outPaths[i] = path } diff --git a/walk/walk.go b/walk/walk.go index bc60e5a0..a42f3d73 100644 --- a/walk/walk.go +++ b/walk/walk.go @@ -274,12 +274,10 @@ func walkDirectory(ctx context.Context, dirent Dirent, childControllers := make([]*flowController, len(children)) for n, child := range children { - childControllers[n] = newController() - if child.IsDir() { + childControllers[n] = newController() + go walkDirectory(ctx, child, childControllers[n], request, sendDirs) - } else { - go sendFileEntry(ctx, child, childControllers[n]) } } @@ -289,20 +287,17 @@ func walkDirectory(ctx context.Context, dirent Dirent, sendEntry(ctx, dirent, control) } - for _, childController := range childControllers { - childController.PassControl(control) + for n, childController := range childControllers { + if childController == nil { + sendEntry(ctx, children[n], control) + } else { + childController.PassControl(control) + } } flowControl.EndControl() } -func sendFileEntry(ctx context.Context, dirent Dirent, flowControl *flowController) { - control := flowControl.GetControl() - - sendEntry(ctx, dirent, control) - flowControl.EndControl() -} - func sendEntry(ctx context.Context, dirent Dirent, direntCh chan<- Dirent) { select { case <-ctx.Done():