diff --git a/pkg/kgo/record_formatter.go b/pkg/kgo/record_formatter.go index 06e4fec9..932cc5ce 100644 --- a/pkg/kgo/record_formatter.go +++ b/pkg/kgo/record_formatter.go @@ -6,6 +6,7 @@ import ( "encoding/base64" "encoding/binary" "encoding/hex" + "encoding/json" "errors" "fmt" "io" @@ -366,6 +367,9 @@ func NewRecordFormatter(layout string) (*RecordFormatter, error) { var appendFn func([]byte, []byte) []byte if handledBrace = isOpenBrace; handledBrace { switch { + case strings.HasPrefix(layout, "}"): + layout = layout[len("}"):] + appendFn = appendPlain case strings.HasPrefix(layout, "base64}"): appendFn = appendBase64 layout = layout[len("base64}"):] @@ -1050,12 +1054,14 @@ type RecordReader struct { // // # Text // -// Topics, keys, and values can be decoded uding "base64" and "hex" formatting -// options. Any size specification is the size of the encoded value actually -// being read. +// Topics, keys, and values can be decoded using "base64", "hex", and "json" +// formatting options. Any size specification is the size of the encoded value +// actually being read (i.e., size as seen, not size when decoded). JSON values +// are compacted after being read. // // %T%t{hex} - 4abcd reads four hex characters "abcd" // %V%v{base64} - 2z9 reads two base64 characters "z9" +// %v{json} %k - {"foo" : "bar"} foo reads a JSON object and then "foo" // // As well, these text options can be parsed with regular expressions: // @@ -1292,14 +1298,25 @@ func (r *RecordReader) parseReadLayout(layout string) error { case 't', 'k', 'v': var decodeFn func([]byte) ([]byte, error) var re *regexp.Regexp + var isJson bool if handledBrace = isOpenBrace; handledBrace { switch { + case strings.HasPrefix(layout, "}"): + layout = layout[len("}"):] case strings.HasPrefix(layout, "base64}"): decodeFn = decodeBase64 layout = layout[len("base64}"):] case strings.HasPrefix(layout, "hex}"): decodeFn = decodeHex layout = layout[len("hex}"):] + case strings.HasPrefix(layout, "json}"): + isJson = true + decodeFn = func(b []byte) ([]byte, error) { + var buf bytes.Buffer + err := json.Compact(&buf, b) + return buf.Bytes(), err + } + layout = layout[len("json}"):] case strings.HasPrefix(layout, "re"): restr, rem, err := nomOpenClose(layout[len("re"):]) if err != nil { @@ -1353,9 +1370,14 @@ func (r *RecordReader) parseReadLayout(layout string) error { if re != nil { return errors.New("cannot specify exact size and regular expression") } + if isJson { + return errors.New("cannot specify exact size and json") + } fn.read = readKind{sizefn: func() int { return int(*size) }} } else if re != nil { fn.read = readKind{re: re} + } else if isJson { + fn.read = readKind{condition: new(jsonReader).read} } r.fns = append(r.fns, fn) @@ -1640,7 +1662,7 @@ func decodeHex(b []byte) ([]byte, error) { type readKind struct { noread bool exact []byte - condition func(byte) int8 // -1: stop, do not consume input; 0: stop, consume input; 1: keep going, consume input, 2: keep going, consume input, can EOF + condition func(byte) int8 // -2: error, -1: stop, do not consume input; 0: stop, consume input; 1: keep going, consume input, 2: keep going, consume input, can EOF size int sizefn func() int handoff func(*RecordReader, *Record) error @@ -1738,6 +1760,8 @@ func (r *RecordReader) readCondition(fn func(byte) int8) error { ignoreEOF = false c := peek[0] switch fn(c) { + case -2: + return fmt.Errorf("invalid input %q", c) case -1: return nil case 0: @@ -1840,6 +1864,356 @@ func (r *RecordReader) readDelim(d []byte) error { } } +type jsonReader struct { + state int8 + n int8 // misc. + nexts []int8 +} + +func (*jsonReader) isHex(c byte) bool { + switch c { + case '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + 'a', 'b', 'c', 'd', 'e', 'f', + 'A', 'B', 'C', 'D', 'E', 'F': + return true + default: + return false + } +} + +func (*jsonReader) isNum(c byte) bool { + switch c { + case '0', '1', '2', '3', '4', '5', '6', '7', '8', '9': + return true + } + return false +} + +func (*jsonReader) isNat(c byte) bool { + switch c { + case '1', '2', '3', '4', '5', '6', '7', '8', '9': + return true + } + return false +} + +func (*jsonReader) isE(c byte) bool { + return c == 'e' || c == 'E' +} + +const ( + jrstAny int8 = iota + jrstObj + jrstObjSep + jrstObjFin + jrstArr + jrstArrFin + jrstStrBegin + jrstStr + jrstStrEsc + jrstStrEscU + jrstTrue + jrstFalse + jrstNull + jrstNeg + jrstOne + jrstDotOrE + jrstDot + jrstE +) + +func (r *jsonReader) read(c byte) (rr int8) { +start: + switch r.state { + case jrstAny: + switch c { + case ' ', '\t', '\n', '\r': + return 1 // skip whitespace, need more + case '{': + r.state = jrstObj + return 1 // object open, need more + case '[': + r.state = jrstArr + return 1 // array open, need more + case '"': + r.state = jrstStr + return 1 // string open, need more + case 't': + r.state = jrstTrue + r.n = 0 + return 1 // beginning of true, need more + case 'f': + r.state = jrstFalse + r.n = 0 + return 1 // beginning of false, need more + case 'n': + r.state = jrstNull + r.n = 0 + return 1 // beginning of null, need more + case '-': + r.state = jrstNeg + return 1 // beginning of negative number, need more + case '0': + r.state = jrstDotOrE + return 1 // beginning of 0e or 0., need more + case '1', '2', '3', '4', '5', '6', '7', '8', '9': + r.state = jrstOne + return 1 // beginning of number, need more + default: + return -2 // invalid json + } + + case jrstObj: + switch c { + case ' ', '\t', '\n', '\r': + return 1 // skip whitespace in json object, need more + case '"': + r.pushState(jrstStr, jrstObjSep) + return 1 // beginning of object key, need to finish, transition to obj sep + case '}': + return r.popState() // end of object, this is valid json end, pop state + default: + return -2 // invalid json: expected object key + } + case jrstObjSep: + switch c { + case ' ', '\t', '\n', '\r': + return 1 // skip whitespace in json object, need more + case ':': + r.pushState(jrstAny, jrstObjFin) + return 1 // beginning of object value, need to finish, transition to obj fin + default: + return -2 // invalid json: expected object separator + } + case jrstObjFin: + switch c { + case ' ', '\r', '\t', '\n': + return 1 // skip whitespace in json object, need more + case ',': + r.pushState(jrstStrBegin, jrstObjSep) + return 1 // beginning of new object key, need to finish, transition to obj sep + case '}': + return r.popState() // end of object, this is valid json end, pop state + default: + return -2 // invalid json + } + + case jrstArr: + switch c { + case ' ', '\r', '\t', '\n': + return 1 // skip whitespace in json array, need more + case ']': + return r.popState() // end of array, this is valid json end, pop state + default: + r.pushState(jrstAny, jrstArrFin) + goto start // array value began: immediately transition to it + } + case jrstArrFin: + switch c { + case ' ', '\r', '\t', '\n': + return 1 // skip whitespace in json array, need more + case ',': + r.state = jrstArr + return 1 // beginning of new array value, need more + case ']': + return r.popState() // end of array, this is valid json end, pop state + default: + return -2 // invalid json + } + + case jrstStrBegin: + switch c { + case ' ', '\r', '\t', '\n': + return 1 // skip whitespace in json object (before beginning of key), need more + case '"': + r.state = jrstStr + return 1 // beginning of object key, need more + default: + return -2 // invalid json + } + + case jrstStr: + switch c { + case 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, + 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31: + return -2 // invalid json: control characters not allowed in string + case '"': + return r.popState() // end of string, this is valid json end, pop state + case '\\': + r.state = jrstStrEsc + return 1 // beginning of escape sequence, need more + default: + return 1 // continue string, need more + } + case jrstStrEsc: + switch c { + case 'b', 'f', 'n', 'r', 't', '\\', '/', '"': + r.state = jrstStr + return 1 // end of escape sequence, still need to finish string + case 'u': + r.state = jrstStrEscU + r.n = 0 + return 1 // beginning of unicode escape sequence, need more + default: + return -2 // invalid json: invalid escape sequence + } + case jrstStrEscU: + if !r.isHex(c) { + return -2 // invalid json: invalid unicode escape sequence + } + r.n++ + if r.n == 4 { + r.state = jrstStr + } + return 1 // end of unicode escape sequence, still need to finish string + + case jrstTrue: + switch { + case r.n == 0 && c == 'r': + r.n++ + return 1 + case r.n == 1 && c == 'u': + r.n++ + return 1 + case r.n == 2 && c == 'e': + return r.popState() // end of true, this is valid json end, pop state + } + case jrstFalse: + switch { + case r.n == 0 && c == 'a': + r.n++ + return 1 + case r.n == 1 && c == 'l': + r.n++ + return 1 + case r.n == 2 && c == 's': + r.n++ + return 1 + case r.n == 3 && c == 'e': + return r.popState() // end of false, this is valid json end, pop state + } + case jrstNull: + switch { + case r.n == 0 && c == 'u': + r.n++ + return 1 + case r.n == 1 && c == 'l': + r.n++ + return 1 + case r.n == 2 && c == 'l': + return r.popState() // end of null, this is valid json end, pop state + } + + case jrstNeg: + if c == '0' { + r.state = jrstDotOrE + return r.oneOrTwo() // beginning of -0, need to see if there is more (potentially end) + } else if r.isNat(c) { + r.state = jrstOne + return r.oneOrTwo() // beginning of -1 (or 2,3,..9), need to see if there is more (potentially end) + } + return -2 // invalid, -a or something + case jrstOne: + if r.isNum(c) { + return r.oneOrTwo() // continue the number (potentially end) + } + fallthrough // not a number, check if e or . + case jrstDotOrE: + if r.isE(c) { + r.state = jrstE + return 1 // beginning of exponent, need more + } + if c == '.' { + r.state = jrstDot + r.n = 0 + return 1 // beginning of dot, need more + } + if r.popStateToStart() { + goto start + } + return -1 // done with number, no more state to bubble to: we are done + + case jrstDot: + switch r.n { + case 0: + if !r.isNum(c) { + return -2 // first char after dot must be a number + } + r.n = 1 + return r.oneOrTwo() // saw number, keep and continue (potentially end) + case 1: + if r.isNum(c) { + return r.oneOrTwo() // more number, keep and continue (potentially end) + } + if r.isE(c) { + r.state = jrstE + r.n = 0 + return 1 // beginning of exponent (-0.1e), need more + } + if r.popStateToStart() { + goto start + } + return -1 // done with number, no more state to bubble to: we are done + } + case jrstE: + switch r.n { + case 0: + if c == '+' || c == '-' { + r.n = 1 + return 1 // beginning of exponent sign, need more + } + fallthrough + case 1: + if !r.isNum(c) { + return -2 // first char after exponent must be sign or number + } + r.n = 2 + return r.oneOrTwo() // saw number, keep and continue (potentially end) + case 2: + if r.isNum(c) { + return r.oneOrTwo() // more number, keep and continue (potentially end) + } + if r.popStateToStart() { + goto start + } + return -1 // done with number, no more state to bubble to: we are done + } + } + return -2 // unknown state +} + +func (r *jsonReader) pushState(next, next2 int8) { + r.nexts = append(r.nexts, next2) + r.state = next +} + +func (r *jsonReader) popState() int8 { + if len(r.nexts) == 0 { + r.state = jrstAny + return 0 + } + r.state = r.nexts[len(r.nexts)-1] + r.nexts = r.nexts[:len(r.nexts)-1] + return 1 +} + +func (r *jsonReader) popStateToStart() bool { + if len(r.nexts) == 0 { + r.state = jrstAny + return false + } + r.state = r.nexts[len(r.nexts)-1] + r.nexts = r.nexts[:len(r.nexts)-1] + return true +} + +func (r *jsonReader) oneOrTwo() int8 { + if len(r.nexts) > 0 { + return 1 + } + return 2 +} + //////////// // COMMON // //////////// diff --git a/pkg/kgo/record_formatter_test.go b/pkg/kgo/record_formatter_test.go index 802d4869..895d6e26 100644 --- a/pkg/kgo/record_formatter_test.go +++ b/pkg/kgo/record_formatter_test.go @@ -1,9 +1,12 @@ package kgo import ( + "bytes" + "encoding/json" "errors" "io" "reflect" + "strconv" "strings" "testing" "time" @@ -41,6 +44,10 @@ func TestRecordFormatter(t *testing.T) { layout: "%v", expR: "value", }, + { + layout: "%v{}", + expR: "value", + }, { layout: "%T{hex16}%t %V{ascii} %v %V{little16} %k %K{big32} %o", @@ -230,6 +237,11 @@ func TestRecordReader(t *testing.T) { in: "foo bar biz\nbaz", exp: []*Record{StringRecord("foo bar biz\nbaz")}, }, + { + layout: "%v{}", + in: "foo bar biz\nbaz", + exp: []*Record{StringRecord("foo bar biz\nbaz")}, + }, { layout: "%k %v", @@ -576,6 +588,29 @@ func TestRecordReader(t *testing.T) { expErr: true, }, + // some json tests -- more below + { + layout: `%v{json} %k{json}`, + in: `{"foo": "bar"} true`, + exp: []*Record{ + KeyStringRecord("true", `{"foo":"bar"}`), + }, + }, + { + layout: `%v{json}%k{json}`, + in: `3null`, + exp: []*Record{ + KeyStringRecord("null", "3"), + }, + }, + { + layout: `%k{json}\t%v{json}`, + in: `true { "foo" : "bar", "biz": ["\n", 4.4 , 3,4] }`, + exp: []*Record{ + KeyStringRecord("true", `{"foo":"bar","biz":["\n",4.4,3,4]}`), + }, + }, + // } { t.Run(test.layout, func(t *testing.T) { @@ -610,6 +645,142 @@ func TestRecordReader(t *testing.T) { } } +func TestRecordReaderJson(t *testing.T) { + tests := []string{ + "", + " ", + " z", + " 1 ", + " {}", + " []", + " true", + " null", + " \"n\"", + + // string + "\"\xe2", // begin line-sep but invalid finish + "\"\xe2\x79\"", // begin line-sep but not actually line sep + `"foo"`, + "\"\xe2\x80\xa8\xe2\x80\xa9\"", // line-sep and paragraph-sep + ` "\uaaaa" `, + ` "\uaaaa\uaaaa" `, + ` "\`, + ` "\z`, + " \"f\x00o\"", + ` "foo`, + ` "\uazaa" `, + + // number + "1", + " 0 ", + " 0e1 ", + " 0.1 ", + "1.", + "1", + " 0e+0 ", + " -0e+0 ", + "-0", + "1e6", + "1e+6", + "-1e+6", + "-0e+6", + " -103e+1 ", + "-0.01e+006", + "-z", + "-", + "1e", + "1e+", + " 0.3e+1 ", + " 1e.1 ", + " 0 ", + "1.e3", + "0.1e+6", + "-0.01e+06", + "0e+01", + + // object + "{}", + `{"foo": 3}`, + ` {} `, + strings.Repeat(`{"f":`, 1000) + "{}" + strings.Repeat("}", 1000), + `{"f":{}}`, + `{"":3,"":2}`, + `{"foo": [{"":3, "4": "3"}, 4, {}], "t_wo": 1}`, + ` {"foo": 2,"fudge}`, + `{{"foo": }}`, + `{"foo": true, f "a": true}`, + `{{"foo": [{"":3, 4: "3"}, 4, "5": {4}]}, "t_wo": 1}`, + `{"\uaaaa\uaaaa" : true}`, + "{\"\xe2\x80\xa8\xe2\x80\xa9\": true}", // line-sep and paragraph-sep + "{", + `{"foo"`, + `{"foo",f}`, + `{"foo",`, + `{"foo"f`, + "{}", + `{"":[4.4]}`, + `{"":[4.4e4]}`, + `{"":d}`, + `{"":{}d}`, + + // array + `[]`, + `[[]]`, + `[ ]`, + `[ 1, {}]`, + strings.Repeat("[", 1000) + strings.Repeat("]", 1000), + `[1, 2, 3, 4, {}]`, + `[`, + `[1,`, + `[1a`, + `[1a]`, + `[]`, + + // boolean + "true", + " true ", + "tru", + "false", + " true ", + "fals", + "false", + + // null + "null ", + " null ", + "nul", + " null ", + } + + for i, test := range tests { + t.Run(strconv.Itoa(i), func(t *testing.T) { + r, err := NewRecordReader(strings.NewReader(test), "%v{json}") + if err != nil { + t.Errorf("unexpected err: %v", err) + return + } + + in := []byte(test) + valid := json.Valid(in) + expErr := !valid + rec, err := r.ReadRecord() + gotErr := err != nil + + if expErr != gotErr { + t.Errorf("got err? %v, exp err? %v", gotErr, expErr) + return + } + if expErr || !gotErr { + return + } + + if !bytes.Equal(rec.Value, in) { + t.Errorf("got %q, exp %q", rec.Value, in) + } + }) + } +} + func BenchmarkFormatter(b *testing.B) { buf := make([]byte, 1024) r := &Record{