Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: switch to bufio.Reader for KubeTask output parsing #2641

Merged
merged 13 commits into from
Feb 15, 2024
8 changes: 8 additions & 0 deletions BUILD.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ Kanister is using `check` library to extend go testing capabilities: https://git
It's recommended to write new tests using this library for consistency.

`make test` runs all tests in the repository.
It's possible to run a specific test with `TEST_FILTER` environment variable:

```
make tests TEST_FILTER=OutputTestSuite
```

This variable will be passed to `-check.f` flag and supports regex filters.

To run tests for specific package you can run `go test` in that package directory.
It's recommended to do that in build image shell, you can run it with `make shell`.

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ deploy: release-controller .deploy-$(DOTFILE_IMAGE)
@kubectl apply -f .deploy-$(DOTFILE_IMAGE)

test: build-dirs
@$(MAKE) run CMD="./build/test.sh $(SRC_DIRS)"
@$(MAKE) run CMD="TEST_FILTER=$(TEST_FILTER) ./build/test.sh $(SRC_DIRS)"

helm-test: build-dirs
@$(MAKE) run CMD="./build/helm-test.sh $(SRC_DIRS)"
Expand Down
9 changes: 8 additions & 1 deletion build/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )"
export CGO_ENABLED=0
export GO111MODULE=on

TEST_FILTER="${TEST_FILTER:-}"
GOCHECK_FILTER=""
if [ -n "${TEST_FILTER}" ]; then
echo "Using test filter ${TEST_FILTER}"
GOCHECK_FILTER="-check.f ${TEST_FILTER}"
fi

TARGETS=$(for d in "$@"; do echo ./$d/...; done)

echo -n "Checking gofmt: "
Expand Down Expand Up @@ -79,7 +86,7 @@ check_dependencies

echo "Running tests:"
go test -v ${TARGETS} -list .
go test -v -installsuffix "static" ${TARGETS} -check.v
go test -v -installsuffix "static" ${TARGETS} -check.v ${GOCHECK_FILTER}
echo

echo "PASS"
Expand Down
25 changes: 9 additions & 16 deletions pkg/function/kube_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"context"
"io"
"os"
"regexp"
"strings"
"time"

kanister "github.com/kanisterio/kanister/pkg"
Expand Down Expand Up @@ -60,22 +60,15 @@ func parseLogAndCreateOutput(out string) (map[string]interface{}, error) {
if out == "" {
return nil, nil
}
var op map[string]interface{}
logs := regexp.MustCompile("[\n]").Split(out, -1)
for _, l := range logs {
opObj, err := output.Parse(l)
if err != nil {
return nil, err
}
if opObj == nil {
continue
}
if op == nil {
op = make(map[string]interface{})
}
op[opObj.Key] = opObj.Value

reader := io.NopCloser(strings.NewReader(out))
output, err := output.LogAndParse(context.Background(), reader)

// For some reason we expect empty output to be returned as nil here
if len(output) == 0 {
return nil, err
}
return op, nil
return output, err
}

func (kef *kubeExecFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
Expand Down
62 changes: 62 additions & 0 deletions pkg/function/kube_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package function

import (
"context"
"fmt"
"os"
"strings"
"time"

. "gopkg.in/check.v1"
Expand Down Expand Up @@ -61,6 +63,24 @@ func (s *KubeTaskSuite) TearDownSuite(c *C) {
}
}

func bigOutputPhase(namespace string) crv1alpha1.BlueprintPhase {
longstring := strings.Repeat("a", 100000)
return crv1alpha1.BlueprintPhase{
Name: "testOutput",
Func: KubeTaskFuncName,
Args: map[string]interface{}{
KubeTaskNamespaceArg: namespace,
KubeTaskImageArg: consts.LatestKanisterToolsImage,
KubeTaskCommandArg: []string{
"sh",
"-c",
// We output a line for log only, and a line with output at the tail
fmt.Sprintf("echo -n %s > tmpfile; cat tmpfile; echo; cat tmpfile; kando output longstring $(cat tmpfile)", longstring),
},
},
}
}

func outputPhase(namespace string) crv1alpha1.BlueprintPhase {
return crv1alpha1.BlueprintPhase{
Name: "testOutput",
Expand Down Expand Up @@ -161,3 +181,45 @@ func (s *KubeTaskSuite) TestKubeTask(c *C) {
}
}
}

func (s *KubeTaskSuite) TestKubeTaskWithBigOutput(c *C) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
tp := param.TemplateParams{
StatefulSet: &param.StatefulSetParams{
Namespace: s.namespace,
},
PodOverride: crv1alpha1.JSONMap{
"containers": []map[string]interface{}{
{
"name": "container",
"imagePullPolicy": "Always",
},
},
},
}
expectedOut := strings.Repeat("a", 100000)
action := "test"
for _, tc := range []struct {
bp *crv1alpha1.Blueprint
outs []map[string]interface{}
}{
{
bp: newTaskBlueprint(bigOutputPhase(s.namespace)),
outs: []map[string]interface{}{
{
"longstring": expectedOut,
},
},
},
} {
phases, err := kanister.GetPhases(*tc.bp, action, kanister.DefaultVersion, tp)
c.Assert(err, IsNil)
c.Assert(phases, HasLen, len(tc.outs))
for i, p := range phases {
out, err := p.Exec(ctx, *tc.bp, action, tp)
c.Assert(err, IsNil, Commentf("Phase %s failed", p.Name()))
c.Assert(out, DeepEquals, tc.outs[i])
}
}
}
20 changes: 1 addition & 19 deletions pkg/output/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"io"
"os"
"regexp"
"strings"

"github.com/pkg/errors"
)
Expand All @@ -47,7 +46,7 @@ func marshalOutput(key, value string) (string, error) {
}

