Skip to content

Commit

Permalink
Update diskmap/diskslice to use directio on linux. Fix filewatcher ex…
Browse files Browse the repository at this point in the history
…ample
  • Loading branch information
John Doak authored and John Doak committed Dec 12, 2022
1 parent 316e0a2 commit d49b462
Show file tree
Hide file tree
Showing 12 changed files with 287 additions and 67 deletions.
67 changes: 38 additions & 29 deletions diskmap/diskmap.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/*
Package diskmap provides disk storage of key/value pairs. The data is immutable once written.
In addition the diskmap utilizes mmap on reads to make the random access faster.
On Linux, diskmap uses directio to speed up writes.
Usage is simplistic:
Expand Down Expand Up @@ -81,14 +82,17 @@ Reading the file is simply:
package diskmap

import (
"bufio"
"context"
"encoding/binary"
"fmt"
"io"
"os"
"reflect"
"sync"
"unsafe"

"golang.org/x/net/context"
"github.com/brk0v/directio"
)

// reservedHeader is the size, in bytes, of the reserved header portion of the file.
Expand Down Expand Up @@ -157,42 +161,29 @@ type value struct {

// writer implements Writer.
type writer struct {
name string
file *os.File
dio *directio.DirectIO
buf *bufio.Writer
index index
offset int64
num int64
sync.Mutex
}

// New returns a new Writer that writes to file "p".
func New(p string) (Writer, error) {
f, err := os.Create(p)
if err != nil {
return nil, err
}

if err := f.Chmod(0600); err != nil {
return nil, err
}

if _, err = f.Seek(reservedHeader, 0); err != nil {
return nil, fmt.Errorf("was unable to seek %d bytes into the file: %q", reservedHeader, err)
}

return &writer{
file: f,
offset: reservedHeader,
index: make(index, 0, 1000),
Mutex: sync.Mutex{},
}, nil
}

// Write implements Writer.Write().
func (w *writer) Write(k, v []byte) error {
w.Lock()
defer w.Unlock()

if _, err := w.file.Write(v); err != nil {
var writer io.Writer
if w.buf != nil {
writer = w.buf
} else {
writer = w.file
}

if _, err := writer.Write(v); err != nil {
return fmt.Errorf("problem writing key/value to disk: %q", err)
}

Expand All @@ -213,24 +204,42 @@ func (w *writer) Write(k, v []byte) error {

// Close implements Writer.Close().
func (w *writer) Close() error {
var writer io.Writer
if w.buf != nil {
writer = w.buf
} else {
writer = w.file
}

// Write each data offset, then the length of the key, then finally the key to disk (our index) for each entry.
for _, entry := range w.index {
if err := binary.Write(w.file, endian, entry.offset); err != nil {
if err := binary.Write(writer, endian, entry.offset); err != nil {
return fmt.Errorf("could not write offset value %d: %q", entry.offset, err)
}

if err := binary.Write(w.file, endian, entry.length); err != nil {
if err := binary.Write(writer, endian, entry.length); err != nil {
return fmt.Errorf("could not write data length: %q", err)
}

if err := binary.Write(w.file, endian, int64(len(entry.key))); err != nil {
if err := binary.Write(writer, endian, int64(len(entry.key))); err != nil {
return fmt.Errorf("could not write key length: %q", err)
}

if _, err := w.file.Write(entry.key); err != nil {
if _, err := writer.Write(entry.key); err != nil {
return fmt.Errorf("could not write key to disk: %q", err)
}
}
if w.buf != nil {
w.buf.Flush()
w.dio.Flush()

w.file.Close()
f, err := os.OpenFile(w.name, os.O_RDWR, 0666)
if err != nil {
return err
}
w.file = f
}

// Now that we've written all our data to the end of the file, we can go back to our reserved header
// and write our offset to the index at the beginnign of the file.
Expand Down
42 changes: 42 additions & 0 deletions diskmap/diskmap_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
//go:build linux

package diskmap

import (
"bufio"
"os"
"sync"
"syscall"

"github.com/brk0v/directio"
)

// New returns a new Writer that writes to file "p".
func New(p string) (Writer, error) {
f, err := os.OpenFile(p, os.O_CREATE+os.O_WRONLY+syscall.O_DIRECT, 0666)
if err != nil {
return nil, err
}

dio, err := directio.New(f)
if err != nil {
return nil, err
}

w := bufio.NewWriterSize(dio, 67108864)
header := [64]byte{}
_, err = w.Write(header[:])
if err != nil {
return nil, err
}

return &writer{
name: p,
file: f,
dio: dio,
buf: w,
offset: reservedHeader,
index: make(index, 0, 1000),
Mutex: sync.Mutex{},
}, nil
}
32 changes: 32 additions & 0 deletions diskmap/diskmap_other.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
//go:build !linux

package diskmap

import (
"fmt"
"os"
"sync"
)

// New returns a new Writer that writes to file "p".
func New(p string) (Writer, error) {
f, err := os.Create(p)
if err != nil {
return nil, err
}

if err := f.Chmod(0600); err != nil {
return nil, err
}

if _, err = f.Seek(reservedHeader, 0); err != nil {
return nil, fmt.Errorf("was unable to seek %d bytes into the file: %q", reservedHeader, err)
}

return &writer{
file: f,
offset: reservedHeader,
index: make(index, 0, 1000),
Mutex: sync.Mutex{},
}, nil
}
25 changes: 25 additions & 0 deletions diskmap/diskmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,31 @@ func TestDiskMap(t *testing.T) {
}
}

func BenchmarkDiskMap(b *testing.B) {
b.ReportAllocs()

p := path.Join(os.TempDir(), nextSuffix())
w, err := New(p)
if err != nil {
panic(err)
}
defer os.Remove(p)

b.ResetTimer()
for i := 0; i < 10000; i++ {
k := []byte(nextSuffix())
v := randStringBytes()

if err := w.Write(k, v); err != nil {
b.Fatalf("error writing:\nkey:%q\nvalue:%q\n", k, v)
}
}

if err := w.Close(); err != nil {
b.Fatalf("error closing the Writer: %q", err)
}
}

func nextSuffix() string {
r := uint32(time.Now().UnixNano() + int64(os.Getpid()))

Expand Down
72 changes: 37 additions & 35 deletions diskslice/diskslice.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,21 @@ that can be managed, moved and trivially read. More complex use cases require mo
involving multiple files, lock files, etc.... This makes no attempt to provide that.
Read call without a cached index consist of:
* A single seek to an index entry
* Two 8 byte reads for data offset and len
* A seek to the data
* A single read of the value
- A single seek to an index entry
- Two 8 byte reads for data offset and len
- A seek to the data
- A single read of the value
Total: 2 seeks and 3 reads
Read call with cached index consits of:
* A single read to the data
- A single read to the data
If doing a range over a large file or lots of range calls, it is optimal to have the Reader cache
the index. Every 131,072 entries consumes 1 MiB of cached memory.
File format is as follows:
<file>
<reserve header space>
[index offset]
Expand All @@ -38,13 +40,16 @@ File format is as follows:
package diskslice

import (
"bufio"
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
"os"
"sync"

"github.com/brk0v/directio"
)

// reservedHeader is the size, in bytes, of the reserved header portion of the file.
Expand Down Expand Up @@ -77,7 +82,10 @@ type value struct {
// Writer provides methods for writing an array of values to disk that can be read without
// reading the file back into memory.
type Writer struct {
name string
file *os.File
dio *directio.DirectIO
buf *bufio.Writer
index index
offset int64
num int64
Expand All @@ -97,33 +105,6 @@ func WriteIntercept(interceptor func(dst io.Writer) io.WriteCloser) WriteOption
}
}

// New is the constructor for Writer.
func New(fpath string, options ...WriteOption) (*Writer, error) {
f, err := os.Create(fpath)
if err != nil {
return nil, err
}

if err := f.Chmod(0600); err != nil {
return nil, err
}

if _, err = f.Seek(reservedHeader, 0); err != nil {
return nil, fmt.Errorf("was unable to seek %d bytes into the file: %q", reservedHeader, err)
}

w := &Writer{
file: f,
offset: reservedHeader,
index: make(index, 0, 1000),
mu: sync.Mutex{},
}
for _, option := range options {
option(w)
}
return w, nil
}

// Write writes a byte slice to the diskslice.
func (w *Writer) Write(b []byte) error {
if w.interceptor != nil {
Expand All @@ -141,7 +122,11 @@ func (w *Writer) Write(b []byte) error {
w.mu.Lock()
defer w.mu.Unlock()

if _, err := w.file.Write(b); err != nil {
var writer io.Writer = w.file
if w.buf != nil {
writer = w.buf
}
if _, err := writer.Write(b); err != nil {
return fmt.Errorf("problem writing value to disk: %q", err)
}

Expand All @@ -161,17 +146,34 @@ func (w *Writer) Write(b []byte) error {

// Close closes the file for writing and writes our index to the file.
func (w *Writer) Close() error {
var writer io.Writer = w.file
if w.buf != nil {
writer = w.buf
}

// Write each data offset, then the length of the key, then finally the key to disk (our index) for each entry.
for _, entry := range w.index {
if err := binary.Write(w.file, endian, entry.offset); err != nil {
if err := binary.Write(writer, endian, entry.offset); err != nil {
return fmt.Errorf("could not write offset value %d: %q", entry.offset, err)
}

if err := binary.Write(w.file, endian, entry.length); err != nil {
if err := binary.Write(writer, endian, entry.length); err != nil {
return fmt.Errorf("could not write data length: %q", err)
}
}

if w.buf != nil {
w.buf.Flush()
w.dio.Flush()

w.file.Close()
f, err := os.OpenFile(w.name, os.O_RDWR, 0666)
if err != nil {
return err
}
w.file = f
}

// Now that we've written all our data to the end of the file, we can go back to our reserved header
// and write our offset to the index at the beginnign of the file.
if _, err := w.file.Seek(0, 0); err != nil {
Expand Down
Loading

0 comments on commit d49b462

Please sign in to comment.