Skip to content

Commit

Permalink
* Validation of payload digest
Browse files Browse the repository at this point in the history
* Reduced disk access
* Config option for default digest algorithm
  • Loading branch information
johnerikhalse committed Aug 26, 2021
1 parent a9aea0b commit 8682f71
Show file tree
Hide file tree
Showing 13 changed files with 309 additions and 232 deletions.
89 changes: 35 additions & 54 deletions block.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/nlnwa/gowarc/internal/diskbuffer"
"io"
"io/ioutil"
"sync"
)

// Block is the interface used to represent the content of a WARC record as specified by the WARC specification:
Expand Down Expand Up @@ -49,91 +48,73 @@ type PayloadBlock interface {
}

type genericBlock struct {
opts *warcRecordOptions
rawBytes io.Reader
blockDigest *digest
digestOnce sync.Once
readOp readOp
cached bool
opts *warcRecordOptions
rawBytes io.Reader
blockDigest *digest
filterReader *digestFilterReader
blockDigestString string
}

func newGenericBlock(opts *warcRecordOptions, r io.Reader, d *digest) *genericBlock {
b := &genericBlock{opts: opts, rawBytes: r, blockDigest: d}
if _, ok := r.(io.Seeker); ok {
b.cached = true
}
return b
return &genericBlock{opts: opts, rawBytes: r, blockDigest: d}
}

func (block *genericBlock) IsCached() bool {
return block.cached
_, ok := block.rawBytes.(io.Seeker)
return ok
}

func (block *genericBlock) Cache() error {
if block.cached {
if block.IsCached() {
return nil
}
if block.readOp != opInitial {
return errContentReAccessed

r, err := block.RawBytes()
if err != nil {
return err
}

buf := diskbuffer.New(block.opts.bufferOptions...)
if _, err := buf.ReadFrom(block.rawBytes); err != nil {
if _, err := buf.ReadFrom(r); err != nil {
return err
}
if c, ok := block.rawBytes.(io.Closer); ok {
_ = c.Close()
}
block.blockDigestString = block.blockDigest.format()
block.rawBytes = buf
block.cached = true
return nil
}

func (block *genericBlock) RawBytes() (io.Reader, error) {
if block.cached {
if _, err := block.rawBytes.(io.Seeker).Seek(0, io.SeekStart); err != nil {
return nil, err
}
return block.rawBytes, nil
if block.filterReader == nil {
block.filterReader = newDigestFilterReader(block.rawBytes, block.blockDigest)
return block.filterReader, nil
}

if block.blockDigestString == "" {
block.BlockDigest()
}

// Block is not cached. Guard against calling more than once
if block.readOp != opInitial {
if !block.IsCached() {
return nil, errContentReAccessed
}
block.readOp = opRawBytes

block.rawBytes = io.TeeReader(block.rawBytes, block.blockDigest)
return block.rawBytes, nil
if _, err := block.rawBytes.(io.Seeker).Seek(0, io.SeekStart); err != nil {
return nil, err
}
return newDigestFilterReader(block.rawBytes), nil
}

func (block *genericBlock) BlockDigest() string {
block.digestOnce.Do(func() {
if block.cached {
if _, err := block.rawBytes.(io.Seeker).Seek(0, io.SeekStart); err != nil {
panic(err)
}
block.blockDigest.Reset()
_, _ = io.Copy(block.blockDigest, block.rawBytes)
return
}

if block.readOp == opInitial {
_, _ = block.RawBytes()
if block.blockDigestString == "" {
if block.filterReader == nil {
block.filterReader = newDigestFilterReader(block.rawBytes, block.blockDigest)
}
block.readOp = opRawBytes
_, _ = io.Copy(ioutil.Discard, block.rawBytes)
})
return block.blockDigest.format()
_, _ = io.Copy(ioutil.Discard, block.filterReader)
block.blockDigestString = block.blockDigest.format()
}
return block.blockDigestString
}

// The readOp constants describe access to RawBytes() or PayloadBytes() on a PayloadBlock(),
// so that RawBytes and PayloadBytes() can check for invalid usage.
type readOp int8

const (
opInitial readOp = 0 // Initial value.
opRawBytes readOp = 1
opPayloadBytes readOp = 2
)

var errContentReAccessed = errors.New("gowarc.Block: tried to access content twice")
4 changes: 2 additions & 2 deletions block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func Test_httpRequestBlock_IsCached(t *testing.T) {
{
"diskbuffer.Buffer",
func() io.Reader { d := diskbuffer.New(); _, _ = d.WriteString(content); return d }(),
false,
true,
},
{
"iotest.HalfReader",
Expand Down Expand Up @@ -527,7 +527,7 @@ func Test_httpResponseBlock_IsCached(t *testing.T) {
{
"diskbuffer.Buffer",
func() io.Reader { d := diskbuffer.New(); _, _ = d.WriteString(content); return d }(),
false,
true,
},
{
"iotest.HalfReader",
Expand Down
30 changes: 30 additions & 0 deletions digest.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"crypto/sha512"
"fmt"
"hash"
"io"
"strings"
)

Expand Down Expand Up @@ -67,3 +68,32 @@ func newDigest(digestString string) (*digest, error) {
return nil, fmt.Errorf("unsupported digest algorithm '%s'", algorithm)
}
}

func newDigestFromField(wr *warcRecord, warcDigestField string) (d *digest, err error) {
if wr.WarcHeader().Has(warcDigestField) {
d, err = newDigest(wr.WarcHeader().Get(warcDigestField))
} else {
d, err = newDigest(wr.opts.defaultDigestAlgorithm)
}
return
}

type digestFilterReader struct {
src io.Reader
digests []*digest
}

func newDigestFilterReader(src io.Reader, digests ...*digest) *digestFilterReader {
return &digestFilterReader{src: src, digests: digests}
}

func (d digestFilterReader) Read(p []byte) (n int, err error) {
n, err = d.src.Read(p)
if n > 0 {
pp := p[:n]
for _, dd := range d.digests {
dd.Write(pp)
}
}
return
}
Loading

0 comments on commit 8682f71

Please sign in to comment.