Skip to content

Commit

Permalink
new
Browse files Browse the repository at this point in the history
  • Loading branch information
谢小军 authored and 谢小军 committed Sep 14, 2019
0 parents commit d13252a
Show file tree
Hide file tree
Showing 6 changed files with 361 additions and 0 deletions.
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module gowp

go 1.13

require github.com/xxjwxc/public v0.0.0-20190914140823-729bb11f966c
35 changes: 35 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
github.com/ant0ine/go-json-rest v3.3.2+incompatible/go.mod h1:q6aCt0GfU6LhpBsnZ/2U+mwe+0XB5WStbmwyoPfc+sk=
github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394/go.mod h1:Q8n74mJTIgjX4RBBcHnJ05h//6/k6foqmgE45jTQtxg=
github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngEKAMDJEczWVA=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gookit/color v1.1.7 h1:WR5I/mhSHzemW2DzG54hTsUb7OzaREvkcmUG4/WST4Q=
github.com/gookit/color v1.1.7/go.mod h1:R3ogXq2B9rTbXoSHJ1HyUVAZ3poOJHpd9nQmyGZsfvQ=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/jander/golog v0.0.0-20150917071935-954a5be801fc/go.mod h1:uWhWXOR4dpfk9J8fegnMY7sP2GFXxe3PFI9Ps+TRXJs=
github.com/kardianos/service v1.0.0/go.mod h1:8CzDhVuCuugtsHyZoTvsOBuvonN/UDBvl0kH+BUxvbo=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
github.com/xxjwxc/public v0.0.0-20190911032541-5d814c6ef57d h1:JGx6eOr2X16pd/h8yZvXTL8qEsu6jPpMNjOOTlvBo1Y=
github.com/xxjwxc/public v0.0.0-20190911032541-5d814c6ef57d/go.mod h1:rgW5aGDrLzmJM8NtrtskcxerGFyvn1OK3+Ra52YeZS0=
github.com/xxjwxc/public v0.0.0-20190914140823-729bb11f966c h1:FQ+LOgzb3F30rn6zZwwZ3fAJGjaCtR7lymFySbjfazk=
github.com/xxjwxc/public v0.0.0-20190914140823-729bb11f966c/go.mod h1:rgW5aGDrLzmJM8NtrtskcxerGFyvn1OK3+Ra52YeZS0=
github.com/xxjwxc/public v1.0.4 h1:C16i87ODAsabmsun+7MEoK7goPjWNGTQOYg+53PzzCw=
github.com/xxjwxc/public v1.0.4/go.mod h1:rgW5aGDrLzmJM8NtrtskcxerGFyvn1OK3+Ra52YeZS0=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/eapache/queue.v1 v1.1.0/go.mod h1:wNtmx1/O7kZSR9zNT1TTOJ7GLpm3Vn7srzlfylFbQwU=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
29 changes: 29 additions & 0 deletions mian.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package main

import (
"fmt"
"gowp/workerpool"
"time"
)

func main() {
wp := workerpool.New(5) //设置最大线程数
for i := 0; i < 10; i++ { //开启20个请求
// ii := i
wp.Do(func() error {
for j := 0; j < 5; j++ { //每次打印0-10的值
//fmt.Println(fmt.Sprintf("%v->\t%v", ii, j))
time.Sleep(1 * time.Second)
}
return nil
})

fmt.Println(wp.IsDone())
}

wp.Wait()

fmt.Println(wp.IsDone())

fmt.Println("down")
}
31 changes: 31 additions & 0 deletions workerpool/def.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package workerpool

import (
"sync"
"time"
)

// // CallHandler process .定义调用回调体(可修改)
// type CallHandler func()

// TaskHandler process .定义函数回调体
type TaskHandler func() error

// ServeHandler must process tls.Config.NextProto negotiated requests.
//type ServeHandler func(c net.Conn) error

// workerPool serves incoming connections via a pool of workers
// in FILO order, i.e. the most recently stopped worker will serve the next
// incoming connection.
//
// Such a scheme keeps CPU caches hot (in theory).
type WorkerPool struct {
//sync.Mutex
maxWorkersCount int //最大的工作协程数
closed int32
errChan chan error //错误chan
timeout time.Duration //最大超时时间
wg sync.WaitGroup
task chan TaskHandler
start sync.Once
}
134 changes: 134 additions & 0 deletions workerpool/workerpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package workerpool

import (
"context"
"sync/atomic"
"time"

"github.com/xxjwxc/public/mylog"
)

//New 注册工作池,并设置最大并发数
//new workpool and set the max number of concurrencies
func New(max int) *WorkerPool {
if max < 1 {
max = 1
}

return &WorkerPool{
maxWorkersCount: max,
task: make(chan TaskHandler, max),
errChan: make(chan error, 1),
}
}

//SetTimeout 设置超时时间
func (p *WorkerPool) SetTimeout(timeout time.Duration) {
p.timeout = timeout
}

//SingleCall 单程执行(排他)
// func (p *WorkerPool) SingleCall(fn TaskHandler) {
// p.Mutex.Lock()
// fn()
// p.Mutex.Unlock()
// }

//Do 添加到工作池,并立即返回
func (p *WorkerPool) Do(fn TaskHandler) {
p.start.Do(func() { //once
p.wg.Add(p.maxWorkersCount)
go p.loop()
})

if atomic.LoadInt32(&p.closed) == 1 {
// 已关闭
return
}
p.task <- fn
}

