From d2e8c3bd0b9c55c1056632c6dad10f23817fbb11 Mon Sep 17 00:00:00 2001 From: Jozef Kralik Date: Fri, 8 Oct 2021 09:36:16 +0200 Subject: [PATCH] resource-directory: allow drop event when buffer of subscription is exhausted --- resource-directory/service/subscriptions.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/resource-directory/service/subscriptions.go b/resource-directory/service/subscriptions.go index 2d900f650..3e1c1cbfb 100644 --- a/resource-directory/service/subscriptions.go +++ b/resource-directory/service/subscriptions.go @@ -682,7 +682,7 @@ func (s *subscriptions) SubscribeForEvents(resourceProjection *Projection, srv p } }() - sendChan := make(chan pb.Event, 16) + sendChan := make(chan pb.Event, 1024) go func() { defer wg.Done() for { @@ -705,9 +705,11 @@ func (s *subscriptions) SubscribeForEvents(resourceProjection *Projection, srv p case sendChan <- e: return nil case <-ctx.Done(): - return fmt.Errorf("cannot send event: stream context returns error: %v", ctx.Err()) + return fmt.Errorf("cannot send event (%+v) for subscription(%v): stream context returns error: %v", e.GetType(), e.GetSubscriptionId(), ctx.Err()) case <-senderCtx.Done(): - return fmt.Errorf("cannot send event: sender context returns error: %v", ctx.Err()) + return fmt.Errorf("cannot send event (%+v) for subscription(%v): sender context returns error: %v", e.GetType(), e.GetSubscriptionId(), ctx.Err()) + default: + return fmt.Errorf("cannot send event (%+v) for subscription(%v): buffer it exhausted", e.GetType(), e.GetSubscriptionId()) } }