Skip to content

Latest commit

 

History

History
421 lines (357 loc) · 6.43 KB

worker.md

File metadata and controls

421 lines (357 loc) · 6.43 KB

Basic worker with graceful shutdown

package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	fmt.Println("Hello, playground")
	mgr := NewTaskManager()
	// Start 3 worker.
	for i := 0; i < 3; i++ {
		mgr.Start()
	}

	go func() {
		// Send message to workers.
		for i := 100; i < 200; i++ {
			msg := fmt.Sprintf("work %d", i)
			if sent := mgr.Send(msg); sent {
				fmt.Println("sent", msg)
			}
		}
	}()

	go func() {
		// Send message to workers.
		for i := 0; i < 100; i++ {
			msg := fmt.Sprintf("work %d", i)
			if sent := mgr.Send(msg); sent {
				fmt.Println("sent", msg)
			}
		}
	}()
	time.Sleep(2 * time.Second)
	mgr.Stop()
	fmt.Println("terminating")
}

type TaskManager struct {
	ch   chan interface{}
	done sync.WaitGroup
	quit chan interface{}
}

func NewTaskManager() *TaskManager {
	return &TaskManager{
		ch:   make(chan interface{}),
		quit: make(chan interface{}),
	}
}

func (t *TaskManager) Start() {
	t.done.Add(1)
	go func() {
		defer t.done.Done()
		for {
			select {
			case <-t.quit:
				fmt.Println("quitting", len(t.ch))
				return
			case evt, ok := <-t.ch:
				if !ok {
					return
				}
				time.Sleep(1 * time.Second)
				fmt.Println("received evt", evt)
			}
		}
	}()
}

func (t *TaskManager) Stop() {
	fmt.Println("stopping")

	close(t.quit)
	t.done.Wait()

	fmt.Println("stopped")
}

func (t *TaskManager) Send(evt interface{}) bool {
	// Add a consumer task to keep track of messages sent.

	select {
	case <-t.quit:
		return false
	case t.ch <- evt:
		return true
	case <-time.After(5 * time.Second):
		return false
	}
}

Multiple Background Worker implementation

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

type Response interface {
	Value() interface{}
	Error() error
}

type Task interface {
	Execute() Response
}

type Delay struct {
	Duration int
	Err      error
}

func (d *Delay) Value() interface{} {
	return d.Duration
}

func (d *Delay) Error() error {
	return d.Err
}

type DelayTask struct {
	value Delay
}

func (d *DelayTask) Execute() Response {
	duration := rand.Intn(500)
	time.Sleep(time.Duration(duration) * time.Millisecond)
	return &Delay{
		Duration: duration,
	}

}

type WorkerPool struct {
	taskCh chan Task
	quit   chan interface{}
	wg     *sync.WaitGroup
}

func NewWorkerPool() *WorkerPool {
	return &WorkerPool{
		// Adding a buffer allows the taskCh to take more than it can process.
		taskCh: make(chan Task, 10),
		quit:   make(chan interface{}),
		wg:     new(sync.WaitGroup),
	}
}

func (w *WorkerPool) Start(n int) func() {
	w.wg.Add(n)
	for i := 0; i < n; i++ {
		go w.loop(i)
	}
	return func() {
		w.wg.Wait()
	}
}

func (w *WorkerPool) loop(i int) {
	defer w.wg.Done()
	fmt.Println("started worker", i)
	for {
		select {
		case <-w.quit:
			return
		case task, ok := <-w.taskCh:
			if !ok {
				return
			}
			res := task.Execute()
			duration, ok := res.Value().(int)
			if ok {
				fmt.Println("worker", i, "took", duration, "ms")
			}
		}
	}
}

func (w *WorkerPool) Send(task Task) {
	select {
	case <-w.quit:
		return
	case w.taskCh <- task:
	}
}

func (w *WorkerPool) Stop() {
	close(w.quit)
}