// UnmarshalOutput unmarshals output json into Output struct
func UnmarshalOutput(opString string) (*Output, error) {
func UnmarshalOutput(opString []byte) (*Output, error) {
p := &Output{}
err := json.Unmarshal([]byte(opString), p)
return p, errors.Wrap(err, "Failed to unmarshal key-value pair")
Expand Down Expand Up @@ -85,20 +84,3 @@ func fPrintOutput(w io.Writer, key, value string) error {
fmt.Fprintln(w, PhaseOpString, outString)
return nil
}

const reStr = PhaseOpString + `(.*)$`

var logRE = regexp.MustCompile(reStr)

func Parse(l string) (*Output, error) {
l = strings.TrimSpace(l)
match := logRE.FindAllStringSubmatch(l, 1)
if len(match) == 0 {
return nil, nil
}
op, err := UnmarshalOutput(match[0][1])
if err != nil {
return nil, err
}
return op, nil
}
39 changes: 0 additions & 39 deletions pkg/output/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package output

import (
"bytes"
"testing"

. "gopkg.in/check.v1"
Expand Down Expand Up @@ -44,41 +43,3 @@ func (s *OutputSuite) TestValidateKey(c *C) {
c.Check(err, tc.checker, Commentf("Key (%s) failed!", tc.key))
}
}

func (s *OutputSuite) TestParseValid(c *C) {
key, val := "foo", "bar"
b := bytes.NewBuffer(nil)
err := fPrintOutput(b, key, val)
c.Check(err, IsNil)

o, err := Parse(b.String())
c.Assert(err, IsNil)
c.Assert(o, NotNil)
c.Assert(o.Key, Equals, key)
c.Assert(o.Value, Equals, val)
}

func (s *OutputSuite) TestParseNoOutput(c *C) {
key, val := "foo", "bar"
b := bytes.NewBuffer(nil)
err := fPrintOutput(b, key, val)
c.Check(err, IsNil)
valid := b.String()
for _, tc := range []struct {
s string
checker Checker
}{
{
s: valid[0 : len(valid)-2],
checker: NotNil,
},
{
s: valid[1 : len(valid)-1],
checker: IsNil,
},
} {
o, err := Parse(tc.s)
c.Assert(err, tc.checker)
c.Assert(o, IsNil)
}
}
Loading