Skip to content

Commit

Permalink
Bug fixes:
Browse files Browse the repository at this point in the history
  * Default file length in WarcFileWriter
  * Multithreading issue in WarcFileWriter
  * Fixed autogeneration of PayloadDigest
  • Loading branch information
johnerikhalse committed Sep 17, 2021
1 parent b92de0f commit 7ef1209
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 27 deletions.
6 changes: 2 additions & 4 deletions record.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,8 @@ func (wr *warcRecord) ValidateDigest(validation *Validation) error {
if blockDigest != nil {
if blockDigest.hash == "" {
// Missing digest header is allowed, so skip validation. But if fixDigest option is set, a header will be added.
if wr.opts.fixDigest {
if wr.opts.addMissingDigest {
wr.WarcHeader().Set(WarcBlockDigest, blockDigest.format())
return nil
}
} else {
if err := blockDigest.validate(); err != nil {
Expand All @@ -393,9 +392,8 @@ func (wr *warcRecord) ValidateDigest(validation *Validation) error {
if payloadDigest != nil {
if payloadDigest.hash == "" {
// Missing digest header is allowed, so skip validation. But if fixDigest option is set, a header will be added.
if wr.opts.fixDigest {
if wr.opts.addMissingDigest {
wr.WarcHeader().Set(WarcPayloadDigest, payloadDigest.format())
return nil
}
} else {
if err := payloadDigest.validate(); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ func Test_warcRecord_Merge(t *testing.T) {
}

func createRecord1(recordType RecordType, headers *WarcFields, data string) WarcRecord {
rb := NewRecordBuilder(recordType, WithSpecViolationPolicy(ErrFail), WithSyntaxErrorPolicy(ErrFail), WithUnknownRecordTypePolicy(ErrIgnore), WithFixDigest(false))
rb := NewRecordBuilder(recordType, WithSpecViolationPolicy(ErrFail), WithSyntaxErrorPolicy(ErrFail),
WithUnknownRecordTypePolicy(ErrIgnore), WithFixDigest(false), WithAddMissingDigest(false))
for _, nv := range *headers {
rb.AddWarcHeader(nv.Name, nv.Value)
}
Expand Down
2 changes: 1 addition & 1 deletion recordbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestRecordBuilder(t *testing.T) {
{
"valid request record",
args{
[]WarcRecordOption{WithSpecViolationPolicy(ErrFail), WithSyntaxErrorPolicy(ErrFail), WithUnknownRecordTypePolicy(ErrIgnore), WithFixDigest(false)},
[]WarcRecordOption{WithSpecViolationPolicy(ErrFail), WithSyntaxErrorPolicy(ErrFail), WithUnknownRecordTypePolicy(ErrIgnore), WithFixDigest(false), WithAddMissingDigest(false)},
Request,
&WarcFields{
&nameValue{Name: WarcDate, Value: "2017-03-06T04:03:53Z"},
Expand Down
109 changes: 89 additions & 20 deletions warcfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,17 @@ func (g *PatternNameGenerator) NewWarcfileName() (string, string) {
}

type WarcFileWriter struct {
opts *warcFileWriterOptions
writers []*singleWarcFileWriter
jobs chan *job
opts *warcFileWriterOptions
writers []*singleWarcFileWriter
shutWriters *sync.WaitGroup
jobs chan *job
middleCh chan *job
closing chan struct{} // signal channel
closed chan struct{}
}

func (w *WarcFileWriter) String() string {
return fmt.Sprintf("WarcFileWriter (%s)", w.opts)
}

// NewWarcFileWriter creates a new WarcFileWriter with the supplied options.
Expand All @@ -78,31 +86,67 @@ func NewWarcFileWriter(opts ...WarcFileWriterOption) *WarcFileWriter {
for _, opt := range opts {
opt.apply(&o)
}
w := &WarcFileWriter{opts: &o}
w.jobs = make(chan *job)
w := &WarcFileWriter{opts: &o,
closing: make(chan struct{}), // signal channel
closed: make(chan struct{}),
middleCh: make(chan *job),
jobs: make(chan *job),
shutWriters: &sync.WaitGroup{},
}
w.shutWriters.Add(o.maxConcurrentWriters)

// the middle layer
go func() {
exit := func(v *job, needSend bool) {
close(w.closed)
if needSend {
w.jobs <- v
}
close(w.jobs)
}

for {
select {
case <-w.closing:
exit(nil, false)
return
case v := <-w.middleCh:
select {
case <-w.closing:
exit(v, true)
return
case w.jobs <- v:
}
}
}
}()

for i := 0; i < o.maxConcurrentWriters; i++ {
writer := &singleWarcFileWriter{opts: &o}
writer := &singleWarcFileWriter{opts: &o, shutWriters: w.shutWriters}
w.writers = append(w.writers, writer)
go worker(writer, w.jobs)
}
return w
}

func worker(w *singleWarcFileWriter, jobs <-chan *job) {
defer func() {
w.Close()
w.shutWriters.Done()
}()

for j := range jobs {
res := make([]WriteResponse, len(j.records))
for i, r := range j.records {
res[i] = w.Write(r)
}
j.responses = res
j.wg.Done()
j.responses <- res
}
}

type job struct {
records []WarcRecord
responses []WriteResponse
wg sync.WaitGroup
responses chan<- []WriteResponse
}

type WriteResponse struct {
Expand All @@ -119,6 +163,22 @@ type WriteResponse struct {
//
// Returns a slice with one WriteResponse for each record written.
func (w *WarcFileWriter) Write(record ...WarcRecord) []WriteResponse {
select {
case <-w.closed:
return nil
default:
}

job, result := w.createWriteJob(record...)
select {
case <-w.closed:
return nil
case w.middleCh <- job:
return <-result
}
}

func (w *WarcFileWriter) createWriteJob(record ...WarcRecord) (*job, <-chan []WriteResponse) {
if w.opts.addConcurrentHeader {
for k, wr := range record {
for k2, wr2 := range record {
Expand All @@ -130,14 +190,12 @@ func (w *WarcFileWriter) Write(record ...WarcRecord) []WriteResponse {
}
}

result := make(chan []WriteResponse)
job := &job{
records: record,
wg: sync.WaitGroup{},
records: record,
responses: result,
}
job.wg.Add(1)
w.jobs <- job
job.wg.Wait()
return job.responses
return job, result
}

// Close closes the current files beeing written to.
Expand All @@ -155,11 +213,17 @@ func (w *WarcFileWriter) Close() error {
return nil
}

// Shutdown closes the current file being written to and then releases all resources used by the WarcFileWriter.
// Shutdown closes the current file(s) being written to and then releases all resources used by the WarcFileWriter.
// Calling Write after Shutdown will panic.
func (w *WarcFileWriter) Shutdown() error {
close(w.jobs)
return w.Close()
select {
case w.closing <- struct{}{}:
<-w.closed
case <-w.closed:
}

w.shutWriters.Wait()
return nil
}

type singleWarcFileWriter struct {
Expand All @@ -169,6 +233,7 @@ type singleWarcFileWriter struct {
currentFileSize int64
currentWarcInfoId string
writeLock sync.Mutex
shutWriters *sync.WaitGroup
}

func (w *singleWarcFileWriter) Write(record WarcRecord) (response WriteResponse) {
Expand Down Expand Up @@ -413,6 +478,10 @@ type warcFileWriterOptions struct {
addConcurrentHeader bool
}

func (w *warcFileWriterOptions) String() string {
return fmt.Sprintf("File size: %d, Compressed: %v, Num writers: %d", w.maxFileSize, w.compress, w.maxConcurrentWriters)
}

// WarcFileWriterOption configures how to write WARC files.
type WarcFileWriterOption interface {
apply(*warcFileWriterOptions)
Expand All @@ -436,7 +505,7 @@ func newFuncWarcFileOption(f func(*warcFileWriterOptions)) *funcWarcFileWriterOp

func defaultwarcFileWriterOptions() warcFileWriterOptions {
return warcFileWriterOptions{
maxFileSize: 1024 ^ 3, // 1 GiB
maxFileSize: 1024 * 1024 * 1024, // 1 GiB
compress: true,
expectedCompressionRatio: .5,
useSegmentation: false,
Expand Down
2 changes: 1 addition & 1 deletion warcfile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ func TestWarcFileWriter_Write(t *testing.T) {
}

func createTestRecord() WarcRecord {
builder := NewRecordBuilder(Response, WithFixDigest(false), WithStrictValidation())
builder := NewRecordBuilder(Response, WithFixDigest(false), WithStrictValidation(), WithAddMissingDigest(false))
_, err := builder.WriteString("HTTP/1.1 200 OK\nDate: Tue, 19 Sep 2016 17:18:40 GMT\nServer: Apache/2.0.54 (Ubuntu)\n" +
"Last-Modified: Mon, 16 Jun 2013 22:28:51 GMT\nETag: \"3e45-67e-2ed02ec0\"\nAccept-Ranges: bytes\n" +
"Content-Length: 19\nConnection: close\nContent-Type: text/plain\n\nThis is the content\n")
Expand Down

0 comments on commit 7ef1209

Please sign in to comment.