diff --git a/README.md b/README.md index 8114749..578f0d1 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,102 @@ -# gowp -golang worker pool ,线程池 , 工作池 +## golang worker pool ,线程池 , 工作池 + +- 并发限制goroutine池。 +- 限制任务执行的并发性,而不是排队的任务数。 +- 无论排队多少任务,都不会阻止提交任务。 +- 通过队列支持 + +- golang 工作池公共库 + +### 支持最大任务数, 放到工作池里面 并等待全部完成 +``` + wp := workerpool.New(10) //设置最大线程数 + for i := 0; i < 20; i++ { //开启20个请求 + ii := i + wp.Do(func() error { + for j := 0; j < 10; j++ { //每次打印0-10的值 + fmt.Println(fmt.Sprintf("%v->\t%v", ii, j)) + time.Sleep(1 * time.Second) + } + //time.Sleep(1 * time.Second) + return nil + }) + } + + wp.Wait() + fmt.Println("down") +``` + +### 支持错误返回 +``` + wp := workerpool.New(10) //设置最大线程数 + for i := 0; i < 20; i++ { //开启20个请求 + ii := i + wp.Do(func() error { + for j := 0; j < 10; j++ { //每次打印0-10的值 + fmt.Println(fmt.Sprintf("%v->\t%v", ii, j)) + if ii == 1 { + return errors.Cause(errors.New("my test err")) //有err 立即返回 + } + time.Sleep(1 * time.Second) + } + + return nil + }) + } + + err := wp.Wait() + if err != nil { + fmt.Println(err) + } + fmt.Println("down") +``` + +### 支持判断是否完成 (非阻塞) + +``` + wp := workerpool.New(5) //设置最大线程数 + for i := 0; i < 10; i++ { //开启20个请求 + // ii := i + wp.Do(func() error { + for j := 0; j < 5; j++ { //每次打印0-10的值 + //fmt.Println(fmt.Sprintf("%v->\t%v", ii, j)) + time.Sleep(1 * time.Second) + } + return nil + }) + + fmt.Println(wp.IsDone()) + } + wp.Wait() + fmt.Println(wp.IsDone()) + fmt.Println("down") +``` + +### 支持同步等待结果 + +``` + wp := workerpool.New(5) //设置最大线程数 + for i := 0; i < 10; i++ { //开启20个请求 + ii := i + wp.DoWait(func() error { + for j := 0; j < 5; j++ { //每次打印0-10的值 + fmt.Println(fmt.Sprintf("%v->\t%v", ii, j)) + // if ii == 1 { + // return errors.New("my test err") + // } + time.Sleep(1 * time.Second) + } + + return nil + //time.Sleep(1 * time.Second) + //return errors.New("my test err") + }) + } + + err := wp.Wait() + if err != nil { + fmt.Println(err) + } + fmt.Println("down") + +``` \ No newline at end of file diff --git a/go.mod b/go.mod index 9986711..50afa43 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,4 @@ module gowp go 1.13 -require github.com/xxjwxc/public v0.0.0-20190914140823-729bb11f966c +require github.com/xxjwxc/public v0.0.0-20190915122658-9831b23af2e1 diff --git a/go.sum b/go.sum index a757958..3cba325 100644 --- a/go.sum +++ b/go.sum @@ -17,18 +17,18 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= -github.com/xxjwxc/public v0.0.0-20190911032541-5d814c6ef57d h1:JGx6eOr2X16pd/h8yZvXTL8qEsu6jPpMNjOOTlvBo1Y= -github.com/xxjwxc/public v0.0.0-20190911032541-5d814c6ef57d/go.mod h1:rgW5aGDrLzmJM8NtrtskcxerGFyvn1OK3+Ra52YeZS0= -github.com/xxjwxc/public v0.0.0-20190914140823-729bb11f966c h1:FQ+LOgzb3F30rn6zZwwZ3fAJGjaCtR7lymFySbjfazk= -github.com/xxjwxc/public v0.0.0-20190914140823-729bb11f966c/go.mod h1:rgW5aGDrLzmJM8NtrtskcxerGFyvn1OK3+Ra52YeZS0= +github.com/xxjwxc/public v0.0.0-20190915113632-d369d5187200 h1:Hi8KKUGp+xh+x6WA/3KNmvvEvjcHFKZ6NJOduAL92k4= +github.com/xxjwxc/public v0.0.0-20190915113632-d369d5187200/go.mod h1:rgW5aGDrLzmJM8NtrtskcxerGFyvn1OK3+Ra52YeZS0= +github.com/xxjwxc/public v0.0.0-20190915122658-9831b23af2e1 h1:nCgYwrp2ZbumdJvgkyJ3+KXBaLWipRSDUujE0GdNKAU= +github.com/xxjwxc/public v0.0.0-20190915122658-9831b23af2e1/go.mod h1:rgW5aGDrLzmJM8NtrtskcxerGFyvn1OK3+Ra52YeZS0= github.com/xxjwxc/public v1.0.4 h1:C16i87ODAsabmsun+7MEoK7goPjWNGTQOYg+53PzzCw= -github.com/xxjwxc/public v1.0.4/go.mod h1:rgW5aGDrLzmJM8NtrtskcxerGFyvn1OK3+Ra52YeZS0= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/eapache/queue.v1 v1.1.0 h1:EldqoJEGtXYiVCMRo2C9mePO2UUGnYn2+qLmlQSqPdc= gopkg.in/eapache/queue.v1 v1.1.0/go.mod h1:wNtmx1/O7kZSR9zNT1TTOJ7GLpm3Vn7srzlfylFbQwU= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= diff --git a/main.go b/main.go index 28763ed..e03227c 100644 --- a/main.go +++ b/main.go @@ -7,13 +7,13 @@ import ( ) func main() { - wp := workerpool.New(5) //设置最大线程数 - for i := 0; i < 10; i++ { //开启20个请求 - // ii := i + wp := workerpool.New(5) //设置最大线程数 + for i := 0; i < 100; i++ { //开启20个请求 + ii := i wp.Do(func() error { - for j := 0; j < 5; j++ { //每次打印0-10的值 - //fmt.Println(fmt.Sprintf("%v->\t%v", ii, j)) - time.Sleep(1 * time.Microsecond) + for j := 0; j < 50; j++ { //每次打印0-10的值 + fmt.Println(fmt.Sprintf("%v->\t%v", ii, j)) + time.Sleep(1 * time.Second) } return nil }) diff --git a/workerpool/def.go b/workerpool/def.go index 09630da..33cc2b0 100644 --- a/workerpool/def.go +++ b/workerpool/def.go @@ -3,6 +3,8 @@ package workerpool import ( "sync" "time" + + "github.com/xxjwxc/public/myqueue" ) // TaskHandler process .定义函数回调体 @@ -17,9 +19,10 @@ type WorkerPool struct { //sync.Mutex //maxWorkersCount int //最大的工作协程数 //start sync.Once - closed int32 - errChan chan error //错误chan - timeout time.Duration //最大超时时间 - wg sync.WaitGroup - task chan TaskHandler + closed int32 + errChan chan error //错误chan + timeout time.Duration //最大超时时间 + wg sync.WaitGroup + task chan TaskHandler + waitingQueue *myqueue.MyQueue } diff --git a/workerpool/err/err_2019-09-15_20.log b/workerpool/err/err_2019-09-15_20.log new file mode 100644 index 0000000..e100bb9 --- /dev/null +++ b/workerpool/err/err_2019-09-15_20.log @@ -0,0 +1,21 @@ +=========================2019-09-15 20:47:01 ========================= +my test err +gowp/workerpool.TestWorkerPoolError.func1 + /Users/xxj/work/workspace/github/xxjwxc/gowp/workerpool/workerpool_test.go:37 +gowp/workerpool.(*WorkerPool).loop.func1 + /Users/xxj/work/workspace/github/xxjwxc/gowp/workerpool/workerpool.go:133 +runtime.goexit + /usr/local/go/src/runtime/asm_amd64.s:1357 +goroutine 51 [running]: +runtime/debug.Stack(0x0, 0x0, 0x0) + /usr/local/go/src/runtime/debug/stack.go:24 +0xa1 +github.com/xxjwxc/public/mylog.SaveError(0xc0001883c0, 0x133, 0x11b92cb, 0x3) + /Users/xxj/work/path/pkg/mod/github.com/xxjwxc/public@v0.0.0-20190915122658-9831b23af2e1/mylog/mylog.go:100 +0x498 +github.com/xxjwxc/public/mylog.Error(0x11d7480, 0xc000172020) + /Users/xxj/work/path/pkg/mod/github.com/xxjwxc/public@v0.0.0-20190915122658-9831b23af2e1/mylog/mylog.go:62 +0x3d5 +gowp/workerpool.(*WorkerPool).loop.func1(0xc0000da780) + /Users/xxj/work/workspace/github/xxjwxc/gowp/workerpool/workerpool.go:139 +0x34f +created by gowp/workerpool.(*WorkerPool).loop + /Users/xxj/work/workspace/github/xxjwxc/gowp/workerpool/workerpool.go:108 +0xa1 + +=========================end========================= diff --git a/workerpool/workerpool.go b/workerpool/workerpool.go index ca1f387..9c976b5 100644 --- a/workerpool/workerpool.go +++ b/workerpool/workerpool.go @@ -5,6 +5,8 @@ import ( "sync/atomic" "time" + "github.com/xxjwxc/public/myqueue" + "github.com/xxjwxc/public/mylog" ) @@ -16,12 +18,12 @@ func New(max int) *WorkerPool { } p := &WorkerPool{ - task: make(chan TaskHandler, 2*max), - errChan: make(chan error, 1), + task: make(chan TaskHandler, 2*max), + errChan: make(chan error, 1), + waitingQueue: myqueue.New(), } go p.loop(max) - return p } @@ -35,7 +37,8 @@ func (p *WorkerPool) Do(fn TaskHandler) { if p.IsClosed() { // 已关闭 return } - p.task <- fn + p.waitingQueue.Push(fn) + //p.task <- fn } //DoWait 添加到工作池,并等待执行完成之后再返回 @@ -45,14 +48,60 @@ func (p *WorkerPool) DoWait(task TaskHandler) { } doneChan := make(chan struct{}) - p.task <- func() error { + p.waitingQueue.Push(TaskHandler(func() error { defer close(doneChan) return task() - } + })) <-doneChan } +//Wait 等待工作线程执行结束 +func (p *WorkerPool) Wait() error { + p.waitingQueue.Wait() //等待队列结束 + close(p.task) + p.wg.Wait() //等待结束 + select { + case err := <-p.errChan: + return err + default: + return nil + } +} + +//IsDone 判断是否完成 (非阻塞) +func (p *WorkerPool) IsDone() bool { + if p == nil || p.task == nil { + return true + } + + return len(p.task) == 0 +} + +//IsClosed 是否已经关闭 +func (p *WorkerPool) IsClosed() bool { + if atomic.LoadInt32(&p.closed) == 1 { // 已关闭 + return true + } + return false +} + +func (p *WorkerPool) startQueue() { + for { + fn := p.waitingQueue.Pop().(TaskHandler) + if p.IsClosed() { // 已关闭 + p.waitingQueue.Close() + break + } + + if fn != nil { + p.task <- fn + } + } +} + func (p *WorkerPool) loop(maxWorkersCount int) { + go p.startQueue() //启动队列 + p.wg.Add(maxWorkersCount) // 最大的工作协程数 // 启动max个worker for i := 0; i < maxWorkersCount; i++ { @@ -96,32 +145,3 @@ func (p *WorkerPool) loop(maxWorkersCount int) { }() } } - -//Wait 等待工作线程执行结束 -func (p *WorkerPool) Wait() error { - close(p.task) - p.wg.Wait() //等待结束 - select { - case err := <-p.errChan: - return err - default: - return nil - } -} - -//IsDone 判断是否完成 (非阻塞) -func (p *WorkerPool) IsDone() bool { - if p == nil || p.task == nil { - return true - } - - return len(p.task) == 0 -} - -//IsClosed 是否已经关闭 -func (p *WorkerPool) IsClosed() bool { - if atomic.LoadInt32(&p.closed) == 1 { // 已关闭 - return true - } - return false -}