From a7350abde832cecb97b24c0761d07fa9670b9860 Mon Sep 17 00:00:00 2001 From: John Doak Date: Mon, 12 Dec 2022 19:15:17 -0800 Subject: [PATCH] Diskmap now has explicit support for duplicate keys --- diskmap/diskmap.go | 71 ++++++++++++++++++++++++++++++++++------- diskmap/diskmap_test.go | 61 +++++++++++++++++++++++++++++++++-- 2 files changed, 119 insertions(+), 13 deletions(-) diff --git a/diskmap/diskmap.go b/diskmap/diskmap.go index 9b14f31..9d07eb4 100644 --- a/diskmap/diskmap.go +++ b/diskmap/diskmap.go @@ -3,6 +3,9 @@ Package diskmap provides disk storage of key/value pairs. The data is immutable In addition the diskmap utilizes mmap on reads to make the random access faster. On Linux, diskmap uses directio to speed up writes. +Unlike a regular map, keys can have duplicates. If you need this filtered out you +must do it before adding to the diskmap. + Usage is simplistic: // Create a new diskmap. @@ -101,11 +104,19 @@ const reservedHeader = 64 // endian is the endianess of all our binary encodings. var endian = binary.LittleEndian -// Reader provides read access to the the diskmap file. +// ErrKeyNotFound indicates that a searched for key was not found. +var ErrKeyNotFound = fmt.Errorf("key was not found") + +// Reader provides read access to the the diskmap file. If you fake this, you need +// to embed it in your fake. type Reader interface { - // Get fetches key "k" and returns the value. Errors when key not found. Thread-safe. + // Read fetches key "k" and returns the value. If there are multi-key matches, + // it returns the last key added. Errors when key not found. Thread-safe. Read(k []byte) ([]byte, error) + // ReadAll fetches all matches to key "k". Does not error if not found. Thread-safe. + ReadAll(k []byte) ([][]byte, error) + // Range allows iteration over all the key/value pairs stored in the diskmap. If not interating // over all values, Cancel() or a timeout should be used on the Context to prevent a goroutine leak. Range(ctx context.Context) chan KeyValue @@ -115,6 +126,7 @@ type Reader interface { } // Writer provides write access to the diskmap file. An error on write makes the Writer unusable. +// If you fake this, you need to embed it in your fake. type Writer interface { // Write writes a key/value pair to disk. Thread-safe. Write(k, v []byte) error @@ -267,11 +279,13 @@ func (w *writer) Close() error { // reader implements Reader. type reader struct { // index is the key to offset data mapping. - index map[string]value + index map[string][]value // file holds the mapping file in mmap. file *os.File + // The mutex is protecting the *os.File. Otherwise + // we can have the file pointer moving while doing a read operation. sync.Mutex } @@ -300,7 +314,7 @@ func Open(p string) (Reader, error) { return nil, fmt.Errorf("cannot seek to index offset: %q", err) } - kv := make(map[string]value, num) + kv := make(map[string][]value, num) var dOff, dLen, kLen int64 @@ -324,10 +338,14 @@ func Open(p string) (Reader, error) { return nil, fmt.Errorf("error reading in a key from the index: %q", err) } - kv[string(key)] = value{ - offset: dOff, - length: dLen, + strKey := byteSlice2String(key) + if sl, ok := kv[strKey]; ok { + sl = append(sl, value{offset: dOff, length: dLen}) + kv[strKey] = sl + continue } + + kv[string(key)] = []value{{offset: dOff, length: dLen}} } return &reader{index: kv, file: f}, nil @@ -338,7 +356,35 @@ func (r *reader) Read(k []byte) ([]byte, error) { r.Lock() defer r.Unlock() - if v, ok := r.index[string(k)]; ok { + vals, ok := r.index[byteSlice2String(k)] + if !ok { + return nil, ErrKeyNotFound + } + // If there are multiple values with the same key, only return the last one. + v := vals[len(vals)-1] + + if _, err := r.file.Seek(v.offset, 0); err != nil { + return nil, fmt.Errorf("cannot reach offset for key supplied by the index: %q", err) + } + b := make([]byte, v.length) + if _, err := r.file.Read(b); err != nil { + return nil, fmt.Errorf("error reading value from file: %q", err) + } + return b, nil +} + +// ReadAll implements Reader.ReadAll(). +func (r *reader) ReadAll(k []byte) ([][]byte, error) { + r.Lock() + defer r.Unlock() + + vals, ok := r.index[byteSlice2String(k)] + if !ok { + return nil, nil + } + + sl := make([][]byte, 0, len(vals)) + for _, v := range vals { if _, err := r.file.Seek(v.offset, 0); err != nil { return nil, fmt.Errorf("cannot reach offset for key supplied by the index: %q", err) } @@ -346,10 +392,9 @@ func (r *reader) Read(k []byte) ([]byte, error) { if _, err := r.file.Read(b); err != nil { return nil, fmt.Errorf("error reading value from file: %q", err) } - return b, nil + sl = append(sl, b) } - - return nil, fmt.Errorf("key was not found") + return sl, nil } // Range implements Reader.Range(). @@ -393,3 +438,7 @@ func UnsafeGetBytes(s string) []byte { (*reflect.StringHeader)(unsafe.Pointer(&s)).Data), )[:len(s):len(s)] } + +func byteSlice2String(bs []byte) string { + return *(*string)(unsafe.Pointer(&bs)) +} diff --git a/diskmap/diskmap_test.go b/diskmap/diskmap_test.go index f065991..f9401ff 100644 --- a/diskmap/diskmap_test.go +++ b/diskmap/diskmap_test.go @@ -55,7 +55,7 @@ func TestDiskMap(t *testing.T) { continue } - if bytes.Compare(val, v) != 0 { + if !bytes.Equal(val, v) { t.Errorf("a value was not correctly stored") } } @@ -65,7 +65,64 @@ func TestDiskMap(t *testing.T) { } } -func BenchmarkDiskMap(b *testing.B) { +func TestDiskMapDuplicateKeys(t *testing.T) { + p := path.Join(os.TempDir(), nextSuffix()) + w, err := New(p) + if err != nil { + panic(err) + } + defer os.Remove(p) + + _1stKey := []byte(nextSuffix()) + _1stData := randStringBytes() + dupKey := []byte(nextSuffix()) + dupData0 := randStringBytes() + dupData1 := randStringBytes() + _2ndKey := []byte(nextSuffix()) + _2ndData := randStringBytes() + + for _, kv := range []KeyValue{ + {Key: _1stKey, Value: _1stData}, + {Key: dupKey, Value: dupData0}, + {Key: _2ndKey, Value: _2ndData}, + {Key: dupKey, Value: dupData1}, + } { + if err := w.Write(kv.Key, kv.Value); err != nil { + t.Fatalf("error writing:\nkey:%q\nvalue:%q\n", kv.Key, kv.Value) + } + } + + w.Close() + + r, err := Open(p) + if err != nil { + t.Fatalf("error opening diskmap %q", err) + } + + got, err := r.Read(dupKey) + if err != nil { + t.Fatalf("TestDiskMapDuplicateKeys(r.Read()): got err == %s, want err == nil", err) + } + if !bytes.Equal(got, dupData1) { + t.Fatalf("TestDiskMapDuplicateKeys(r.Read()): got incorrect data") + } + + gotBatch, err := r.ReadAll(dupKey) + if err != nil { + t.Fatalf("TestDiskMapDuplicateKeys(r.ReadAll()): got err == %s, want err == nil", err) + } + if len(gotBatch) != 2 { + t.Fatalf("TestDiskMapDuplicateKeys(r.ReadAll()): got %d return values, want %d", len(gotBatch), 2) + } + want := [][]byte{dupData0, dupData1} + for i := 0; i < len(gotBatch); i++ { + if !bytes.Equal(gotBatch[i], want[i]) { + t.Fatalf("TestDiskMapDuplicateKeys(r.ReadAll()): returned value %d was incorrect", i) + } + } +} + +func BenchmarkDiskMapWriter(b *testing.B) { b.ReportAllocs() p := path.Join(os.TempDir(), nextSuffix())