Skip to content

Commit

Permalink
update conduit-commons
Browse files Browse the repository at this point in the history
  • Loading branch information
lovromazgon committed Sep 13, 2024
1 parent 3c67005 commit fd57ecd
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 9 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/Masterminds/sprig/v3 v3.3.0
github.com/NYTimes/gziphandler v1.1.1
github.com/bufbuild/buf v1.41.0
github.com/conduitio/conduit-commons v0.3.1-0.20240913105540-7ca383b14fff
github.com/conduitio/conduit-commons v0.3.1-0.20240913141354-f6dd6c835674
github.com/conduitio/conduit-connector-file v0.7.0
github.com/conduitio/conduit-connector-generator v0.7.0
github.com/conduitio/conduit-connector-kafka v0.9.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGX
github.com/colinmarc/hdfs/v2 v2.1.1/go.mod h1:M3x+k8UKKmxtFu++uAZ0OtDU8jR3jnaZIAc6yK4Ue0c=
github.com/conduitio/conduit-commons v0.3.1-0.20240913105540-7ca383b14fff h1:qsOCwSeqFn6bnOqijWnlmUG6osXpCibJRKpI5xq5zR4=
github.com/conduitio/conduit-commons v0.3.1-0.20240913105540-7ca383b14fff/go.mod h1:zeKc8PpXRzB94MhhpwdRI0Rd+Jn/uNs2tqxLhPA05y4=
github.com/conduitio/conduit-commons v0.3.1-0.20240913141354-f6dd6c835674 h1:coVk6aVsbP2u2vheM55GaCtu3+JYdIvOimQ0abUrmLA=
github.com/conduitio/conduit-commons v0.3.1-0.20240913141354-f6dd6c835674/go.mod h1:R/GNsw7iAUy5g2mFUVupAcrfWlGCUACwMvkDDrF39RI=
github.com/conduitio/conduit-connector-file v0.7.0 h1:lUfDdpRZleJ/DDXX3NCzHN6VUYKORU/b443mJH6PJU4=
github.com/conduitio/conduit-connector-file v0.7.0/go.mod h1:OXmcc1eAXmqmn9XoS/C3TdgZn0W1GMyqfNzUZRFmHNU=
github.com/conduitio/conduit-connector-generator v0.7.0 h1:Bqsh/ak7gw6k5E8m0PxXOib0zhNlKbrJcIoLLQ0+S08=
Expand Down
15 changes: 7 additions & 8 deletions pkg/lifecycle/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,10 @@ func (s *Service) stopForceful(ctx context.Context, rp *runnablePipeline) error
// StopAll will ask all the running pipelines to stop gracefully
// (i.e. that existing messages get processed but not new messages get produced).
func (s *Service) StopAll(ctx context.Context, reason error) {
s.runningPipelines.Range(func(_ string, rp *runnablePipeline) bool {
for _, rp := range s.runningPipelines.All() {
p := rp.pipeline
if p.GetStatus() != pipeline.StatusRunning && p.GetStatus() != pipeline.StatusRecovering {
return true
continue
}
err := s.stopGraceful(ctx, rp, reason)
if err != nil {
Expand All @@ -252,8 +252,7 @@ func (s *Service) StopAll(ctx context.Context, reason error) {
Str(log.PipelineIDField, p.ID).
Msg("could not stop pipeline")
}
return true
})
}
// TODO stop pipelines forcefully after timeout if they are still running
}

Expand Down Expand Up @@ -286,18 +285,18 @@ func (s *Service) Wait(timeout time.Duration) error {
func (s *Service) waitInternal() error {
var errs []error

// copy pipelines to keep the map unlocked while we iterate it
pipelines := s.runningPipelines.Copy()

pipelines.Range(func(_ string, rp *runnablePipeline) bool {
for _, rp := range pipelines.All() {
if rp.t == nil {
return true
continue
}
err := rp.t.Wait()
if err != nil {
errs = append(errs, err)
}
return true
})
}
return cerrors.Join(errs...)
}

Expand Down

0 comments on commit fd57ecd

Please sign in to comment.