From a9aea0b6e5bf1dd68c72061f6193d3e120a14619 Mon Sep 17 00:00:00 2001 From: John Erik Halse Date: Tue, 24 Aug 2021 09:57:04 +0200 Subject: [PATCH] Support for creating Slices from diskbuffer --- internal/diskbuffer/diskbuffer.go | 1 + internal/diskbuffer/diskbuffer_test.go | 58 ------- internal/diskbuffer/slice.go | 170 ++++++++++++++++++ internal/diskbuffer/slice_test.go | 229 +++++++++++++++++++++++++ 4 files changed, 400 insertions(+), 58 deletions(-) create mode 100644 internal/diskbuffer/slice.go create mode 100644 internal/diskbuffer/slice_test.go diff --git a/internal/diskbuffer/diskbuffer.go b/internal/diskbuffer/diskbuffer.go index a61cfd9..9308d8f 100644 --- a/internal/diskbuffer/diskbuffer.go +++ b/internal/diskbuffer/diskbuffer.go @@ -43,6 +43,7 @@ type Buffer interface { ReadString(delim byte) (line string, err error) Peek(n int) (p []byte, err error) Size() int64 + Slice(offset, len int64) Slice } // A Buffer is a variable-sized buffer of bytes with Read and Write methods. diff --git a/internal/diskbuffer/diskbuffer_test.go b/internal/diskbuffer/diskbuffer_test.go index d2f169a..fea0d19 100644 --- a/internal/diskbuffer/diskbuffer_test.go +++ b/internal/diskbuffer/diskbuffer_test.go @@ -21,7 +21,6 @@ import ( "bytes" "crypto/md5" "encoding/hex" - "fmt" "io" "os" "testing" @@ -108,27 +107,6 @@ func TestSeekWithFile(t *testing.T) { assert.Equal(t, tlen, l) } -//func TestSeekFirst(t *testing.T) { -// tlen := int64(1057576) -// r, hash := createReaderOfSize(tlen) -// bb, err := New(r) -// assert.Nil(t, err) -// -// l, err := bb.Size() -// assert.NoError(t, err) -// assert.Equal(t, tlen, l) -// -// assert.NoError(t, err) -// assert.Equal(t, hash, hashOfReader(bb)) -// -// bb.Seek(0, 0) -// -// assert.Equal(t, hash, hashOfReader(bb)) -// l, err = bb.Size() -// assert.NoError(t, err) -// assert.Equal(t, tlen, l) -//} - func TestLimitDoesNotExceed(t *testing.T) { requestSize := int64(1057576) r, hash := createReaderOfSize(requestSize) @@ -234,15 +212,11 @@ func TestReadStringMemory(t *testing.T) { bb.WriteString("line1\n") bb.WriteString("line2") - fmt.Printf("Diskbuffer: %s\n", bb) line, err := bb.ReadString('\n') - fmt.Printf("Line: <%q> :: %s\n", line, bb) assert.NoError(t, err) assert.Equal(t, "line1\n", line) - fmt.Printf("Diskbuffer: %s\n", bb) line, err = bb.ReadString('\n') - fmt.Printf("Line: <%q> :: %s\n", line, bb) assert.Error(t, err) assert.Equal(t, "line2", line) } @@ -272,35 +246,3 @@ func TestReadString(t *testing.T) { assert.Error(t, err) assert.Equal(t, "line4", line) } - -//func TestWriterReaderCalled(t *testing.T) { -// size := int64(1000) -// r, hash := createReaderOfSize(size) -// -// w, err := NewWriterOnce() -// assert.NoError(t, err) -// -// _, err = io.Copy(w, r) -// assert.NoError(t, err) -// assert.NoError(t, w.Close()) -// -// bb, err := w.Reader() -// assert.NoError(t, err) -// -// assert.Equal(t, hash, hashOfReader(bb)) -// -// // Subsequent calls to write and get reader will fail -// _, err = w.Reader() -// assert.Error(t, err) -// -// _, err = w.Write([]byte{1}) -// assert.Error(t, err) -//} - -//func TestWriterNoData(t *testing.T) { -// w, err := NewWriterOnce() -// assert.NoError(t, err) -// -// _, err = w.Reader() -// assert.Error(t, err) -//} diff --git a/internal/diskbuffer/slice.go b/internal/diskbuffer/slice.go new file mode 100644 index 0000000..db8bdf6 --- /dev/null +++ b/internal/diskbuffer/slice.go @@ -0,0 +1,170 @@ +/* + * Copyright 2021 National Library of Norway. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package diskbuffer + +import ( + "bytes" + "io" +) + +type Slice interface { + io.Reader + io.WriterTo + io.Closer + io.Seeker + ReadBytes(delim byte) (line []byte, err error) + ReadString(delim byte) (line string, err error) + Peek(n int) (p []byte, err error) + Size() int64 +} + +// Slice returns a read only subset of a buffer. +func (b *buffer) Slice(offset, len int64) Slice { + if len <= 0 { + len = unlimited + } + return &slice{ + buf: b, + off: offset, + len: len, + } +} + +type slice struct { + buf *buffer + len int64 // Lenght of slice + off int64 // Offset in buffer where this slice starts + pos int64 // Current position in slice +} + +func (s *slice) ReadAtOffset(off int64, p []byte) (n int, err error) { + start := s.off + off + l := s.len - s.pos + if l <= 0 { + return 0, io.EOF + } + + pp := p + if int64(len(p)) > l { + pp = p[:l] + } + n, err = s.buf.ReadAtOffset(start, pp) + return n, err +} + +func (s *slice) Read(p []byte) (n int, err error) { + n, err = s.ReadAtOffset(s.pos, p) + s.pos += int64(n) + return +} + +func (s *slice) WriteTo(w io.Writer) (n int64, err error) { + p := make([]byte, 32*1024) + for { + l1, e1 := s.Read(p) + l2, e2 := w.Write(p[:l1]) + n += int64(l2) + if e1 != nil { + if e1 != io.EOF { + err = e1 + return + } + } + if e2 != nil { + err = e2 + return + } + if l2 == 0 { + break + } + } + return +} + +// Close closes the underlying buffer +func (s *slice) Close() error { + return s.buf.Close() +} + +func (s *slice) Seek(offset int64, whence int) (int64, error) { + switch whence { + case io.SeekStart: + s.pos = offset + case io.SeekCurrent: + s.pos += offset + case io.SeekEnd: + s.pos = s.Size() - offset + } + return s.pos, nil +} + +func (s *slice) ReadBytes(delim byte) (line []byte, err error) { + if s.len-s.pos <= 0 { + return []byte{}, io.EOF + } + + p := make([]byte, 100) + off := s.pos + for { + var n int + n, err = s.ReadAtOffset(off, p) + if n > 0 { + i := bytes.IndexByte(p[:n], delim) + end := i + 1 + if i < 0 { + line = append(line, p[:n]...) + off += int64(n) + } else { + // found delim + line = append(line, p[:end]...) + off += int64(end) + err = nil + break + } + } + if err != nil { + break + } + } + s.pos = off + return line, err +} + +func (s *slice) ReadString(delim byte) (line string, err error) { + var bytes []byte + bytes, err = s.ReadBytes(delim) + return string(bytes), err +} + +func (s *slice) Peek(n int) (p []byte, err error) { + p = make([]byte, n) + n, err = s.ReadAtOffset(s.pos, p) + return p[:n], err +} + +// Len returns the number of bytes of the unread portion of the slice; +func (s *slice) Len() int64 { + return s.Size() - s.pos +} + +func (s *slice) Size() int64 { + if s.len == unlimited { + return s.buf.Size() - s.off + } else { + return s.len - s.off + } +} diff --git a/internal/diskbuffer/slice_test.go b/internal/diskbuffer/slice_test.go new file mode 100644 index 0000000..67f7ce5 --- /dev/null +++ b/internal/diskbuffer/slice_test.go @@ -0,0 +1,229 @@ +/* + * Copyright 2020 National Library of Norway. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//nolint +package diskbuffer + +import ( + "bytes" + "fmt" + "io" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSliceReadMemBuffer(t *testing.T) { + bb := New(WithMaxMemBytes(100)) + defer bb.Close() + + bb.WriteString("line1\n") + bb.WriteString("line2\n") + bb.WriteString("line3") + + slice := bb.Slice(6, 100) + p := make([]byte, 100) + total, err := slice.Read(p) + assert.Equal(t, io.EOF, err) + assert.Equal(t, 11, total) + assert.Equal(t, "line2\nline3", string(p[:total])) + + slice = bb.Slice(6, 0) + total, err = slice.Read(p) + assert.Equal(t, io.EOF, err) + assert.Equal(t, 11, total) + assert.Equal(t, "line2\nline3", string(p[:total])) + + slice = bb.Slice(6, 4) + total, err = slice.Read(p) + assert.Equal(t, nil, err) + assert.Equal(t, 4, total) + assert.Equal(t, "line", string(p[:total])) + + total, err = slice.Read(p) + assert.Equal(t, io.EOF, err) + assert.Equal(t, 0, total) + assert.Equal(t, "", string(p[:total])) + + total, err = slice.Read(p) + assert.Equal(t, io.EOF, err) + assert.Equal(t, 0, total) + assert.Equal(t, "", string(p[:total])) +} + +func TestSliceReadDiskBuffer(t *testing.T) { + bb := New(WithMaxMemBytes(7)) + defer bb.Close() + + bb.WriteString("line1\n") + bb.WriteString("line2") + + slice := bb.Slice(6, 100) + p := make([]byte, 100) + total, err := slice.Read(p) + assert.Equal(t, io.EOF, err) + assert.Equal(t, 5, total) + assert.Equal(t, "line2", string(p[:total])) + + slice = bb.Slice(6, 0) + total, err = slice.Read(p) + assert.Equal(t, io.EOF, err) + assert.Equal(t, 5, total) + assert.Equal(t, "line2", string(p[:total])) + + slice = bb.Slice(6, 4) + total, err = slice.Read(p) + assert.Equal(t, nil, err) + assert.Equal(t, 4, total) + assert.Equal(t, "line", string(p[:total])) +} + +func TestSliceWriteToDiskBuffer(t *testing.T) { + bb := New(WithMaxMemBytes(7)) + defer bb.Close() + + data := bytes.Buffer{} + for i := 0; i < 4000; i++ { + data.WriteString(fmt.Sprintf("line%04d\n", i)) + } + bb.Write(data.Bytes()) + + slice := bb.Slice(7, 100) + w := bytes.Buffer{} + total, err := slice.WriteTo(&w) + assert.Equal(t, nil, err) + assert.Equal(t, int64(100), total) + assert.Equal(t, data.String()[7:107], w.String()) + + slice = bb.Slice(7, 0) + w.Reset() + total, err = slice.WriteTo(&w) + assert.Equal(t, nil, err) + assert.Equal(t, int64(9*4000-7), total) + assert.Equal(t, data.String()[7:], w.String()) + + slice = bb.Slice(7, 50) + w.Reset() + total, err = slice.WriteTo(&w) + assert.Equal(t, nil, err) + assert.Equal(t, int64(50), total) + assert.Equal(t, data.String()[7:50+7], w.String()) +} + +func TestSliceReadString(t *testing.T) { + bb := New(WithMaxMemBytes(100)) + defer bb.Close() + + bb.WriteString("line1\n") + bb.WriteString("line2\n") + bb.WriteString("line3") + + slice := bb.Slice(6, 0) + line, err := slice.ReadString('\n') + assert.NoError(t, err) + assert.Equal(t, "line2\n", line) + + line, err = slice.ReadString('\n') + assert.Error(t, err) + assert.Equal(t, "line3", line) +} + +func TestSliceReadBytes(t *testing.T) { + bb := New(WithMaxMemBytes(100)) + defer bb.Close() + + bb.WriteString("line1\n") + bb.WriteString("line2\n") + bb.WriteString("line3") + + slice := bb.Slice(6, 0) + line, err := slice.ReadBytes('\n') + assert.NoError(t, err) + assert.Equal(t, []byte("line2\n"), line) + + line, err = slice.ReadBytes('\n') + assert.Error(t, err) + assert.Equal(t, []byte("line3"), line) +} + +func TestSlicePeek(t *testing.T) { + bb := New(WithMaxMemBytes(10)) + defer bb.Close() + + bb.WriteString("line1\n") + bb.WriteString("line2\n") + bb.WriteString("line3") + + slice := bb.Slice(6, 0) + peek, err := slice.Peek(5) + assert.NoError(t, err) + assert.Equal(t, []byte("line2"), peek) + + peek, err = slice.Peek(5) + assert.NoError(t, err) + assert.Equal(t, []byte("line2"), peek) + + line, err := slice.ReadBytes('\n') + assert.NoError(t, err) + assert.Equal(t, []byte("line2\n"), line) + + peek, err = slice.Peek(5) + assert.NoError(t, err) + assert.Equal(t, []byte("line3"), peek) + + line, err = slice.ReadBytes('\n') + assert.Error(t, err) + assert.Equal(t, []byte("line3"), line) +} + +func TestSliceSeek(t *testing.T) { + bb := New(WithMaxMemBytes(10)) + defer bb.Close() + + bb.WriteString("line1\n") + bb.WriteString("line2\n") + bb.WriteString("line3") + + slice := bb.Slice(6, 0) + + pos, err := slice.Seek(0, io.SeekStart) + assert.NoError(t, err) + assert.Equal(t, int64(0), pos) + peek, err := slice.Peek(5) + assert.NoError(t, err) + assert.Equal(t, []byte("line2"), peek) + + pos, err = slice.Seek(4, io.SeekStart) + assert.NoError(t, err) + assert.Equal(t, int64(4), pos) + peek, err = slice.Peek(5) + assert.NoError(t, err) + assert.Equal(t, []byte("2\nlin"), peek) + + pos, err = slice.Seek(2, io.SeekCurrent) + assert.NoError(t, err) + assert.Equal(t, int64(6), pos) + peek, err = slice.Peek(5) + assert.NoError(t, err) + assert.Equal(t, []byte("line3"), peek) + + pos, err = slice.Seek(4, io.SeekEnd) + assert.NoError(t, err) + assert.Equal(t, int64(7), pos) + peek, err = slice.Peek(4) + assert.NoError(t, err) + assert.Equal(t, []byte("ine3"), peek) +}