//DoWait 添加到工作池,并等待执行完成之后再返回
func (p *WorkerPool) DoWait(task TaskHandler) {
p.start.Do(func() { //once
p.wg.Add(p.maxWorkersCount)
go p.loop()
})

if atomic.LoadInt32(&p.closed) == 1 { // 已关闭
return
}

doneChan := make(chan struct{})
p.task <- func() error {
err := task()
close(doneChan)
return err
}
<-doneChan
}

func (p *WorkerPool) loop() {
// 启动n个worker
for i := 0; i < p.maxWorkersCount; i++ {
go func() {
defer p.wg.Done()
// worker 开始干活
for wt := range p.task {
if wt == nil || atomic.LoadInt32(&p.closed) == 1 { //有err 立即返回
continue //需要先消费完了之后再返回,
}

closed := make(chan struct{}, 1)
// 有设置超时,优先task 的超时
if p.timeout > 0 {
ct, cancel := context.WithTimeout(context.Background(), p.timeout)
go func() {
select {
case <-ct.Done():
p.errChan <- ct.Err()
//if atomic.LoadInt32(&p.closed) != 1 {
mylog.Error(ct.Err())
atomic.StoreInt32(&p.closed, 1)
cancel()
case <-closed:
}
}()
}

err := wt() //真正执行的点
close(closed)
if err != nil {
select {
case p.errChan <- err:
//if atomic.LoadInt32(&p.closed) != 1 {
mylog.Error(err)
atomic.StoreInt32(&p.closed, 1)
default:
}
}
}
}()
}
}

//Wait 等待工作线程执行结束
func (p *WorkerPool) Wait() error {
close(p.task)
p.wg.Wait() //等待结束
select {
case err := <-p.errChan:
return err
default:
return nil
}
}

//IsDone 判断是否完成 (非阻塞)
func (p *WorkerPool) IsDone() bool {
if p == nil || p.task == nil {
return true
}

return len(p.task) == 0
}
127 changes: 127 additions & 0 deletions workerpool/workerpool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package workerpool

import (
"fmt"
"testing"
"time"

"github.com/xxjwxc/public/errors"
)

func TestWorkerPoolStart(t *testing.T) {
wp := New(10) //设置最大线程数
for i := 0; i < 20; i++ { //开启20个请求
ii := i
wp.Do(func() error {
for j := 0; j < 10; j++ { //每次打印0-10的值
fmt.Println(fmt.Sprintf("%v->\t%v", ii, j))
time.Sleep(1 * time.Second)
}
//time.Sleep(1 * time.Second)
return nil
})
}

wp.Wait()
fmt.Println("down")
}

func TestWorkerPoolError(t *testing.T) {
wp := New(10) //设置最大线程数
for i := 0; i < 20; i++ { //开启20个请求
ii := i
wp.Do(func() error {
for j := 0; j < 10; j++ { //每次打印0-10的值
fmt.Println(fmt.Sprintf("%v->\t%v", ii, j))
if ii == 1 {
return errors.Cause(errors.New("my test err")) //有err 立即返回
}
time.Sleep(1 * time.Second)
}

return nil
//time.Sleep(1 * time.Second)
//return errors.New("my test err")
})
}

err := wp.Wait()
if err != nil {
fmt.Println(err)
}
fmt.Println("down")
}

//测试排他执行(到单线程模式)
// func TestWorkerPoolSingleCall(t *testing.T) {
// wp := New(2) //设置最大线程数
// for i := 0; i < 4; i++ { //开启20个请求
// ii := i
// wp.SingleCall(func() error {
// for j := 0; j < 2; j++ { //每次打印0-10的值
// fmt.Println(fmt.Sprintf("%v->\t%v", ii, j))
// if ii == 1 {
// return errors.New("my test err")
// }
// time.Sleep(1 * time.Second)
// }

// return nil
// //time.Sleep(1 * time.Second)
// //return errors.New("my test err")
// })
// }

// err := wp.Wait()
// if err != nil {
// fmt.Println(err)
// }
// fmt.Println("down")
// }

//放到工作池里面 且等待执行结果
func TestWorkerPoolDoWait(t *testing.T) {
wp := New(5) //设置最大线程数
for i := 0; i < 10; i++ { //开启20个请求
ii := i
wp.DoWait(func() error {
for j := 0; j < 5; j++ { //每次打印0-10的值
fmt.Println(fmt.Sprintf("%v->\t%v", ii, j))
if ii == 1 {
return errors.New("my test err")
}
time.Sleep(1 * time.Second)
}

return nil
//time.Sleep(1 * time.Second)
//return errors.New("my test err")
})
}

err := wp.Wait()
if err != nil {
fmt.Println(err)
}
fmt.Println("down")
}

//判断是否完成 (非阻塞)
func TestWorkerPoolIsDone(t *testing.T) {
wp := New(5) //设置最大线程数
for i := 0; i < 10; i++ { //开启20个请求
// ii := i
wp.Do(func() error {
for j := 0; j < 5; j++ { //每次打印0-10的值
//fmt.Println(fmt.Sprintf("%v->\t%v", ii, j))
time.Sleep(1 * time.Second)
}
return nil
})

fmt.Println(wp.IsDone())
}
wp.Wait()
fmt.Println(wp.IsDone())
fmt.Println("down")
}

0 comments on commit d13252a

Please sign in to comment.