Skip to content

Commit

Permalink
new
Browse files Browse the repository at this point in the history
  • Loading branch information
谢小军 authored and 谢小军 committed Sep 15, 2019
1 parent 414a6d5 commit f44adb4
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 75 deletions.
3 changes: 1 addition & 2 deletions mian.go → main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func main() {
wp.Do(func() error {
for j := 0; j < 5; j++ { //每次打印0-10的值
//fmt.Println(fmt.Sprintf("%v->\t%v", ii, j))
time.Sleep(1 * time.Second)
time.Sleep(1 * time.Microsecond)
}
return nil
})
Expand All @@ -22,7 +22,6 @@ func main() {
}

wp.Wait()

fmt.Println(wp.IsDone())

fmt.Println("down")
Expand Down
20 changes: 7 additions & 13 deletions workerpool/def.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,21 @@ import (
"time"
)

// // CallHandler process .定义调用回调体(可修改)
// type CallHandler func()

// TaskHandler process .定义函数回调体
type TaskHandler func() error

// ServeHandler must process tls.Config.NextProto negotiated requests.
//type ServeHandler func(c net.Conn) error

// workerPool serves incoming connections via a pool of workers
// in FILO order, i.e. the most recently stopped worker will serve the next
// incoming connection.
//
// Such a scheme keeps CPU caches hot (in theory).
type WorkerPool struct {
//sync.Mutex
maxWorkersCount int //最大的工作协程数
closed int32
errChan chan error //错误chan
timeout time.Duration //最大超时时间
wg sync.WaitGroup
task chan TaskHandler
start sync.Once
//maxWorkersCount int //最大的工作协程数
//start sync.Once
closed int32
errChan chan error //错误chan
timeout time.Duration //最大超时时间
wg sync.WaitGroup
task chan TaskHandler
}
53 changes: 23 additions & 30 deletions workerpool/workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,62 +15,47 @@ func New(max int) *WorkerPool {
max = 1
}

return &WorkerPool{
maxWorkersCount: max,
task: make(chan TaskHandler, max),
errChan: make(chan error, 1),
p := &WorkerPool{
task: make(chan TaskHandler, 2*max),
errChan: make(chan error, 1),
}

go p.loop(max)

return p
}

//SetTimeout 设置超时时间
func (p *WorkerPool) SetTimeout(timeout time.Duration) {
p.timeout = timeout
}

//SingleCall 单程执行(排他)
// func (p *WorkerPool) SingleCall(fn TaskHandler) {
// p.Mutex.Lock()
// fn()
// p.Mutex.Unlock()
// }

//Do 添加到工作池,并立即返回
func (p *WorkerPool) Do(fn TaskHandler) {
p.start.Do(func() { //once
p.wg.Add(p.maxWorkersCount)
go p.loop()
})

if atomic.LoadInt32(&p.closed) == 1 {
// 已关闭
if p.IsClosed() { // 已关闭
return
}
p.task <- fn
}

//DoWait 添加到工作池,并等待执行完成之后再返回
func (p *WorkerPool) DoWait(task TaskHandler) {
p.start.Do(func() { //once
p.wg.Add(p.maxWorkersCount)
go p.loop()
})

if atomic.LoadInt32(&p.closed) == 1 { // 已关闭
if p.IsClosed() { // 已关闭
return
}

doneChan := make(chan struct{})
p.task <- func() error {
err := task()
close(doneChan)
return err
defer close(doneChan)
return task()
}
<-doneChan
}

func (p *WorkerPool) loop() {
// 启动n个worker
for i := 0; i < p.maxWorkersCount; i++ {
func (p *WorkerPool) loop(maxWorkersCount int) {
p.wg.Add(maxWorkersCount) // 最大的工作协程数
// 启动max个worker
for i := 0; i < maxWorkersCount; i++ {
go func() {
defer p.wg.Done()
// worker 开始干活
Expand Down Expand Up @@ -132,3 +117,11 @@ func (p *WorkerPool) IsDone() bool {

return len(p.task) == 0
}

//IsClosed 是否已经关闭
func (p *WorkerPool) IsClosed() bool {
if atomic.LoadInt32(&p.closed) == 1 { // 已关闭
return true
}
return false
}
33 changes: 3 additions & 30 deletions workerpool/workerpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,33 +52,6 @@ func TestWorkerPoolError(t *testing.T) {
fmt.Println("down")
}

//测试排他执行(到单线程模式)
// func TestWorkerPoolSingleCall(t *testing.T) {
// wp := New(2) //设置最大线程数
// for i := 0; i < 4; i++ { //开启20个请求
// ii := i
// wp.SingleCall(func() error {
// for j := 0; j < 2; j++ { //每次打印0-10的值
// fmt.Println(fmt.Sprintf("%v->\t%v", ii, j))
// if ii == 1 {
// return errors.New("my test err")
// }
// time.Sleep(1 * time.Second)
// }

// return nil
// //time.Sleep(1 * time.Second)
// //return errors.New("my test err")
// })
// }

// err := wp.Wait()
// if err != nil {
// fmt.Println(err)
// }
// fmt.Println("down")
// }

//放到工作池里面 且等待执行结果
func TestWorkerPoolDoWait(t *testing.T) {
wp := New(5) //设置最大线程数
Expand All @@ -87,9 +60,9 @@ func TestWorkerPoolDoWait(t *testing.T) {
wp.DoWait(func() error {
for j := 0; j < 5; j++ { //每次打印0-10的值
fmt.Println(fmt.Sprintf("%v->\t%v", ii, j))
if ii == 1 {
return errors.New("my test err")
}
// if ii == 1 {
// return errors.New("my test err")
// }
time.Sleep(1 * time.Second)
}

Expand Down

0 comments on commit f44adb4

Please sign in to comment.