Skip to content

Commit

Permalink
Fixed error in file naming. Added support for writing several records…
Browse files Browse the repository at this point in the history
… at once with WARC-Concurrent-To headers.
  • Loading branch information
johnerikhalse committed Sep 13, 2021
1 parent 319f120 commit 0af90a9
Show file tree
Hide file tree
Showing 2 changed files with 218 additions and 78 deletions.
101 changes: 71 additions & 30 deletions warcfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func (g *PatternNameGenerator) NewWarcfileName() (string, string) {
}

type WarcFileWriter struct {
opts *warcFileWriterOptions
writers []*singleWarcFileWriter
jobs chan *job
}
Expand All @@ -77,7 +78,7 @@ func NewWarcFileWriter(opts ...WarcFileWriterOption) *WarcFileWriter {
for _, opt := range opts {
opt.apply(&o)
}
w := &WarcFileWriter{}
w := &WarcFileWriter{opts: &o}
w.jobs = make(chan *job)
for i := 0; i < o.maxConcurrentWriters; i++ {
writer := &singleWarcFileWriter{opts: &o}
Expand All @@ -89,31 +90,54 @@ func NewWarcFileWriter(opts ...WarcFileWriterOption) *WarcFileWriter {

func worker(w *singleWarcFileWriter, jobs <-chan *job) {
for j := range jobs {
j.fileOffset, j.fileName, j.bytesWritten, j.err = w.Write(j.record)
res := make([]WriteResponse, len(j.records))
for i, r := range j.records {
res[i] = w.Write(r)
}
j.responses = res
j.wg.Done()
}
}

type job struct {
record WarcRecord
fileName string
fileOffset int64
bytesWritten int64
err error
wg sync.WaitGroup
records []WarcRecord
responses []WriteResponse
wg sync.WaitGroup
}

type WriteResponse struct {
fileName string // filename
fileOffset int64 // the offset in file
bytesWritten int64 // number of uncompressed bytes written
err error // eventual error
}

// Write marshals a WarcRecord to file.
// Returns the number of uncompressed bytes written.
func (w *WarcFileWriter) Write(record WarcRecord) (int64, string, int64, error) {
// Write marshals one or more WarcRecords to file.
//
// If more than one is written, then those will be written sequentially to the same file if size permits.
// If the writer was created with the WithAddWarcConcurrentToHeader option, each record will have cross-reference headers.
//
// Returns a slice with one WriteResponse for each record written.
func (w *WarcFileWriter) Write(record ...WarcRecord) []WriteResponse {
if w.opts.addConcurrentHeader {
for k, wr := range record {
for k2, wr2 := range record {
if k == k2 {
continue
}
wr.WarcHeader().Add(WarcConcurrentTo, wr2.WarcHeader().Get(WarcRecordID))
}
}
}

job := &job{
record: record,
wg: sync.WaitGroup{},
records: record,
wg: sync.WaitGroup{},
}
job.wg.Add(1)
w.jobs <- job
job.wg.Wait()
return job.fileOffset, job.fileName, job.bytesWritten, job.err
return job.responses
}

// Close closes the current files beeing written to.
Expand Down Expand Up @@ -147,7 +171,7 @@ type singleWarcFileWriter struct {
writeLock sync.Mutex
}

func (w *singleWarcFileWriter) Write(record WarcRecord) (int64, string, int64, error) {
func (w *singleWarcFileWriter) Write(record WarcRecord) (response WriteResponse) {
w.writeLock.Lock()
defer w.writeLock.Unlock()

Expand All @@ -171,13 +195,15 @@ func (w *singleWarcFileWriter) Write(record WarcRecord) (int64, string, int64, e
size = int64(float64(size) * w.opts.expectedCompressionRatio)
}
if err != nil {
return 0, "", 0, err
response.err = err
return
}
if w.currentFileSize > 0 && (w.currentFileSize+size) > w.opts.maxFileSize {
// Not enough space in file, close it so a new will be created
err = w.close()
if err != nil {
return 0, "", 0, err
response.err = err
return
}
}
}
Expand All @@ -186,25 +212,29 @@ func (w *singleWarcFileWriter) Write(record WarcRecord) (int64, string, int64, e
// Create new file if necessary
if w.currentFile == nil {
if err := w.createFile(); err != nil {
return 0, "", 0, err
response.err = err
return
}
}

offset := w.currentFileSize
size, err := w.writeRecord(w.currentFile, record, maxRecordSize)
if err != nil {
return 0, "", 0, err
response.fileOffset = w.currentFileSize
response.fileName = w.currentFileName
response.bytesWritten, response.err = w.writeRecord(w.currentFile, record, maxRecordSize)
if response.err != nil {
return
}
// sync file to reduce possibility of half written records in case of crash
if err := w.currentFile.Sync(); err != nil {
return 0, "", 0, err
if response.err = w.currentFile.Sync(); response.err != nil {
return
}
fi, err := w.currentFile.Stat()
if err != nil {
return 0, "", 0, err
response.err = err
return
}
w.currentFileSize = fi.Size()
return offset, w.currentFileName, size, err

return
}

func (w *singleWarcFileWriter) createFile() error {
Expand All @@ -218,7 +248,7 @@ func (w *singleWarcFileWriter) createFile() error {
if path != "" && !strings.HasSuffix(path, "/") {
path += "/"
}
path += fileName + suffix + w.opts.openFileSuffix
path += fileName + w.opts.openFileSuffix

file, err := os.OpenFile(path, os.O_CREATE|os.O_EXCL|os.O_RDWR, 0666)
if err != nil {
Expand Down Expand Up @@ -249,9 +279,9 @@ func (w *singleWarcFileWriter) writeRecord(writer io.Writer, record WarcRecord,
return size, err
}
if nextRec != nil {
_, _, s, e := w.Write(nextRec)
s += size
return s, e
res := w.Write(nextRec)
res.bytesWritten += size
return res.bytesWritten, res.err
}
return size, nil
}
Expand Down Expand Up @@ -380,6 +410,7 @@ type warcFileWriterOptions struct {
marshaler Marshaler
maxConcurrentWriters int
warcInfoFunc func(recordBuilder WarcRecordBuilder) error
addConcurrentHeader bool
}

// WarcFileWriterOption configures how to write WARC files.
Expand Down Expand Up @@ -414,6 +445,7 @@ func defaultwarcFileWriterOptions() warcFileWriterOptions {
nameGenerator: &PatternNameGenerator{},
marshaler: &defaultMarshaler{},
maxConcurrentWriters: 1,
addConcurrentHeader: false,
}
}

Expand Down Expand Up @@ -506,3 +538,12 @@ func WithWarcInfoFunc(f func(recordBuilder WarcRecordBuilder) error) WarcFileWri
o.warcInfoFunc = f
})
}

// WithAddWarcConcurrentToHeader configures if records written in the same call to Write should have WARC-Concurrent-To
// headers added for cross-reference.
// default false
func WithAddWarcConcurrentToHeader(addConcurrentHeader bool) WarcFileWriterOption {
return newFuncWarcFileOption(func(o *warcFileWriterOptions) {
o.addConcurrentHeader = addConcurrentHeader
})
}
Loading

0 comments on commit 0af90a9

Please sign in to comment.