Skip to content

Commit

Permalink
can't just use seek because of how the ipc reader works
Browse files Browse the repository at this point in the history
  • Loading branch information
thorfour committed Oct 3, 2024
1 parent 7bad9c2 commit 68a9790
Showing 1 changed file with 17 additions and 19 deletions.
36 changes: 17 additions & 19 deletions reporter/offline.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,10 @@ func (o *OfflineReporter) GetMetrics() reporter.Metrics {
}

type ArrowLogReader struct {
lastReader bool
currentReader *ipc.FileReader
currentRecord int
f *os.File
offset int64
}

type ArrowLogger struct {
Expand Down Expand Up @@ -312,50 +312,48 @@ func (a *ArrowLogger) Write(mem memory.Allocator, rec arrow.Record) error {
}

func OpenArrowLog(fname string) (*ArrowLogReader, error) {
f, err := os.Open(fname)
info, err := os.Stat(fname)
if err != nil {
return nil, err
}

// Seek to the last 8 bytes
_, err = f.Seek(-8, io.SeekEnd)
f, err := os.Open(fname)
if err != nil {
return nil, err
}

return &ArrowLogReader{
f: f,
f: f,
offset: info.Size(),
}, nil
}

func (a *ArrowLogReader) initNextReader() error {
if a.lastReader {
if a.offset == 0 {
return io.EOF
}

// Read the last 8 bytes
size := make([]byte, 8)
_, err := a.f.Read(size)
// Read the 8 bytes of size
_, err := a.f.Seek(a.offset-8, io.SeekStart)
if err != nil {
return err
}

// Read the size of the record
recordSize := binary.LittleEndian.Uint64(size)

// Seek to the start of the record
offset, err := a.f.Seek(-int64(recordSize+8), io.SeekCurrent)
size := make([]byte, 8)
_, err = a.f.Read(size)
if err != nil {
return err
}

// We've reached the last record
if offset == 0 {
a.lastReader = true
}
// Read the size of the record
recordSize := binary.LittleEndian.Uint64(size)

// Set the offset to the start of the record
a.offset -= int64(recordSize + 8)

// Start a new reader
a.currentReader, err = ipc.NewFileReader(a.f, ipc.WithAllocator(memory.NewGoAllocator()))
section := io.NewSectionReader(a.f, a.offset, int64(recordSize))
a.currentReader, err = ipc.NewFileReader(section, ipc.WithAllocator(memory.NewGoAllocator()))
if err != nil {
return err
}
Expand Down

0 comments on commit 68a9790

Please sign in to comment.