diff --git a/pkg/output/output_test.go b/pkg/output/output_test.go index 52413f26be..a63e681030 100644 --- a/pkg/output/output_test.go +++ b/pkg/output/output_test.go @@ -15,10 +15,6 @@ package output import ( - "bytes" - "context" - "io" - "strings" "testing" . "gopkg.in/check.v1" @@ -47,55 +43,3 @@ func (s *OutputSuite) TestValidateKey(c *C) { c.Check(err, tc.checker, Commentf("Key (%s) failed!", tc.key)) } } - -// FIXME: replace this with TestLogAndParse -func parse(str string) (*Output, error) { - reader := io.NopCloser(strings.NewReader(str)) - output, err := LogAndParse(context.Background(), reader) - var o *Output - for k, v := range output { - o = &Output{ - Key: k, - Value: v.(string), - } - } - return o, err -} - -func (s *OutputSuite) TestParseValid(c *C) { - key, val := "foo", "bar" - b := bytes.NewBuffer(nil) - err := fPrintOutput(b, key, val) - c.Check(err, IsNil) - - o, err := parse(b.String()) - c.Assert(err, IsNil) - c.Assert(o, NotNil) - c.Assert(o.Key, Equals, key) - c.Assert(o.Value, Equals, val) -} - -func (s *OutputSuite) TestParseNoOutput(c *C) { - key, val := "foo", "bar" - b := bytes.NewBuffer(nil) - err := fPrintOutput(b, key, val) - c.Check(err, IsNil) - valid := b.String() - for _, tc := range []struct { - s string - checker Checker - }{ - { - s: valid[0 : len(valid)-2], - checker: NotNil, - }, - { - s: valid[1 : len(valid)-1], - checker: IsNil, - }, - } { - o, err := parse(tc.s) - c.Assert(err, tc.checker) - c.Assert(o, IsNil) - } -} diff --git a/pkg/output/stream.go b/pkg/output/stream.go index a57145e3b0..7e15b9ff16 100644 --- a/pkg/output/stream.go +++ b/pkg/output/stream.go @@ -18,7 +18,6 @@ import ( "bufio" "bytes" "context" - "fmt" "io" "github.com/kanisterio/kanister/pkg/field" @@ -101,34 +100,45 @@ func CheckSeparatorSuffixState(separatorSuffix []byte) scanState { } } -func handleOutput(state scanState, line []byte, isPrefix bool, ctx context.Context, f func(context.Context, []byte) error) (scanState, error) { +func handleOutput( + state scanState, + line []byte, + isPrefix bool, + ctx context.Context, + f func(context.Context, []byte) error, +) (scanState, error) { if isPrefix { // Accumulate phase output return ReadPhaseOutputState(append(state.outputBuf, line...)), nil - } else { - // Reached the end of the line while reading phase output - outputContent := state.outputBuf - outputContent = append(outputContent, line...) - - if err := f(ctx, outputContent); err != nil { - return state, err - } - - // Transition out of readingOutput state - return InitState(), nil } + // Reached the end of the line while reading phase output + if err := f(ctx, append(state.outputBuf, line...)); err != nil { + return state, err + } + // Transition out of readingOutput state + return InitState(), nil } -func handleSeparatorSuffix(state scanState, line []byte, isPrefix bool, ctx context.Context, f func(context.Context, []byte) error) (scanState, error) { +func handleSeparatorSuffix( + state scanState, + line []byte, + isPrefix bool, + ctx context.Context, + f func(context.Context, []byte) error, +) (scanState, error) { if bytes.Index(line, state.separatorSuffix) == 0 { return captureOutputContent(line, isPrefix, len(state.separatorSuffix), ctx, f) - } else { - // Read log like normal - return handleLog(line, isPrefix, ctx, f) } + // Read log like normal + return handleLog(line, isPrefix, ctx, f) } -func handleLog(line []byte, isPrefix bool, ctx context.Context, f func(context.Context, []byte) error) (scanState, error) { +func handleLog( + line []byte, + isPrefix bool, + ctx context.Context, + f func(context.Context, []byte) error, +) (scanState, error) { indexOfPOString := bytes.Index(line, []byte(PhaseOpString)) if indexOfPOString == -1 { // Log plain output, there is no phase output here @@ -142,27 +152,31 @@ func handleLog(line []byte, isPrefix bool, ctx context.Context, f func(context.C } return InitState(), nil - } else { - // Log everything before separator as plain output - prefix := line[0:indexOfPOString] - if len(prefix) > 0 { - logOutput(ctx, prefix) - } - - return captureOutputContent(line, isPrefix, indexOfPOString+len(PhaseOpString), ctx, f) } + // Log everything before separator as plain output + prefix := line[0:indexOfPOString] + if len(prefix) > 0 { + logOutput(ctx, prefix) + } + + return captureOutputContent(line, isPrefix, indexOfPOString+len(PhaseOpString), ctx, f) } -func captureOutputContent(line []byte, isPrefix bool, startIndex int, ctx context.Context, f func(context.Context, []byte) error) (scanState, error) { +func captureOutputContent( + line []byte, + isPrefix bool, + startIndex int, + ctx context.Context, + f func(context.Context, []byte) error, +) (scanState, error) { outputContent := line[startIndex:] if !isPrefix { if err := f(ctx, outputContent); err != nil { return InitState(), err } return InitState(), nil - } else { - return ReadPhaseOutputState(append([]byte(nil), outputContent...)), nil } + return ReadPhaseOutputState(append([]byte(nil), outputContent...)), nil } func checkSplitSeparator(line []byte) (splitSeparator int, separatorSuffix []byte) { @@ -198,7 +212,6 @@ func LogAndParse(ctx context.Context, r io.ReadCloser) (map[string]interface{}, if err != nil { return err } - fmt.Printf("\nParsed output: %v\n", op) if op != nil { out[op.Key] = op.Value }