-
Notifications
You must be signed in to change notification settings - Fork 2
/
segmenter.go
90 lines (74 loc) · 2.03 KB
/
segmenter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package dedup
import (
"bufio"
"io"
"github.com/kch42/buzhash"
"github.com/pkg/errors"
)
// SegmentHandler is something capable of processing the segments handed to it
type SegmentHandler func([]byte) error
// Segmenter segments a file or stream
type Segmenter struct {
WindowSize uint64
Mask uint64
MaxSegmentLength uint64
}
// SegmentFile does the actual work of segmenting the specified file as per the
// params configure in the Segmenter struct. It reads the io.Reader till EOF,
// calling the specified handler each time it finds a segment
func (s Segmenter) SegmentFile(file io.Reader, handler SegmentHandler) error {
if handler == nil {
return errors.Errorf("No segment handler specified")
}
if s.Mask == 0 {
return errors.Errorf("Invalid mask specified (0)")
}
if s.WindowSize <= 0 {
return errors.Errorf("Invalid windows size specified")
}
if s.MaxSegmentLength <= 0 {
s.MaxSegmentLength = (s.Mask + 1) * 8 // arbitrary :-)
}
var (
reader = bufio.NewReader(file)
roller = buzhash.NewBuzHash(uint32(s.WindowSize))
curSegment = make([]byte, 0, s.MaxSegmentLength)
bytesRead = uint64(0)
minSegLen = s.WindowSize
)
// Loop over input stream one byte at a time
for {
b, err := reader.ReadByte()
if err == io.EOF {
break
}
if err != nil {
return err
}
curSegment = append(curSegment, b)
sum := roller.HashByte(b)
bytesRead++
// dont accept segments smaller than minSegLen
if uint64(len(curSegment)) < minSegLen {
continue
}
// If this is a cutpoint, process the curSegment
if (uint64(sum) & s.Mask) == 0 {
if err := handler(curSegment); err != nil {
return err
}
curSegment = curSegment[:0] // reset the curSegment accumulator
}
if uint64(len(curSegment)) >= s.MaxSegmentLength {
if err := handler(curSegment); err != nil {
return err
}
curSegment = curSegment[:0] // reset the curSegment accumulator
}
}
// Deal with any remaining bytes in curSegment
if err := handler(curSegment); err != nil {
return err
}
return nil
}