Skip to content

Commit

Permalink
feat: agent collector statistics report (#3544)
Browse files Browse the repository at this point in the history
* feat: count spans

* agent: implement grpc stream client

* agent: implement send otlp connection result

* feat: send collector statistics to server when requested

* test: check if collector statistics are precise

* feat: reset statistics if flag is present

* fix issue with datastore connection test and add tests
  • Loading branch information
mathnogueira authored Jan 19, 2024
1 parent eac8b6d commit a53a0d5
Show file tree
Hide file tree
Showing 16 changed files with 1,214 additions and 525 deletions.
10 changes: 10 additions & 0 deletions agent/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Client struct {
pollListener func(context.Context, *proto.PollingRequest) error
shutdownListener func(context.Context, *proto.ShutdownRequest) error
dataStoreConnectionListener func(context.Context, *proto.DataStoreConnectionTestRequest) error
otlpConnectionTestListener func(context.Context, *proto.OTLPConnectionTestRequest) error
}

func (c *Client) Start(ctx context.Context) error {
Expand Down Expand Up @@ -92,6 +93,11 @@ func (c *Client) Start(ctx context.Context) error {
return err
}

err = c.startOTLPConnectionTestListener(ctx)
if err != nil {
return err
}

err = c.startHearthBeat(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -134,6 +140,10 @@ func (c *Client) OnPollingRequest(listener func(context.Context, *proto.PollingR
c.pollListener = listener
}

func (c *Client) OnOTLPConnectionTest(listener func(context.Context, *proto.OTLPConnectionTestRequest) error) {
c.otlpConnectionTestListener = listener
}

func (c *Client) OnConnectionClosed(listener func(context.Context, *proto.ShutdownRequest) error) {
c.shutdownListener = listener
}
Expand Down
76 changes: 64 additions & 12 deletions agent/client/mocks/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,28 @@ import (

type GrpcServerMock struct {
proto.UnimplementedOrchestratorServer
port int
triggerChannel chan *proto.TriggerRequest
pollingChannel chan *proto.PollingRequest
terminationChannel chan *proto.ShutdownRequest
dataStoreTestChannel chan *proto.DataStoreConnectionTestRequest

lastTriggerResponse *proto.TriggerResponse
lastPollingResponse *proto.PollingResponse
port int
triggerChannel chan *proto.TriggerRequest
pollingChannel chan *proto.PollingRequest
otlpConnectionTestChannel chan *proto.OTLPConnectionTestRequest
terminationChannel chan *proto.ShutdownRequest
dataStoreTestChannel chan *proto.DataStoreConnectionTestRequest

lastTriggerResponse *proto.TriggerResponse
lastPollingResponse *proto.PollingResponse
lastOtlpConnectionResponse *proto.OTLPConnectionTestResponse
lastDataStoreConnectionResponse *proto.DataStoreConnectionTestResponse

server *grpc.Server
}

func NewGrpcServer() *GrpcServerMock {
server := &GrpcServerMock{
triggerChannel: make(chan *proto.TriggerRequest),
pollingChannel: make(chan *proto.PollingRequest),
terminationChannel: make(chan *proto.ShutdownRequest),
dataStoreTestChannel: make(chan *proto.DataStoreConnectionTestRequest),
triggerChannel: make(chan *proto.TriggerRequest),
pollingChannel: make(chan *proto.PollingRequest),
terminationChannel: make(chan *proto.ShutdownRequest),
dataStoreTestChannel: make(chan *proto.DataStoreConnectionTestRequest),
otlpConnectionTestChannel: make(chan *proto.OTLPConnectionTestRequest),
}
var wg sync.WaitGroup
wg.Add(1)
Expand Down Expand Up @@ -148,6 +152,38 @@ func (s *GrpcServerMock) RegisterDataStoreConnectionTestAgent(id *proto.AgentIde
}
}

func (s *GrpcServerMock) RegisterOTLPConnectionTestListener(id *proto.AgentIdentification, stream proto.Orchestrator_RegisterOTLPConnectionTestListenerServer) error {
if id.Token != "token" {
return fmt.Errorf("could not validate token")
}

for {
testRequest := <-s.otlpConnectionTestChannel
err := stream.Send(testRequest)
if err != nil {
log.Println("could not send polling request to agent: %w", err)
}
}
}

func (s *GrpcServerMock) SendOTLPConnectionTestResult(ctx context.Context, result *proto.OTLPConnectionTestResponse) (*proto.Empty, error) {
if result.AgentIdentification == nil || result.AgentIdentification.Token != "token" {
return nil, fmt.Errorf("could not validate token")
}

s.lastOtlpConnectionResponse = result
return &proto.Empty{}, nil
}

func (s *GrpcServerMock) SendDataStoreConnectionTestResult(ctx context.Context, result *proto.DataStoreConnectionTestResponse) (*proto.Empty, error) {
if result.AgentIdentification == nil || result.AgentIdentification.Token != "token" {
return nil, fmt.Errorf("could not validate token")
}

s.lastDataStoreConnectionResponse = result
return &proto.Empty{}, nil
}

func (s *GrpcServerMock) SendPolledSpans(ctx context.Context, result *proto.PollingResponse) (*proto.Empty, error) {
if result.AgentIdentification == nil || result.AgentIdentification.Token != "token" {
return nil, fmt.Errorf("could not validate token")
Expand Down Expand Up @@ -177,6 +213,14 @@ func (s *GrpcServerMock) SendPollingRequest(request *proto.PollingRequest) {
s.pollingChannel <- request
}

func (s *GrpcServerMock) SendDataStoreConnectionTestRequest(request *proto.DataStoreConnectionTestRequest) {
s.dataStoreTestChannel <- request
}

func (s *GrpcServerMock) SendOTLPConnectionTestRequest(request *proto.OTLPConnectionTestRequest) {
s.otlpConnectionTestChannel <- request
}

func (s *GrpcServerMock) GetLastTriggerResponse() *proto.TriggerResponse {
return s.lastTriggerResponse
}
Expand All @@ -185,6 +229,14 @@ func (s *GrpcServerMock) GetLastPollingResponse() *proto.PollingResponse {
return s.lastPollingResponse
}

func (s *GrpcServerMock) GetLastOTLPConnectionResponse() *proto.OTLPConnectionTestResponse {
return s.lastOtlpConnectionResponse
}

func (s *GrpcServerMock) GetLastDataStoreConnectionResponse() *proto.DataStoreConnectionTestResponse {
return s.lastDataStoreConnectionResponse
}

func (s *GrpcServerMock) TerminateConnection(reason string) {
s.terminationChannel <- &proto.ShutdownRequest{
Reason: reason,
Expand Down
42 changes: 42 additions & 0 deletions agent/client/workflow_listen_for_ds_connection_tests_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package client_test

import (
"context"
"testing"
"time"

"github.com/kubeshop/tracetest/agent/client"
"github.com/kubeshop/tracetest/agent/client/mocks"
"github.com/kubeshop/tracetest/agent/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestDataStoreConnectionTestWorkflow(t *testing.T) {
server := mocks.NewGrpcServer()
defer server.Stop()

client, err := client.Connect(context.Background(), server.Addr())
require.NoError(t, err)

var receivedConnectionTestRequest *proto.DataStoreConnectionTestRequest
client.OnDataStoreTestConnectionRequest(func(ctx context.Context, otr *proto.DataStoreConnectionTestRequest) error {
receivedConnectionTestRequest = otr
return nil
})

err = client.Start(context.Background())
require.NoError(t, err)

connectionTestRequest := &proto.DataStoreConnectionTestRequest{
RequestID: "request-id",
}

server.SendDataStoreConnectionTestRequest(connectionTestRequest)

// ensures there's enough time for networking between server and client
time.Sleep(1 * time.Second)

assert.NotNil(t, receivedConnectionTestRequest)
assert.Equal(t, connectionTestRequest.RequestID, "request-id")
}
47 changes: 47 additions & 0 deletions agent/client/workflow_listen_for_otlp_connection_tests.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package client

import (
"context"
"fmt"
"log"
"time"

"github.com/kubeshop/tracetest/agent/proto"
)

func (c *Client) startOTLPConnectionTestListener(ctx context.Context) error {
client := proto.NewOrchestratorClient(c.conn)

stream, err := client.RegisterOTLPConnectionTestListener(ctx, c.sessionConfig.AgentIdentification)
if err != nil {
return fmt.Errorf("could not open agent stream: %w", err)
}

go func() {
for {
req := proto.OTLPConnectionTestRequest{}
err := stream.RecvMsg(&req)
if isEndOfFileError(err) || isCancelledError(err) {
return
}

reconnected, err := c.handleDisconnectionError(err)
if reconnected {
return
}

if err != nil {
log.Println("could not get message from otlp connection stream: %w", err)
time.Sleep(1 * time.Second)
continue
}

// TODO: Get ctx from request
err = c.otlpConnectionTestListener(context.Background(), &req)
if err != nil {
fmt.Println(err.Error())
}
}
}()
return nil
}
42 changes: 42 additions & 0 deletions agent/client/workflow_listen_for_otlp_connection_tests_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package client_test

import (
"context"
"testing"
"time"

"github.com/kubeshop/tracetest/agent/client"
"github.com/kubeshop/tracetest/agent/client/mocks"
"github.com/kubeshop/tracetest/agent/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestOtlpConnectionTestWorkflow(t *testing.T) {
server := mocks.NewGrpcServer()
defer server.Stop()

client, err := client.Connect(context.Background(), server.Addr())
require.NoError(t, err)

var receivedConnectionTestRequest *proto.OTLPConnectionTestRequest
client.OnOTLPConnectionTest(func(ctx context.Context, otr *proto.OTLPConnectionTestRequest) error {
receivedConnectionTestRequest = otr
return nil
})

err = client.Start(context.Background())
require.NoError(t, err)

connectionTestRequest := &proto.OTLPConnectionTestRequest{
RequestID: "request-id",
}

server.SendOTLPConnectionTestRequest(connectionTestRequest)

// ensures there's enough time for networking between server and client
time.Sleep(1 * time.Second)

assert.NotNil(t, receivedConnectionTestRequest)
assert.Equal(t, connectionTestRequest.RequestID, "request-id")
}
2 changes: 1 addition & 1 deletion agent/client/workflow_send_ds_connection_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func (c *Client) SendDataStoreConnectionResult(ctx context.Context, response *pr

_, err := client.SendDataStoreConnectionTestResult(ctx, response)
if err != nil {
return fmt.Errorf("could not send data store connection result request: %w", err)
return fmt.Errorf("could not send otlp connection result request: %w", err)
}

return nil
Expand Down
43 changes: 43 additions & 0 deletions agent/client/workflow_send_ds_connection_result_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package client_test

import (
"context"
"testing"

"github.com/kubeshop/tracetest/agent/client"
"github.com/kubeshop/tracetest/agent/client/mocks"
"github.com/kubeshop/tracetest/agent/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestDataStoreConnectionResult(t *testing.T) {
server := mocks.NewGrpcServer()
defer server.Stop()

client, err := client.Connect(context.Background(), server.Addr())
require.NoError(t, err)

err = client.Start(context.Background())
require.NoError(t, err)

result := &proto.DataStoreConnectionTestResponse{
RequestID: "request-id",
AgentIdentification: &proto.AgentIdentification{},
Successful: true,
Steps: &proto.DataStoreConnectionTestSteps{
PortCheck: &proto.DataStoreConnectionTestStep{
Passed: true,
},
},
}

err = client.SendDataStoreConnectionResult(context.Background(), result)
require.NoError(t, err)

receivedResponse := server.GetLastDataStoreConnectionResponse()

assert.Equal(t, result.RequestID, receivedResponse.RequestID)
assert.True(t, result.Successful)
assert.True(t, result.Steps.PortCheck.Passed)
}
21 changes: 21 additions & 0 deletions agent/client/workflow_send_otlp_connection_result.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package client

import (
"context"
"fmt"

"github.com/kubeshop/tracetest/agent/proto"
)

func (c *Client) SendOTLPConnectionResult(ctx context.Context, response *proto.OTLPConnectionTestResponse) error {
client := proto.NewOrchestratorClient(c.conn)

response.AgentIdentification = c.sessionConfig.AgentIdentification

_, err := client.SendOTLPConnectionTestResult(ctx, response)
if err != nil {
return fmt.Errorf("could not send otlp connection result request: %w", err)
}

return nil
}
41 changes: 41 additions & 0 deletions agent/client/workflow_send_otlp_connection_result_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package client_test

import (
"context"
"testing"
"time"

"github.com/kubeshop/tracetest/agent/client"
"github.com/kubeshop/tracetest/agent/client/mocks"
"github.com/kubeshop/tracetest/agent/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestOTLPConnectionResultTrace(t *testing.T) {
server := mocks.NewGrpcServer()
defer server.Stop()

client, err := client.Connect(context.Background(), server.Addr())
require.NoError(t, err)

err = client.Start(context.Background())
require.NoError(t, err)

now := time.Now()
result := &proto.OTLPConnectionTestResponse{
RequestID: "request-id",
AgentIdentification: &proto.AgentIdentification{},
SpanCount: 10,
LastSpanTimestamp: now.UnixMilli(),
}

err = client.SendOTLPConnectionResult(context.Background(), result)
require.NoError(t, err)

receivedResponse := server.GetLastOTLPConnectionResponse()

assert.Equal(t, result.RequestID, receivedResponse.RequestID)
assert.Equal(t, result.SpanCount, receivedResponse.SpanCount)
assert.Equal(t, result.LastSpanTimestamp, receivedResponse.LastSpanTimestamp)
}
Loading

0 comments on commit a53a0d5

Please sign in to comment.