Skip to content

Commit

Permalink
Merge pull request #15 from francoispqt/update/improve-io-reader
Browse files Browse the repository at this point in the history
improve io.Reader handling in decoder, handling EOF error and retrying
  • Loading branch information
francoispqt authored May 2, 2018
2 parents 616be90 + 55210b4 commit 0c193f6
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 9 deletions.
19 changes: 11 additions & 8 deletions decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,12 +291,16 @@ func (dec *Decoder) read() bool {
copy(Buf, dec.data)
dec.data = Buf
}
n, err := dec.r.Read(dec.data[dec.length:])
if err != nil {
dec.err = err
return false
} else if n == 0 {
return false
var n int
var err error
for n == 0 {
n, err = dec.r.Read(dec.data[dec.length:])
if err != nil {
if err != io.EOF {
dec.err = err
}
return false
}
}
dec.length = dec.length + n
return true
Expand All @@ -305,10 +309,9 @@ func (dec *Decoder) read() bool {
}

func (dec *Decoder) nextChar() byte {
for dec.cursor < dec.length || dec.read() {
for ; dec.cursor < dec.length || dec.read(); dec.cursor++ {
switch dec.data[dec.cursor] {
case ' ', '\n', '\t', '\r', ',':
dec.cursor = dec.cursor + 1
continue
}
d := dec.data[dec.cursor]
Expand Down
1 change: 1 addition & 0 deletions decode_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func (dec *StreamDecoder) DecodeStream(c UnmarshalerStream) error {
return nil
}
}
close(dec.done)
return InvalidJSONError("Invalid JSON while parsing line delimited JSON")
}

Expand Down
24 changes: 23 additions & 1 deletion decode_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package gojay

import (
"context"
"errors"
"io"
"testing"
"time"

Expand Down Expand Up @@ -310,6 +312,20 @@ loop:
testCase.expectations(dec.Err(), result, t)
}

func TestStreamDecodingErr(t *testing.T) {
testChan := ChannelStreamStrings(make(chan *string))
dec := Stream.NewDecoder(&StreamReaderErr{})
// start decoding (will block the goroutine until something is written to the ReadWriter)
go dec.DecodeStream(testChan)
select {
case <-dec.Done():
assert.NotNil(t, dec.Err(), "dec.Err() should not be nil")
case <-testChan:
assert.True(t, false, "should not be called")
}

}

type ChannelStreamStrings chan *string

func (c ChannelStreamStrings) UnmarshalStream(dec *StreamDecoder) error {
Expand Down Expand Up @@ -355,10 +371,16 @@ func (r *StreamReader) Read(b []byte) (int, error) {
n := copy(b, v)
return n, nil
case <-r.done:
return 0, nil
return 0, io.EOF
}
}

type StreamReaderErr struct{}

func (r *StreamReaderErr) Read(b []byte) (int, error) {
return 0, errors.New("Test Error")
}

// Deadline test
func TestStreamDecodingDeadline(t *testing.T) {
dec := Stream.NewDecoder(&StreamReader{})
Expand Down

0 comments on commit 0c193f6

Please sign in to comment.