Skip to content

Commit

Permalink
Merge pull request #45 from nlnwa/gzip_performance
Browse files Browse the repository at this point in the history
Gzip performance
  • Loading branch information
maeb authored Oct 12, 2021
2 parents 74cc4f8 + 65af504 commit 5f214ee
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 9 deletions.
16 changes: 10 additions & 6 deletions unmarshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type unmarshaler struct {
opts *warcRecordOptions
warcFieldsParser *warcfieldsParser
LastOffset int64
gz *gzip.Reader // Holds gzip reader for enabling reuse
}

func NewUnmarshaler(opts ...WarcRecordOption) Unmarshaler {
Expand Down Expand Up @@ -77,14 +78,17 @@ func (u *unmarshaler) Unmarshal(b *bufio.Reader) (WarcRecord, int64, *Validation
expectedRecordStartOffset, offset), &position{}))
}

var g *gzip.Reader
if magic[0] == 0x1f && magic[1] == 0x8b {
g, err = gzip.NewReader(b)
if u.gz == nil {
u.gz, err = gzip.NewReader(b)
} else {
err = u.gz.Reset(b)
}
if err != nil {
return nil, offset, validation, err
}
g.Multistream(false)
r = bufio.NewReader(g)
u.gz.Multistream(false)
r = bufio.NewReader(u.gz)
} else {
r = b
}
Expand Down Expand Up @@ -144,8 +148,8 @@ func (u *unmarshaler) Unmarshal(b *bufio.Reader) (WarcRecord, int64, *Validation
// Discarding 2 bytes which makes up the end of record marker (\r\n)
// TODO: validate that record ends with correct marker
_, _ = r.Discard(2)
if g != nil {
_ = g.Close()
if u.gz != nil {
_ = u.gz.Close()
}
return err
}
Expand Down
32 changes: 32 additions & 0 deletions unmarshaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package gowarc

import (
"bufio"
"bytes"
"compress/gzip"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"io/ioutil"
Expand Down Expand Up @@ -450,3 +452,33 @@ func Test_unmarshaler_Unmarshal(t *testing.T) {
})
}
}

var unmarshallerBenchmarkResult interface{}

func BenchmarkUnmarshaler_Unmarshal_compressed(b *testing.B) {
record := "WARC/1.0\r\n" +
"WARC-Date: 2017-03-06T04:03:53Z\r\n" +
"WARC-Record-ID: <urn:uuid:e9a0cecc-0221-11e7-adb1-0242ac120008>\r\n" +
"WARC-Type: response\r\n" +
"Content-Type: application/http;msgtype=response\r\n" +
"Warc-Block-Digest: sha1:B285747AD7CC57AA74BCE2E30B453C8D1CB71BA4\r\n" +
"Content-Length: 257\r\n" +
"\r\n" +
"HTTP/1.1 200 OK\nDate: Tue, 19 Sep 2016 17:18:40 GMT\nServer: Apache/2.0.54 (Ubuntu)\n" +
"Last-Modified: Mon, 16 Jun 2013 22:28:51 GMT\nETag: \"3e45-67e-2ed02ec0\"\nAccept-Ranges: bytes\n" +
"Content-Length: 19\nConnection: close\nContent-Type: text/plain\n\nThis is the content"

recordCompressed := &bytes.Buffer{}

z := gzip.NewWriter(recordCompressed)
_, _ = z.Write([]byte(record))
_ = z.Close()

u := NewUnmarshaler(WithNoValidation())

for n := 0; n < b.N; n++ {
data := bufio.NewReader(bytes.NewReader(recordCompressed.Bytes()))
gotRecord, _, _, _ := u.Unmarshal(data)
unmarshallerBenchmarkResult = gotRecord.Close()
}
}
10 changes: 7 additions & 3 deletions warcfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ func NewWarcFileWriter(opts ...WarcFileWriterOption) *WarcFileWriter {

for i := 0; i < o.maxConcurrentWriters; i++ {
writer := &singleWarcFileWriter{opts: &o, shutWriters: w.shutWriters}
if o.compress {
writer.gz = gzip.NewWriter(nil)
}
w.writers = append(w.writers, writer)
go worker(writer, w.jobs)
}
Expand Down Expand Up @@ -237,6 +240,7 @@ type singleWarcFileWriter struct {
currentWarcInfoId string
writeLock sync.Mutex
shutWriters *sync.WaitGroup
gz *gzip.Writer // Holds gzip writer, enabling reuse
}

func (w *singleWarcFileWriter) Write(record WarcRecord) (response WriteResponse) {
Expand Down Expand Up @@ -337,9 +341,9 @@ func (w *singleWarcFileWriter) createFile() error {

func (w *singleWarcFileWriter) writeRecord(writer io.Writer, record WarcRecord, maxRecordSize int64) (int64, error) {
if w.opts.compress {
gz := gzip.NewWriter(writer)
defer func() { _ = gz.Close() }()
writer = gz
w.gz.Reset(writer)
defer func() { _ = w.gz.Close() }()
writer = w.gz
}
if w.currentWarcInfoId != "" {
record.WarcHeader().Set(WarcWarcinfoID, w.currentWarcInfoId)
Expand Down
25 changes: 25 additions & 0 deletions warcfile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,3 +567,28 @@ func TestDefaultNameGenerator_NewWarcfileName(t *testing.T) {
})
}
}

var warcFileWriterBenchmarkResult interface{}

func BenchmarkWarcFileWriter_Write_compressed(b *testing.B) {
now = func() time.Time {
return time.Date(2001, 9, 12, 5, 30, 20, 0, time.UTC)
}
assert := assert.New(b)

testdir := "tmp-test"
nameGenerator := &PatternNameGenerator{Prefix: "bench-", Directory: testdir}
assert.NoError(os.Mkdir(testdir, 0755))
w := NewWarcFileWriter(
WithCompression(true),
WithFileNameGenerator(nameGenerator),
WithMaxFileSize(0),
WithMaxConcurrentWriters(1))
defer func() { assert.NoError(w.Close()) }()
defer func() { assert.NoError(os.RemoveAll(testdir)) }()

for n := 0; n < b.N; n++ {
res := w.Write(createTestRecord())
warcFileWriterBenchmarkResult = res
}
}

0 comments on commit 5f214ee

Please sign in to comment.