From fd57ecd49631d2c318af4eb871c80296ffae3835 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Fri, 13 Sep 2024 16:30:54 +0200 Subject: [PATCH] update conduit-commons --- go.mod | 2 +- go.sum | 2 ++ pkg/lifecycle/service.go | 15 +++++++-------- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index ce70fe7f9..e01f30e06 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/Masterminds/sprig/v3 v3.3.0 github.com/NYTimes/gziphandler v1.1.1 github.com/bufbuild/buf v1.41.0 - github.com/conduitio/conduit-commons v0.3.1-0.20240913105540-7ca383b14fff + github.com/conduitio/conduit-commons v0.3.1-0.20240913141354-f6dd6c835674 github.com/conduitio/conduit-connector-file v0.7.0 github.com/conduitio/conduit-connector-generator v0.7.0 github.com/conduitio/conduit-connector-kafka v0.9.0 diff --git a/go.sum b/go.sum index 0f0590d09..8235d86ac 100644 --- a/go.sum +++ b/go.sum @@ -220,6 +220,8 @@ github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGX github.com/colinmarc/hdfs/v2 v2.1.1/go.mod h1:M3x+k8UKKmxtFu++uAZ0OtDU8jR3jnaZIAc6yK4Ue0c= github.com/conduitio/conduit-commons v0.3.1-0.20240913105540-7ca383b14fff h1:qsOCwSeqFn6bnOqijWnlmUG6osXpCibJRKpI5xq5zR4= github.com/conduitio/conduit-commons v0.3.1-0.20240913105540-7ca383b14fff/go.mod h1:zeKc8PpXRzB94MhhpwdRI0Rd+Jn/uNs2tqxLhPA05y4= +github.com/conduitio/conduit-commons v0.3.1-0.20240913141354-f6dd6c835674 h1:coVk6aVsbP2u2vheM55GaCtu3+JYdIvOimQ0abUrmLA= +github.com/conduitio/conduit-commons v0.3.1-0.20240913141354-f6dd6c835674/go.mod h1:R/GNsw7iAUy5g2mFUVupAcrfWlGCUACwMvkDDrF39RI= github.com/conduitio/conduit-connector-file v0.7.0 h1:lUfDdpRZleJ/DDXX3NCzHN6VUYKORU/b443mJH6PJU4= github.com/conduitio/conduit-connector-file v0.7.0/go.mod h1:OXmcc1eAXmqmn9XoS/C3TdgZn0W1GMyqfNzUZRFmHNU= github.com/conduitio/conduit-connector-generator v0.7.0 h1:Bqsh/ak7gw6k5E8m0PxXOib0zhNlKbrJcIoLLQ0+S08= diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index 132379119..2849da9fe 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -240,10 +240,10 @@ func (s *Service) stopForceful(ctx context.Context, rp *runnablePipeline) error // StopAll will ask all the running pipelines to stop gracefully // (i.e. that existing messages get processed but not new messages get produced). func (s *Service) StopAll(ctx context.Context, reason error) { - s.runningPipelines.Range(func(_ string, rp *runnablePipeline) bool { + for _, rp := range s.runningPipelines.All() { p := rp.pipeline if p.GetStatus() != pipeline.StatusRunning && p.GetStatus() != pipeline.StatusRecovering { - return true + continue } err := s.stopGraceful(ctx, rp, reason) if err != nil { @@ -252,8 +252,7 @@ func (s *Service) StopAll(ctx context.Context, reason error) { Str(log.PipelineIDField, p.ID). Msg("could not stop pipeline") } - return true - }) + } // TODO stop pipelines forcefully after timeout if they are still running } @@ -286,18 +285,18 @@ func (s *Service) Wait(timeout time.Duration) error { func (s *Service) waitInternal() error { var errs []error + // copy pipelines to keep the map unlocked while we iterate it pipelines := s.runningPipelines.Copy() - pipelines.Range(func(_ string, rp *runnablePipeline) bool { + for _, rp := range pipelines.All() { if rp.t == nil { - return true + continue } err := rp.t.Wait() if err != nil { errs = append(errs, err) } - return true - }) + } return cerrors.Join(errs...) }