Skip to content

Commit

Permalink
Merge pull request #46 from nlnwa/fixes
Browse files Browse the repository at this point in the history
Fixed reading of gzipped records + several small enhancements
  • Loading branch information
maeb authored Oct 21, 2021
2 parents 5f214ee + 6c548fa commit 7b48182
Show file tree
Hide file tree
Showing 11 changed files with 183 additions and 41 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.16
require (
github.com/go-logfmt/logfmt v0.4.0 // indirect
github.com/google/uuid v1.2.0
github.com/klauspost/compress v1.13.6
github.com/kr/text v0.2.0 // indirect
github.com/nlnwa/whatwg-url v0.0.0-20200306110950-d1a95e2e8fc3
github.com/pkg/errors v0.8.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 h1:T+h1c/A9Gawja4Y9mFVWj2vyii2bbUNDw3kt9VxK2EY=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
Expand Down
7 changes: 2 additions & 5 deletions internal/countingreader/countingreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,9 @@ func (r *Reader) Read(p []byte) (n int, err error) {
if int64(len(p)) > remaining {
p = p[:remaining]
}
n, err = r.ioReader.Read(p)
atomic.AddInt64(&r.bytesRead, int64(n))
} else {
n, err = r.ioReader.Read(p)
atomic.AddInt64(&r.bytesRead, int64(n))
}
n, err = r.ioReader.Read(p)
atomic.AddInt64(&r.bytesRead, int64(n))
return
}

Expand Down
44 changes: 38 additions & 6 deletions internal/ipaddress.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,49 @@

package internal

import "net"
import (
"net"
"os"
)

