From 89a65753c3b5d2020576411829bb6144a0cd4cfc Mon Sep 17 00:00:00 2001 From: Tom Manville Date: Mon, 14 Oct 2024 11:48:43 -0700 Subject: [PATCH] Log kanx child out/err to parent stdout/err (#3183) --- pkg/kanx/logger.go | 41 +++++++++++++++++++++++++ pkg/kanx/logger_test.go | 66 +++++++++++++++++++++++++++++++++++++++++ pkg/kanx/server.go | 16 ++++++---- 3 files changed, 118 insertions(+), 5 deletions(-) create mode 100644 pkg/kanx/logger.go create mode 100644 pkg/kanx/logger_test.go diff --git a/pkg/kanx/logger.go b/pkg/kanx/logger.go new file mode 100644 index 0000000000..f2d75062c0 --- /dev/null +++ b/pkg/kanx/logger.go @@ -0,0 +1,41 @@ +package kanx + +import ( + "io" + "sync" + + "github.com/kanisterio/kanister/pkg/field" + "github.com/kanisterio/kanister/pkg/log" +) + +var _ io.Writer = (*logWriter)(nil) + +type logWriter struct { + logger log.Logger + writer io.Writer + fields field.M + mutex *sync.Mutex +} + +func newLogWriter(l log.Logger, w io.Writer) *logWriter { + return &logWriter{ + logger: l, + writer: w, + fields: nil, + mutex: &sync.Mutex{}, + } +} + +func (lw *logWriter) SetFields(m field.M) { + lw.mutex.Lock() + defer lw.mutex.Unlock() + lw.fields = m +} + +func (lw *logWriter) Write(buf []byte) (int, error) { + lw.mutex.Lock() + f := lw.fields + lw.mutex.Unlock() + lw.logger.PrintTo(lw.writer, string(buf), f) + return len(buf), nil +} diff --git a/pkg/kanx/logger_test.go b/pkg/kanx/logger_test.go new file mode 100644 index 0000000000..b402f81d7a --- /dev/null +++ b/pkg/kanx/logger_test.go @@ -0,0 +1,66 @@ +package kanx + +import ( + "bytes" + "encoding/json" + + "gopkg.in/check.v1" + + "github.com/kanisterio/kanister/pkg/field" + "github.com/kanisterio/kanister/pkg/log" +) + +type LoggerSuite struct{} + +var _ = check.Suite(&LoggerSuite{}) + +type Log struct { + File *string `json:"File,omitempty"` + Function *string `json:"Function,omitempty"` + Line *int `json:"Line,omitempty"` + Level *string `json:"level,omitempty"` + Msg *string `json:"msg,omitempty"` + Time *string `json:"time,omitempty"` + Boo *string `json:"boo,omitempty"` +} + +func (s *LoggerSuite) TestLogger(c *check.C) { + buf := bytes.NewBuffer(nil) + msg := []byte("hello!") + + lw := newLogWriter(log.Info(), buf) + + n, err := lw.Write(msg) + c.Assert(err, check.IsNil) + c.Assert(n, check.Equals, len(msg)) + + l := Log{} + err = json.Unmarshal(buf.Bytes(), &l) + c.Assert(err, check.IsNil) + + c.Assert(l.File, check.NotNil) + c.Assert(l.Function, check.NotNil) + c.Assert(l.Line, check.Not(check.Equals), 0) + c.Assert(l.Level, check.NotNil) + c.Assert(*l.Msg, check.Equals, string(msg)) + c.Assert(l.Time, check.NotNil) + c.Assert(l.Boo, check.IsNil) + + buf.Reset() + lw.SetFields(field.M{"boo": "far"}) + n, err = lw.Write(msg) + c.Assert(err, check.IsNil) + c.Assert(n, check.Equals, len(msg)) + + l = Log{} + err = json.Unmarshal(buf.Bytes(), &l) + c.Assert(err, check.IsNil) + + c.Assert(l.File, check.NotNil) + c.Assert(l.Function, check.NotNil) + c.Assert(l.Line, check.Not(check.Equals), 0) + c.Assert(l.Level, check.NotNil) + c.Assert(*l.Msg, check.Equals, string(msg)) + c.Assert(l.Time, check.NotNil) + c.Assert(*l.Boo, check.Equals, "far") +} diff --git a/pkg/kanx/server.go b/pkg/kanx/server.go index 208c5115cb..50883ff938 100644 --- a/pkg/kanx/server.go +++ b/pkg/kanx/server.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "io" "net" "os" "os/exec" @@ -68,15 +69,20 @@ func (s *processServiceServer) CreateProcesses(_ context.Context, cpr *CreatePro stderr: stderr, cancel: can, } - cmd.Stdout = p.stdout - cmd.Stderr = p.stderr + stdoutLogWriter := newLogWriter(log.Info(), os.Stdout) + stderrLogWriter := newLogWriter(log.Info(), os.Stderr) + cmd.Stdout = io.MultiWriter(p.stdout, stdoutLogWriter) + cmd.Stderr = io.MultiWriter(p.stderr, stderrLogWriter) err = cmd.Start() if err != nil { return nil, err } s.processes[int64(cmd.Process.Pid)] = p - log.Info().Print(processToProto(p).String(), field.M{"stdout": stdout.Name(), "stderr": stderr.Name()}) + fields := field.M{"pid": cmd.Process.Pid, "stdout": stdout.Name(), "stderr": stderr.Name()} + stdoutLogWriter.SetFields(fields) + stderrLogWriter.SetFields(fields) + log.Info().Print(processToProto(p).String(), fields) go func() { err := p.cmd.Wait() p.err = err @@ -85,11 +91,11 @@ func (s *processServiceServer) CreateProcesses(_ context.Context, cpr *CreatePro } err = stdout.Close() if err != nil { - log.Error().WithError(err).Print("Failed to close stdout", field.M{"pid": cmd.Process.Pid}) + log.Error().WithError(err).Print("Failed to close stdout", fields) } err = stderr.Close() if err != nil { - log.Error().WithError(err).Print("Failed to close stderr", field.M{"pid": cmd.Process.Pid}) + log.Error().WithError(err).Print("Failed to close stderr", fields) } close(p.doneCh) log.Info().Print(processToProto(p).String())