Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add reset #22

Merged
merged 7 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 111 additions & 56 deletions min_heap.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,29 @@
// Copyright 2020-2024 guonaihong, antlabs. All rights reserved.
//
// mit license
package timer

import (
"container/heap"
"context"
"math"
"sync"
"sync/atomic"
"time"
)

var _ Timer = (*minHeap)(nil)

var defaultTimeout = time.Hour

type minHeap struct {
mu sync.Mutex
minHeaps
chAdd chan struct{}
ctx context.Context
cancel context.CancelFunc
wait sync.WaitGroup
runCount uint32 // 测试时使用
tm *time.Timer
runCount int32 // 单元测试时使用
}

// 一次性定时器
Expand All @@ -44,9 +49,6 @@ func (m *minHeap) addCallback(expire time.Duration, n Next, callback func(), isS
default:
}

m.mu.Lock()
defer m.mu.Unlock()

node := minHeapNode{
callback: callback,
userExpire: expire,
Expand All @@ -60,7 +62,11 @@ func (m *minHeap) addCallback(expire time.Duration, n Next, callback func(), isS
node.absExpire = n.Next(time.Now())
}

m.mu.Lock()
heap.Push(&m.minHeaps, &node)
m.wait.Add(1)
m.mu.Unlock()

select {
case m.chAdd <- struct{}{}:
default:
Expand All @@ -71,71 +77,120 @@ func (m *minHeap) addCallback(expire time.Duration, n Next, callback func(), isS

func (m *minHeap) removeTimeNode(node *minHeapNode) {
m.mu.Lock()
if node.index < 0 || node.index > len(m.minHeaps) || len(m.minHeaps) == 0 {
if node.index < 0 || node.index > int32(len(m.minHeaps)) || int32(len(m.minHeaps)) == 0 {
m.mu.Unlock()
return
}

heap.Remove(&m.minHeaps, node.index)
heap.Remove(&m.minHeaps, int(node.index))
m.wait.Done()
m.mu.Unlock()
}

func (m *minHeap) resetTimeNode(node *minHeapNode, d time.Duration) {
m.mu.Lock()
node.userExpire = d
node.absExpire = time.Now().Add(d)
heap.Fix(&m.minHeaps, int(node.index))
select {
case m.chAdd <- struct{}{}:
default:
}
m.mu.Unlock()
}

func (m *minHeap) getNewSleepTime() time.Duration {
if m.minHeaps.Len() == 0 {
return time.Hour
}

timeout := time.Since(m.minHeaps[0].absExpire)
if timeout < 0 {
timeout = 0
}
return timeout
}

func (m *minHeap) process() {
for {
m.mu.Lock()
now := time.Now()
// 如果堆中没有元素,就等待
// 这时候设置一个相对长的时间,避免空转cpu
if m.minHeaps.Len() == 0 {
m.tm.Reset(time.Hour)
m.mu.Unlock()
return
}

for {
// 取出最小堆的第一个元素
first := m.minHeaps[0]

// 时间未到直接过滤掉
// 只是跳过最近的循环
if !now.After(first.absExpire) {
break
}

// 取出待执行的callback
callback := first.callback
// 如果是周期性任务
if first.isSchedule {
// 计算下次触发的绝对时间点
first.absExpire = first.Next(now)
// 修改下在堆中的位置
heap.Fix(&m.minHeaps, int(first.index))
} else {
// 从堆中删除
heap.Pop(&m.minHeaps)
m.wait.Done()
}

// 正在运行的任务数加1
atomic.AddInt32(&m.runCount, 1)
go func() {
callback()
// 对正在运行的任务数减1
atomic.AddInt32(&m.runCount, -1)
}()

// 如果堆中没有元素,就等待
if m.minHeaps.Len() == 0 {
m.tm.Reset(defaultTimeout)
m.mu.Unlock()
return
}
}

// 取出第一个元素
first := m.minHeaps[0]
// 如果第一个元素的时间还没到,就计算下次触发的时间
if time.Now().Before(first.absExpire) {
to := m.getNewSleepTime()
m.tm.Reset(to)
// fmt.Printf("### now=%v, to = %v, m.minHeaps[0].absExpire = %v\n", time.Now(), to, m.minHeaps[0].absExpire)
m.mu.Unlock()
return
}
m.mu.Unlock()
}
}

// 运行
// 为了避免空转cpu, 会等待一个chan, 只要AfterFunc或者ScheduleFunc被调用就会往这个chan里面写值
func (m *minHeap) Run() {
timeout := time.Hour
tm := time.NewTimer(timeout)
m.tm = time.NewTimer(time.Hour)
m.process()
for {
select {
case <-tm.C:
for {
m.mu.Lock()
now := time.Now()
if m.minHeaps.Len() == 0 {
tm.Reset(timeout)
m.mu.Unlock()
goto next
}

for {
first := m.minHeaps[0]

// 时间未到直接过滤掉
if !now.After(first.absExpire) {
break
}

callback := first.callback
if first.isSchedule {
first.absExpire = first.Next(now)
heap.Fix(&m.minHeaps, first.index)
} else {
heap.Pop(&m.minHeaps)
}
atomic.AddUint32(&m.runCount, 1)
go callback()

if m.minHeaps.Len() == 0 {
tm.Reset(timeout)
m.mu.Unlock()
goto next
}
}

first := m.minHeaps[0]
if time.Now().Before(first.absExpire) {
to := time.Duration(math.Abs(float64(time.Since(m.minHeaps[0].absExpire))))
tm.Reset(to)
m.mu.Unlock()
goto next
}
m.mu.Unlock()
}
case <-m.tm.C:
m.process()
case <-m.chAdd:
m.mu.Lock()
// 极端情况,加完任务立即给删除了, 判断下当前堆中是否有元素
if m.minHeaps.Len() > 0 {
tm.Reset(m.minHeaps[0].absExpire.Sub(time.Now()))
m.tm.Reset(m.getNewSleepTime())
}
m.mu.Unlock()
// 进入事件循环,如果为空就会从事件循环里面退出
Expand All @@ -144,7 +199,7 @@ func (m *minHeap) Run() {
m.wait.Wait()
return
}
next:

}
}

Expand All @@ -156,7 +211,7 @@ func (m *minHeap) Stop() {
func newMinHeap() (mh *minHeap) {
mh = &minHeap{}
heap.Init(&mh.minHeaps)
mh.chAdd = make(chan struct{}, 1)
mh.chAdd = make(chan struct{}, 1024)
mh.ctx, mh.cancel = context.WithCancel(context.TODO())
return
}
20 changes: 13 additions & 7 deletions min_heap_node.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright 2020-2024 guonaihong, antlabs. All rights reserved.
//
// mit license
package timer

import (
Expand All @@ -8,19 +11,22 @@ type minHeapNode struct {
callback func() // 用户的callback
absExpire time.Time // 绝对时间
userExpire time.Duration // 过期时间段
next Next // 自定义下个触发的时间点
root *minHeap // 指向最小堆
next Next // 自定义下个触发的时间点, cronex项目用到了
index int32 // 在min heap中的索引,方便删除或者重新推入堆中
isSchedule bool // 是否是周期性任务
index int // 在min heap中的索引,方便删除或者重新推入堆中
root *minHeap
}

func (m *minHeapNode) Stop() {
m.root.removeTimeNode(m)
}
func (m *minHeapNode) Reset(d time.Duration) {
m.root.resetTimeNode(m, d)
}

func (m *minHeapNode) Next(now time.Time) time.Time {
if m.next != nil {
return m.next.Next(now)
return (m.next).Next(now)
}
return now.Add(m.userExpire)
}
Expand All @@ -33,15 +39,15 @@ func (m minHeaps) Less(i, j int) bool { return m[i].absExpire.Before(m[j].absExp

func (m minHeaps) Swap(i, j int) {
m[i], m[j] = m[j], m[i]
m[i].index = i
m[j].index = j
m[i].index = int32(i)
m[j].index = int32(j)
}

func (m *minHeaps) Push(x any) {
// Push and Pop use pointer receivers because they modify the slice's length,
// not just its contents.
*m = append(*m, x.(*minHeapNode))
lastIndex := len(*m) - 1
lastIndex := int32(len(*m) - 1)
(*m)[lastIndex].index = lastIndex
}

Expand Down
9 changes: 9 additions & 0 deletions min_heap_node_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
// Copyright 2020-2024 guonaihong, antlabs. All rights reserved.
//
// mit license

package timer

import (
Expand All @@ -6,6 +10,11 @@ import (
"time"
)

func Test_NodeSizeof(t *testing.T) {
t.Run("输出最小堆node的sizeof", func(t *testing.T) {
// t.Logf("minHeapNode size: %d, %d\n", unsafe.Sizeof(minHeapNode{}), unsafe.Sizeof(time.Timer{}))
})
}
func Test_MinHeap(t *testing.T) {
t.Run("", func(t *testing.T) {
var mh minHeaps
Expand Down
Loading
Loading