// Get preferred outbound ip of this machine
func GetOutboundIP() net.IP {
func getOutboundIP() (net.IP, error) {
conn, err := net.Dial("udp", "8.8.8.8:80")
if err != nil {
panic(err)
return nil, err
}
defer conn.Close()
defer func() { _ = conn.Close() }()

localAddr := conn.LocalAddr().(*net.UDPAddr)

return localAddr.IP
return localAddr.IP, nil
}

// GetOutboundIP returns the preferred outbound ip of this node
// If resolution fails, 'unknown' is returned.
func GetOutboundIP() string {
if ip, err := getOutboundIP(); err == nil {
return ip.String()
}
return "unknown"
}

// GetHostName returns the hostname reported by the kernel.
// If resolution fails, 'unknown' is returned.
func GetHostName() string {
if host, err := os.Hostname(); err == nil {
return host
}
return "unknown"
}

// GetHostNameOrIP returns the hostname reported by the kernel falling back to outbound ip if hostname could not be resolved
// If resolution fails, 'unknown' is returned.
func GetHostNameOrIP() string {
if host, err := os.Hostname(); err == nil {
return host
}
if ip, err := getOutboundIP(); err == nil {
return ip.String()
}
return "unknown"
}
2 changes: 1 addition & 1 deletion marshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
//
// Marshal converts a WARC record to its serialized form and returns the size of the marshalled record or any error encountered.
//
// Depending on implementation, Marshal might return a WarcRecord which is the continuation of the record beeing written.
// Depending on implementation, Marshal might return a WarcRecord which is the continuation of the record being written.
// See the description of record segmentation at https://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.1/#record-segmentation
type Marshaler interface {
Marshal(w io.Writer, record WarcRecord, maxSize int64) (WarcRecord, int64, error)
Expand Down
4 changes: 3 additions & 1 deletion record.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,9 @@ func (wr *warcRecord) String() string {

func (wr *warcRecord) Close() error {
if wr.closer != nil {
return wr.closer()
err := wr.closer()
wr.closer = nil
return err
}
return nil
}
Expand Down
26 changes: 20 additions & 6 deletions unmarshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package gowarc
import (
"bufio"
"bytes"
"compress/gzip"
"fmt"
"github.com/klauspost/compress/gzip"
"github.com/nlnwa/gowarc/internal/countingreader"
"io"
"io/ioutil"
Expand Down Expand Up @@ -52,6 +52,7 @@ func (u *unmarshaler) Unmarshal(b *bufio.Reader) (WarcRecord, int64, *Validation
var r *bufio.Reader
var offset int64
validation := &Validation{}
isGzip := false

magic, err := b.Peek(5)
if err != nil {
Expand Down Expand Up @@ -79,6 +80,7 @@ func (u *unmarshaler) Unmarshal(b *bufio.Reader) (WarcRecord, int64, *Validation
}

if magic[0] == 0x1f && magic[1] == 0x8b {
isGzip = true
if u.gz == nil {
u.gz, err = gzip.NewReader(b)
} else {
Expand Down Expand Up @@ -142,19 +144,31 @@ func (u *unmarshaler) Unmarshal(b *bufio.Reader) (WarcRecord, int64, *Validation

length, _ := strconv.ParseInt(record.headers.Get(ContentLength), 10, 64)

c2 := countingreader.NewLimited(r, length)
content := countingreader.NewLimited(r, length)
record.closer = func() error {
_, err := io.Copy(ioutil.Discard, c2)
_, err := io.Copy(ioutil.Discard, content)

// Discarding 2 bytes which makes up the end of record marker (\r\n)
// TODO: validate that record ends with correct marker
_, _ = r.Discard(2)
if u.gz != nil {
_ = u.gz.Close()
if isGzip {
// Empty gzip reader to ensure gzip checksum is validated
b := make([]byte, 10)
var err error
for err == nil {
_, err = u.gz.Read(b)
}
if err != io.EOF {
panic(err)
}
if err := u.gz.Close(); err != nil {
panic(err)
}
}
return err
}

err = record.parseBlock(bufio.NewReader(c2), validation)
err = record.parseBlock(bufio.NewReader(content), validation)

return record, offset, validation, err
}
Expand Down
2 changes: 1 addition & 1 deletion unmarshaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package gowarc
import (
"bufio"
"bytes"
"compress/gzip"
"github.com/klauspost/compress/gzip"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"io/ioutil"
Expand Down
8 changes: 7 additions & 1 deletion validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import (
// Validation contain validation results.
type Validation []error

func (v *Validation) Error() string {
return v.String()
}

func (v *Validation) String() string {
if len(*v) == 0 {
return ""
Expand All @@ -32,11 +36,13 @@ func (v *Validation) String() string {
sb := strings.Builder{}
sb.WriteString("gowarc: Validation errors:\n")
for i, e := range *v {
if i > 0 {
sb.WriteByte('\n')
}
sb.WriteString(" ")
sb.WriteString(strconv.Itoa(i + 1))
sb.WriteString(": ")
sb.WriteString(e.Error())
sb.WriteByte('\n')
}
return sb.String()
}
Expand Down
80 changes: 70 additions & 10 deletions warcfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ package gowarc

import (
"bufio"
"compress/gzip"
"fmt"
"github.com/klauspost/compress/gzip"
"github.com/nlnwa/gowarc/internal"
"github.com/nlnwa/gowarc/internal/countingreader"
"github.com/nlnwa/gowarc/internal/timestamp"
Expand All @@ -41,29 +41,64 @@ type WarcFileNameGenerator interface {
}

// PatternNameGenerator implements the WarcFileNameGenerator.
// New filenames are generated based on a pattern which defaults to the recommendation in the WARC 1.1 standard
// (https://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.1/#annex-c-informative-warc-file-size-and-name-recommendations).
// The pattern is like golangs fmt package (https://pkg.go.dev/fmt), but allows for named fields in curly braces.
// The available predefined names are:
// * prefix - content of the Prefix field
// * ext - content of the Extension field
// * ts - current time as 14-digit GMT Time-stamp
// * serial - atomically increased serial number for every generated file name. Initial value is 0 if Serial field is not set
// * ip - primary IP address of the node
// * host - host name of the node
// * hostOrIp - host name of the node, falling back to IP address if host name could not be resolved
type PatternNameGenerator struct {
Directory string // Directory to store warcfiles. Defaults to the empty string
Prefix string // Prefix available to be used in pattern. Defaults to the empty string
Serial int32 // Serial number available for use in pattern. It is atomically increased with every generated file name.
Pattern string // Pattern for generated file name. Defaults to: "%{prefix}s%{ts}s-%04{serial}d-%{ip}s.warc"
Pattern string // Pattern for generated file name. Defaults to: "%{prefix}s%{ts}s-%04{serial}d-%{hostOrIp}s.%{ext}s"
Extension string // Extension for file name. Defaults to: "warc"
params map[string]interface{}
}

const defaultPattern = "%{prefix}s%{ts}s-%04{serial}d-%{ip}s.warc"
const (
defaultPattern = "%{prefix}s%{ts}s-%04{serial}d-%{hostOrIp}s.%{ext}s"
defaultExtension = "warc"
)

// Allow overriding of time.Now for tests
var now = time.Now
var ip = internal.GetOutboundIP
var host = internal.GetHostName
var hostOrIp = internal.GetHostNameOrIP

// NewWarcfileName returns a directory (might be the empty string for current directory) and a file name
func (g *PatternNameGenerator) NewWarcfileName() (string, string) {
if g.Pattern == "" {
g.Pattern = defaultPattern
}
params := map[string]interface{}{
"prefix": g.Prefix,
if g.Extension == "" {
g.Extension = defaultExtension
}
if g.params == nil {
g.params = map[string]interface{}{
"prefix": g.Prefix,
"ext": g.Extension,
"ip": ip(),
"host": host(),
"hostOrIp": hostOrIp(),
}
}

p := map[string]interface{}{
"ts": timestamp.UTC14(now()),
"serial": atomic.AddInt32(&g.Serial, 1),
"ip": internal.GetOutboundIP()}
}
for k, v := range g.params {
p[k] = v
}

name := internal.Sprintt(g.Pattern, params)
name := internal.Sprintt(g.Pattern, p)
return g.Directory, name
}

Expand Down Expand Up @@ -125,7 +160,7 @@ func NewWarcFileWriter(opts ...WarcFileWriterOption) *WarcFileWriter {
for i := 0; i < o.maxConcurrentWriters; i++ {
writer := &singleWarcFileWriter{opts: &o, shutWriters: w.shutWriters}
if o.compress {
writer.gz = gzip.NewWriter(nil)
writer.gz, _ = gzip.NewWriterLevel(nil, o.gzipLevel)
}
w.writers = append(w.writers, writer)
go worker(writer, w.jobs)
Expand Down Expand Up @@ -429,6 +464,12 @@ type WarcFileReader struct {
currentRecord WarcRecord
}

var inputBufPool = sync.Pool{
New: func() interface{} {
return bufio.NewReaderSize(nil, 1024*1024)
},
}

func NewWarcFileReader(filename string, offset int64, opts ...WarcRecordOption) (*WarcFileReader, error) {
file, err := os.Open(filename) // For read access.
if err != nil {
Expand All @@ -447,7 +488,9 @@ func NewWarcFileReader(filename string, offset int64, opts ...WarcRecordOption)

wf.countingReader = countingreader.New(file)
wf.initialOffset = offset
wf.bufferedReader = bufio.NewReaderSize(wf.countingReader, 4*1024)
buf := inputBufPool.Get().(*bufio.Reader)
buf.Reset(wf.countingReader)
wf.bufferedReader = buf
return wf, nil
}

Expand Down Expand Up @@ -490,13 +533,15 @@ func (wf *WarcFileReader) Next() (WarcRecord, int64, *Validation, error) {

// Close closes the WarcFileReader.
func (wf *WarcFileReader) Close() error {
inputBufPool.Put(wf.bufferedReader)
return wf.file.Close()
}

// Options for Warc file writer
type warcFileWriterOptions struct {
maxFileSize int64
compress bool
gzipLevel int
expectedCompressionRatio float64
useSegmentation bool
compressSuffix string
Expand Down Expand Up @@ -539,6 +584,7 @@ func defaultwarcFileWriterOptions() warcFileWriterOptions {
return warcFileWriterOptions{
maxFileSize: 1024 * 1024 * 1024, // 1 GiB
compress: true,
gzipLevel: gzip.DefaultCompression,
expectedCompressionRatio: .5,
useSegmentation: false,
compressSuffix: ".gz",
Expand All @@ -559,14 +605,28 @@ func WithMaxFileSize(size int64) WarcFileWriterOption {
})
}

// WithCompression sets if writer should write compressed WARC files.
// WithCompression sets if writer should write gzip compressed WARC files.
// defaults to true
func WithCompression(compress bool) WarcFileWriterOption {
return newFuncWarcFileOption(func(o *warcFileWriterOptions) {
o.compress = compress
})
}

// WithCompressionLevel sets the gzip level (1-9) to use for compression.
// defaults to 5
func WithCompressionLevel(gzipLevel int) WarcFileWriterOption {
return newFuncWarcFileOption(func(o *warcFileWriterOptions) {
if gzipLevel == gzip.DefaultCompression {
gzipLevel = 5
}
if gzipLevel < gzip.BestSpeed || gzipLevel > gzip.BestCompression {
panic("illegal compression level " + strconv.Itoa(gzipLevel) + ", must be between 1 and 9")
}
o.gzipLevel = gzipLevel
})
}

// WithFlush sets if writer should commit each record to stable storage.
// defaults to false
func WithFlush(flush bool) WarcFileWriterOption {
Expand Down
Loading

0 comments on commit 7b48182

Please sign in to comment.