diff --git a/go.mod b/go.mod index b30654b9..c33f28d9 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.22 require ( github.com/benbjohnson/clock v1.3.5 github.com/bufbuild/protoyaml-go v0.1.9 - github.com/eapache/channels v1.1.0 github.com/frostbyte73/core v0.0.12 github.com/fsnotify/fsnotify v1.7.0 github.com/gammazero/deque v0.2.1 @@ -15,7 +14,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/lithammer/shortuuid/v4 v4.0.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a + github.com/livekit/psrpc v0.6.0 github.com/mackerelio/go-osstat v0.2.4 github.com/maxbrunsfeld/counterfeiter/v6 v6.8.1 github.com/pion/logging v0.2.2 @@ -48,7 +47,6 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect - github.com/eapache/queue v1.1.0 // indirect github.com/google/cel-go v0.20.1 // indirect github.com/google/uuid v1.6.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect diff --git a/go.sum b/go.sum index 0c99c495..5f84071f 100644 --- a/go.sum +++ b/go.sum @@ -21,10 +21,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= -github.com/eapache/channels v1.1.0 h1:F1taHcn7/F0i8DYqKXJnyhJcVpp2kgFcNePxXtnyu4k= -github.com/eapache/channels v1.1.0/go.mod h1:jMm2qB5Ubtg9zLd+inMZd2/NUvXgzmWXsDaLyQIGfH0= -github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= -github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A= github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= @@ -88,8 +84,8 @@ github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= -github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a h1:EQAHmcYEGlc6V517cQ3Iy0+jHgP6+tM/B4l2vGuLpQo= -github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= +github.com/livekit/psrpc v0.6.0 h1:bLf39yD2IP92/C7104Q2ZYqauUpcn+fioBeVtfhXe+E= +github.com/livekit/psrpc v0.6.0/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= github.com/mackerelio/go-osstat v0.2.4/go.mod h1:Zy+qzGdZs3A9cuIqmgbJvwbmLQH9dJvtio5ZjJTbdlQ= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= diff --git a/replay/cloud_replay.pb.go b/replay/cloud_replay.pb.go index 7ff9a4b5..e9c71b39 100644 --- a/replay/cloud_replay.pb.go +++ b/replay/cloud_replay.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.34.2 -// protoc v5.28.0 +// protoc v5.27.3 // source: cloud_replay.proto package replay diff --git a/rpc/agent.psrpc.go b/rpc/agent.psrpc.go index c39e3b71..b658b3c3 100644 --- a/rpc/agent.psrpc.go +++ b/rpc/agent.psrpc.go @@ -1,4 +1,4 @@ -// Code generated by protoc-gen-psrpc v0.5.1, DO NOT EDIT. +// Code generated by protoc-gen-psrpc v0.6.0, DO NOT EDIT. // source: rpc/agent.proto package rpc @@ -16,7 +16,7 @@ import ( import google_protobuf "google.golang.org/protobuf/types/known/emptypb" import livekit2 "github.com/livekit/protocol/livekit" -var _ = version.PsrpcVersion_0_5 +var _ = version.PsrpcVersion_0_6 // ============================== // AgentInternal Client Interface @@ -30,6 +30,9 @@ type AgentInternalClient interface { JobTerminate(ctx context.Context, jobId string, req *JobTerminateRequest, opts ...psrpc.RequestOption) (*JobTerminateResponse, error) SubscribeWorkerRegistered(ctx context.Context, handlerNamespace string) (psrpc.Subscription[*google_protobuf.Empty], error) + + // Close immediately, without waiting for pending RPCs + Close() } // ================================== @@ -109,6 +112,10 @@ func (c *agentInternalClient) SubscribeWorkerRegistered(ctx context.Context, han return client.Join[*google_protobuf.Empty](ctx, c.client, "WorkerRegistered", []string{handlerNamespace}) } +func (s *agentInternalClient) Close() { + s.client.Close() +} + // ==================== // AgentInternal Server // ==================== diff --git a/rpc/agent_dispatch.psrpc.go b/rpc/agent_dispatch.psrpc.go index f0019ae6..f56fffc7 100644 --- a/rpc/agent_dispatch.psrpc.go +++ b/rpc/agent_dispatch.psrpc.go @@ -1,4 +1,4 @@ -// Code generated by protoc-gen-psrpc v0.5.1, DO NOT EDIT. +// Code generated by protoc-gen-psrpc v0.6.0, DO NOT EDIT. // source: rpc/agent_dispatch.proto package rpc @@ -15,7 +15,7 @@ import ( ) import livekit3 "github.com/livekit/protocol/livekit" -var _ = version.PsrpcVersion_0_5 +var _ = version.PsrpcVersion_0_6 // ====================================== // AgentDispatchInternal Client Interface @@ -27,6 +27,9 @@ type AgentDispatchInternalClient[RoomTopicType ~string] interface { DeleteDispatch(ctx context.Context, room RoomTopicType, req *livekit3.DeleteAgentDispatchRequest, opts ...psrpc.RequestOption) (*livekit3.AgentDispatch, error) ListDispatch(ctx context.Context, room RoomTopicType, req *livekit3.ListAgentDispatchRequest, opts ...psrpc.RequestOption) (*livekit3.ListAgentDispatchResponse, error) + + // Close immediately, without waiting for pending RPCs + Close() } // ========================================== @@ -103,6 +106,10 @@ func (c *agentDispatchInternalClient[RoomTopicType]) ListDispatch(ctx context.Co return client.RequestSingle[*livekit3.ListAgentDispatchResponse](ctx, c.client, "ListDispatch", []string{string(room)}, req, opts...) } +func (s *agentDispatchInternalClient[RoomTopicType]) Close() { + s.client.Close() +} + // ============================ // AgentDispatchInternal Server // ============================ diff --git a/rpc/egress.psrpc.go b/rpc/egress.psrpc.go index c54c01f4..e68930dc 100644 --- a/rpc/egress.psrpc.go +++ b/rpc/egress.psrpc.go @@ -1,4 +1,4 @@ -// Code generated by protoc-gen-psrpc v0.5.1, DO NOT EDIT. +// Code generated by protoc-gen-psrpc v0.6.0, DO NOT EDIT. // source: rpc/egress.proto package rpc @@ -15,7 +15,7 @@ import ( ) import livekit4 "github.com/livekit/protocol/livekit" -var _ = version.PsrpcVersion_0_5 +var _ = version.PsrpcVersion_0_6 // =============================== // EgressInternal Client Interface @@ -25,6 +25,9 @@ type EgressInternalClient interface { StartEgress(ctx context.Context, topic string, req *StartEgressRequest, opts ...psrpc.RequestOption) (*livekit4.EgressInfo, error) ListActiveEgress(ctx context.Context, topic string, req *ListActiveEgressRequest, opts ...psrpc.RequestOption) (<-chan *psrpc.Response[*ListActiveEgressResponse], error) + + // Close immediately, without waiting for pending RPCs + Close() } // =================================== @@ -91,6 +94,10 @@ func (c *egressInternalClient) ListActiveEgress(ctx context.Context, topic strin return client.RequestMulti[*ListActiveEgressResponse](ctx, c.client, "ListActiveEgress", []string{topic}, req, opts...) } +func (s *egressInternalClient) Close() { + s.client.Close() +} + // ===================== // EgressInternal Server // ===================== @@ -150,6 +157,9 @@ type EgressHandlerClient interface { UpdateStream(ctx context.Context, topic string, req *livekit4.UpdateStreamRequest, opts ...psrpc.RequestOption) (*livekit4.EgressInfo, error) StopEgress(ctx context.Context, topic string, req *livekit4.StopEgressRequest, opts ...psrpc.RequestOption) (*livekit4.EgressInfo, error) + + // Close immediately, without waiting for pending RPCs + Close() } // ================================== @@ -215,6 +225,10 @@ func (c *egressHandlerClient) StopEgress(ctx context.Context, topic string, req return client.RequestSingle[*livekit4.EgressInfo](ctx, c.client, "StopEgress", []string{topic}, req, opts...) } +func (s *egressHandlerClient) Close() { + s.client.Close() +} + // ==================== // EgressHandler Server // ==================== diff --git a/rpc/egress_client.go b/rpc/egress_client.go index db7707c9..7125d385 100644 --- a/rpc/egress_client.go +++ b/rpc/egress_client.go @@ -109,3 +109,8 @@ func (c *egressClient) StartEgress(ctx context.Context, topic string, req *Start }, opts...) return c.EgressInternalClient.StartEgress(ctx, topic, req, o...) } + +func (c *egressClient) Close() { + c.EgressInternalClient.Close() + c.EgressHandlerClient.Close() +} diff --git a/rpc/ingress.psrpc.go b/rpc/ingress.psrpc.go index 87f3fd56..f2af0fb0 100644 --- a/rpc/ingress.psrpc.go +++ b/rpc/ingress.psrpc.go @@ -1,4 +1,4 @@ -// Code generated by protoc-gen-psrpc v0.5.1, DO NOT EDIT. +// Code generated by protoc-gen-psrpc v0.6.0, DO NOT EDIT. // source: rpc/ingress.proto package rpc @@ -16,7 +16,7 @@ import ( import google_protobuf "google.golang.org/protobuf/types/known/emptypb" import livekit5 "github.com/livekit/protocol/livekit" -var _ = version.PsrpcVersion_0_5 +var _ = version.PsrpcVersion_0_6 // ================================ // IngressInternal Client Interface @@ -28,6 +28,9 @@ type IngressInternalClient interface { ListActiveIngress(ctx context.Context, topic string, req *ListActiveIngressRequest, opts ...psrpc.RequestOption) (<-chan *psrpc.Response[*ListActiveIngressResponse], error) KillIngressSession(ctx context.Context, ingressId string, resourceId string, req *KillIngressSessionRequest, opts ...psrpc.RequestOption) (*google_protobuf.Empty, error) + + // Close immediately, without waiting for pending RPCs + Close() } // ==================================== @@ -101,6 +104,10 @@ func (c *ingressInternalClient) KillIngressSession(ctx context.Context, ingressI return client.RequestSingle[*google_protobuf.Empty](ctx, c.client, "KillIngressSession", []string{ingressId, resourceId}, req, opts...) } +func (s *ingressInternalClient) Close() { + s.client.Close() +} + // ====================== // IngressInternal Server // ====================== @@ -172,6 +179,9 @@ type IngressHandlerClient interface { DeleteWHIPResource(ctx context.Context, topic string, req *DeleteWHIPResourceRequest, opts ...psrpc.RequestOption) (*google_protobuf.Empty, error) ICERestartWHIPResource(ctx context.Context, topic string, req *ICERestartWHIPResourceRequest, opts ...psrpc.RequestOption) (*ICERestartWHIPResourceResponse, error) + + // Close immediately, without waiting for pending RPCs + Close() } // =================================== @@ -255,6 +265,10 @@ func (c *ingressHandlerClient) ICERestartWHIPResource(ctx context.Context, topic return client.RequestSingle[*ICERestartWHIPResourceResponse](ctx, c.client, "ICERestartWHIPResource", []string{topic}, req, opts...) } +func (s *ingressHandlerClient) Close() { + s.client.Close() +} + // ===================== // IngressHandler Server // ===================== diff --git a/rpc/ingress_client.go b/rpc/ingress_client.go index 7e2c1688..88b20d66 100644 --- a/rpc/ingress_client.go +++ b/rpc/ingress_client.go @@ -43,3 +43,8 @@ func NewIngressClient(params ClientParams) (IngressClient, error) { IngressHandlerClient: handlerClient, }, nil } + +func (c *ingressClient) Close() { + c.IngressInternalClient.Close() + c.IngressHandlerClient.Close() +} diff --git a/rpc/io.psrpc.go b/rpc/io.psrpc.go index 693e0b9d..d14eb69e 100644 --- a/rpc/io.psrpc.go +++ b/rpc/io.psrpc.go @@ -1,4 +1,4 @@ -// Code generated by protoc-gen-psrpc v0.5.1, DO NOT EDIT. +// Code generated by protoc-gen-psrpc v0.6.0, DO NOT EDIT. // source: rpc/io.proto package rpc @@ -17,7 +17,7 @@ import google_protobuf "google.golang.org/protobuf/types/known/emptypb" import livekit4 "github.com/livekit/protocol/livekit" import livekit5 "github.com/livekit/protocol/livekit" -var _ = version.PsrpcVersion_0_5 +var _ = version.PsrpcVersion_0_6 // ======================= // IOInfo Client Interface @@ -46,6 +46,9 @@ type IOInfoClient interface { GetSIPTrunkAuthentication(ctx context.Context, req *GetSIPTrunkAuthenticationRequest, opts ...psrpc.RequestOption) (*GetSIPTrunkAuthenticationResponse, error) EvaluateSIPDispatchRules(ctx context.Context, req *EvaluateSIPDispatchRulesRequest, opts ...psrpc.RequestOption) (*EvaluateSIPDispatchRulesResponse, error) + + // Close immediately, without waiting for pending RPCs + Close() } // =========================== @@ -166,6 +169,10 @@ func (c *iOInfoClient) EvaluateSIPDispatchRules(ctx context.Context, req *Evalua return client.RequestSingle[*EvaluateSIPDispatchRulesResponse](ctx, c.client, "EvaluateSIPDispatchRules", nil, req, opts...) } +func (s *iOInfoClient) Close() { + s.client.Close() +} + // ============= // IOInfo Server // ============= diff --git a/rpc/keepalive.psrpc.go b/rpc/keepalive.psrpc.go index fb55cc79..36e69d21 100644 --- a/rpc/keepalive.psrpc.go +++ b/rpc/keepalive.psrpc.go @@ -1,4 +1,4 @@ -// Code generated by protoc-gen-psrpc v0.5.1, DO NOT EDIT. +// Code generated by protoc-gen-psrpc v0.6.0, DO NOT EDIT. // source: rpc/keepalive.proto package rpc @@ -14,7 +14,7 @@ import ( "github.com/livekit/psrpc/version" ) -var _ = version.PsrpcVersion_0_5 +var _ = version.PsrpcVersion_0_6 // ========================== // Keepalive Client Interface @@ -22,6 +22,9 @@ var _ = version.PsrpcVersion_0_5 type KeepaliveClient[NodeIDTopicType ~string] interface { SubscribePing(ctx context.Context, nodeID NodeIDTopicType) (psrpc.Subscription[*KeepalivePing], error) + + // Close immediately, without waiting for pending RPCs + Close() } // ============================== @@ -76,6 +79,10 @@ func (c *keepaliveClient[NodeIDTopicType]) SubscribePing(ctx context.Context, no return client.Join[*KeepalivePing](ctx, c.client, "Ping", []string{string(nodeID)}) } +func (s *keepaliveClient[NodeIDTopicType]) Close() { + s.client.Close() +} + // ================ // Keepalive Server // ================ diff --git a/rpc/participant.psrpc.go b/rpc/participant.psrpc.go index be1e599f..c59ad874 100644 --- a/rpc/participant.psrpc.go +++ b/rpc/participant.psrpc.go @@ -1,4 +1,4 @@ -// Code generated by protoc-gen-psrpc v0.5.1, DO NOT EDIT. +// Code generated by protoc-gen-psrpc v0.6.0, DO NOT EDIT. // source: rpc/participant.proto package rpc @@ -16,7 +16,7 @@ import ( import livekit1 "github.com/livekit/protocol/livekit" import livekit6 "github.com/livekit/protocol/livekit" -var _ = version.PsrpcVersion_0_5 +var _ = version.PsrpcVersion_0_6 // ============================ // Participant Client Interface @@ -30,6 +30,9 @@ type ParticipantClient[ParticipantTopicType ~string] interface { UpdateParticipant(ctx context.Context, participant ParticipantTopicType, req *livekit6.UpdateParticipantRequest, opts ...psrpc.RequestOption) (*livekit1.ParticipantInfo, error) UpdateSubscriptions(ctx context.Context, participant ParticipantTopicType, req *livekit6.UpdateSubscriptionsRequest, opts ...psrpc.RequestOption) (*livekit6.UpdateSubscriptionsResponse, error) + + // Close immediately, without waiting for pending RPCs + Close() } // ================================ @@ -115,6 +118,10 @@ func (c *participantClient[ParticipantTopicType]) UpdateSubscriptions(ctx contex return client.RequestSingle[*livekit6.UpdateSubscriptionsResponse](ctx, c.client, "UpdateSubscriptions", []string{string(participant)}, req, opts...) } +func (s *participantClient[ParticipantTopicType]) Close() { + s.client.Close() +} + // ================== // Participant Server // ================== diff --git a/rpc/room.psrpc.go b/rpc/room.psrpc.go index 0da44f91..a29ef514 100644 --- a/rpc/room.psrpc.go +++ b/rpc/room.psrpc.go @@ -1,4 +1,4 @@ -// Code generated by protoc-gen-psrpc v0.5.1, DO NOT EDIT. +// Code generated by protoc-gen-psrpc v0.6.0, DO NOT EDIT. // source: rpc/room.proto package rpc @@ -16,7 +16,7 @@ import ( import livekit1 "github.com/livekit/protocol/livekit" import livekit6 "github.com/livekit/protocol/livekit" -var _ = version.PsrpcVersion_0_5 +var _ = version.PsrpcVersion_0_6 // ===================== // Room Client Interface @@ -28,6 +28,9 @@ type RoomClient[RoomTopicType ~string] interface { SendData(ctx context.Context, room RoomTopicType, req *livekit6.SendDataRequest, opts ...psrpc.RequestOption) (*livekit6.SendDataResponse, error) UpdateRoomMetadata(ctx context.Context, room RoomTopicType, req *livekit6.UpdateRoomMetadataRequest, opts ...psrpc.RequestOption) (*livekit1.Room, error) + + // Close immediately, without waiting for pending RPCs + Close() } // ========================= @@ -104,6 +107,10 @@ func (c *roomClient[RoomTopicType]) UpdateRoomMetadata(ctx context.Context, room return client.RequestSingle[*livekit1.Room](ctx, c.client, "UpdateRoomMetadata", []string{string(room)}, req, opts...) } +func (s *roomClient[RoomTopicType]) Close() { + s.client.Close() +} + // =========== // Room Server // =========== diff --git a/rpc/roommanager.psrpc.go b/rpc/roommanager.psrpc.go index c8bbb9d9..4b532cbc 100644 --- a/rpc/roommanager.psrpc.go +++ b/rpc/roommanager.psrpc.go @@ -1,4 +1,4 @@ -// Code generated by protoc-gen-psrpc v0.5.1, DO NOT EDIT. +// Code generated by protoc-gen-psrpc v0.6.0, DO NOT EDIT. // source: rpc/roommanager.proto package rpc @@ -16,7 +16,7 @@ import ( import livekit1 "github.com/livekit/protocol/livekit" import livekit6 "github.com/livekit/protocol/livekit" -var _ = version.PsrpcVersion_0_5 +var _ = version.PsrpcVersion_0_6 // ============================ // RoomManager Client Interface @@ -24,6 +24,9 @@ var _ = version.PsrpcVersion_0_5 type RoomManagerClient[NodeIdTopicType ~string] interface { CreateRoom(ctx context.Context, nodeId NodeIdTopicType, req *livekit6.CreateRoomRequest, opts ...psrpc.RequestOption) (*livekit1.Room, error) + + // Close immediately, without waiting for pending RPCs + Close() } // ================================ @@ -82,6 +85,10 @@ func (c *roomManagerClient[NodeIdTopicType]) CreateRoom(ctx context.Context, nod return client.RequestSingle[*livekit1.Room](ctx, c.client, "CreateRoom", []string{string(nodeId)}, req, opts...) } +func (s *roomManagerClient[NodeIdTopicType]) Close() { + s.client.Close() +} + // ================== // RoomManager Server // ================== diff --git a/rpc/rpcfakes/fake_keepalive_pub_sub.go b/rpc/rpcfakes/fake_keepalive_pub_sub.go index a5a385f8..5e3eecfc 100644 --- a/rpc/rpcfakes/fake_keepalive_pub_sub.go +++ b/rpc/rpcfakes/fake_keepalive_pub_sub.go @@ -11,6 +11,10 @@ import ( ) type FakeKeepalivePubSub struct { + CloseStub func() + closeMutex sync.RWMutex + closeArgsForCall []struct { + } KillStub func() killMutex sync.RWMutex killArgsForCall []struct { @@ -50,6 +54,30 @@ type FakeKeepalivePubSub struct { invocationsMutex sync.RWMutex } +func (fake *FakeKeepalivePubSub) Close() { + fake.closeMutex.Lock() + fake.closeArgsForCall = append(fake.closeArgsForCall, struct { + }{}) + stub := fake.CloseStub + fake.recordInvocation("Close", []interface{}{}) + fake.closeMutex.Unlock() + if stub != nil { + fake.CloseStub() + } +} + +func (fake *FakeKeepalivePubSub) CloseCallCount() int { + fake.closeMutex.RLock() + defer fake.closeMutex.RUnlock() + return len(fake.closeArgsForCall) +} + +func (fake *FakeKeepalivePubSub) CloseCalls(stub func()) { + fake.closeMutex.Lock() + defer fake.closeMutex.Unlock() + fake.CloseStub = stub +} + func (fake *FakeKeepalivePubSub) Kill() { fake.killMutex.Lock() fake.killArgsForCall = append(fake.killArgsForCall, struct { @@ -229,6 +257,8 @@ func (fake *FakeKeepalivePubSub) SubscribePingReturnsOnCall(i int, result1 psrpc func (fake *FakeKeepalivePubSub) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() + fake.closeMutex.RLock() + defer fake.closeMutex.RUnlock() fake.killMutex.RLock() defer fake.killMutex.RUnlock() fake.publishPingMutex.RLock() diff --git a/rpc/rpcfakes/fake_typed_participant_client.go b/rpc/rpcfakes/fake_typed_participant_client.go index e15e543d..748eeb31 100644 --- a/rpc/rpcfakes/fake_typed_participant_client.go +++ b/rpc/rpcfakes/fake_typed_participant_client.go @@ -11,6 +11,10 @@ import ( ) type FakeTypedParticipantClient struct { + CloseStub func() + closeMutex sync.RWMutex + closeArgsForCall []struct { + } MutePublishedTrackStub func(context.Context, rpc.ParticipantTopic, *livekit.MuteRoomTrackRequest, ...psrpc.RequestOption) (*livekit.MuteRoomTrackResponse, error) mutePublishedTrackMutex sync.RWMutex mutePublishedTrackArgsForCall []struct { @@ -79,6 +83,30 @@ type FakeTypedParticipantClient struct { invocationsMutex sync.RWMutex } +func (fake *FakeTypedParticipantClient) Close() { + fake.closeMutex.Lock() + fake.closeArgsForCall = append(fake.closeArgsForCall, struct { + }{}) + stub := fake.CloseStub + fake.recordInvocation("Close", []interface{}{}) + fake.closeMutex.Unlock() + if stub != nil { + fake.CloseStub() + } +} + +func (fake *FakeTypedParticipantClient) CloseCallCount() int { + fake.closeMutex.RLock() + defer fake.closeMutex.RUnlock() + return len(fake.closeArgsForCall) +} + +func (fake *FakeTypedParticipantClient) CloseCalls(stub func()) { + fake.closeMutex.Lock() + defer fake.closeMutex.Unlock() + fake.CloseStub = stub +} + func (fake *FakeTypedParticipantClient) MutePublishedTrack(arg1 context.Context, arg2 rpc.ParticipantTopic, arg3 *livekit.MuteRoomTrackRequest, arg4 ...psrpc.RequestOption) (*livekit.MuteRoomTrackResponse, error) { fake.mutePublishedTrackMutex.Lock() ret, specificReturn := fake.mutePublishedTrackReturnsOnCall[len(fake.mutePublishedTrackArgsForCall)] @@ -350,6 +378,8 @@ func (fake *FakeTypedParticipantClient) UpdateSubscriptionsReturnsOnCall(i int, func (fake *FakeTypedParticipantClient) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() + fake.closeMutex.RLock() + defer fake.closeMutex.RUnlock() fake.mutePublishedTrackMutex.RLock() defer fake.mutePublishedTrackMutex.RUnlock() fake.removeParticipantMutex.RLock() diff --git a/rpc/rpcfakes/fake_typed_room_client.go b/rpc/rpcfakes/fake_typed_room_client.go index 0a8d0615..2539080b 100644 --- a/rpc/rpcfakes/fake_typed_room_client.go +++ b/rpc/rpcfakes/fake_typed_room_client.go @@ -11,6 +11,10 @@ import ( ) type FakeTypedRoomClient struct { + CloseStub func() + closeMutex sync.RWMutex + closeArgsForCall []struct { + } DeleteRoomStub func(context.Context, rpc.RoomTopic, *livekit.DeleteRoomRequest, ...psrpc.RequestOption) (*livekit.DeleteRoomResponse, error) deleteRoomMutex sync.RWMutex deleteRoomArgsForCall []struct { @@ -63,6 +67,30 @@ type FakeTypedRoomClient struct { invocationsMutex sync.RWMutex } +func (fake *FakeTypedRoomClient) Close() { + fake.closeMutex.Lock() + fake.closeArgsForCall = append(fake.closeArgsForCall, struct { + }{}) + stub := fake.CloseStub + fake.recordInvocation("Close", []interface{}{}) + fake.closeMutex.Unlock() + if stub != nil { + fake.CloseStub() + } +} + +func (fake *FakeTypedRoomClient) CloseCallCount() int { + fake.closeMutex.RLock() + defer fake.closeMutex.RUnlock() + return len(fake.closeArgsForCall) +} + +func (fake *FakeTypedRoomClient) CloseCalls(stub func()) { + fake.closeMutex.Lock() + defer fake.closeMutex.Unlock() + fake.CloseStub = stub +} + func (fake *FakeTypedRoomClient) DeleteRoom(arg1 context.Context, arg2 rpc.RoomTopic, arg3 *livekit.DeleteRoomRequest, arg4 ...psrpc.RequestOption) (*livekit.DeleteRoomResponse, error) { fake.deleteRoomMutex.Lock() ret, specificReturn := fake.deleteRoomReturnsOnCall[len(fake.deleteRoomArgsForCall)] @@ -267,6 +295,8 @@ func (fake *FakeTypedRoomClient) UpdateRoomMetadataReturnsOnCall(i int, result1 func (fake *FakeTypedRoomClient) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() + fake.closeMutex.RLock() + defer fake.closeMutex.RUnlock() fake.deleteRoomMutex.RLock() defer fake.deleteRoomMutex.RUnlock() fake.sendDataMutex.RLock() diff --git a/rpc/signal.psrpc.go b/rpc/signal.psrpc.go index 61b1109b..36e7c0ae 100644 --- a/rpc/signal.psrpc.go +++ b/rpc/signal.psrpc.go @@ -1,4 +1,4 @@ -// Code generated by protoc-gen-psrpc v0.5.1, DO NOT EDIT. +// Code generated by protoc-gen-psrpc v0.6.0, DO NOT EDIT. // source: rpc/signal.proto package rpc @@ -14,7 +14,7 @@ import ( "github.com/livekit/psrpc/version" ) -var _ = version.PsrpcVersion_0_5 +var _ = version.PsrpcVersion_0_6 // ======================= // Signal Client Interface @@ -22,6 +22,9 @@ var _ = version.PsrpcVersion_0_5 type SignalClient[NodeIdTopicType ~string] interface { RelaySignal(ctx context.Context, nodeId NodeIdTopicType, opts ...psrpc.RequestOption) (psrpc.ClientStream[*RelaySignalRequest, *RelaySignalResponse], error) + + // Close immediately, without waiting for pending RPCs + Close() } // =========================== @@ -80,6 +83,10 @@ func (c *signalClient[NodeIdTopicType]) RelaySignal(ctx context.Context, nodeId return client.OpenStream[*RelaySignalRequest, *RelaySignalResponse](ctx, c.client, "RelaySignal", []string{string(nodeId)}, opts...) } +func (s *signalClient[NodeIdTopicType]) Close() { + s.client.Close() +} + // ============= // Signal Server // ============= diff --git a/rpc/sip.psrpc.go b/rpc/sip.psrpc.go index 9f8ff41f..37b9f3c2 100644 --- a/rpc/sip.psrpc.go +++ b/rpc/sip.psrpc.go @@ -1,4 +1,4 @@ -// Code generated by protoc-gen-psrpc v0.5.1, DO NOT EDIT. +// Code generated by protoc-gen-psrpc v0.6.0, DO NOT EDIT. // source: rpc/sip.proto package rpc @@ -15,7 +15,7 @@ import ( ) import google_protobuf "google.golang.org/protobuf/types/known/emptypb" -var _ = version.PsrpcVersion_0_5 +var _ = version.PsrpcVersion_0_6 // ============================ // SIPInternal Client Interface @@ -25,6 +25,9 @@ type SIPInternalClient interface { CreateSIPParticipant(ctx context.Context, topic string, req *InternalCreateSIPParticipantRequest, opts ...psrpc.RequestOption) (*InternalCreateSIPParticipantResponse, error) TransferSIPParticipant(ctx context.Context, sipCallId string, req *InternalTransferSIPParticipantRequest, opts ...psrpc.RequestOption) (*google_protobuf.Empty, error) + + // Close immediately, without waiting for pending RPCs + Close() } // ================================ @@ -91,6 +94,10 @@ func (c *sIPInternalClient) TransferSIPParticipant(ctx context.Context, sipCallI return client.RequestSingle[*google_protobuf.Empty](ctx, c.client, "TransferSIPParticipant", []string{sipCallId}, req, opts...) } +func (s *sIPInternalClient) Close() { + s.client.Close() +} + // ================== // SIPInternal Server // ==================