From a3cb68b3eccdf11e68454b6b45c9db235c43b0cb Mon Sep 17 00:00:00 2001 From: Oscar Reyes Date: Wed, 13 Sep 2023 14:27:20 -0600 Subject: [PATCH] feat: Agent Ping --- agent/client/client.go | 2 ++ agent/client/workflow_ping.go | 23 ++++++++++++++++ agent/proto/orchestrator.pb.go | 36 ++++++++++++++----------- agent/proto/orchestrator.proto | 3 +++ agent/proto/orchestrator_grpc.pb.go | 41 ++++++++++++++++++++++++++++- 5 files changed, 89 insertions(+), 16 deletions(-) create mode 100644 agent/client/workflow_ping.go diff --git a/agent/client/client.go b/agent/client/client.go index a68028681c..c676cdcaac 100644 --- a/agent/client/client.go +++ b/agent/client/client.go @@ -61,6 +61,8 @@ func (c *Client) Start(ctx context.Context) error { return err } + c.startHearthBeat(ctx) + return nil } diff --git a/agent/client/workflow_ping.go b/agent/client/workflow_ping.go new file mode 100644 index 0000000000..923f0caaa0 --- /dev/null +++ b/agent/client/workflow_ping.go @@ -0,0 +1,23 @@ +package client + +import ( + "context" + "fmt" + "time" + + "github.com/kubeshop/tracetest/agent/proto" +) + +func (c *Client) startHearthBeat(ctx context.Context) error { + client := proto.NewOrchestratorClient(c.conn) + ticker := time.NewTicker(2 * time.Second) + + go func() { + for range ticker.C { + fmt.Println("@@PING") + client.Ping(ctx, c.sessionConfig.AgentIdentification) + } + }() + + return nil +} diff --git a/agent/proto/orchestrator.pb.go b/agent/proto/orchestrator.pb.go index 8c0b8d2cb0..a38d29ef10 100644 --- a/agent/proto/orchestrator.pb.go +++ b/agent/proto/orchestrator.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 -// protoc v3.21.12 +// protoc-gen-go v1.31.0 +// protoc v4.24.2 // source: proto/orchestrator.proto package proto @@ -3138,7 +3138,7 @@ var file_proto_orchestrator_proto_rawDesc = []byte{ 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6f, 0x66, 0x66, 0x73, - 0x65, 0x74, 0x32, 0xb6, 0x03, 0x0a, 0x0c, 0x4f, 0x72, 0x63, 0x68, 0x65, 0x73, 0x74, 0x72, 0x61, + 0x65, 0x74, 0x32, 0xea, 0x03, 0x0a, 0x0c, 0x4f, 0x72, 0x63, 0x68, 0x65, 0x73, 0x74, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x12, 0x3d, 0x0a, 0x07, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x12, 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x41, 0x67, @@ -3165,10 +3165,14 @@ var file_proto_orchestrator_proto_rawDesc = []byte{ 0x6e, 0x65, 0x72, 0x12, 0x1a, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x00, 0x30, 0x01, 0x42, 0x2b, 0x5a, 0x29, 0x67, - 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6b, 0x75, 0x62, 0x65, 0x73, 0x68, - 0x6f, 0x70, 0x2f, 0x74, 0x72, 0x61, 0x63, 0x65, 0x74, 0x65, 0x73, 0x74, 0x2f, 0x61, 0x67, 0x65, - 0x6e, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x00, 0x30, 0x01, 0x12, 0x32, 0x0a, 0x04, 0x50, + 0x69, 0x6e, 0x67, 0x12, 0x1a, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x41, 0x67, 0x65, 0x6e, + 0x74, 0x49, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x1a, + 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x42, + 0x2b, 0x5a, 0x29, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6b, 0x75, + 0x62, 0x65, 0x73, 0x68, 0x6f, 0x70, 0x2f, 0x74, 0x72, 0x61, 0x63, 0x65, 0x74, 0x65, 0x73, 0x74, + 0x2f, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -3280,14 +3284,16 @@ var file_proto_orchestrator_proto_depIdxs = []int32{ 5, // 49: proto.Orchestrator.RegisterPollerAgent:input_type -> proto.AgentIdentification 34, // 50: proto.Orchestrator.SendPolledSpans:input_type -> proto.PollingResponse 5, // 51: proto.Orchestrator.RegisterShutdownListener:input_type -> proto.AgentIdentification - 2, // 52: proto.Orchestrator.Connect:output_type -> proto.AgentConfiguration - 6, // 53: proto.Orchestrator.RegisterTriggerAgent:output_type -> proto.TriggerRequest - 0, // 54: proto.Orchestrator.SendTriggerResult:output_type -> proto.Empty - 22, // 55: proto.Orchestrator.RegisterPollerAgent:output_type -> proto.PollingRequest - 0, // 56: proto.Orchestrator.SendPolledSpans:output_type -> proto.Empty - 4, // 57: proto.Orchestrator.RegisterShutdownListener:output_type -> proto.ShutdownRequest - 52, // [52:58] is the sub-list for method output_type - 46, // [46:52] is the sub-list for method input_type + 5, // 52: proto.Orchestrator.Ping:input_type -> proto.AgentIdentification + 2, // 53: proto.Orchestrator.Connect:output_type -> proto.AgentConfiguration + 6, // 54: proto.Orchestrator.RegisterTriggerAgent:output_type -> proto.TriggerRequest + 0, // 55: proto.Orchestrator.SendTriggerResult:output_type -> proto.Empty + 22, // 56: proto.Orchestrator.RegisterPollerAgent:output_type -> proto.PollingRequest + 0, // 57: proto.Orchestrator.SendPolledSpans:output_type -> proto.Empty + 4, // 58: proto.Orchestrator.RegisterShutdownListener:output_type -> proto.ShutdownRequest + 0, // 59: proto.Orchestrator.Ping:output_type -> proto.Empty + 53, // [53:60] is the sub-list for method output_type + 46, // [46:53] is the sub-list for method input_type 46, // [46:46] is the sub-list for extension type_name 46, // [46:46] is the sub-list for extension extendee 0, // [0:46] is the sub-list for field type_name diff --git a/agent/proto/orchestrator.proto b/agent/proto/orchestrator.proto index f07dd654b8..efa0f7b135 100644 --- a/agent/proto/orchestrator.proto +++ b/agent/proto/orchestrator.proto @@ -25,6 +25,9 @@ service Orchestrator { // Register an agent to listen for shutdown commands rpc RegisterShutdownListener(AgentIdentification) returns (stream ShutdownRequest) {} + + // Ping is used to check if the agent is still connected + rpc Ping(AgentIdentification) returns (Empty) {} } // Empty message for endpoints that don't return anything diff --git a/agent/proto/orchestrator_grpc.pb.go b/agent/proto/orchestrator_grpc.pb.go index 63c2b490dd..c42f870bf5 100644 --- a/agent/proto/orchestrator_grpc.pb.go +++ b/agent/proto/orchestrator_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v3.21.12 +// - protoc v4.24.2 // source: proto/orchestrator.proto package proto @@ -25,6 +25,7 @@ const ( Orchestrator_RegisterPollerAgent_FullMethodName = "/proto.Orchestrator/RegisterPollerAgent" Orchestrator_SendPolledSpans_FullMethodName = "/proto.Orchestrator/SendPolledSpans" Orchestrator_RegisterShutdownListener_FullMethodName = "/proto.Orchestrator/RegisterShutdownListener" + Orchestrator_Ping_FullMethodName = "/proto.Orchestrator/Ping" ) // OrchestratorClient is the client API for Orchestrator service. @@ -45,6 +46,8 @@ type OrchestratorClient interface { SendPolledSpans(ctx context.Context, in *PollingResponse, opts ...grpc.CallOption) (*Empty, error) // Register an agent to listen for shutdown commands RegisterShutdownListener(ctx context.Context, in *AgentIdentification, opts ...grpc.CallOption) (Orchestrator_RegisterShutdownListenerClient, error) + // Ping is used to check if the agent is still connected + Ping(ctx context.Context, in *AgentIdentification, opts ...grpc.CallOption) (*Empty, error) } type orchestratorClient struct { @@ -178,6 +181,15 @@ func (x *orchestratorRegisterShutdownListenerClient) Recv() (*ShutdownRequest, e return m, nil } +func (c *orchestratorClient) Ping(ctx context.Context, in *AgentIdentification, opts ...grpc.CallOption) (*Empty, error) { + out := new(Empty) + err := c.cc.Invoke(ctx, Orchestrator_Ping_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // OrchestratorServer is the server API for Orchestrator service. // All implementations must embed UnimplementedOrchestratorServer // for forward compatibility @@ -196,6 +208,8 @@ type OrchestratorServer interface { SendPolledSpans(context.Context, *PollingResponse) (*Empty, error) // Register an agent to listen for shutdown commands RegisterShutdownListener(*AgentIdentification, Orchestrator_RegisterShutdownListenerServer) error + // Ping is used to check if the agent is still connected + Ping(context.Context, *AgentIdentification) (*Empty, error) mustEmbedUnimplementedOrchestratorServer() } @@ -221,6 +235,9 @@ func (UnimplementedOrchestratorServer) SendPolledSpans(context.Context, *Polling func (UnimplementedOrchestratorServer) RegisterShutdownListener(*AgentIdentification, Orchestrator_RegisterShutdownListenerServer) error { return status.Errorf(codes.Unimplemented, "method RegisterShutdownListener not implemented") } +func (UnimplementedOrchestratorServer) Ping(context.Context, *AgentIdentification) (*Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented") +} func (UnimplementedOrchestratorServer) mustEmbedUnimplementedOrchestratorServer() {} // UnsafeOrchestratorServer may be embedded to opt out of forward compatibility for this service. @@ -351,6 +368,24 @@ func (x *orchestratorRegisterShutdownListenerServer) Send(m *ShutdownRequest) er return x.ServerStream.SendMsg(m) } +func _Orchestrator_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(AgentIdentification) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(OrchestratorServer).Ping(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Orchestrator_Ping_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(OrchestratorServer).Ping(ctx, req.(*AgentIdentification)) + } + return interceptor(ctx, in, info, handler) +} + // Orchestrator_ServiceDesc is the grpc.ServiceDesc for Orchestrator service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -370,6 +405,10 @@ var Orchestrator_ServiceDesc = grpc.ServiceDesc{ MethodName: "SendPolledSpans", Handler: _Orchestrator_SendPolledSpans_Handler, }, + { + MethodName: "Ping", + Handler: _Orchestrator_Ping_Handler, + }, }, Streams: []grpc.StreamDesc{ {