From 2ddb227d357501f3c9630cba1ce6ceb0e6051bdc Mon Sep 17 00:00:00 2001 From: Daniil Fedotov Date: Wed, 31 Jan 2024 18:05:24 -0500 Subject: [PATCH 01/10] fix: switch to bufio.Reader for KubeTask output parsing bufio.Scanner has buffer limit and can fail the function if output is too big fixes #2612 --- pkg/function/kube_task_test.go | 61 ++++++++++++++++++++++++++++++++++ pkg/output/stream.go | 28 +++++++++++----- 2 files changed, 80 insertions(+), 9 deletions(-) diff --git a/pkg/function/kube_task_test.go b/pkg/function/kube_task_test.go index f45b0383c3..e9bb1c78f9 100644 --- a/pkg/function/kube_task_test.go +++ b/pkg/function/kube_task_test.go @@ -16,7 +16,9 @@ package function import ( "context" + "fmt" "os" + "strings" "time" . "gopkg.in/check.v1" @@ -61,6 +63,23 @@ func (s *KubeTaskSuite) TearDownSuite(c *C) { } } +func bigOutputPhase(namespace string) crv1alpha1.BlueprintPhase { + longstring := strings.Repeat("a", 100000) + return crv1alpha1.BlueprintPhase{ + Name: "testOutput", + Func: KubeTaskFuncName, + Args: map[string]interface{}{ + KubeTaskNamespaceArg: namespace, + KubeTaskImageArg: consts.LatestKanisterToolsImage, + KubeTaskCommandArg: []string{ + "sh", + "-c", + fmt.Sprintf("kando output longstring %s", longstring), + }, + }, + } +} + func outputPhase(namespace string) crv1alpha1.BlueprintPhase { return crv1alpha1.BlueprintPhase{ Name: "testOutput", @@ -161,3 +180,45 @@ func (s *KubeTaskSuite) TestKubeTask(c *C) { } } } + +func (s *KubeTaskSuite) TestKubeTaskWithBigOutput(c *C) { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() + tp := param.TemplateParams{ + StatefulSet: ¶m.StatefulSetParams{ + Namespace: s.namespace, + }, + PodOverride: crv1alpha1.JSONMap{ + "containers": []map[string]interface{}{ + { + "name": "container", + "imagePullPolicy": "Always", + }, + }, + }, + } + expectedOut := strings.Repeat("a", 100000) + action := "test" + for _, tc := range []struct { + bp *crv1alpha1.Blueprint + outs []map[string]interface{} + }{ + { + bp: newTaskBlueprint(bigOutputPhase(s.namespace)), + outs: []map[string]interface{}{ + { + "longstring": expectedOut, + }, + }, + }, + } { + phases, err := kanister.GetPhases(*tc.bp, action, kanister.DefaultVersion, tp) + c.Assert(err, IsNil) + c.Assert(phases, HasLen, len(tc.outs)) + for i, p := range phases { + out, err := p.Exec(ctx, *tc.bp, action, tp) + c.Assert(err, IsNil, Commentf("Phase %s failed", p.Name())) + c.Assert(out, DeepEquals, tc.outs[i]) + } + } +} diff --git a/pkg/output/stream.go b/pkg/output/stream.go index a06d8fa92d..3d3d5bcbe6 100644 --- a/pkg/output/stream.go +++ b/pkg/output/stream.go @@ -36,18 +36,28 @@ func splitLines(ctx context.Context, r io.ReadCloser, f func(context.Context, st }() // Scan log lines when ready. - s := bufio.NewScanner(r) - for s.Scan() { - l := s.Text() - l = strings.TrimSpace(l) - if l == "" { - continue + // Don't use bufio.Scanner because it breaks if lines are too long + reader := bufio.NewReader(r) + // ReadString returns error AND a line for last line + line, err := reader.ReadString('\n') + for { + line = strings.TrimSpace(line) + + if line != "" { + if err := f(ctx, line); err != nil { + return err + } } - if err := f(ctx, l); err != nil { - return err + if err != nil { + break } + line, err = reader.ReadString('\n') } - return errors.Wrap(s.Err(), "Split lines failed") + if err != io.EOF { + return errors.Wrap(err, "Split lines failed") + } + + return nil } func LogAndParse(ctx context.Context, r io.ReadCloser) (map[string]interface{}, error) { From 331e367797300d59e15dea5916dbe5bf7d7d9e5d Mon Sep 17 00:00:00 2001 From: Daniil Fedotov Date: Wed, 7 Feb 2024 16:15:14 -0500 Subject: [PATCH 02/10] WIP: scan kube task output in chunks --- pkg/function/kube_exec.go | 41 +++++--- pkg/function/kube_task_test.go | 3 +- pkg/output/output.go | 16 +-- pkg/output/output_test.go | 21 +++- pkg/output/stream.go | 176 ++++++++++++++++++++++++++++----- 5 files changed, 197 insertions(+), 60 deletions(-) diff --git a/pkg/function/kube_exec.go b/pkg/function/kube_exec.go index ab47058a0b..ee09d70afe 100644 --- a/pkg/function/kube_exec.go +++ b/pkg/function/kube_exec.go @@ -19,7 +19,7 @@ import ( "context" "io" "os" - "regexp" + "strings" "time" kanister "github.com/kanisterio/kanister/pkg" @@ -60,22 +60,31 @@ func parseLogAndCreateOutput(out string) (map[string]interface{}, error) { if out == "" { return nil, nil } - var op map[string]interface{} - logs := regexp.MustCompile("[\n]").Split(out, -1) - for _, l := range logs { - opObj, err := output.Parse(l) - if err != nil { - return nil, err - } - if opObj == nil { - continue - } - if op == nil { - op = make(map[string]interface{}) - } - op[opObj.Key] = opObj.Value + + // var op map[string]interface{} + reader := io.NopCloser(strings.NewReader(out)) + output, err := output.LogAndParse(context.Background(), reader) + + // For some reason we expect empty output to be returned as nil here + if len(output) == 0 { + return nil, err } - return op, nil + return output, err + // logs := regexp.MustCompile("[\n]").Split(out, -1) + // for _, l := range logs { + // opObj, err := output.Parse(l) + // if err != nil { + // return nil, err + // } + // if opObj == nil { + // continue + // } + // if op == nil { + // op = make(map[string]interface{}) + // } + // op[opObj.Key] = opObj.Value + // } + // return op, nil } func (kef *kubeExecFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) { diff --git a/pkg/function/kube_task_test.go b/pkg/function/kube_task_test.go index e9bb1c78f9..e6acab62de 100644 --- a/pkg/function/kube_task_test.go +++ b/pkg/function/kube_task_test.go @@ -74,7 +74,8 @@ func bigOutputPhase(namespace string) crv1alpha1.BlueprintPhase { KubeTaskCommandArg: []string{ "sh", "-c", - fmt.Sprintf("kando output longstring %s", longstring), + // We output a line for log only, and a line with output at the tail + fmt.Sprintf("echo -n %s > tmpfile; cat tmpfile; echo; cat tmpfile; kando output longstring $(cat tmpfile)", longstring), }, }, } diff --git a/pkg/output/output.go b/pkg/output/output.go index 8608df9c8b..da2e7a1db8 100644 --- a/pkg/output/output.go +++ b/pkg/output/output.go @@ -20,7 +20,6 @@ import ( "io" "os" "regexp" - "strings" "github.com/pkg/errors" ) @@ -47,7 +46,7 @@ func marshalOutput(key, value string) (string, error) { } // UnmarshalOutput unmarshals output json into Output struct -func UnmarshalOutput(opString string) (*Output, error) { +func UnmarshalOutput(opString []byte) (*Output, error) { p := &Output{} err := json.Unmarshal([]byte(opString), p) return p, errors.Wrap(err, "Failed to unmarshal key-value pair") @@ -89,16 +88,3 @@ func fPrintOutput(w io.Writer, key, value string) error { const reStr = PhaseOpString + `(.*)$` var logRE = regexp.MustCompile(reStr) - -func Parse(l string) (*Output, error) { - l = strings.TrimSpace(l) - match := logRE.FindAllStringSubmatch(l, 1) - if len(match) == 0 { - return nil, nil - } - op, err := UnmarshalOutput(match[0][1]) - if err != nil { - return nil, err - } - return op, nil -} diff --git a/pkg/output/output_test.go b/pkg/output/output_test.go index 0f37a13698..52413f26be 100644 --- a/pkg/output/output_test.go +++ b/pkg/output/output_test.go @@ -16,6 +16,9 @@ package output import ( "bytes" + "context" + "io" + "strings" "testing" . "gopkg.in/check.v1" @@ -45,13 +48,27 @@ func (s *OutputSuite) TestValidateKey(c *C) { } } +// FIXME: replace this with TestLogAndParse +func parse(str string) (*Output, error) { + reader := io.NopCloser(strings.NewReader(str)) + output, err := LogAndParse(context.Background(), reader) + var o *Output + for k, v := range output { + o = &Output{ + Key: k, + Value: v.(string), + } + } + return o, err +} + func (s *OutputSuite) TestParseValid(c *C) { key, val := "foo", "bar" b := bytes.NewBuffer(nil) err := fPrintOutput(b, key, val) c.Check(err, IsNil) - o, err := Parse(b.String()) + o, err := parse(b.String()) c.Assert(err, IsNil) c.Assert(o, NotNil) c.Assert(o.Key, Equals, key) @@ -77,7 +94,7 @@ func (s *OutputSuite) TestParseNoOutput(c *C) { checker: IsNil, }, } { - o, err := Parse(tc.s) + o, err := parse(tc.s) c.Assert(err, tc.checker) c.Assert(o, IsNil) } diff --git a/pkg/output/stream.go b/pkg/output/stream.go index 3d3d5bcbe6..69577bf494 100644 --- a/pkg/output/stream.go +++ b/pkg/output/stream.go @@ -16,17 +16,22 @@ package output import ( "bufio" + "bytes" "context" + "fmt" "io" - "strings" - - "github.com/pkg/errors" "github.com/kanisterio/kanister/pkg/field" "github.com/kanisterio/kanister/pkg/log" ) -func splitLines(ctx context.Context, r io.ReadCloser, f func(context.Context, string) error) error { +type scanState struct { + outputBuf []byte + readingOutput bool + separatorSuffix []byte +} + +func splitLines(ctx context.Context, r io.ReadCloser, f func(context.Context, []byte) error) error { // Call r.Close() if the context is canceled or if s.Scan() == false. ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -35,41 +40,160 @@ func splitLines(ctx context.Context, r io.ReadCloser, f func(context.Context, st _ = r.Close() }() - // Scan log lines when ready. - // Don't use bufio.Scanner because it breaks if lines are too long + state := InitState() + reader := bufio.NewReader(r) - // ReadString returns error AND a line for last line - line, err := reader.ReadString('\n') - for { - line = strings.TrimSpace(line) - if line != "" { - if err := f(ctx, line); err != nil { + // Run a simple state machine loop + for { + line, isPrefix, err := reader.ReadLine() + if err == io.EOF { + // Terminal state + return nil + } + if err != nil { + return err + } + if state.readingOutput { + if state, err = handleOutput(state, line, isPrefix, ctx, f); err != nil { return err } + } else { + if len(state.separatorSuffix) > 0 { + if state, err = handleSeparatorSuffix(state, line, isPrefix, ctx, f); err != nil { + return err + } + } else { + if state, err = handleLog(line, isPrefix, ctx, f); err != nil { + return err + } + } } - if err != nil { - break + } +} + +func InitState() scanState { + return scanState{ + outputBuf: []byte(nil), + readingOutput: false, + separatorSuffix: []byte(nil), + } +} + +func ReadPhaseOutputState(outputBuf []byte) scanState { + return scanState{ + outputBuf: outputBuf, + readingOutput: true, + separatorSuffix: []byte(nil), + } +} + +func CheckSeparatorSuffixState(separatorSuffix []byte) scanState { + return scanState{ + outputBuf: []byte(nil), + readingOutput: false, + separatorSuffix: separatorSuffix, + } +} + +func handleOutput(state scanState, line []byte, isPrefix bool, ctx context.Context, f func(context.Context, []byte) error) (scanState, error) { + if isPrefix { + // Accumulate phase output + return ReadPhaseOutputState(append(state.outputBuf, line...)), nil + } else { + // Reached the end of the line while reading phase output + outputContent := append(state.outputBuf, line...) + + if err := f(ctx, outputContent); err != nil { + return state, err } - line, err = reader.ReadString('\n') + + // Transition out of readingOutput state + return InitState(), nil } - if err != io.EOF { - return errors.Wrap(err, "Split lines failed") +} + +func handleSeparatorSuffix(state scanState, line []byte, isPrefix bool, ctx context.Context, f func(context.Context, []byte) error) (scanState, error) { + if bytes.Index(line, state.separatorSuffix) == 0 { + return captureOutputContent(line, isPrefix, len(state.separatorSuffix), ctx, f) + } else { + // Read log like normal + return handleLog(line, isPrefix, ctx, f) } +} + +func handleLog(line []byte, isPrefix bool, ctx context.Context, f func(context.Context, []byte) error) (scanState, error) { + indexOfPOString := bytes.Index(line, []byte(PhaseOpString)) + if indexOfPOString == -1 { + // Log plain output, there is no phase output here + logOutput(ctx, line) + + // There is a corner case possible when PhaseOpString is split between chunks + splitSeparator, separatorSuffix := checkSplitSeparator(line) + if splitSeparator != -1 { + // Transition to separatorSuffix state to check next line + return CheckSeparatorSuffixState(separatorSuffix), nil + } + + return InitState(), nil + } else { + // Log everything before separator as plain output + prefix := line[0:indexOfPOString] + logOutput(ctx, prefix) - return nil + return captureOutputContent(line, isPrefix, indexOfPOString+len(PhaseOpString), ctx, f) + } +} + +func captureOutputContent(line []byte, isPrefix bool, startIndex int, ctx context.Context, f func(context.Context, []byte) error) (scanState, error) { + outputContent := line[startIndex:] + if !isPrefix { + if err := f(ctx, outputContent); err != nil { + return InitState(), err + } + return InitState(), nil + } else { + return ReadPhaseOutputState(append([]byte(nil), outputContent...)), nil + } } +func checkSplitSeparator(line []byte) (splitSeparator int, separatorSuffix []byte) { + lineLength := len(line) + phaseOpBytes := []byte(PhaseOpString) + for i := 1; i < len(phaseOpBytes); i++ { + if bytes.Equal(line[lineLength-i:], phaseOpBytes[0:i]) { + return lineLength - i, phaseOpBytes[i:] + } + } + return -1, nil +} + +func logOutput(ctx context.Context, out []byte) { + log.WithContext(ctx).Print("", field.M{"Pod_Out": string(out)}) +} + +// State machine +// init state: ignore and log output, until we reach ###Phase-output### +// Read output state: accumulate output in buffer +// Transitions: +// init state -> output state: on reaching ###Phase-output###: create and start accumulating output buffer +// output state -> output state: DONT DO THAT YET +// output state -> init state : on reaching \n: parse output json from output buffer and clean the buffer + func LogAndParse(ctx context.Context, r io.ReadCloser) (map[string]interface{}, error) { out := make(map[string]interface{}) - err := splitLines(ctx, r, func(ctx context.Context, l string) error { - log.WithContext(ctx).Print("", field.M{"Pod_Out": l}) - o, err := Parse(l) - if err != nil { - return err - } - if o != nil { - out[o.Key] = o.Value + err := splitLines(ctx, r, func(ctx context.Context, outputContent []byte) error { + outputContent = bytes.TrimSpace(outputContent) + if len(outputContent) != 0 { + log.WithContext(ctx).Print("", field.M{"Pod_Out": string(outputContent)}) + op, err := UnmarshalOutput(outputContent) + if err != nil { + return err + } + fmt.Printf("\nParsed output: %v\n", op) + if op != nil { + out[op.Key] = op.Value + } } return nil }) From dde88dd920965cbc61a9466ff70359789eb1df88 Mon Sep 17 00:00:00 2001 From: Eugen Sumin Date: Sat, 10 Feb 2024 02:01:50 +0100 Subject: [PATCH 03/10] Bugfix - empty line dumped to log redundantly --- pkg/output/stream.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/output/stream.go b/pkg/output/stream.go index 69577bf494..be3c537dea 100644 --- a/pkg/output/stream.go +++ b/pkg/output/stream.go @@ -139,7 +139,9 @@ func handleLog(line []byte, isPrefix bool, ctx context.Context, f func(context.C } else { // Log everything before separator as plain output prefix := line[0:indexOfPOString] - logOutput(ctx, prefix) + if len(prefix) > 0 { + logOutput(ctx, prefix) + } return captureOutputContent(line, isPrefix, indexOfPOString+len(PhaseOpString), ctx, f) } From 8c15e434255203dd2e5072903919a85e6ffc28d4 Mon Sep 17 00:00:00 2001 From: Eugen Sumin Date: Sat, 10 Feb 2024 02:02:09 +0100 Subject: [PATCH 04/10] Add unit tests draft --- pkg/output/stream_test.go | 168 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 168 insertions(+) create mode 100644 pkg/output/stream_test.go diff --git a/pkg/output/stream_test.go b/pkg/output/stream_test.go new file mode 100644 index 0000000000..f9f5d5aa77 --- /dev/null +++ b/pkg/output/stream_test.go @@ -0,0 +1,168 @@ +// Copyright 2024 The Kanister Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package output_test + +import ( + "context" + "io" + "math/rand" + "testing" + "time" + + "github.com/kanisterio/kanister/pkg/output" + . "gopkg.in/check.v1" + apirand "k8s.io/apimachinery/pkg/util/rand" +) + +type EndlinePolicy int + +const ( + NewlineEndline = '\n' + NoEndline = rune(0) + + EndlineRequired EndlinePolicy = iota + EndlineRandom +) + +// Hook up gocheck into the "go test" runner. +func Test(t *testing.T) { TestingT(t) } + +type OutputTestSuite struct{} + +var _ = Suite(&OutputTestSuite{}) + +type testCase struct { + prefixLength int + prefixWithEndline bool + key string + value []rune +} + +func generateLength(r *rand.Rand, avgLength int) int { + if avgLength == 0 { + return 0 + } + return r.Intn(avgLength/5) + avgLength // Return random length ±20% of avgLength +} + +var runes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_!=-+*/\\") + +func generateRandomRunes(r *rand.Rand, length int, endline rune) []rune { + totalLength := length + if endline != NoEndline { + totalLength += 1 + } + line := make([]rune, totalLength) + var last rune + for j := 0; j < length; j++ { + var current rune + for rpt := true; rpt; rpt = last == '\\' && (current == '\n' || current == '\r') { + current = runes[r.Intn(len(runes))] + } + + line[j] = current + last = current + } + + if endline != NoEndline { + line[length] = endline + } + return line +} + +func generateTestCases(numOfLines, avgPrefixLength, avgKeyLength, avgValueLength int, endlinePolicy EndlinePolicy) []testCase { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + cases := make([]testCase, numOfLines) + for i := 0; i < numOfLines; i++ { + key := "" + value := []rune{} + if avgKeyLength != 0 { + key = apirand.String(generateLength(r, avgKeyLength)) + value = generateRandomRunes(r, avgValueLength, NoEndline) + } + cases[i] = testCase{ + prefixLength: generateLength(r, avgPrefixLength), + prefixWithEndline: endlinePolicy == EndlineRequired || rand.Intn(2) == 1, + key: key, + value: value, + } + } + + return cases +} + +func getTestReaderCloser(done chan struct{}, cases []testCase) io.ReadCloser { + pr, pw := io.Pipe() + r := rand.New(rand.NewSource(time.Now().UnixNano())) + + go func() { + defer pw.Close() + + for _, tc := range cases { + select { + case <-done: + return + default: + if tc.prefixLength != 0 { + endline := NoEndline + if tc.prefixWithEndline { + endline = NewlineEndline + } + prefixLine := generateRandomRunes(r, tc.prefixLength, endline) + _, err := pw.Write([]byte(string(prefixLine))) + if err != nil { + return + } + } + + if tc.key != "" { + err := output.PrintOutputTo(pw, tc.key, string(tc.value)) + if err != nil { + return + } + } + } + } + }() + + return pr +} + +func (s *OutputTestSuite) TestHugeStreamsWithoutPhaseOutput(c *C) { + done := make(chan struct{}) + defer func() { close(done) }() + + // e-sumin: Here I'm generating test case, when we have just random string around 10000 runes + // I expect that it will be logged as one line as it was before + // But in fact it is logged by chunks of ±4kb + // When we will fix the code behavior, numOfLines has to be set to 10, and avgPrefix len has to be set to 500000 + cases := generateTestCases(1, 10000, 0, 0, EndlineRequired) + r := getTestReaderCloser(done, cases) + m, e := output.LogAndParse(context.TODO(), r) + c.Check(e, IsNil) + c.Check(len(m), Equals, 0) +} + +func (s *OutputTestSuite) TestShortStreamsWithPhaseOutput(c *C) { + done := make(chan struct{}) + defer func() { close(done) }() + + cases := generateTestCases(1, 0, 10, 50, EndlineRequired) + r := getTestReaderCloser(done, cases) + m, e := output.LogAndParse(context.TODO(), r) + c.Check(e, IsNil) + c.Check(len(m), Equals, 1) + c.Check(m[cases[0].key], Equals, string(cases[0].value)) +} From dbdb9851fb74d0b0b360eb73625997bbe1fd38a8 Mon Sep 17 00:00:00 2001 From: Daniil Fedotov Date: Mon, 12 Feb 2024 14:21:03 -0500 Subject: [PATCH 05/10] fix: set read buffer size to 64kb to keep existing behaviour with shorter lines Fix some out of bounds errors Don't run the tests twice --- pkg/output/output_test.go | 4 ---- pkg/output/stream.go | 11 +++++++++-- pkg/output/stream_test.go | 6 +----- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/pkg/output/output_test.go b/pkg/output/output_test.go index 52413f26be..2c507a70af 100644 --- a/pkg/output/output_test.go +++ b/pkg/output/output_test.go @@ -19,14 +19,10 @@ import ( "context" "io" "strings" - "testing" . "gopkg.in/check.v1" ) -// Hook up gocheck into the "go test" runner. -func Test(t *testing.T) { TestingT(t) } - type OutputSuite struct{} var _ = Suite(&OutputSuite{}) diff --git a/pkg/output/stream.go b/pkg/output/stream.go index be3c537dea..e8bfe740f0 100644 --- a/pkg/output/stream.go +++ b/pkg/output/stream.go @@ -31,6 +31,8 @@ type scanState struct { separatorSuffix []byte } +const bufferSize64k = 64 * 1024 + func splitLines(ctx context.Context, r io.ReadCloser, f func(context.Context, []byte) error) error { // Call r.Close() if the context is canceled or if s.Scan() == false. ctx, cancel := context.WithCancel(ctx) @@ -42,7 +44,8 @@ func splitLines(ctx context.Context, r io.ReadCloser, f func(context.Context, [] state := InitState() - reader := bufio.NewReader(r) + reader := bufio.NewReaderSize(r, bufferSize64k) + // reader := bufio.NewReader(r) // Run a simple state machine loop for { @@ -54,6 +57,10 @@ func splitLines(ctx context.Context, r io.ReadCloser, f func(context.Context, [] if err != nil { return err } + // Skip empty lines + if len(line) == 0 { + continue + } if state.readingOutput { if state, err = handleOutput(state, line, isPrefix, ctx, f); err != nil { return err @@ -162,7 +169,7 @@ func captureOutputContent(line []byte, isPrefix bool, startIndex int, ctx contex func checkSplitSeparator(line []byte) (splitSeparator int, separatorSuffix []byte) { lineLength := len(line) phaseOpBytes := []byte(PhaseOpString) - for i := 1; i < len(phaseOpBytes); i++ { + for i := 1; i < len(phaseOpBytes) && i <= lineLength; i++ { if bytes.Equal(line[lineLength-i:], phaseOpBytes[0:i]) { return lineLength - i, phaseOpBytes[i:] } diff --git a/pkg/output/stream_test.go b/pkg/output/stream_test.go index f9f5d5aa77..b14a83a58a 100644 --- a/pkg/output/stream_test.go +++ b/pkg/output/stream_test.go @@ -18,7 +18,6 @@ import ( "context" "io" "math/rand" - "testing" "time" "github.com/kanisterio/kanister/pkg/output" @@ -36,9 +35,6 @@ const ( EndlineRandom ) -// Hook up gocheck into the "go test" runner. -func Test(t *testing.T) { TestingT(t) } - type OutputTestSuite struct{} var _ = Suite(&OutputTestSuite{}) @@ -148,7 +144,7 @@ func (s *OutputTestSuite) TestHugeStreamsWithoutPhaseOutput(c *C) { // I expect that it will be logged as one line as it was before // But in fact it is logged by chunks of ±4kb // When we will fix the code behavior, numOfLines has to be set to 10, and avgPrefix len has to be set to 500000 - cases := generateTestCases(1, 10000, 0, 0, EndlineRequired) + cases := generateTestCases(10, 50000, 0, 0, EndlineRequired) r := getTestReaderCloser(done, cases) m, e := output.LogAndParse(context.TODO(), r) c.Check(e, IsNil) From 76c51eb9d459bcd33947e7ac00680d551d7cd615 Mon Sep 17 00:00:00 2001 From: Daniil Fedotov Date: Mon, 12 Feb 2024 14:27:08 -0500 Subject: [PATCH 06/10] test: support filters in `make test` command --- BUILD.md | 8 ++++++++ Makefile | 2 +- build/test.sh | 9 ++++++++- 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/BUILD.md b/BUILD.md index 6bcef3db5a..7e2cf11436 100644 --- a/BUILD.md +++ b/BUILD.md @@ -114,6 +114,14 @@ Kanister is using `check` library to extend go testing capabilities: https://git It's recommended to write new tests using this library for consistency. `make test` runs all tests in the repository. +It's possible to run a specific test with `TEST_FILTER` environment variable: + +``` +make tests TEST_FILTER=OutputTestSuite +``` + +This variable will be passed to `-check.f` flag and supports regex filters. + To run tests for specific package you can run `go test` in that package directory. It's recommended to do that in build image shell, you can run it with `make shell`. diff --git a/Makefile b/Makefile index 3972f55530..5c732bb09a 100644 --- a/Makefile +++ b/Makefile @@ -155,7 +155,7 @@ deploy: release-controller .deploy-$(DOTFILE_IMAGE) @kubectl apply -f .deploy-$(DOTFILE_IMAGE) test: build-dirs - @$(MAKE) run CMD="./build/test.sh $(SRC_DIRS)" + @$(MAKE) run CMD="TEST_FILTER=$(TEST_FILTER) ./build/test.sh $(SRC_DIRS)" helm-test: build-dirs @$(MAKE) run CMD="./build/helm-test.sh $(SRC_DIRS)" diff --git a/build/test.sh b/build/test.sh index 0cf570c8ee..58410d46b4 100755 --- a/build/test.sh +++ b/build/test.sh @@ -24,6 +24,13 @@ SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )" export CGO_ENABLED=0 export GO111MODULE=on +TEST_FILTER="${TEST_FILTER:-}" +GOCHECK_FILTER="" +if [ -n "${TEST_FILTER}" ]; then + echo "Using test filter ${TEST_FILTER}" + GOCHECK_FILTER="-check.f ${TEST_FILTER}" +fi + TARGETS=$(for d in "$@"; do echo ./$d/...; done) echo -n "Checking gofmt: " @@ -79,7 +86,7 @@ check_dependencies echo "Running tests:" go test -v ${TARGETS} -list . -go test -v -installsuffix "static" ${TARGETS} -check.v +go test -v -installsuffix "static" ${TARGETS} -check.v ${GOCHECK_FILTER} echo echo "PASS" From e24c134ec427481831396cdff4ecb242290e4073 Mon Sep 17 00:00:00 2001 From: Daniil Fedotov Date: Mon, 12 Feb 2024 16:15:16 -0500 Subject: [PATCH 07/10] test: test and lint issues --- pkg/output/output.go | 4 ---- pkg/output/output_test.go | 4 ++++ pkg/output/stream.go | 22 +++++++++++----------- pkg/output/stream_test.go | 24 ++++++++++++++++++++++++ 4 files changed, 39 insertions(+), 15 deletions(-) diff --git a/pkg/output/output.go b/pkg/output/output.go index da2e7a1db8..12c14fbe22 100644 --- a/pkg/output/output.go +++ b/pkg/output/output.go @@ -84,7 +84,3 @@ func fPrintOutput(w io.Writer, key, value string) error { fmt.Fprintln(w, PhaseOpString, outString) return nil } - -const reStr = PhaseOpString + `(.*)$` - -var logRE = regexp.MustCompile(reStr) diff --git a/pkg/output/output_test.go b/pkg/output/output_test.go index 2c507a70af..52413f26be 100644 --- a/pkg/output/output_test.go +++ b/pkg/output/output_test.go @@ -19,10 +19,14 @@ import ( "context" "io" "strings" + "testing" . "gopkg.in/check.v1" ) +// Hook up gocheck into the "go test" runner. +func Test(t *testing.T) { TestingT(t) } + type OutputSuite struct{} var _ = Suite(&OutputSuite{}) diff --git a/pkg/output/stream.go b/pkg/output/stream.go index e8bfe740f0..841b827329 100644 --- a/pkg/output/stream.go +++ b/pkg/output/stream.go @@ -61,19 +61,18 @@ func splitLines(ctx context.Context, r io.ReadCloser, f func(context.Context, [] if len(line) == 0 { continue } - if state.readingOutput { + switch { + case state.readingOutput: if state, err = handleOutput(state, line, isPrefix, ctx, f); err != nil { return err } - } else { - if len(state.separatorSuffix) > 0 { - if state, err = handleSeparatorSuffix(state, line, isPrefix, ctx, f); err != nil { - return err - } - } else { - if state, err = handleLog(line, isPrefix, ctx, f); err != nil { - return err - } + case len(state.separatorSuffix) > 0: + if state, err = handleSeparatorSuffix(state, line, isPrefix, ctx, f); err != nil { + return err + } + default: + if state, err = handleLog(line, isPrefix, ctx, f); err != nil { + return err } } } @@ -109,7 +108,8 @@ func handleOutput(state scanState, line []byte, isPrefix bool, ctx context.Conte return ReadPhaseOutputState(append(state.outputBuf, line...)), nil } else { // Reached the end of the line while reading phase output - outputContent := append(state.outputBuf, line...) + outputContent := state.outputBuf + outputContent = append(outputContent, line...) if err := f(ctx, outputContent); err != nil { return state, err diff --git a/pkg/output/stream_test.go b/pkg/output/stream_test.go index b14a83a58a..0e69c288d9 100644 --- a/pkg/output/stream_test.go +++ b/pkg/output/stream_test.go @@ -162,3 +162,27 @@ func (s *OutputTestSuite) TestShortStreamsWithPhaseOutput(c *C) { c.Check(len(m), Equals, 1) c.Check(m[cases[0].key], Equals, string(cases[0].value)) } + +func (s *OutputTestSuite) TestLongStreamsWithPhaseOutput(c *C) { + done := make(chan struct{}) + defer func() { close(done) }() + + cases := generateTestCases(10, 10000, 10, 50, EndlineRequired) + r := getTestReaderCloser(done, cases) + m, e := output.LogAndParse(context.TODO(), r) + c.Check(e, IsNil) + c.Check(len(m), Equals, 10) + c.Check(m[cases[0].key], Equals, string(cases[0].value)) +} + +func (s *OutputTestSuite) TestHugeStreamsWithPhaseOutput(c *C) { + done := make(chan struct{}) + defer func() { close(done) }() + + cases := generateTestCases(5, 100000, 10, 50, EndlineRequired) + r := getTestReaderCloser(done, cases) + m, e := output.LogAndParse(context.TODO(), r) + c.Check(e, IsNil) + c.Check(len(m), Equals, 5) + c.Check(m[cases[0].key], Equals, string(cases[0].value)) +} From 961bc55ebaa16d539f03972450561f7491bdd4ed Mon Sep 17 00:00:00 2001 From: Eugen Sumin Date: Tue, 13 Feb 2024 02:27:31 +0100 Subject: [PATCH 08/10] Couple additional tests --- pkg/output/stream_test.go | 69 +++++++++++++++++++++++++++++++++++---- 1 file changed, 62 insertions(+), 7 deletions(-) diff --git a/pkg/output/stream_test.go b/pkg/output/stream_test.go index 0e69c288d9..75cd713379 100644 --- a/pkg/output/stream_test.go +++ b/pkg/output/stream_test.go @@ -32,7 +32,7 @@ const ( NoEndline = rune(0) EndlineRequired EndlinePolicy = iota - EndlineRandom + EndlineProhibited ) type OutputTestSuite struct{} @@ -88,9 +88,12 @@ func generateTestCases(numOfLines, avgPrefixLength, avgKeyLength, avgValueLength key = apirand.String(generateLength(r, avgKeyLength)) value = generateRandomRunes(r, avgValueLength, NoEndline) } + + prefixWithEndLine := endlinePolicy == EndlineRequired + cases[i] = testCase{ prefixLength: generateLength(r, avgPrefixLength), - prefixWithEndline: endlinePolicy == EndlineRequired || rand.Intn(2) == 1, + prefixWithEndline: prefixWithEndLine, key: key, value: value, } @@ -136,14 +139,13 @@ func getTestReaderCloser(done chan struct{}, cases []testCase) io.ReadCloser { return pr } -func (s *OutputTestSuite) TestHugeStreamsWithoutPhaseOutput(c *C) { +// TestLongStreamsWithoutPhaseOutput Will produce 10 long lines +// each line will contain from 50Kb to 60Kb of random text +// there will be no phase output in lines +func (s *OutputTestSuite) TestLongStreamsWithoutPhaseOutput(c *C) { done := make(chan struct{}) defer func() { close(done) }() - // e-sumin: Here I'm generating test case, when we have just random string around 10000 runes - // I expect that it will be logged as one line as it was before - // But in fact it is logged by chunks of ±4kb - // When we will fix the code behavior, numOfLines has to be set to 10, and avgPrefix len has to be set to 500000 cases := generateTestCases(10, 50000, 0, 0, EndlineRequired) r := getTestReaderCloser(done, cases) m, e := output.LogAndParse(context.TODO(), r) @@ -151,6 +153,8 @@ func (s *OutputTestSuite) TestHugeStreamsWithoutPhaseOutput(c *C) { c.Check(len(m), Equals, 0) } +// TestShortStreamsWithPhaseOutput Will produce one short line +// which will contain ONLY phase output and nothing else func (s *OutputTestSuite) TestShortStreamsWithPhaseOutput(c *C) { done := make(chan struct{}) defer func() { close(done) }() @@ -163,6 +167,9 @@ func (s *OutputTestSuite) TestShortStreamsWithPhaseOutput(c *C) { c.Check(m[cases[0].key], Equals, string(cases[0].value)) } +// TestLongStreamsWithPhaseOutput Will produce 10 long lines +// each line will contain from 10Kb to 12Kb of random text and +// phase output preceded with newline func (s *OutputTestSuite) TestLongStreamsWithPhaseOutput(c *C) { done := make(chan struct{}) defer func() { close(done) }() @@ -175,6 +182,9 @@ func (s *OutputTestSuite) TestLongStreamsWithPhaseOutput(c *C) { c.Check(m[cases[0].key], Equals, string(cases[0].value)) } +// TestHugeStreamsWithHugePhaseOutput Will produce five huge lines +// each line will contain ±100Kb of random text WITH newline before Phase Output mark +// Phase output value will be very short func (s *OutputTestSuite) TestHugeStreamsWithPhaseOutput(c *C) { done := make(chan struct{}) defer func() { close(done) }() @@ -186,3 +196,48 @@ func (s *OutputTestSuite) TestHugeStreamsWithPhaseOutput(c *C) { c.Check(len(m), Equals, 5) c.Check(m[cases[0].key], Equals, string(cases[0].value)) } + +// TestHugeStreamsWithHugePhaseOutput Will produce five huge lines +// each line will contain ±500Kb of random text WITH newline before Phase Output mark +// Phase output value will be ±10Kb of random text +func (s *OutputTestSuite) TestHugeStreamsWithLongPhaseOutput(c *C) { + done := make(chan struct{}) + defer func() { close(done) }() + + cases := generateTestCases(5, 500000, 10, 10000, EndlineRequired) + r := getTestReaderCloser(done, cases) + m, e := output.LogAndParse(context.TODO(), r) + c.Check(e, IsNil) + c.Check(len(m), Equals, 5) + c.Check(m[cases[0].key], Equals, string(cases[0].value)) +} + +// TestHugeStreamsWithHugePhaseOutput Will produce one huge line +// which will contain ±500Kb of random text WITH newline before Phase Output mark +// Phase output value will also be ±500Kb +func (s *OutputTestSuite) TestHugeStreamsWithHugePhaseOutput(c *C) { + done := make(chan struct{}) + defer func() { close(done) }() + + cases := generateTestCases(1, 500000, 10, 500000, EndlineRequired) + r := getTestReaderCloser(done, cases) + m, e := output.LogAndParse(context.TODO(), r) + c.Check(e, IsNil) + c.Check(len(m), Equals, 1) + c.Check(m[cases[0].key], Equals, string(cases[0].value)) +} + +// TestHugeStreamsWithHugePhaseOutputWithoutNewlineDelimiter Will produce one huge line +// which will contain ±500Kb of random text WITHOUT newline before Phase Output mark +// Phase output value will also be ±500Kb +func (s *OutputTestSuite) TestHugeStreamsWithHugePhaseOutputWithoutNewlineDelimiter(c *C) { + done := make(chan struct{}) + defer func() { close(done) }() + + cases := generateTestCases(1, 500000, 10, 500000, EndlineProhibited) + r := getTestReaderCloser(done, cases) + m, e := output.LogAndParse(context.TODO(), r) + c.Check(e, IsNil) + c.Check(len(m), Equals, 1) + c.Check(m[cases[0].key], Equals, string(cases[0].value)) +} From 33cb34294e3a7bb1abb087ba73e068399f081889 Mon Sep 17 00:00:00 2001 From: Eugen Sumin Date: Tue, 13 Feb 2024 02:41:10 +0100 Subject: [PATCH 09/10] Remove commented code --- pkg/function/kube_exec.go | 16 ---------------- pkg/output/stream.go | 1 - 2 files changed, 17 deletions(-) diff --git a/pkg/function/kube_exec.go b/pkg/function/kube_exec.go index ee09d70afe..7c8bcfef41 100644 --- a/pkg/function/kube_exec.go +++ b/pkg/function/kube_exec.go @@ -61,7 +61,6 @@ func parseLogAndCreateOutput(out string) (map[string]interface{}, error) { return nil, nil } - // var op map[string]interface{} reader := io.NopCloser(strings.NewReader(out)) output, err := output.LogAndParse(context.Background(), reader) @@ -70,21 +69,6 @@ func parseLogAndCreateOutput(out string) (map[string]interface{}, error) { return nil, err } return output, err - // logs := regexp.MustCompile("[\n]").Split(out, -1) - // for _, l := range logs { - // opObj, err := output.Parse(l) - // if err != nil { - // return nil, err - // } - // if opObj == nil { - // continue - // } - // if op == nil { - // op = make(map[string]interface{}) - // } - // op[opObj.Key] = opObj.Value - // } - // return op, nil } func (kef *kubeExecFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) { diff --git a/pkg/output/stream.go b/pkg/output/stream.go index 841b827329..a57145e3b0 100644 --- a/pkg/output/stream.go +++ b/pkg/output/stream.go @@ -45,7 +45,6 @@ func splitLines(ctx context.Context, r io.ReadCloser, f func(context.Context, [] state := InitState() reader := bufio.NewReaderSize(r, bufferSize64k) - // reader := bufio.NewReader(r) // Run a simple state machine loop for { From 8a8aa99d5d4cb47de1f4ff8ffa5b0413844bbeda Mon Sep 17 00:00:00 2001 From: Daniil Fedotov Date: Wed, 14 Feb 2024 13:54:48 -0500 Subject: [PATCH 10/10] code style fixes --- pkg/output/output_test.go | 56 ------------------------------ pkg/output/stream.go | 73 +++++++++++++++++++++++---------------- 2 files changed, 43 insertions(+), 86 deletions(-) diff --git a/pkg/output/output_test.go b/pkg/output/output_test.go index 52413f26be..a63e681030 100644 --- a/pkg/output/output_test.go +++ b/pkg/output/output_test.go @@ -15,10 +15,6 @@ package output import ( - "bytes" - "context" - "io" - "strings" "testing" . "gopkg.in/check.v1" @@ -47,55 +43,3 @@ func (s *OutputSuite) TestValidateKey(c *C) { c.Check(err, tc.checker, Commentf("Key (%s) failed!", tc.key)) } } - -// FIXME: replace this with TestLogAndParse -func parse(str string) (*Output, error) { - reader := io.NopCloser(strings.NewReader(str)) - output, err := LogAndParse(context.Background(), reader) - var o *Output - for k, v := range output { - o = &Output{ - Key: k, - Value: v.(string), - } - } - return o, err -} - -func (s *OutputSuite) TestParseValid(c *C) { - key, val := "foo", "bar" - b := bytes.NewBuffer(nil) - err := fPrintOutput(b, key, val) - c.Check(err, IsNil) - - o, err := parse(b.String()) - c.Assert(err, IsNil) - c.Assert(o, NotNil) - c.Assert(o.Key, Equals, key) - c.Assert(o.Value, Equals, val) -} - -func (s *OutputSuite) TestParseNoOutput(c *C) { - key, val := "foo", "bar" - b := bytes.NewBuffer(nil) - err := fPrintOutput(b, key, val) - c.Check(err, IsNil) - valid := b.String() - for _, tc := range []struct { - s string - checker Checker - }{ - { - s: valid[0 : len(valid)-2], - checker: NotNil, - }, - { - s: valid[1 : len(valid)-1], - checker: IsNil, - }, - } { - o, err := parse(tc.s) - c.Assert(err, tc.checker) - c.Assert(o, IsNil) - } -} diff --git a/pkg/output/stream.go b/pkg/output/stream.go index a57145e3b0..7e15b9ff16 100644 --- a/pkg/output/stream.go +++ b/pkg/output/stream.go @@ -18,7 +18,6 @@ import ( "bufio" "bytes" "context" - "fmt" "io" "github.com/kanisterio/kanister/pkg/field" @@ -101,34 +100,45 @@ func CheckSeparatorSuffixState(separatorSuffix []byte) scanState { } } -func handleOutput(state scanState, line []byte, isPrefix bool, ctx context.Context, f func(context.Context, []byte) error) (scanState, error) { +func handleOutput( + state scanState, + line []byte, + isPrefix bool, + ctx context.Context, + f func(context.Context, []byte) error, +) (scanState, error) { if isPrefix { // Accumulate phase output return ReadPhaseOutputState(append(state.outputBuf, line...)), nil - } else { - // Reached the end of the line while reading phase output - outputContent := state.outputBuf - outputContent = append(outputContent, line...) - - if err := f(ctx, outputContent); err != nil { - return state, err - } - - // Transition out of readingOutput state - return InitState(), nil } + // Reached the end of the line while reading phase output + if err := f(ctx, append(state.outputBuf, line...)); err != nil { + return state, err + } + // Transition out of readingOutput state + return InitState(), nil } -func handleSeparatorSuffix(state scanState, line []byte, isPrefix bool, ctx context.Context, f func(context.Context, []byte) error) (scanState, error) { +func handleSeparatorSuffix( + state scanState, + line []byte, + isPrefix bool, + ctx context.Context, + f func(context.Context, []byte) error, +) (scanState, error) { if bytes.Index(line, state.separatorSuffix) == 0 { return captureOutputContent(line, isPrefix, len(state.separatorSuffix), ctx, f) - } else { - // Read log like normal - return handleLog(line, isPrefix, ctx, f) } + // Read log like normal + return handleLog(line, isPrefix, ctx, f) } -func handleLog(line []byte, isPrefix bool, ctx context.Context, f func(context.Context, []byte) error) (scanState, error) { +func handleLog( + line []byte, + isPrefix bool, + ctx context.Context, + f func(context.Context, []byte) error, +) (scanState, error) { indexOfPOString := bytes.Index(line, []byte(PhaseOpString)) if indexOfPOString == -1 { // Log plain output, there is no phase output here @@ -142,27 +152,31 @@ func handleLog(line []byte, isPrefix bool, ctx context.Context, f func(context.C } return InitState(), nil - } else { - // Log everything before separator as plain output - prefix := line[0:indexOfPOString] - if len(prefix) > 0 { - logOutput(ctx, prefix) - } - - return captureOutputContent(line, isPrefix, indexOfPOString+len(PhaseOpString), ctx, f) } + // Log everything before separator as plain output + prefix := line[0:indexOfPOString] + if len(prefix) > 0 { + logOutput(ctx, prefix) + } + + return captureOutputContent(line, isPrefix, indexOfPOString+len(PhaseOpString), ctx, f) } -func captureOutputContent(line []byte, isPrefix bool, startIndex int, ctx context.Context, f func(context.Context, []byte) error) (scanState, error) { +func captureOutputContent( + line []byte, + isPrefix bool, + startIndex int, + ctx context.Context, + f func(context.Context, []byte) error, +) (scanState, error) { outputContent := line[startIndex:] if !isPrefix { if err := f(ctx, outputContent); err != nil { return InitState(), err } return InitState(), nil - } else { - return ReadPhaseOutputState(append([]byte(nil), outputContent...)), nil } + return ReadPhaseOutputState(append([]byte(nil), outputContent...)), nil } func checkSplitSeparator(line []byte) (splitSeparator int, separatorSuffix []byte) { @@ -198,7 +212,6 @@ func LogAndParse(ctx context.Context, r io.ReadCloser) (map[string]interface{}, if err != nil { return err } - fmt.Printf("\nParsed output: %v\n", op) if op != nil { out[op.Key] = op.Value }