From f6f5320a7a3bd9b2b60b879ebfb557d1b06955fa Mon Sep 17 00:00:00 2001 From: Tedi Mitiku Date: Fri, 8 Nov 2024 18:51:07 -0800 Subject: [PATCH] only wait on err chan --- .../persistent_volume_logs_database_client_test.go | 12 ++++++------ .../engine/server/engine_connect_server_service.go | 9 +++------ 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client_test.go b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client_test.go index 0f5197b1c5..6e3d77c2a6 100644 --- a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client_test.go +++ b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client_test.go @@ -721,15 +721,15 @@ func executeStreamCallAndGetReceivedServiceLogLines( case <-time.Tick(testTimeOut): return nil, stacktrace.NewError("Receiving stream logs in the test has reached the '%v' time out", testTimeOut) case streamErr, isChanOpen := <-errChan: - if !isChanOpen { - if len(userServiceLogsByUuidChan) == 0 { - shouldReceiveStream = false - } + if !isChanOpen && len(userServiceLogsByUuidChan) == 0 { + shouldReceiveStream = false break } - return nil, stacktrace.Propagate(streamErr, "Receiving streaming error.") + if isChanOpen && streamErr != nil { + return nil, stacktrace.Propagate(streamErr, "Receiving streaming error.") + } case userServiceLogsByUuid, isChanOpen := <-userServiceLogsByUuidChan: - if !isChanOpen && len(userServiceLogsByUuidChan) == 0 { + if !isChanOpen { shouldReceiveStream = false break } diff --git a/engine/server/engine/server/engine_connect_server_service.go b/engine/server/engine/server/engine_connect_server_service.go index 0003c5482d..281a80e9af 100644 --- a/engine/server/engine/server/engine_connect_server_service.go +++ b/engine/server/engine/server/engine_connect_server_service.go @@ -347,8 +347,7 @@ func (service *EngineConnectServerService) GetServiceLogs(ctx context.Context, c //stream case case serviceLogsByServiceUuid, isChanOpen := <-serviceLogsByServiceUuidChan: // If the channel is closed means that the logs database client won't continue sending streams - // but keep reading from it until the channel is empty - if !isChanOpen && len(serviceLogsByServiceUuidChan) == 0 { + if !isChanOpen { logrus.Debug("Exiting the stream loop after receiving a close signal from the service logs by service UUID channel") return nil } @@ -359,10 +358,8 @@ func (service *EngineConnectServerService) GetServiceLogs(ctx context.Context, c } //client cancel ctx case case <-contextWithCancel.Done(): - if len(serviceLogsByServiceUuidChan) == 0 { - logrus.Debug("The user service logs stream has done") - return nil - } + logrus.Debug("The user service logs stream is done.") + return nil //error from logs database case case err, isChanOpen := <-errChan: if isChanOpen {