Skip to content

Commit

Permalink
Asynchronously write to walk output files (#118)
Browse files Browse the repository at this point in the history
* Asynchronously write to walk output files

* Reduce number of goroutines to reduce memory usage (by ~3/4)

* Update linter version
  • Loading branch information
mjkw31 authored Dec 12, 2024
1 parent 7332f10 commit f624f07
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 17 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@ jobs:
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
with:
version: v1.59.1
only-new-issues: true
version: v1.62.0
only-new-issues: true
45 changes: 43 additions & 2 deletions walk/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,45 @@ func (b *bufferedFile) Close() error {
return b.Closer.Close()
}

type asyncWriter struct {
mu sync.Mutex
buffer [bufferSize]byte
len int
err error
io.WriteCloser
}

func (a *asyncWriter) Write(p []byte) (int, error) {
a.mu.Lock()

if a.err != nil {
defer a.mu.Unlock()

return 0, a.err
}

a.len = copy(a.buffer[:], p)

go func() {
defer a.mu.Unlock()

_, a.err = a.WriteCloser.Write(a.buffer[:a.len])
}()

return len(p), nil
}

func (a *asyncWriter) Close() error {
a.mu.Lock()
defer a.mu.Unlock()

if a.err != nil {
return a.err
}

return a.WriteCloser.Close()
}

// Files represents a collection of output files that can be written to in a
// round-robin.
type Files struct {
Expand Down Expand Up @@ -101,8 +140,10 @@ func NewFiles(outDir string, n int) (*Files, error) {
return nil, err
}

files[i].Writer = bufio.NewWriterSize(file, bufferSize)
files[i].Closer = file
f := &asyncWriter{WriteCloser: file}

files[i].Writer = bufio.NewWriterSize(f, bufferSize)
files[i].Closer = f

outPaths[i] = path
}
Expand Down
21 changes: 8 additions & 13 deletions walk/walk.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,12 +274,10 @@ func walkDirectory(ctx context.Context, dirent Dirent,
childControllers := make([]*flowController, len(children))

for n, child := range children {
childControllers[n] = newController()

if child.IsDir() {
childControllers[n] = newController()

go walkDirectory(ctx, child, childControllers[n], request, sendDirs)
} else {
go sendFileEntry(ctx, child, childControllers[n])
}
}

Expand All @@ -289,20 +287,17 @@ func walkDirectory(ctx context.Context, dirent Dirent,
sendEntry(ctx, dirent, control)
}

for _, childController := range childControllers {
childController.PassControl(control)
for n, childController := range childControllers {
if childController == nil {
sendEntry(ctx, children[n], control)
} else {
childController.PassControl(control)
}
}

flowControl.EndControl()
}

func sendFileEntry(ctx context.Context, dirent Dirent, flowControl *flowController) {
control := flowControl.GetControl()

sendEntry(ctx, dirent, control)
flowControl.EndControl()
}

func sendEntry(ctx context.Context, dirent Dirent, direntCh chan<- Dirent) {
select {
case <-ctx.Done():
Expand Down

0 comments on commit f624f07

Please sign in to comment.