diff --git a/grpc/taskrunner/manager.go b/grpc/taskrunner/manager.go index 77e725c..ba6df9b 100644 --- a/grpc/taskrunner/manager.go +++ b/grpc/taskrunner/manager.go @@ -1,5 +1,7 @@ package taskrunner +// copied and modified from https://github.com/zenthangplus/goccm + import "sync/atomic" type concurrencyManager struct { diff --git a/grpc/taskrunner/run.go b/grpc/taskrunner/run.go index 2dc8d7e..38abded 100644 --- a/grpc/taskrunner/run.go +++ b/grpc/taskrunner/run.go @@ -14,9 +14,9 @@ type orchestrator struct { manager *concurrencyManager workerIdleTimeout time.Duration fifoChan chan string - // perIDQueue is a map of hostID to a channel of tasks. - perIDQueue sync.Map - ingestChan chan Task + // perHostChan is a map of hostID to a channel of tasks. + perHostChan sync.Map + ingestChan chan Task } // ingest take a task off the ingestion queue and puts it on the perID queue @@ -32,8 +32,8 @@ func (r *Runner) ingest(ctx context.Context) { case t := <-r.orchestrator.ingestChan: // 2. enqueue to perID queue - ch := make(chan Task, 5000) - q, exists := r.orchestrator.perIDQueue.LoadOrStore(t.Host, ch) + ch := make(chan Task, 10) + q, exists := r.orchestrator.perHostChan.LoadOrStore(t.Host, ch) v, ok := q.(chan Task) if !ok { fmt.Println("bad type: IngestQueue") @@ -59,22 +59,22 @@ func (r *Runner) orchestrate(ctx context.Context) { // 1. dequeue from fcfs queue // 2. start workers for { - time.Sleep(time.Second * 2) + // time.Sleep(time.Second * 3) - this potential helps with ingestion r.orchestrator.workers.Range(func(key, value interface{}) bool { // if worker id exists in o.workers, then move on because the worker is already running. - if value.(bool) { + if value.(bool) { //nolint: forcetypeassert // values are always certain. return true } // wait for a worker to become available r.orchestrator.manager.Wait() - r.orchestrator.workers.Store(key.(string), true) - v, found := r.orchestrator.perIDQueue.Load(key.(string)) + r.orchestrator.workers.Store(key.(string), true) //nolint: forcetypeassert // values are always certain. + v, found := r.orchestrator.perHostChan.Load(key.(string)) if !found { return false } - go r.worker(ctx, key.(string), v.(chan Task)) + go r.worker(ctx, key.(string), v.(chan Task)) //nolint: forcetypeassert // values are always certain. return true }) } diff --git a/grpc/taskrunner/taskrunner.go b/grpc/taskrunner/taskrunner.go index c15269e..fce4975 100644 --- a/grpc/taskrunner/taskrunner.go +++ b/grpc/taskrunner/taskrunner.go @@ -44,8 +44,8 @@ func NewRunner(repo repository.Actions, maxWorkers int, workerIdleTimeout time.D o := &orchestrator{ workers: sync.Map{}, fifoChan: make(chan string, 10000), - // perIDQueue is a map of hostID to a channel of tasks. - perIDQueue: sync.Map{}, + // perHostChan is a map of hostID to a channel of tasks. + perHostChan: sync.Map{}, manager: newManager(maxWorkers), workerIdleTimeout: workerIdleTimeout, ingestChan: make(chan Task, 10000), diff --git a/pkg/http/http.go b/pkg/http/http.go index b05158e..c2f70e4 100644 --- a/pkg/http/http.go +++ b/pkg/http/http.go @@ -50,7 +50,11 @@ func (h *Server) Run(ctx context.Context) error { _ = svr.Shutdown(ctx) }() - return svr.ListenAndServe() + if err := svr.ListenAndServe(); err != nil && err != http.ErrServerClosed { + return err + } + + return nil } func NewServer(addr string) *Server {