Skip to content

Commit

Permalink
new
Browse files Browse the repository at this point in the history
  • Loading branch information
谢小军 authored and 谢小军 committed Sep 15, 2019
1 parent 194fb42 commit 683c297
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 39 deletions.
38 changes: 20 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
## golang worker pool ,线程池 , 工作池
### [中文](README_cn.md)

- 并发限制goroutine池。
- 限制任务执行的并发性,而不是排队的任务数。
- 无论排队多少任务,都不会阻止提交任务。
- 通过队列支持
- Concurrency limiting goroutine pool.
- Limits the concurrency of task execution, not the number of tasks queued.
- Never blocks submitting tasks, no matter how many tasks are queued.
- Support through security queues [queue](https://github.com/xxjwxc/public/tree/master/myqueue)

- golang 工作池公共库
- golang workpool common library

### Support the maximum number of tasks, put them in the workpool and wait for them to be completed

### 支持最大任务数, 放到工作池里面 并等待全部完成
```
package main
Expand All @@ -19,7 +21,7 @@ import (
)
func main() {
wp := workerpool.New(10) //设置最大线程数
wp := workerpool.New(10) //Set the maximum number of threads,设置最大线程数
for i := 0; i < 20; i++ { //开启20个请求
ii := i
wp.Do(func() error {
Expand All @@ -37,7 +39,7 @@ func main() {
}
```

### 支持错误返回
### Support for error return
```
package main
Expand All @@ -49,8 +51,8 @@ import (
)
func main() {
wp := workerpool.New(10) //设置最大线程数
for i := 0; i < 20; i++ { //开启20个请求
wp := workerpool.New(10) //Set the maximum number of threads,设置最大线程数
for i := 0; i < 20; i++ {
ii := i
wp.Do(func() error {
for j := 0; j < 10; j++ { //每次打印0-10的值
Expand All @@ -73,7 +75,7 @@ func main() {
}
```

### 支持判断是否完成 (非阻塞)
### Supporting judgement of completion (non-blocking)

```
package main
Expand All @@ -86,11 +88,11 @@ import (
)
func main() {
wp := workerpool.New(5) //设置最大线程数
for i := 0; i < 10; i++ { //开启20个请求
wp := workerpool.New(5) //Set the maximum number of threads,设置最大线程数
for i := 0; i < 10; i++ {
// ii := i
wp.Do(func() error {
for j := 0; j < 5; j++ { //每次打印0-10的值
for j := 0; j < 5; j++ {
//fmt.Println(fmt.Sprintf("%v->\t%v", ii, j))
time.Sleep(1 * time.Second)
}
Expand All @@ -105,7 +107,7 @@ func main() {
}
```

### 支持同步等待结果
### Support synchronous waiting for results

```
package main
Expand All @@ -118,11 +120,11 @@ import (
)
func main() {
wp := workerpool.New(5) //设置最大线程数
for i := 0; i < 10; i++ { //开启20个请求
wp := workerpool.New(5) //Set the maximum number of threads,设置最大线程数
for i := 0; i < 10; i++ {
ii := i
wp.DoWait(func() error {
for j := 0; j < 5; j++ { //每次打印0-10的值
for j := 0; j < 5; j++ {
fmt.Println(fmt.Sprintf("%v->\t%v", ii, j))
// if ii == 1 {
// return errors.New("my test err")
Expand Down
145 changes: 145 additions & 0 deletions README_cn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
## golang worker pool ,线程池 , 工作池
### [English](README_cn.md)
- 并发限制goroutine池。
- 限制任务执行的并发性,而不是排队的任务数。
- 无论排队多少任务,都不会阻止提交任务。
- 通过队列支持[queue](https://github.com/xxjwxc/public/tree/master/myqueue)

- golang 工作池公共库

### 支持最大任务数, 放到工作池里面 并等待全部完成
```
package main
import (
"fmt"
"time"
"github.com/xxjwxc/gowp/workerpool"
)
func main() {
wp := workerpool.New(10) //设置最大线程数
for i := 0; i < 20; i++ { //开启20个请求
ii := i
wp.Do(func() error {
for j := 0; j < 10; j++ { //每次打印0-10的值
fmt.Println(fmt.Sprintf("%v->\t%v", ii, j))
time.Sleep(1 * time.Second)
}
//time.Sleep(1 * time.Second)
return nil
})
}
wp.Wait()
fmt.Println("down")
}
```

### 支持错误返回
```
package main
import (
"fmt"
"time"
"github.com/xxjwxc/gowp/workerpool"
)
func main() {
wp := workerpool.New(10) //设置最大线程数
for i := 0; i < 20; i++ { //开启20个请求
ii := i
wp.Do(func() error {
for j := 0; j < 10; j++ { //每次打印0-10的值
fmt.Println(fmt.Sprintf("%v->\t%v", ii, j))
if ii == 1 {
return errors.Cause(errors.New("my test err")) //有err 立即返回
}
time.Sleep(1 * time.Second)
}
return nil
})
}
err := wp.Wait()
if err != nil {
fmt.Println(err)
}
fmt.Println("down")
}
```

### 支持判断是否完成 (非阻塞)

```
package main
import (
"fmt"
"time"
"github.com/xxjwxc/gowp/workerpool"
)
func main() {
wp := workerpool.New(5) //设置最大线程数
for i := 0; i < 10; i++ { //开启20个请求
// ii := i
wp.Do(func() error {
for j := 0; j < 5; j++ {
//fmt.Println(fmt.Sprintf("%v->\t%v", ii, j))
time.Sleep(1 * time.Second)
}
return nil
})
fmt.Println(wp.IsDone())//判断是否完成
}
wp.Wait()
fmt.Println(wp.IsDone())
fmt.Println("down")
}
```

### 支持同步等待结果

```
package main
import (
"fmt"
"time"
"github.com/xxjwxc/gowp/workerpool"
)
func main() {
wp := workerpool.New(5) //设置最大线程数
for i := 0; i < 10; i++ { //开启20个请求
ii := i
wp.DoWait(func() error {
for j := 0; j < 5; j++ {
fmt.Println(fmt.Sprintf("%v->\t%v", ii, j))
// if ii == 1 {
// return errors.New("my test err")
// }
time.Sleep(1 * time.Second)
}
return nil
//time.Sleep(1 * time.Second)
//return errors.New("my test err")
})
}
err := wp.Wait()
if err != nil {
fmt.Println(err)
}
fmt.Println("down")
}
```
25 changes: 15 additions & 10 deletions workerpool/workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func (p *WorkerPool) SetTimeout(timeout time.Duration) {
p.timeout = timeout
}

//Add to the workpool and return immediately
//Do 添加到工作池,并立即返回
func (p *WorkerPool) Do(fn TaskHandler) {
if p.IsClosed() { // 已关闭
Expand All @@ -41,9 +42,10 @@ func (p *WorkerPool) Do(fn TaskHandler) {
//p.task <- fn
}

//Add to the workpool and wait for execution to complete before returning
//DoWait 添加到工作池,并等待执行完成之后再返回
func (p *WorkerPool) DoWait(task TaskHandler) {
if p.IsClosed() { // 已关闭
if p.IsClosed() { // closed
return
}

Expand All @@ -55,6 +57,7 @@ func (p *WorkerPool) DoWait(task TaskHandler) {
<-doneChan
}

//Waiting for the worker thread to finish executing
//Wait 等待工作线程执行结束
func (p *WorkerPool) Wait() error {
p.waitingQueue.Wait() //等待队列结束
Expand All @@ -68,6 +71,7 @@ func (p *WorkerPool) Wait() error {
}
}

//Determine whether it is complete (non-blocking)
//IsDone 判断是否完成 (非阻塞)
func (p *WorkerPool) IsDone() bool {
if p == nil || p.task == nil {
Expand All @@ -77,9 +81,10 @@ func (p *WorkerPool) IsDone() bool {
return len(p.task) == 0
}

//Has it been closed?
//IsClosed 是否已经关闭
func (p *WorkerPool) IsClosed() bool {
if atomic.LoadInt32(&p.closed) == 1 { // 已关闭
if atomic.LoadInt32(&p.closed) == 1 { // closed
return true
}
return false
Expand All @@ -88,7 +93,7 @@ func (p *WorkerPool) IsClosed() bool {
func (p *WorkerPool) startQueue() {
for {
fn := p.waitingQueue.Pop().(TaskHandler)
if p.IsClosed() { // 已关闭
if p.IsClosed() { // closed
p.waitingQueue.Close()
break
}
Expand All @@ -100,21 +105,21 @@ func (p *WorkerPool) startQueue() {
}

func (p *WorkerPool) loop(maxWorkersCount int) {
go p.startQueue() //启动队列
go p.startQueue() //Startup queue , 启动队列

p.wg.Add(maxWorkersCount) // 最大的工作协程数
// 启动max个worker
p.wg.Add(maxWorkersCount) // Maximum number of work cycles,最大的工作协程数
//Start Max workers, 启动max个worker
for i := 0; i < maxWorkersCount; i++ {
go func() {
defer p.wg.Done()
// worker 开始干活
for wt := range p.task {
if wt == nil || atomic.LoadInt32(&p.closed) == 1 { //有err 立即返回
continue //需要先消费完了之后再返回,
if wt == nil || atomic.LoadInt32(&p.closed) == 1 { //returns immediately,有err 立即返回
continue //It needs to be consumed before returning.需要先消费完了之后再返回,
}

closed := make(chan struct{}, 1)
// 有设置超时,优先task 的超时
// Set timeout, priority task timeout.有设置超时,优先task 的超时
if p.timeout > 0 {
ct, cancel := context.WithTimeout(context.Background(), p.timeout)
go func() {
Expand All @@ -130,7 +135,7 @@ func (p *WorkerPool) loop(maxWorkersCount int) {
}()
}

err := wt() //真正执行的点
err := wt() //Points of Execution.真正执行的点
close(closed)
if err != nil {
select {
Expand Down
Loading

0 comments on commit 683c297

Please sign in to comment.