Skip to content

Commit

Permalink
fix: switch to bufio.Reader for KubeTask output parsing (#2641)
Browse files Browse the repository at this point in the history
* fix: switch to bufio.Reader for KubeTask output parsing

bufio.Scanner has buffer limit and can fail the function if output is too big
fixes #2612

* WIP: scan kube task output in chunks

* Bugfix - empty line dumped to log redundantly

* Add unit tests draft

* fix: set read buffer size to 64kb to keep existing behaviour with shorter lines

Fix some out of bounds errors
Don't run the tests twice

* test: support filters in `make test` command

* test: test and lint issues

* Couple additional tests

* Remove commented code

* code style fixes

---------

Co-authored-by: Eugen Sumin <[email protected]>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Feb 15, 2024
1 parent a641098 commit 1ae84d4
Show file tree
Hide file tree
Showing 9 changed files with 508 additions and 97 deletions.
8 changes: 8 additions & 0 deletions BUILD.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ Kanister is using `check` library to extend go testing capabilities: https://git
It's recommended to write new tests using this library for consistency.

`make test` runs all tests in the repository.
It's possible to run a specific test with `TEST_FILTER` environment variable:

```
make tests TEST_FILTER=OutputTestSuite
```

This variable will be passed to `-check.f` flag and supports regex filters.

To run tests for specific package you can run `go test` in that package directory.
It's recommended to do that in build image shell, you can run it with `make shell`.

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ deploy: release-controller .deploy-$(DOTFILE_IMAGE)
@kubectl apply -f .deploy-$(DOTFILE_IMAGE)

test: build-dirs
@$(MAKE) run CMD="./build/test.sh $(SRC_DIRS)"
@$(MAKE) run CMD="TEST_FILTER=$(TEST_FILTER) ./build/test.sh $(SRC_DIRS)"

helm-test: build-dirs
@$(MAKE) run CMD="./build/helm-test.sh $(SRC_DIRS)"
Expand Down
9 changes: 8 additions & 1 deletion build/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )"
export CGO_ENABLED=0
export GO111MODULE=on

TEST_FILTER="${TEST_FILTER:-}"
GOCHECK_FILTER=""
if [ -n "${TEST_FILTER}" ]; then
echo "Using test filter ${TEST_FILTER}"
GOCHECK_FILTER="-check.f ${TEST_FILTER}"
fi

TARGETS=$(for d in "$@"; do echo ./$d/...; done)

echo -n "Checking gofmt: "
Expand Down Expand Up @@ -79,7 +86,7 @@ check_dependencies

echo "Running tests:"
go test -v ${TARGETS} -list .
go test -v -installsuffix "static" ${TARGETS} -check.v
go test -v -installsuffix "static" ${TARGETS} -check.v ${GOCHECK_FILTER}
echo

echo "PASS"
Expand Down
25 changes: 9 additions & 16 deletions pkg/function/kube_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"context"
"io"
"os"
"regexp"
"strings"
"time"

