diff --git a/BUILD.md b/BUILD.md index 6bcef3db5a..7e2cf11436 100644 --- a/BUILD.md +++ b/BUILD.md @@ -114,6 +114,14 @@ Kanister is using `check` library to extend go testing capabilities: https://git It's recommended to write new tests using this library for consistency. `make test` runs all tests in the repository. +It's possible to run a specific test with `TEST_FILTER` environment variable: + +``` +make tests TEST_FILTER=OutputTestSuite +``` + +This variable will be passed to `-check.f` flag and supports regex filters. + To run tests for specific package you can run `go test` in that package directory. It's recommended to do that in build image shell, you can run it with `make shell`. diff --git a/Makefile b/Makefile index 3972f55530..5c732bb09a 100644 --- a/Makefile +++ b/Makefile @@ -155,7 +155,7 @@ deploy: release-controller .deploy-$(DOTFILE_IMAGE) @kubectl apply -f .deploy-$(DOTFILE_IMAGE) test: build-dirs - @$(MAKE) run CMD="./build/test.sh $(SRC_DIRS)" + @$(MAKE) run CMD="TEST_FILTER=$(TEST_FILTER) ./build/test.sh $(SRC_DIRS)" helm-test: build-dirs @$(MAKE) run CMD="./build/helm-test.sh $(SRC_DIRS)" diff --git a/build/test.sh b/build/test.sh index 0cf570c8ee..58410d46b4 100755 --- a/build/test.sh +++ b/build/test.sh @@ -24,6 +24,13 @@ SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )" export CGO_ENABLED=0 export GO111MODULE=on +TEST_FILTER="${TEST_FILTER:-}" +GOCHECK_FILTER="" +if [ -n "${TEST_FILTER}" ]; then + echo "Using test filter ${TEST_FILTER}" + GOCHECK_FILTER="-check.f ${TEST_FILTER}" +fi + TARGETS=$(for d in "$@"; do echo ./$d/...; done) echo -n "Checking gofmt: " @@ -79,7 +86,7 @@ check_dependencies echo "Running tests:" go test -v ${TARGETS} -list . -go test -v -installsuffix "static" ${TARGETS} -check.v +go test -v -installsuffix "static" ${TARGETS} -check.v ${GOCHECK_FILTER} echo echo "PASS" diff --git a/pkg/function/kube_exec.go b/pkg/function/kube_exec.go index ab47058a0b..7c8bcfef41 100644 --- a/pkg/function/kube_exec.go +++ b/pkg/function/kube_exec.go @@ -19,7 +19,7 @@ import ( "context" "io" "os" - "regexp" + "strings" "time" kanister "github.com/kanisterio/kanister/pkg" @@ -60,22 +60,15 @@ func parseLogAndCreateOutput(out string) (map[string]interface{}, error) { if out == "" { return nil, nil } - var op map[string]interface{} - logs := regexp.MustCompile("[\n]").Split(out, -1) - for _, l := range logs { - opObj, err := output.Parse(l) - if err != nil { - return nil, err - } - if opObj == nil { - continue - } - if op == nil { - op = make(map[string]interface{}) - } - op[opObj.Key] = opObj.Value + + reader := io.NopCloser(strings.NewReader(out)) + output, err := output.LogAndParse(context.Background(), reader) + + // For some reason we expect empty output to be returned as nil here + if len(output) == 0 { + return nil, err } - return op, nil + return output, err } func (kef *kubeExecFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) { diff --git a/pkg/function/kube_task_test.go b/pkg/function/kube_task_test.go index f45b0383c3..e6acab62de 100644 --- a/pkg/function/kube_task_test.go +++ b/pkg/function/kube_task_test.go @@ -16,7 +16,9 @@ package function import ( "context" + "fmt" "os" + "strings" "time" . "gopkg.in/check.v1" @@ -61,6 +63,24 @@ func (s *KubeTaskSuite) TearDownSuite(c *C) { } } +func bigOutputPhase(namespace string) crv1alpha1.BlueprintPhase { + longstring := strings.Repeat("a", 100000) + return crv1alpha1.BlueprintPhase{ + Name: "testOutput", + Func: KubeTaskFuncName, + Args: map[string]interface{}{ + KubeTaskNamespaceArg: namespace, + KubeTaskImageArg: consts.LatestKanisterToolsImage, + KubeTaskCommandArg: []string{ + "sh", + "-c", + // We output a line for log only, and a line with output at the tail + fmt.Sprintf("echo -n %s > tmpfile; cat tmpfile; echo; cat tmpfile; kando output longstring $(cat tmpfile)", longstring), + }, + }, + } +} + func outputPhase(namespace string) crv1alpha1.BlueprintPhase { return crv1alpha1.BlueprintPhase{ Name: "testOutput", @@ -161,3 +181,45 @@ func (s *KubeTaskSuite) TestKubeTask(c *C) { } } } + +func (s *KubeTaskSuite) TestKubeTaskWithBigOutput(c *C) { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() + tp := param.TemplateParams{ + StatefulSet: ¶m.StatefulSetParams{ + Namespace: s.namespace, + }, + PodOverride: crv1alpha1.JSONMap{ + "containers": []map[string]interface{}{ + { + "name": "container", + "imagePullPolicy": "Always", + }, + }, + }, + } + expectedOut := strings.Repeat("a", 100000) + action := "test" + for _, tc := range []struct { + bp *crv1alpha1.Blueprint + outs []map[string]interface{} + }{ + { + bp: newTaskBlueprint(bigOutputPhase(s.namespace)), + outs: []map[string]interface{}{ + { + "longstring": expectedOut, + }, + }, + }, + } { + phases, err := kanister.GetPhases(*tc.bp, action, kanister.DefaultVersion, tp) + c.Assert(err, IsNil) + c.Assert(phases, HasLen, len(tc.outs)) + for i, p := range phases { + out, err := p.Exec(ctx, *tc.bp, action, tp) + c.Assert(err, IsNil, Commentf("Phase %s failed", p.Name())) + c.Assert(out, DeepEquals, tc.outs[i]) + } + } +} diff --git a/pkg/output/output.go b/pkg/output/output.go index 8608df9c8b..12c14fbe22 100644 --- a/pkg/output/output.go +++ b/pkg/output/output.go @@ -20,7 +20,6 @@ import ( "io" "os" "regexp" - "strings" "github.com/pkg/errors" ) @@ -47,7 +46,7 @@ func marshalOutput(key, value string) (string, error) { } // UnmarshalOutput unmarshals output json into Output struct -func UnmarshalOutput(opString string) (*Output, error) { +func UnmarshalOutput(opString []byte) (*Output, error) { p := &Output{} err := json.Unmarshal([]byte(opString), p) return p, errors.Wrap(err, "Failed to unmarshal key-value pair") @@ -85,20 +84,3 @@ func fPrintOutput(w io.Writer, key, value string) error { fmt.Fprintln(w, PhaseOpString, outString) return nil } - -const reStr = PhaseOpString + `(.*)$` - -var logRE = regexp.MustCompile(reStr) - -func Parse(l string) (*Output, error) { - l = strings.TrimSpace(l) - match := logRE.FindAllStringSubmatch(l, 1) - if len(match) == 0 { - return nil, nil - } - op, err := UnmarshalOutput(match[0][1]) - if err != nil { - return nil, err - } - return op, nil -} diff --git a/pkg/output/output_test.go b/pkg/output/output_test.go index 0f37a13698..a63e681030 100644 --- a/pkg/output/output_test.go +++ b/pkg/output/output_test.go @@ -15,7 +15,6 @@ package output import ( - "bytes" "testing" . "gopkg.in/check.v1" @@ -44,41 +43,3 @@ func (s *OutputSuite) TestValidateKey(c *C) { c.Check(err, tc.checker, Commentf("Key (%s) failed!", tc.key)) } } - -func (s *OutputSuite) TestParseValid(c *C) { - key, val := "foo", "bar" - b := bytes.NewBuffer(nil) - err := fPrintOutput(b, key, val) - c.Check(err, IsNil) - - o, err := Parse(b.String()) - c.Assert(err, IsNil) - c.Assert(o, NotNil) - c.Assert(o.Key, Equals, key) - c.Assert(o.Value, Equals, val) -} - -func (s *OutputSuite) TestParseNoOutput(c *C) { - key, val := "foo", "bar" - b := bytes.NewBuffer(nil) - err := fPrintOutput(b, key, val) - c.Check(err, IsNil) - valid := b.String() - for _, tc := range []struct { - s string - checker Checker - }{ - { - s: valid[0 : len(valid)-2], - checker: NotNil, - }, - { - s: valid[1 : len(valid)-1], - checker: IsNil, - }, - } { - o, err := Parse(tc.s) - c.Assert(err, tc.checker) - c.Assert(o, IsNil) - } -} diff --git a/pkg/output/stream.go b/pkg/output/stream.go index a06d8fa92d..7e15b9ff16 100644 --- a/pkg/output/stream.go +++ b/pkg/output/stream.go @@ -16,17 +16,23 @@ package output import ( "bufio" + "bytes" "context" "io" - "strings" - - "github.com/pkg/errors" "github.com/kanisterio/kanister/pkg/field" "github.com/kanisterio/kanister/pkg/log" ) -func splitLines(ctx context.Context, r io.ReadCloser, f func(context.Context, string) error) error { +type scanState struct { + outputBuf []byte + readingOutput bool + separatorSuffix []byte +} + +const bufferSize64k = 64 * 1024 + +func splitLines(ctx context.Context, r io.ReadCloser, f func(context.Context, []byte) error) error { // Call r.Close() if the context is canceled or if s.Scan() == false. ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -35,31 +41,180 @@ func splitLines(ctx context.Context, r io.ReadCloser, f func(context.Context, st _ = r.Close() }() - // Scan log lines when ready. - s := bufio.NewScanner(r) - for s.Scan() { - l := s.Text() - l = strings.TrimSpace(l) - if l == "" { - continue + state := InitState() + + reader := bufio.NewReaderSize(r, bufferSize64k) + + // Run a simple state machine loop + for { + line, isPrefix, err := reader.ReadLine() + if err == io.EOF { + // Terminal state + return nil } - if err := f(ctx, l); err != nil { + if err != nil { return err } + // Skip empty lines + if len(line) == 0 { + continue + } + switch { + case state.readingOutput: + if state, err = handleOutput(state, line, isPrefix, ctx, f); err != nil { + return err + } + case len(state.separatorSuffix) > 0: + if state, err = handleSeparatorSuffix(state, line, isPrefix, ctx, f); err != nil { + return err + } + default: + if state, err = handleLog(line, isPrefix, ctx, f); err != nil { + return err + } + } } - return errors.Wrap(s.Err(), "Split lines failed") } +func InitState() scanState { + return scanState{ + outputBuf: []byte(nil), + readingOutput: false, + separatorSuffix: []byte(nil), + } +} + +func ReadPhaseOutputState(outputBuf []byte) scanState { + return scanState{ + outputBuf: outputBuf, + readingOutput: true, + separatorSuffix: []byte(nil), + } +} + +func CheckSeparatorSuffixState(separatorSuffix []byte) scanState { + return scanState{ + outputBuf: []byte(nil), + readingOutput: false, + separatorSuffix: separatorSuffix, + } +} + +func handleOutput( + state scanState, + line []byte, + isPrefix bool, + ctx context.Context, + f func(context.Context, []byte) error, +) (scanState, error) { + if isPrefix { + // Accumulate phase output + return ReadPhaseOutputState(append(state.outputBuf, line...)), nil + } + // Reached the end of the line while reading phase output + if err := f(ctx, append(state.outputBuf, line...)); err != nil { + return state, err + } + // Transition out of readingOutput state + return InitState(), nil +} + +func handleSeparatorSuffix( + state scanState, + line []byte, + isPrefix bool, + ctx context.Context, + f func(context.Context, []byte) error, +) (scanState, error) { + if bytes.Index(line, state.separatorSuffix) == 0 { + return captureOutputContent(line, isPrefix, len(state.separatorSuffix), ctx, f) + } + // Read log like normal + return handleLog(line, isPrefix, ctx, f) +} + +func handleLog( + line []byte, + isPrefix bool, + ctx context.Context, + f func(context.Context, []byte) error, +) (scanState, error) { + indexOfPOString := bytes.Index(line, []byte(PhaseOpString)) + if indexOfPOString == -1 { + // Log plain output, there is no phase output here + logOutput(ctx, line) + + // There is a corner case possible when PhaseOpString is split between chunks + splitSeparator, separatorSuffix := checkSplitSeparator(line) + if splitSeparator != -1 { + // Transition to separatorSuffix state to check next line + return CheckSeparatorSuffixState(separatorSuffix), nil + } + + return InitState(), nil + } + // Log everything before separator as plain output + prefix := line[0:indexOfPOString] + if len(prefix) > 0 { + logOutput(ctx, prefix) + } + + return captureOutputContent(line, isPrefix, indexOfPOString+len(PhaseOpString), ctx, f) +} + +func captureOutputContent( + line []byte, + isPrefix bool, + startIndex int, + ctx context.Context, + f func(context.Context, []byte) error, +) (scanState, error) { + outputContent := line[startIndex:] + if !isPrefix { + if err := f(ctx, outputContent); err != nil { + return InitState(), err + } + return InitState(), nil + } + return ReadPhaseOutputState(append([]byte(nil), outputContent...)), nil +} + +func checkSplitSeparator(line []byte) (splitSeparator int, separatorSuffix []byte) { + lineLength := len(line) + phaseOpBytes := []byte(PhaseOpString) + for i := 1; i < len(phaseOpBytes) && i <= lineLength; i++ { + if bytes.Equal(line[lineLength-i:], phaseOpBytes[0:i]) { + return lineLength - i, phaseOpBytes[i:] + } + } + return -1, nil +} + +func logOutput(ctx context.Context, out []byte) { + log.WithContext(ctx).Print("", field.M{"Pod_Out": string(out)}) +} + +// State machine +// init state: ignore and log output, until we reach ###Phase-output### +// Read output state: accumulate output in buffer +// Transitions: +// init state -> output state: on reaching ###Phase-output###: create and start accumulating output buffer +// output state -> output state: DONT DO THAT YET +// output state -> init state : on reaching \n: parse output json from output buffer and clean the buffer + func LogAndParse(ctx context.Context, r io.ReadCloser) (map[string]interface{}, error) { out := make(map[string]interface{}) - err := splitLines(ctx, r, func(ctx context.Context, l string) error { - log.WithContext(ctx).Print("", field.M{"Pod_Out": l}) - o, err := Parse(l) - if err != nil { - return err - } - if o != nil { - out[o.Key] = o.Value + err := splitLines(ctx, r, func(ctx context.Context, outputContent []byte) error { + outputContent = bytes.TrimSpace(outputContent) + if len(outputContent) != 0 { + log.WithContext(ctx).Print("", field.M{"Pod_Out": string(outputContent)}) + op, err := UnmarshalOutput(outputContent) + if err != nil { + return err + } + if op != nil { + out[op.Key] = op.Value + } } return nil }) diff --git a/pkg/output/stream_test.go b/pkg/output/stream_test.go new file mode 100644 index 0000000000..75cd713379 --- /dev/null +++ b/pkg/output/stream_test.go @@ -0,0 +1,243 @@ +// Copyright 2024 The Kanister Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package output_test + +import ( + "context" + "io" + "math/rand" + "time" + + "github.com/kanisterio/kanister/pkg/output" + . "gopkg.in/check.v1" + apirand "k8s.io/apimachinery/pkg/util/rand" +) + +type EndlinePolicy int + +const ( + NewlineEndline = '\n' + NoEndline = rune(0) + + EndlineRequired EndlinePolicy = iota + EndlineProhibited +) + +type OutputTestSuite struct{} + +var _ = Suite(&OutputTestSuite{}) + +type testCase struct { + prefixLength int + prefixWithEndline bool + key string + value []rune +} + +func generateLength(r *rand.Rand, avgLength int) int { + if avgLength == 0 { + return 0 + } + return r.Intn(avgLength/5) + avgLength // Return random length ±20% of avgLength +} + +var runes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_!=-+*/\\") + +func generateRandomRunes(r *rand.Rand, length int, endline rune) []rune { + totalLength := length + if endline != NoEndline { + totalLength += 1 + } + line := make([]rune, totalLength) + var last rune + for j := 0; j < length; j++ { + var current rune + for rpt := true; rpt; rpt = last == '\\' && (current == '\n' || current == '\r') { + current = runes[r.Intn(len(runes))] + } + + line[j] = current + last = current + } + + if endline != NoEndline { + line[length] = endline + } + return line +} + +func generateTestCases(numOfLines, avgPrefixLength, avgKeyLength, avgValueLength int, endlinePolicy EndlinePolicy) []testCase { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + cases := make([]testCase, numOfLines) + for i := 0; i < numOfLines; i++ { + key := "" + value := []rune{} + if avgKeyLength != 0 { + key = apirand.String(generateLength(r, avgKeyLength)) + value = generateRandomRunes(r, avgValueLength, NoEndline) + } + + prefixWithEndLine := endlinePolicy == EndlineRequired + + cases[i] = testCase{ + prefixLength: generateLength(r, avgPrefixLength), + prefixWithEndline: prefixWithEndLine, + key: key, + value: value, + } + } + + return cases +} + +func getTestReaderCloser(done chan struct{}, cases []testCase) io.ReadCloser { + pr, pw := io.Pipe() + r := rand.New(rand.NewSource(time.Now().UnixNano())) + + go func() { + defer pw.Close() + + for _, tc := range cases { + select { + case <-done: + return + default: + if tc.prefixLength != 0 { + endline := NoEndline + if tc.prefixWithEndline { + endline = NewlineEndline + } + prefixLine := generateRandomRunes(r, tc.prefixLength, endline) + _, err := pw.Write([]byte(string(prefixLine))) + if err != nil { + return + } + } + + if tc.key != "" { + err := output.PrintOutputTo(pw, tc.key, string(tc.value)) + if err != nil { + return + } + } + } + } + }() + + return pr +} + +// TestLongStreamsWithoutPhaseOutput Will produce 10 long lines +// each line will contain from 50Kb to 60Kb of random text +// there will be no phase output in lines +func (s *OutputTestSuite) TestLongStreamsWithoutPhaseOutput(c *C) { + done := make(chan struct{}) + defer func() { close(done) }() + + cases := generateTestCases(10, 50000, 0, 0, EndlineRequired) + r := getTestReaderCloser(done, cases) + m, e := output.LogAndParse(context.TODO(), r) + c.Check(e, IsNil) + c.Check(len(m), Equals, 0) +} + +// TestShortStreamsWithPhaseOutput Will produce one short line +// which will contain ONLY phase output and nothing else +func (s *OutputTestSuite) TestShortStreamsWithPhaseOutput(c *C) { + done := make(chan struct{}) + defer func() { close(done) }() + + cases := generateTestCases(1, 0, 10, 50, EndlineRequired) + r := getTestReaderCloser(done, cases) + m, e := output.LogAndParse(context.TODO(), r) + c.Check(e, IsNil) + c.Check(len(m), Equals, 1) + c.Check(m[cases[0].key], Equals, string(cases[0].value)) +} + +// TestLongStreamsWithPhaseOutput Will produce 10 long lines +// each line will contain from 10Kb to 12Kb of random text and +// phase output preceded with newline +func (s *OutputTestSuite) TestLongStreamsWithPhaseOutput(c *C) { + done := make(chan struct{}) + defer func() { close(done) }() + + cases := generateTestCases(10, 10000, 10, 50, EndlineRequired) + r := getTestReaderCloser(done, cases) + m, e := output.LogAndParse(context.TODO(), r) + c.Check(e, IsNil) + c.Check(len(m), Equals, 10) + c.Check(m[cases[0].key], Equals, string(cases[0].value)) +} + +// TestHugeStreamsWithHugePhaseOutput Will produce five huge lines +// each line will contain ±100Kb of random text WITH newline before Phase Output mark +// Phase output value will be very short +func (s *OutputTestSuite) TestHugeStreamsWithPhaseOutput(c *C) { + done := make(chan struct{}) + defer func() { close(done) }() + + cases := generateTestCases(5, 100000, 10, 50, EndlineRequired) + r := getTestReaderCloser(done, cases) + m, e := output.LogAndParse(context.TODO(), r) + c.Check(e, IsNil) + c.Check(len(m), Equals, 5) + c.Check(m[cases[0].key], Equals, string(cases[0].value)) +} + +// TestHugeStreamsWithHugePhaseOutput Will produce five huge lines +// each line will contain ±500Kb of random text WITH newline before Phase Output mark +// Phase output value will be ±10Kb of random text +func (s *OutputTestSuite) TestHugeStreamsWithLongPhaseOutput(c *C) { + done := make(chan struct{}) + defer func() { close(done) }() + + cases := generateTestCases(5, 500000, 10, 10000, EndlineRequired) + r := getTestReaderCloser(done, cases) + m, e := output.LogAndParse(context.TODO(), r) + c.Check(e, IsNil) + c.Check(len(m), Equals, 5) + c.Check(m[cases[0].key], Equals, string(cases[0].value)) +} + +// TestHugeStreamsWithHugePhaseOutput Will produce one huge line +// which will contain ±500Kb of random text WITH newline before Phase Output mark +// Phase output value will also be ±500Kb +func (s *OutputTestSuite) TestHugeStreamsWithHugePhaseOutput(c *C) { + done := make(chan struct{}) + defer func() { close(done) }() + + cases := generateTestCases(1, 500000, 10, 500000, EndlineRequired) + r := getTestReaderCloser(done, cases) + m, e := output.LogAndParse(context.TODO(), r) + c.Check(e, IsNil) + c.Check(len(m), Equals, 1) + c.Check(m[cases[0].key], Equals, string(cases[0].value)) +} + +// TestHugeStreamsWithHugePhaseOutputWithoutNewlineDelimiter Will produce one huge line +// which will contain ±500Kb of random text WITHOUT newline before Phase Output mark +// Phase output value will also be ±500Kb +func (s *OutputTestSuite) TestHugeStreamsWithHugePhaseOutputWithoutNewlineDelimiter(c *C) { + done := make(chan struct{}) + defer func() { close(done) }() + + cases := generateTestCases(1, 500000, 10, 500000, EndlineProhibited) + r := getTestReaderCloser(done, cases) + m, e := output.LogAndParse(context.TODO(), r) + c.Check(e, IsNil) + c.Check(len(m), Equals, 1) + c.Check(m[cases[0].key], Equals, string(cases[0].value)) +}