Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: agent collector statistics report #3544

Merged
merged 8 commits into from
Jan 19, 2024
10 changes: 10 additions & 0 deletions agent/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Client struct {
pollListener func(context.Context, *proto.PollingRequest) error
shutdownListener func(context.Context, *proto.ShutdownRequest) error
dataStoreConnectionListener func(context.Context, *proto.DataStoreConnectionTestRequest) error
otlpConnectionTestListener func(context.Context, *proto.OTLPConnectionTestRequest) error
}

func (c *Client) Start(ctx context.Context) error {
Expand Down Expand Up @@ -92,6 +93,11 @@ func (c *Client) Start(ctx context.Context) error {
return err
}

err = c.startOTLPConnectionTestListener(ctx)
if err != nil {
return err
}

err = c.startHearthBeat(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -134,6 +140,10 @@ func (c *Client) OnPollingRequest(listener func(context.Context, *proto.PollingR
c.pollListener = listener
}

func (c *Client) OnOTLPConnectionTest(listener func(context.Context, *proto.OTLPConnectionTestRequest) error) {
c.otlpConnectionTestListener = listener
}

func (c *Client) OnConnectionClosed(listener func(context.Context, *proto.ShutdownRequest) error) {
c.shutdownListener = listener
}
Expand Down
76 changes: 64 additions & 12 deletions agent/client/mocks/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,28 @@ import (

type GrpcServerMock struct {
proto.UnimplementedOrchestratorServer
port int
triggerChannel chan *proto.TriggerRequest
pollingChannel chan *proto.PollingRequest
terminationChannel chan *proto.ShutdownRequest
dataStoreTestChannel chan *proto.DataStoreConnectionTestRequest

lastTriggerResponse *proto.TriggerResponse
lastPollingResponse *proto.PollingResponse
port int
triggerChannel chan *proto.TriggerRequest
pollingChannel chan *proto.PollingRequest
otlpConnectionTestChannel chan *proto.OTLPConnectionTestRequest
terminationChannel chan *proto.ShutdownRequest
dataStoreTestChannel chan *proto.DataStoreConnectionTestRequest

lastTriggerResponse *proto.TriggerResponse
lastPollingResponse *proto.PollingResponse
lastOtlpConnectionResponse *proto.OTLPConnectionTestResponse
lastDataStoreConnectionResponse *proto.DataStoreConnectionTestResponse

server *grpc.Server
}

func NewGrpcServer() *GrpcServerMock {
server := &GrpcServerMock{
triggerChannel: make(chan *proto.TriggerRequest),
pollingChannel: make(chan *proto.PollingRequest),
terminationChannel: make(chan *proto.ShutdownRequest),
dataStoreTestChannel: make(chan *proto.DataStoreConnectionTestRequest),
triggerChannel: make(chan *proto.TriggerRequest),
pollingChannel: make(chan *proto.PollingRequest),
terminationChannel: make(chan *proto.ShutdownRequest),
dataStoreTestChannel: make(chan *proto.DataStoreConnectionTestRequest),
otlpConnectionTestChannel: make(chan *proto.OTLPConnectionTestRequest),
}
var wg sync.WaitGroup
wg.Add(1)
Expand Down Expand Up @@ -148,6 +152,38 @@ func (s *GrpcServerMock) RegisterDataStoreConnectionTestAgent(id *proto.AgentIde
}
}

func (s *GrpcServerMock) RegisterOTLPConnectionTestListener(id *proto.AgentIdentification, stream proto.Orchestrator_RegisterOTLPConnectionTestListenerServer) error {
if id.Token != "token" {
return fmt.Errorf("could not validate token")
}

for {
testRequest := <-s.otlpConnectionTestChannel
err := stream.Send(testRequest)
if err != nil {
log.Println("could not send polling request to agent: %w", err)
}
}
}

func (s *GrpcServerMock) SendOTLPConnectionTestResult(ctx context.Context, result *proto.OTLPConnectionTestResponse) (*proto.Empty, error) {
if result.AgentIdentification == nil || result.AgentIdentification.Token != "token" {
return nil, fmt.Errorf("could not validate token")
}

s.lastOtlpConnectionResponse = result
return &proto.Empty{}, nil
}

func (s *GrpcServerMock) SendDataStoreConnectionTestResult(ctx context.Context, result *proto.DataStoreConnectionTestResponse) (*proto.Empty, error) {
if result.AgentIdentification == nil || result.AgentIdentification.Token != "token" {
return nil, fmt.Errorf("could not validate token")
}

s.lastDataStoreConnectionResponse = result
return &proto.Empty{}, nil
}

func (s *GrpcServerMock) SendPolledSpans(ctx context.Context, result *proto.PollingResponse) (*proto.Empty, error) {
if result.AgentIdentification == nil || result.AgentIdentification.Token != "token" {
return nil, fmt.Errorf("could not validate token")
Expand Down Expand Up @@ -177,6 +213,14 @@ func (s *GrpcServerMock) SendPollingRequest(request *proto.PollingRequest) {
s.pollingChannel <- request
}

func (s *GrpcServerMock) SendDataStoreConnectionTestRequest(request *proto.DataStoreConnectionTestRequest) {
s.dataStoreTestChannel <- request
}

func (s *GrpcServerMock) SendOTLPConnectionTestRequest(request *proto.OTLPConnectionTestRequest) {
s.otlpConnectionTestChannel <- request
}

func (s *GrpcServerMock) GetLastTriggerResponse() *proto.TriggerResponse {
return s.lastTriggerResponse
}
Expand All @@ -185,6 +229,14 @@ func (s *GrpcServerMock) GetLastPollingResponse() *proto.PollingResponse {
return s.lastPollingResponse
}

func (s *GrpcServerMock) GetLastOTLPConnectionResponse() *proto.OTLPConnectionTestResponse {
return s.lastOtlpConnectionResponse
}

func (s *GrpcServerMock) GetLastDataStoreConnectionResponse() *proto.DataStoreConnectionTestResponse {
return s.lastDataStoreConnectionResponse
}

func (s *GrpcServerMock) TerminateConnection(reason string) {
s.terminationChannel <- &proto.ShutdownRequest{
Reason: reason,
Expand Down
42 changes: 42 additions & 0 deletions agent/client/workflow_listen_for_ds_connection_tests_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package client_test

import (
"context"
"testing"
"time"

"github.com/kubeshop/tracetest/agent/client"
"github.com/kubeshop/tracetest/agent/client/mocks"
"github.com/kubeshop/tracetest/agent/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestDataStoreConnectionTestWorkflow(t *testing.T) {
server := mocks.NewGrpcServer()
defer server.Stop()

client, err := client.Connect(context.Background(), server.Addr())
require.NoError(t, err)

var receivedConnectionTestRequest *proto.DataStoreConnectionTestRequest
client.OnDataStoreTestConnectionRequest(func(ctx context.Context, otr *proto.DataStoreConnectionTestRequest) error {
receivedConnectionTestRequest = otr
return nil
})

err = client.Start(context.Background())
require.NoError(t, err)

connectionTestRequest := &proto.DataStoreConnectionTestRequest{
RequestID: "request-id",
}

server.SendDataStoreConnectionTestRequest(connectionTestRequest)

// ensures there's enough time for networking between server and client
time.Sleep(1 * time.Second)

assert.NotNil(t, receivedConnectionTestRequest)
assert.Equal(t, connectionTestRequest.RequestID, "request-id")
}
47 changes: 47 additions & 0 deletions agent/client/workflow_listen_for_otlp_connection_tests.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package client

import (
"context"
"fmt"
"log"
"time"

"github.com/kubeshop/tracetest/agent/proto"
)

func (c *Client) startOTLPConnectionTestListener(ctx context.Context) error {
client := proto.NewOrchestratorClient(c.conn)

stream, err := client.RegisterOTLPConnectionTestListener(ctx, c.sessionConfig.AgentIdentification)
if err != nil {
return fmt.Errorf("could not open agent stream: %w", err)
}

go func() {
for {
req := proto.OTLPConnectionTestRequest{}
err := stream.RecvMsg(&req)
if isEndOfFileError(err) || isCancelledError(err) {
return
}

reconnected, err := c.handleDisconnectionError(err)
if reconnected {
return
}

if err != nil {
log.Println("could not get message from otlp connection stream: %w", err)
time.Sleep(1 * time.Second)
continue
}

// TODO: Get ctx from request
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Philosophical question: is there a way to get the context from the request?
My impression is that there is no way to get these contexts due to the passive nature of getting the messages.

Could we create a context in the future and change it by monitoring the stream connection?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would need to propagate headers through the messages to be able to create a context from the request (propagate the otel headers)

err = c.otlpConnectionTestListener(context.Background(), &req)
if err != nil {
fmt.Println(err.Error())
}
}
}()
return nil
}
42 changes: 42 additions & 0 deletions agent/client/workflow_listen_for_otlp_connection_tests_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package client_test

import (
"context"
"testing"
"time"

"github.com/kubeshop/tracetest/agent/client"
"github.com/kubeshop/tracetest/agent/client/mocks"
"github.com/kubeshop/tracetest/agent/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestOtlpConnectionTestWorkflow(t *testing.T) {
server := mocks.NewGrpcServer()
defer server.Stop()

client, err := client.Connect(context.Background(), server.Addr())
require.NoError(t, err)

var receivedConnectionTestRequest *proto.OTLPConnectionTestRequest
client.OnOTLPConnectionTest(func(ctx context.Context, otr *proto.OTLPConnectionTestRequest) error {
receivedConnectionTestRequest = otr
return nil
})

err = client.Start(context.Background())
require.NoError(t, err)

connectionTestRequest := &proto.OTLPConnectionTestRequest{
RequestID: "request-id",
}

server.SendOTLPConnectionTestRequest(connectionTestRequest)

// ensures there's enough time for networking between server and client
time.Sleep(1 * time.Second)

assert.NotNil(t, receivedConnectionTestRequest)
assert.Equal(t, connectionTestRequest.RequestID, "request-id")
}
2 changes: 1 addition & 1 deletion agent/client/workflow_send_ds_connection_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func (c *Client) SendDataStoreConnectionResult(ctx context.Context, response *pr

_, err := client.SendDataStoreConnectionTestResult(ctx, response)
if err != nil {
return fmt.Errorf("could not send data store connection result request: %w", err)
return fmt.Errorf("could not send otlp connection result request: %w", err)
}

return nil
Expand Down
43 changes: 43 additions & 0 deletions agent/client/workflow_send_ds_connection_result_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package client_test

import (
"context"
"testing"

"github.com/kubeshop/tracetest/agent/client"
"github.com/kubeshop/tracetest/agent/client/mocks"
"github.com/kubeshop/tracetest/agent/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestDataStoreConnectionResult(t *testing.T) {
server := mocks.NewGrpcServer()
defer server.Stop()

client, err := client.Connect(context.Background(), server.Addr())
require.NoError(t, err)

err = client.Start(context.Background())
require.NoError(t, err)

result := &proto.DataStoreConnectionTestResponse{
RequestID: "request-id",
AgentIdentification: &proto.AgentIdentification{},
Successful: true,
Steps: &proto.DataStoreConnectionTestSteps{
PortCheck: &proto.DataStoreConnectionTestStep{
Passed: true,
},
},
}

err = client.SendDataStoreConnectionResult(context.Background(), result)
require.NoError(t, err)

receivedResponse := server.GetLastDataStoreConnectionResponse()

assert.Equal(t, result.RequestID, receivedResponse.RequestID)
assert.True(t, result.Successful)
assert.True(t, result.Steps.PortCheck.Passed)
}
21 changes: 21 additions & 0 deletions agent/client/workflow_send_otlp_connection_result.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package client

import (
"context"
"fmt"

"github.com/kubeshop/tracetest/agent/proto"
)

func (c *Client) SendOTLPConnectionResult(ctx context.Context, response *proto.OTLPConnectionTestResponse) error {
client := proto.NewOrchestratorClient(c.conn)

response.AgentIdentification = c.sessionConfig.AgentIdentification

_, err := client.SendOTLPConnectionTestResult(ctx, response)
if err != nil {
return fmt.Errorf("could not send otlp connection result request: %w", err)
}

return nil
}
41 changes: 41 additions & 0 deletions agent/client/workflow_send_otlp_connection_result_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package client_test

import (
"context"
"testing"
"time"

"github.com/kubeshop/tracetest/agent/client"
"github.com/kubeshop/tracetest/agent/client/mocks"
"github.com/kubeshop/tracetest/agent/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestOTLPConnectionResultTrace(t *testing.T) {
server := mocks.NewGrpcServer()
defer server.Stop()

client, err := client.Connect(context.Background(), server.Addr())
require.NoError(t, err)

err = client.Start(context.Background())
require.NoError(t, err)

now := time.Now()
result := &proto.OTLPConnectionTestResponse{
RequestID: "request-id",
AgentIdentification: &proto.AgentIdentification{},
SpanCount: 10,
LastSpanTimestamp: now.UnixMilli(),
}

err = client.SendOTLPConnectionResult(context.Background(), result)
require.NoError(t, err)

receivedResponse := server.GetLastOTLPConnectionResponse()

assert.Equal(t, result.RequestID, receivedResponse.RequestID)
assert.Equal(t, result.SpanCount, receivedResponse.SpanCount)
assert.Equal(t, result.LastSpanTimestamp, receivedResponse.LastSpanTimestamp)
}
Loading
Loading