Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

concurrent helmfile template #901

Closed
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ require (
github.com/blang/semver v3.5.1+incompatible // indirect
github.com/bluekeyes/go-gitdiff v0.4.0 // indirect
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/cenkalti/backoff/v4 v4.1.3 // indirect
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/containerd/cgroups v0.0.0-20200531161412-0dbf7f05ba59 // indirect
github.com/containerd/containerd v1.4.4 // indirect
Expand Down Expand Up @@ -243,6 +244,8 @@ replace (
// override the go-scm from tekton
github.com/jenkins-x/go-scm => github.com/jenkins-x/go-scm v1.11.5

github.com/jenkins-x/jx-helpers => /Users/rochana/Code/Devops/repos/jx-helpers
rochana-atapattu marked this conversation as resolved.
Show resolved Hide resolved

// for the PipelineRun debug fix see: https://github.com/tektoncd/pipeline/pull/4145
github.com/tektoncd/pipeline => github.com/jstrachan/pipeline v0.21.1-0.20210811150720-45a86a5488af

Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ github.com/c2h5oh/datasize v0.0.0-20171227191756-4eba002a5eae/go.mod h1:S/7n9cop
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4=
github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.3.0 h1:t/LhUZLVitR1Ow2YOnduCsavhwFUklBMoGVYUCqmCqk=
Expand Down
2 changes: 2 additions & 0 deletions pkg/cmd/helmfile/helmfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/jenkins-x-plugins/jx-gitops/pkg/cmd/helmfile/resolve"
"github.com/jenkins-x-plugins/jx-gitops/pkg/cmd/helmfile/status"
"github.com/jenkins-x-plugins/jx-gitops/pkg/cmd/helmfile/structure"
"github.com/jenkins-x-plugins/jx-gitops/pkg/cmd/helmfile/template"
"github.com/jenkins-x-plugins/jx-gitops/pkg/cmd/helmfile/validate"
"github.com/jenkins-x/jx-helpers/v3/pkg/cobras"
"github.com/jenkins-x/jx-logging/v3/pkg/log"
Expand All @@ -34,6 +35,7 @@ func NewCmdHelmfile() *cobra.Command {
command.AddCommand(cobras.SplitCommand(resolve.NewCmdHelmfileResolve()))
command.AddCommand(cobras.SplitCommand(status.NewCmdHelmfileStatus()))
command.AddCommand(cobras.SplitCommand(structure.NewCmdHelmfileStructure()))
command.AddCommand(cobras.SplitCommand(template.NewCmdHelmfileTemplate()))
command.AddCommand(cobras.SplitCommand(validate.NewCmdHelmfileValidate()))
return command
}
80 changes: 80 additions & 0 deletions pkg/cmd/helmfile/template/exec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package template

import (
"context"
"fmt"
"sync"

"github.com/jenkins-x/jx-helpers/v3/pkg/cmdrunner"
)

type Result struct {
Attempts int
Value string
Err error
}

type CommandRunners struct {
CommandRunner cmdrunner.CommandRunner
runnersCount int
commands chan *cmdrunner.Command
results chan Result
Done chan struct{}
}

func NewCommandRunners(count int) CommandRunners {
return CommandRunners{
CommandRunner: cmdrunner.DefaultCommandRunner,
runnersCount: count,
commands: make(chan *cmdrunner.Command, count),
results: make(chan Result, count),
Done: make(chan struct{}),
}
}

func (cr CommandRunners) Run(ctx context.Context) {
var wg sync.WaitGroup

for i := 0; i < cr.runnersCount; i++ {
wg.Add(1)
// fan out worker goroutines
//reading from jobs channel and
//pushing calcs into results channel
go cr.worker(ctx, &wg, cr.commands, cr.results)
}

wg.Wait()
close(cr.Done)
close(cr.results)
}

func (cr CommandRunners) worker(ctx context.Context, wg *sync.WaitGroup, commands <-chan *cmdrunner.Command, results chan<- Result) {
defer wg.Done()
for {
select {
case command, ok := <-commands:
if !ok {
return
}
// fan-in job execution multiplexing results into the results channel
result, err := cr.CommandRunner(command)
results <- Result{command.Attempts(), result, err}
case <-ctx.Done():
fmt.Printf("cancelled worker. Error detail: %v\n", ctx.Err())
results <- Result{
Err: ctx.Err(),
}
return
}
}
}
func (cr CommandRunners) Results() <-chan Result {
return cr.results
}

func (cr CommandRunners) GenerateFrom(commands []*cmdrunner.Command) {
for i, _ := range commands {
cr.commands <- commands[i]
}
close(cr.commands)
}
109 changes: 109 additions & 0 deletions pkg/cmd/helmfile/template/exec_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package template

import (
"context"
"testing"

"github.com/jenkins-x/jx-helpers/v3/pkg/cmdrunner"
)

const (
jobsCount = 10
workerCount = 2
)

func TestWorkerPool(t *testing.T) {
cr := NewCommandRunners(workerCount)

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

commands := []*cmdrunner.Command{}

commands = append(commands, &cmdrunner.Command{
Name: "echo",
Args: []string{"hello"},
})
commands = append(commands, &cmdrunner.Command{
Name: "echo",
Args: []string{"world"},
})
go cr.GenerateFrom(commands)

go cr.Run(ctx)

for {
select {
case r, ok := <-cr.Results():
if !ok {
continue
}

val := r.Value
if val != "hello" {
t.Fatalf("wrong value %v; expected %v", val, "hello")
}
case <-cr.Done:
return
default:
}
}
}

// func TestWorkerPool_TimeOut(t *testing.T) {
// wp := New(workerCount)

// ctx, cancel := context.WithTimeout(context.TODO(), time.Nanosecond*10)
// defer cancel()

// go wp.Run(ctx)

// for {
// select {
// case r := <-wp.Results():
// if r.Err != nil && r.Err != context.DeadlineExceeded {
// t.Fatalf("expected error: %v; got: %v", context.DeadlineExceeded, r.Err)
// }
// case <-wp.Done:
// return
// default:
// }
// }
// }

// func TestWorkerPool_Cancel(t *testing.T) {
// wp := New(workerCount)

// ctx, cancel := context.WithCancel(context.TODO())

// go wp.Run(ctx)
// cancel()

// for {
// select {
// case r := <-wp.Results():
// if r.Err != nil && r.Err != context.Canceled {
// t.Fatalf("expected error: %v; got: %v", context.Canceled, r.Err)
// }
// case <-wp.Done:
// return
// default:
// }
// }
// }

// func testJobs() []Job {
// jobs := make([]Job, jobsCount)
// for i := 0; i < jobsCount; i++ {
// jobs[i] = Job{
// Descriptor: JobDescriptor{
// ID: JobID(fmt.Sprintf("%v", i)),
// JType: "anyType",
// Metadata: nil,
// },
// ExecFn: execFn,
// Args: i,
// }
// }
// return jobs
// }
Loading