Skip to content

Commit

Permalink
feat(agent): add debug logs (#3675)
Browse files Browse the repository at this point in the history
  • Loading branch information
schoren authored Feb 22, 2024
1 parent ed7540c commit f698242
Show file tree
Hide file tree
Showing 14 changed files with 182 additions and 2 deletions.
12 changes: 12 additions & 0 deletions agent/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,18 @@ type Client struct {
}

func (c *Client) Start(ctx context.Context) error {
c.logger.Debug("Starting controlPlane client")
err := c.startup(ctx)
if err != nil {
c.logger.Error("Failed to start controlPlane client", zap.Error(err))
return err
}

c.done = make(chan bool)
ctx, cancel := context.WithCancel(ctx)
go func() {
<-c.done
c.logger.Debug("Stopping controlPlane client")
// We cannot `defer cancel()` in this case because the start listener functions
// start one goroutine each and don't block the execution of this function.
// Thus, if we cancel the context, all those goroutines will fail.
Expand All @@ -73,39 +76,48 @@ func (c *Client) Start(ctx context.Context) error {

err = c.startStopListener(ctx)
if err != nil {
c.logger.Error("Failed to start stop listener", zap.Error(err))
return err
}

err = c.startTriggerListener(ctx)
if err != nil {
c.logger.Error("Failed to start trigger listener", zap.Error(err))
return err
}

err = c.startPollerListener(ctx)
if err != nil {
c.logger.Error("Failed to start poller listener", zap.Error(err))
return err
}

err = c.startShutdownListener(ctx)
if err != nil {
c.logger.Error("Failed to start shutdown listener", zap.Error(err))
return err
}

err = c.startDataStoreConnectionTestListener(ctx)
if err != nil {
c.logger.Error("Failed to start data store connection test listener", zap.Error(err))
return err
}

err = c.startOTLPConnectionTestListener(ctx)
if err != nil {
c.logger.Error("Failed to start OTLP connection test listener", zap.Error(err))
return err
}

err = c.startHeartBeat(ctx)
if err != nil {
c.logger.Error("Failed to start heart beat", zap.Error(err))
return err
}

c.logger.Debug("ControlPlane client started")

return nil
}

Expand Down
13 changes: 13 additions & 0 deletions agent/client/workflow_listen_for_ds_connection_tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,42 +8,55 @@ import (

"github.com/kubeshop/tracetest/agent/proto"
"github.com/kubeshop/tracetest/server/telemetry"
"go.uber.org/zap"
)

func (c *Client) startDataStoreConnectionTestListener(ctx context.Context) error {
logger := c.logger.Named("dataStoreConnectionTestListener")
logger.Debug("Starting")

client := proto.NewOrchestratorClient(c.conn)

stream, err := client.RegisterDataStoreConnectionTestAgent(ctx, c.sessionConfig.AgentIdentification)
if err != nil {
logger.Error("could not open agent stream", zap.Error(err))
return fmt.Errorf("could not open agent stream: %w", err)
}

go func() {
for {
req := proto.DataStoreConnectionTestRequest{}
err := stream.RecvMsg(&req)
if err != nil {
logger.Error("could not get message from data store connection stream", zap.Error(err))
}
if isEndOfFileError(err) || isCancelledError(err) {
logger.Debug("data store connection stream closed")
return
}

reconnected, err := c.handleDisconnectionError(err)
if reconnected {
logger.Warn("reconnected to data store connection stream")
return
}

if err != nil {
logger.Error("could not get message from data store connection stream", zap.Error(err))
log.Println("could not get message from data store connection stream: %w", err)
time.Sleep(1 * time.Second)
continue
}

ctx, err := telemetry.ExtractContextFromStream(stream)
if err != nil {
logger.Error("could not extract context from stream", zap.Error(err))
log.Println("could not extract context from stream %w", err)
}

err = c.dataStoreConnectionListener(ctx, &req)
if err != nil {
logger.Error("could not handle data store connection test request", zap.Error(err))
fmt.Println(err.Error())
}
}
Expand Down
13 changes: 13 additions & 0 deletions agent/client/workflow_listen_for_otlp_connection_tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,42 +8,55 @@ import (

"github.com/kubeshop/tracetest/agent/proto"
"github.com/kubeshop/tracetest/server/telemetry"
"go.uber.org/zap"
)

func (c *Client) startOTLPConnectionTestListener(ctx context.Context) error {
logger := c.logger.Named("otlpConnectionTestListener")
logger.Debug("Starting")

client := proto.NewOrchestratorClient(c.conn)

stream, err := client.RegisterOTLPConnectionTestListener(ctx, c.sessionConfig.AgentIdentification)
if err != nil {
logger.Error("could not open agent stream", zap.Error(err))
return fmt.Errorf("could not open agent stream: %w", err)
}

go func() {
for {
req := proto.OTLPConnectionTestRequest{}
err := stream.RecvMsg(&req)
if err != nil {
logger.Error("could not get message from otlp connection stream", zap.Error(err))
}
if isEndOfFileError(err) || isCancelledError(err) {
logger.Debug("otlp connection stream closed")
return
}

reconnected, err := c.handleDisconnectionError(err)
if reconnected {
logger.Warn("reconnected to otlp connection stream")
return
}

if err != nil {
logger.Error("could not get message from otlp connection stream", zap.Error(err))
log.Println("could not get message from otlp connection stream: %w", err)
time.Sleep(1 * time.Second)
continue
}

ctx, err := telemetry.ExtractContextFromStream(stream)
if err != nil {
logger.Error("could not extract context from stream", zap.Error(err))
log.Println("could not extract context from stream %w", err)
}

err = c.otlpConnectionTestListener(ctx, &req)
if err != nil {
logger.Error("could not handle otlp connection test request", zap.Error(err))
fmt.Println(err.Error())
}
}
Expand Down
12 changes: 12 additions & 0 deletions agent/client/workflow_listen_for_poll_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@ import (

"github.com/kubeshop/tracetest/agent/proto"
"github.com/kubeshop/tracetest/server/telemetry"
"go.uber.org/zap"
)

func (c *Client) startPollerListener(ctx context.Context) error {
logger := c.logger.Named("pollerListener")
logger.Debug("Starting")

client := proto.NewOrchestratorClient(c.conn)

stream, err := client.RegisterPollerAgent(ctx, c.sessionConfig.AgentIdentification)
Expand All @@ -22,28 +26,36 @@ func (c *Client) startPollerListener(ctx context.Context) error {
for {
resp := proto.PollingRequest{}
err := stream.RecvMsg(&resp)
if err != nil {
logger.Error("could not get message from poller stream", zap.Error(err))
}
if isEndOfFileError(err) || isCancelledError(err) {
logger.Debug("poller stream closed")
return
}

reconnected, err := c.handleDisconnectionError(err)
if reconnected {
logger.Warn("reconnected to poller stream")
return
}

if err != nil {
logger.Error("could not get message from poller stream", zap.Error(err))
log.Println("could not get message from poller stream: %w", err)
time.Sleep(1 * time.Second)
continue
}

ctx, err := telemetry.ExtractContextFromStream(stream)
if err != nil {
logger.Error("could not extract context from stream", zap.Error(err))
log.Println("could not extract context from stream %w", err)
}

err = c.pollListener(ctx, &resp)
if err != nil {
logger.Error("could not handle poll request", zap.Error(err))
fmt.Println(err.Error())
}
}
Expand Down
12 changes: 12 additions & 0 deletions agent/client/workflow_listen_for_stop_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,42 @@ import (
"time"

"github.com/kubeshop/tracetest/agent/proto"
"go.uber.org/zap"
)

// TODO: fix this and add test
func (c *Client) startStopListener(ctx context.Context) error {
logger := c.logger.Named("stopListener")
logger.Debug("Starting")

client := proto.NewOrchestratorClient(c.conn)

stream, err := client.RegisterStopRequestAgent(ctx, c.sessionConfig.AgentIdentification)
if err != nil {
logger.Error("could not open agent stream", zap.Error(err))
return fmt.Errorf("could not open agent stream: %w", err)
}

go func() {
for {
resp := proto.StopRequest{}
err := stream.RecvMsg(&resp)
if err != nil {
logger.Error("could not get message from stop stream", zap.Error(err))
}
if isEndOfFileError(err) || isCancelledError(err) {
logger.Debug("stop stream closed")
return
}

reconnected, err := c.handleDisconnectionError(err)
if reconnected {
logger.Warn("reconnected to stop stream")
return
}

if err != nil {
logger.Error("could not get message from stop stream", zap.Error(err))
log.Println("could not get message from trigger stream: %w", err)
time.Sleep(1 * time.Second)
continue
Expand All @@ -40,6 +51,7 @@ func (c *Client) startStopListener(ctx context.Context) error {
// TODO: get context from request
err = c.stopListener(context.Background(), &resp)
if err != nil {
logger.Error("could not handle stop request", zap.Error(err))
fmt.Println(err.Error())
}
}
Expand Down
13 changes: 13 additions & 0 deletions agent/client/workflow_listen_for_trigger_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,42 +8,55 @@ import (

"github.com/kubeshop/tracetest/agent/proto"
"github.com/kubeshop/tracetest/server/telemetry"
"go.uber.org/zap"
)

func (c *Client) startTriggerListener(ctx context.Context) error {
logger := c.logger.Named("triggerListener")
logger.Debug("Starting")

client := proto.NewOrchestratorClient(c.conn)

stream, err := client.RegisterTriggerAgent(ctx, c.sessionConfig.AgentIdentification)
if err != nil {
logger.Error("could not open agent stream", zap.Error(err))
return fmt.Errorf("could not open agent stream: %w", err)
}

go func() {
for {
resp := proto.TriggerRequest{}
err := stream.RecvMsg(&resp)
if err != nil {
logger.Error("could not get message from stop stream", zap.Error(err))
}
if isEndOfFileError(err) || isCancelledError(err) {
logger.Debug("stop stream closed")
return
}

reconnected, err := c.handleDisconnectionError(err)
if reconnected {
logger.Warn("reconnected to stop stream")
return
}

if err != nil {
logger.Error("could not get message from stop stream", zap.Error(err))
log.Println("could not get message from trigger stream: %w", err)
time.Sleep(1 * time.Second)
continue
}

ctx, err := telemetry.ExtractContextFromStream(stream)
if err != nil {
logger.Error("could not extract context from stream", zap.Error(err))
log.Println("could not extract context from stream %w", err)
}

err = c.triggerListener(ctx, &resp)
if err != nil {
logger.Error("could not handle trigger request", zap.Error(err))
fmt.Println(err.Error())
}
}
Expand Down
10 changes: 10 additions & 0 deletions agent/client/workflow_ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,35 @@ import (
"time"

"github.com/kubeshop/tracetest/agent/proto"
"go.uber.org/zap"
)

func (c *Client) startHeartBeat(ctx context.Context) error {
logger := c.logger.Named("pingListener")
logger.Debug("Starting")

client := proto.NewOrchestratorClient(c.conn)
ticker := time.NewTicker(c.config.PingPeriod)

go func() {
for range ticker.C {
_, err := client.Ping(ctx, c.sessionConfig.AgentIdentification)
if err != nil {
log.Println("could not send ping: %w", err)
}
if isEndOfFileError(err) || isCancelledError(err) {
log.Println("ping stream closed")
return
}

reconnected, err := c.handleDisconnectionError(err)
if reconnected {
log.Println("reconnected to ping stream")
return
}

if err != nil {
logger.Error("could not get message from ping stream", zap.Error(err))
log.Println("could not get message from ping stream: %w", err)
time.Sleep(1 * time.Second)
continue
Expand Down
Loading

0 comments on commit f698242

Please sign in to comment.