func main() {

	pool := NewWorkerPool()
	wait := pool.Start(5)
	go func() {
    // We can listen for events from a message queue for example,
    // and stream in new Tasks to the background worker.
		for i := 0; i < 100; i++ {
			pool.Send(&DelayTask{})
		}
	}()

	go func() {
		time.Sleep(10 * time.Second)
		fmt.Println("stopping")
		pool.Stop()
	}()

	wait()
}

Worker with Pipeline

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

type Result struct {
	Response interface{}
	Err      error
}

type Task interface {
	Execute() Result
}

type WorkerPool struct {
	wg     *sync.WaitGroup
	mu     *sync.RWMutex
	once   *sync.Once
	quit   chan interface{} // Own quit channel.
	busCh  chan interface{} // Global quit channel.
	taskCh chan Task

	outCh chan Result

	counter int
}

func NewWorkerPool(busCh chan interface{}, taskLimit int) *WorkerPool {
	return &WorkerPool{
		busCh:  busCh,
		mu:     new(sync.RWMutex),
		quit:   make(chan interface{}),
		taskCh: make(chan Task, taskLimit),
		once:   new(sync.Once),
		wg:     new(sync.WaitGroup),
		outCh:  make(chan Result),
	}
}

func (w *WorkerPool) Start(n int) *sync.WaitGroup {
	w.wg.Add(n)
	for i := 0; i < n; i++ {
		go w.loop()
	}
	fmt.Printf("started %d workers\n", n)
	return w.wg
}

func (w *WorkerPool) AddWorker() {
	w.mu.Lock()
	w.wg.Add(1)
	go w.loop()
	w.mu.Unlock()
	fmt.Println("add 1 worker")
}

func (w *WorkerPool) RemoveWorker() {
	w.quit <- struct{}{}
}

func (w *WorkerPool) AddTask(tasks ...Task) {
	for _, task := range tasks {
		select {
		case <-w.busCh:
			return
		case <-w.quit:
			return
		case w.taskCh <- task:
		}
	}
}

func (w *WorkerPool) loop() {
	defer func() {
		fmt.Println("worker stopped")
		w.wg.Done()
	}()
	for {
		select {
		case <-w.busCh:
			return
		case <-w.quit:
			return
		case task, ok := <-w.taskCh:
			if !ok {
				return
			}
			res := task.Execute()

			w.mu.Lock()
			w.counter++
			w.mu.Unlock()

			fmt.Println("task:", res.Response)
			select {
			case <-w.quit:
				return
			case w.outCh <- res:
			}
		}
	}

}
func (w *WorkerPool) Stop() {
	w.once.Do(func() {
		close(w.quit)
	})
}

func main() {
	done := make(chan interface{})
	pool := NewWorkerPool(done, 100)

	numWorkers := 1

	// Create a new worker pool with n workers.
	job := pool.Start(numWorkers)

	go generator(pool, 100)

	go func() {
		time.Sleep(1 * time.Second)

		for i := 0; i < 5; i++ {
			pool.AddWorker()
		}
		pool.RemoveWorker()
		pool.RemoveWorker()
	}()
	
	reader := func() {
		// We can chain the pipeline.
		for res := range multiplier(done, pool.outCh, 2) {
			fmt.Println("output", res)
		}
	}
	go reader()
	go func() {
		time.Sleep(5 * time.Second)
		close(done)
	}()
	job.Wait()
	fmt.Println("exiting", pool.counter)
}

type DelayTask struct{}

func (d *DelayTask) Execute() Result {
	time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
	return Result{
		Response: 2,
	}
}

// Pipelines
func multiplier(
	done chan interface{},
	inCh <-chan Result,
	m int,
) <-chan interface{} {
	outCh := make(chan interface{})
	go func() {
		defer close(outCh)
		for v := range inCh {
			i, ok := v.Response.(int)
			if !ok {
				continue
			}
			select {
			case <-done:
				return
			case outCh <- i * 2:
			}
		}
	}()
	return outCh
}

func generator(pool *WorkerPool, n int) {
	for i := 0; i < n; i++ {
		go func() {
			time.Sleep(time.Duration(rand.Intn(500)+250) * time.Millisecond)
			pool.AddTask(&DelayTask{})
		}()
	}
}