Skip to content

Commit

Permalink
code style fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
hairyhum committed Feb 14, 2024
1 parent 33cb342 commit 8a8aa99
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 86 deletions.
56 changes: 0 additions & 56 deletions pkg/output/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@
package output

import (
"bytes"
"context"
"io"
"strings"
"testing"

. "gopkg.in/check.v1"
Expand Down Expand Up @@ -47,55 +43,3 @@ func (s *OutputSuite) TestValidateKey(c *C) {
c.Check(err, tc.checker, Commentf("Key (%s) failed!", tc.key))
}
}

// FIXME: replace this with TestLogAndParse
func parse(str string) (*Output, error) {
reader := io.NopCloser(strings.NewReader(str))
output, err := LogAndParse(context.Background(), reader)
var o *Output
for k, v := range output {
o = &Output{
Key: k,
Value: v.(string),
}
}
return o, err
}

func (s *OutputSuite) TestParseValid(c *C) {
key, val := "foo", "bar"
b := bytes.NewBuffer(nil)
err := fPrintOutput(b, key, val)
c.Check(err, IsNil)

o, err := parse(b.String())
c.Assert(err, IsNil)
c.Assert(o, NotNil)
c.Assert(o.Key, Equals, key)
c.Assert(o.Value, Equals, val)
}

func (s *OutputSuite) TestParseNoOutput(c *C) {
key, val := "foo", "bar"
b := bytes.NewBuffer(nil)
err := fPrintOutput(b, key, val)
c.Check(err, IsNil)
valid := b.String()
for _, tc := range []struct {
s string
checker Checker
}{
{
s: valid[0 : len(valid)-2],
checker: NotNil,
},
{
s: valid[1 : len(valid)-1],
checker: IsNil,
},
} {
o, err := parse(tc.s)
c.Assert(err, tc.checker)
c.Assert(o, IsNil)
}
}
73 changes: 43 additions & 30 deletions pkg/output/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"bufio"
"bytes"
"context"
"fmt"
"io"

"github.com/kanisterio/kanister/pkg/field"
Expand Down Expand Up @@ -101,34 +100,45 @@ func CheckSeparatorSuffixState(separatorSuffix []byte) scanState {
}
}

func handleOutput(state scanState, line []byte, isPrefix bool, ctx context.Context, f func(context.Context, []byte) error) (scanState, error) {
func handleOutput(
state scanState,
line []byte,
isPrefix bool,
ctx context.Context,
f func(context.Context, []byte) error,
) (scanState, error) {
if isPrefix {
// Accumulate phase output
return ReadPhaseOutputState(append(state.outputBuf, line...)), nil
} else {
// Reached the end of the line while reading phase output
outputContent := state.outputBuf
outputContent = append(outputContent, line...)

if err := f(ctx, outputContent); err != nil {
return state, err
}

// Transition out of readingOutput state
return InitState(), nil
}
// Reached the end of the line while reading phase output
if err := f(ctx, append(state.outputBuf, line...)); err != nil {
return state, err
}
// Transition out of readingOutput state
return InitState(), nil
}

func handleSeparatorSuffix(state scanState, line []byte, isPrefix bool, ctx context.Context, f func(context.Context, []byte) error) (scanState, error) {
func handleSeparatorSuffix(
state scanState,
line []byte,
isPrefix bool,
ctx context.Context,
f func(context.Context, []byte) error,
) (scanState, error) {
if bytes.Index(line, state.separatorSuffix) == 0 {
return captureOutputContent(line, isPrefix, len(state.separatorSuffix), ctx, f)
} else {
// Read log like normal
return handleLog(line, isPrefix, ctx, f)
}
// Read log like normal
return handleLog(line, isPrefix, ctx, f)
}

func handleLog(line []byte, isPrefix bool, ctx context.Context, f func(context.Context, []byte) error) (scanState, error) {
func handleLog(
line []byte,
isPrefix bool,
ctx context.Context,
f func(context.Context, []byte) error,
) (scanState, error) {
indexOfPOString := bytes.Index(line, []byte(PhaseOpString))
if indexOfPOString == -1 {
// Log plain output, there is no phase output here
Expand All @@ -142,27 +152,31 @@ func handleLog(line []byte, isPrefix bool, ctx context.Context, f func(context.C
}

return InitState(), nil
} else {
// Log everything before separator as plain output
prefix := line[0:indexOfPOString]
if len(prefix) > 0 {
logOutput(ctx, prefix)
}

return captureOutputContent(line, isPrefix, indexOfPOString+len(PhaseOpString), ctx, f)
}
// Log everything before separator as plain output
prefix := line[0:indexOfPOString]
if len(prefix) > 0 {
logOutput(ctx, prefix)
}

return captureOutputContent(line, isPrefix, indexOfPOString+len(PhaseOpString), ctx, f)
}

func captureOutputContent(line []byte, isPrefix bool, startIndex int, ctx context.Context, f func(context.Context, []byte) error) (scanState, error) {
func captureOutputContent(
line []byte,
isPrefix bool,
startIndex int,
ctx context.Context,
f func(context.Context, []byte) error,
) (scanState, error) {
outputContent := line[startIndex:]
if !isPrefix {
if err := f(ctx, outputContent); err != nil {
return InitState(), err
}
return InitState(), nil
} else {
return ReadPhaseOutputState(append([]byte(nil), outputContent...)), nil
}
return ReadPhaseOutputState(append([]byte(nil), outputContent...)), nil
}

func checkSplitSeparator(line []byte) (splitSeparator int, separatorSuffix []byte) {
Expand Down Expand Up @@ -198,7 +212,6 @@ func LogAndParse(ctx context.Context, r io.ReadCloser) (map[string]interface{},
if err != nil {
return err
}
fmt.Printf("\nParsed output: %v\n", op)
if op != nil {
out[op.Key] = op.Value
}
Expand Down

0 comments on commit 8a8aa99

Please sign in to comment.