kanister "github.com/kanisterio/kanister/pkg"
Expand Down Expand Up @@ -60,22 +60,15 @@ func parseLogAndCreateOutput(out string) (map[string]interface{}, error) {
if out == "" {
return nil, nil
}
var op map[string]interface{}
logs := regexp.MustCompile("[\n]").Split(out, -1)
for _, l := range logs {
opObj, err := output.Parse(l)
if err != nil {
return nil, err
}
if opObj == nil {
continue
}
if op == nil {
op = make(map[string]interface{})
}
op[opObj.Key] = opObj.Value

reader := io.NopCloser(strings.NewReader(out))
output, err := output.LogAndParse(context.Background(), reader)

// For some reason we expect empty output to be returned as nil here
if len(output) == 0 {
return nil, err
}
return op, nil
return output, err
}

func (kef *kubeExecFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
Expand Down
62 changes: 62 additions & 0 deletions pkg/function/kube_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package function

import (
"context"
"fmt"
"os"
"strings"
"time"

. "gopkg.in/check.v1"
Expand Down Expand Up @@ -61,6 +63,24 @@ func (s *KubeTaskSuite) TearDownSuite(c *C) {
}
}

func bigOutputPhase(namespace string) crv1alpha1.BlueprintPhase {
longstring := strings.Repeat("a", 100000)
return crv1alpha1.BlueprintPhase{
Name: "testOutput",
Func: KubeTaskFuncName,
Args: map[string]interface{}{
KubeTaskNamespaceArg: namespace,
KubeTaskImageArg: consts.LatestKanisterToolsImage,
KubeTaskCommandArg: []string{
"sh",
"-c",
// We output a line for log only, and a line with output at the tail
fmt.Sprintf("echo -n %s > tmpfile; cat tmpfile; echo; cat tmpfile; kando output longstring $(cat tmpfile)", longstring),
},
},
}
}

func outputPhase(namespace string) crv1alpha1.BlueprintPhase {
return crv1alpha1.BlueprintPhase{
Name: "testOutput",
Expand Down Expand Up @@ -161,3 +181,45 @@ func (s *KubeTaskSuite) TestKubeTask(c *C) {
}
}
}

func (s *KubeTaskSuite) TestKubeTaskWithBigOutput(c *C) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
tp := param.TemplateParams{
StatefulSet: &param.StatefulSetParams{
Namespace: s.namespace,
},
PodOverride: crv1alpha1.JSONMap{
"containers": []map[string]interface{}{
{
"name": "container",
"imagePullPolicy": "Always",
},
},
},
}
expectedOut := strings.Repeat("a", 100000)
action := "test"
for _, tc := range []struct {
bp *crv1alpha1.Blueprint
outs []map[string]interface{}
}{
{
bp: newTaskBlueprint(bigOutputPhase(s.namespace)),
outs: []map[string]interface{}{
{
"longstring": expectedOut,
},
},
},
} {
phases, err := kanister.GetPhases(*tc.bp, action, kanister.DefaultVersion, tp)
c.Assert(err, IsNil)
c.Assert(phases, HasLen, len(tc.outs))
for i, p := range phases {
out, err := p.Exec(ctx, *tc.bp, action, tp)
c.Assert(err, IsNil, Commentf("Phase %s failed", p.Name()))
c.Assert(out, DeepEquals, tc.outs[i])
}
}
}
20 changes: 1 addition & 19 deletions pkg/output/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"io"
"os"
"regexp"
"strings"

"github.com/pkg/errors"
)
Expand All @@ -47,7 +46,7 @@ func marshalOutput(key, value string) (string, error) {
}

// UnmarshalOutput unmarshals output json into Output struct
func UnmarshalOutput(opString string) (*Output, error) {
func UnmarshalOutput(opString []byte) (*Output, error) {
p := &Output{}
err := json.Unmarshal([]byte(opString), p)
return p, errors.Wrap(err, "Failed to unmarshal key-value pair")
Expand Down Expand Up @@ -85,20 +84,3 @@ func fPrintOutput(w io.Writer, key, value string) error {
fmt.Fprintln(w, PhaseOpString, outString)
return nil
}

const reStr = PhaseOpString + `(.*)$`

var logRE = regexp.MustCompile(reStr)

func Parse(l string) (*Output, error) {
l = strings.TrimSpace(l)
match := logRE.FindAllStringSubmatch(l, 1)
if len(match) == 0 {
return nil, nil
}
op, err := UnmarshalOutput(match[0][1])
if err != nil {
return nil, err
}
return op, nil
}
39 changes: 0 additions & 39 deletions pkg/output/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package output

import (
"bytes"
"testing"

. "gopkg.in/check.v1"
Expand Down Expand Up @@ -44,41 +43,3 @@ func (s *OutputSuite) TestValidateKey(c *C) {
c.Check(err, tc.checker, Commentf("Key (%s) failed!", tc.key))
}
}

func (s *OutputSuite) TestParseValid(c *C) {
key, val := "foo", "bar"
b := bytes.NewBuffer(nil)
err := fPrintOutput(b, key, val)
c.Check(err, IsNil)

o, err := Parse(b.String())
c.Assert(err, IsNil)
c.Assert(o, NotNil)
c.Assert(o.Key, Equals, key)
c.Assert(o.Value, Equals, val)
}

func (s *OutputSuite) TestParseNoOutput(c *C) {
key, val := "foo", "bar"
b := bytes.NewBuffer(nil)
err := fPrintOutput(b, key, val)
c.Check(err, IsNil)
valid := b.String()
for _, tc := range []struct {
s string
checker Checker
}{
{
s: valid[0 : len(valid)-2],
checker: NotNil,
},
{
s: valid[1 : len(valid)-1],
checker: IsNil,
},
} {
o, err := Parse(tc.s)
c.Assert(err, tc.checker)
c.Assert(o, IsNil)
}
}
Loading

0 comments on commit 1ae84d4

Please sign in to comment.