From 781575aed3a47c6a43bcc87be610d5f6319605d8 Mon Sep 17 00:00:00 2001 From: guonaihong Date: Fri, 9 Feb 2024 20:49:08 +0800 Subject: [PATCH 1/7] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- min_heap.go | 140 +++++++++++++++++++++++++++--------------- min_heap_node.go | 20 +++--- min_heap_node_test.go | 6 ++ min_heap_test.go | 13 ++-- time_wheel_node.go | 10 ++- timer.go | 1 + 6 files changed, 126 insertions(+), 64 deletions(-) diff --git a/min_heap.go b/min_heap.go index 1a8a9b3..d1c992d 100644 --- a/min_heap.go +++ b/min_heap.go @@ -4,6 +4,7 @@ import ( "container/heap" "context" "math" + "runtime" "sync" "sync/atomic" "time" @@ -18,7 +19,9 @@ type minHeap struct { ctx context.Context cancel context.CancelFunc wait sync.WaitGroup - runCount uint32 // 测试时使用 + tm *time.Timer + timeout time.Duration + runCount int32 // 单元测试时使用 } // 一次性定时器 @@ -61,6 +64,7 @@ func (m *minHeap) addCallback(expire time.Duration, n Next, callback func(), isS } heap.Push(&m.minHeaps, &node) + m.wait.Add(1) select { case m.chAdd <- struct{}{}: default: @@ -71,71 +75,105 @@ func (m *minHeap) addCallback(expire time.Duration, n Next, callback func(), isS func (m *minHeap) removeTimeNode(node *minHeapNode) { m.mu.Lock() - if node.index < 0 || node.index > len(m.minHeaps) || len(m.minHeaps) == 0 { + if node.index < 0 || node.index > int32(len(m.minHeaps)) || int32(len(m.minHeaps)) == 0 { m.mu.Unlock() return } - heap.Remove(&m.minHeaps, node.index) + heap.Remove(&m.minHeaps, int(node.index)) + m.wait.Done() m.mu.Unlock() } +func (m *minHeap) resetTimeNode(node *minHeapNode, d time.Duration) { + m.mu.Lock() + heap.Push(&m.minHeaps, node) + select { + case m.chAdd <- struct{}{}: + default: + } + m.mu.Unlock() +} + +func (m *minHeap) process() { + for { + m.mu.Lock() + now := time.Now() + // 如果堆中没有元素,就等待 + // 这时候设置一个相对长的时间,避免空转cpu + if m.minHeaps.Len() == 0 { + m.tm.Reset(m.timeout) + m.mu.Unlock() + return + } + + for { + // 取出最小堆的第一个元素 + first := m.minHeaps[0] + + // 时间未到直接过滤掉 + if !now.After(first.absExpire) { + break + } + + // 取出待执行的callback + callback := first.callback + // 如果是周期性任务 + if first.isSchedule { + // 计算下次触发的绝对时间点 + first.absExpire = first.Next(now) + // 修改下在堆中的位置 + heap.Fix(&m.minHeaps, int(first.index)) + } else { + // 从堆中删除 + heap.Pop(&m.minHeaps) + m.wait.Done() + } + + // 正在运行的任务数加1 + atomic.AddInt32(&m.runCount, 1) + go func() { + callback() + // 对正在运行的任务数减1 + atomic.AddInt32(&m.runCount, -1) + }() + + // 如果堆中没有元素,就等待 + if m.minHeaps.Len() == 0 { + m.tm.Reset(m.timeout) + m.mu.Unlock() + return + } + } + + // 取出第一个元素 + first := m.minHeaps[0] + // 如果第一个元素的时间还没到,就计算下次触发的时间 + if time.Now().Before(first.absExpire) { + to := time.Duration(math.Abs(float64(time.Since(m.minHeaps[0].absExpire)))) + m.tm.Reset(to) + m.mu.Unlock() + return + } + m.mu.Unlock() + } +} + // 运行 // 为了避免空转cpu, 会等待一个chan, 只要AfterFunc或者ScheduleFunc被调用就会往这个chan里面写值 func (m *minHeap) Run() { timeout := time.Hour - tm := time.NewTimer(timeout) + m.tm = time.NewTimer(timeout) + m.process() for { select { - case <-tm.C: - for { - m.mu.Lock() - now := time.Now() - if m.minHeaps.Len() == 0 { - tm.Reset(timeout) - m.mu.Unlock() - goto next - } - - for { - first := m.minHeaps[0] - - // 时间未到直接过滤掉 - if !now.After(first.absExpire) { - break - } - - callback := first.callback - if first.isSchedule { - first.absExpire = first.Next(now) - heap.Fix(&m.minHeaps, first.index) - } else { - heap.Pop(&m.minHeaps) - } - atomic.AddUint32(&m.runCount, 1) - go callback() - - if m.minHeaps.Len() == 0 { - tm.Reset(timeout) - m.mu.Unlock() - goto next - } - } - - first := m.minHeaps[0] - if time.Now().Before(first.absExpire) { - to := time.Duration(math.Abs(float64(time.Since(m.minHeaps[0].absExpire)))) - tm.Reset(to) - m.mu.Unlock() - goto next - } - m.mu.Unlock() - } + case <-m.tm.C: + m.process() case <-m.chAdd: m.mu.Lock() // 极端情况,加完任务立即给删除了, 判断下当前堆中是否有元素 if m.minHeaps.Len() > 0 { - tm.Reset(m.minHeaps[0].absExpire.Sub(time.Now())) + m.tm.Reset(m.minHeaps[0].absExpire.Sub(time.Now())) } m.mu.Unlock() // 进入事件循环,如果为空就会从事件循环里面退出 @@ -144,7 +182,7 @@ func (m *minHeap) Run() { m.wait.Wait() return } - next: + } } @@ -156,7 +194,7 @@ func (m *minHeap) Stop() { func newMinHeap() (mh *minHeap) { mh = &minHeap{} heap.Init(&mh.minHeaps) - mh.chAdd = make(chan struct{}, 1) + mh.chAdd = make(chan struct{}, runtime.NumCPU()) mh.ctx, mh.cancel = context.WithCancel(context.TODO()) return } diff --git a/min_heap_node.go b/min_heap_node.go index 3a434c3..49aeaaa 100644 --- a/min_heap_node.go +++ b/min_heap_node.go @@ -8,19 +8,25 @@ type minHeapNode struct { callback func() // 用户的callback absExpire time.Time // 绝对时间 userExpire time.Duration // 过期时间段 - next Next // 自定义下个触发的时间点 + root *minHeap // 指向最小堆 + next Next // 自定义下个触发的时间点, cronex项目用到了 + index int32 // 在min heap中的索引,方便删除或者重新推入堆中 isSchedule bool // 是否是周期性任务 - index int // 在min heap中的索引,方便删除或者重新推入堆中 - root *minHeap } func (m *minHeapNode) Stop() { m.root.removeTimeNode(m) } +func (m *minHeapNode) Reset(d time.Duration) { + // m.root.removeTimeNode(m) + // m.userExpire = d + // m.absExpire = time.Now().Add(d) + // // m.root.addTimeNode(m) +} func (m *minHeapNode) Next(now time.Time) time.Time { if m.next != nil { - return m.next.Next(now) + return (m.next).Next(now) } return now.Add(m.userExpire) } @@ -33,15 +39,15 @@ func (m minHeaps) Less(i, j int) bool { return m[i].absExpire.Before(m[j].absExp func (m minHeaps) Swap(i, j int) { m[i], m[j] = m[j], m[i] - m[i].index = i - m[j].index = j + m[i].index = int32(i) + m[j].index = int32(j) } func (m *minHeaps) Push(x any) { // Push and Pop use pointer receivers because they modify the slice's length, // not just its contents. *m = append(*m, x.(*minHeapNode)) - lastIndex := len(*m) - 1 + lastIndex := int32(len(*m) - 1) (*m)[lastIndex].index = lastIndex } diff --git a/min_heap_node_test.go b/min_heap_node_test.go index 2776c0b..27c9c1d 100644 --- a/min_heap_node_test.go +++ b/min_heap_node_test.go @@ -4,8 +4,14 @@ import ( "container/heap" "testing" "time" + "unsafe" ) +func Test_NodeSizeof(t *testing.T) { + t.Run("输出最小堆node的sizeof", func(t *testing.T) { + t.Logf("minHeapNode size: %d, %d\n", unsafe.Sizeof(minHeapNode{}), unsafe.Sizeof(time.Timer{})) + }) +} func Test_MinHeap(t *testing.T) { t.Run("", func(t *testing.T) { var mh minHeaps diff --git a/min_heap_test.go b/min_heap_test.go index 0349692..c3d47f8 100644 --- a/min_heap_test.go +++ b/min_heap_test.go @@ -208,17 +208,21 @@ func (c *curstomTest) Next(now time.Time) (rv time.Time) { func Test_CustomFunc(t *testing.T) { t.Run("custom", func(t *testing.T) { tm := NewTimer(WithMinHeap()) - mh := tm.(*minHeap) + // mh := tm.(*minHeap) // 最小堆 tc := make(chan time.Duration, 2) now := time.Now() count := uint32(1) stop := make(chan bool, 1) + // 自定义函数 node := tm.CustomFunc(&curstomTest{count: 1}, func() { + if atomic.LoadUint32(&count) == 2 { return } + // 计算运行次数 atomic.AddUint32(&count, 1) tc <- time.Since(now) + // 关闭这个任务 close(stop) }) @@ -243,9 +247,10 @@ func Test_CustomFunc(t *testing.T) { t.Errorf("count != 2") } - if mh.runCount != uint32(1) { - t.Errorf("mh.runCount != 1") - } + // 正在运行的任务是比较短暂的,所以外部很难 + // if mh.runCount != int32(1) { + // t.Errorf("mh.runCount:%d != 1", mh.runCount) + // } }) } diff --git a/time_wheel_node.go b/time_wheel_node.go index 343c405..3bb712e 100644 --- a/time_wheel_node.go +++ b/time_wheel_node.go @@ -67,8 +67,10 @@ type timeNode struct { // 1.存在于初始化链表中 // 2.被移动到tmp链表 // 3.1 和 3.2是if else的状态 -// 3.1被移动到new链表 -// 3.2直接执行 +// +// 3.1被移动到new链表 +// 3.2直接执行 +// // 1和3.1状态是没有问题的 // 2和3.2状态会是没有锁保护下的操作,会有数据竞争 func (t *timeNode) Stop() { @@ -87,3 +89,7 @@ func (t *timeNode) Stop() { cpyList.Del(&t.Head) } + +func (t *timeNode) Reset(d time.Duration) { + +} diff --git a/timer.go b/timer.go index 8d0bea7..6cc6f68 100644 --- a/timer.go +++ b/timer.go @@ -27,6 +27,7 @@ type Timer interface { // 停止单个定时器 type TimeNoder interface { Stop() + Reset(d time.Duration) } // 定时器构造函数 From bee8cc6156c7727cf48563930ffbc229d3155fd0 Mon Sep 17 00:00:00 2001 From: guonaihong Date: Fri, 9 Feb 2024 20:54:30 +0800 Subject: [PATCH 2/7] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- min_heap_node_test.go | 3 +-- min_heap_test.go | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/min_heap_node_test.go b/min_heap_node_test.go index 27c9c1d..1c88f22 100644 --- a/min_heap_node_test.go +++ b/min_heap_node_test.go @@ -4,12 +4,11 @@ import ( "container/heap" "testing" "time" - "unsafe" ) func Test_NodeSizeof(t *testing.T) { t.Run("输出最小堆node的sizeof", func(t *testing.T) { - t.Logf("minHeapNode size: %d, %d\n", unsafe.Sizeof(minHeapNode{}), unsafe.Sizeof(time.Timer{})) + // t.Logf("minHeapNode size: %d, %d\n", unsafe.Sizeof(minHeapNode{}), unsafe.Sizeof(time.Timer{})) }) } func Test_MinHeap(t *testing.T) { diff --git a/min_heap_test.go b/min_heap_test.go index c3d47f8..39bbca5 100644 --- a/min_heap_test.go +++ b/min_heap_test.go @@ -146,7 +146,7 @@ func Test_MinHeap_ScheduleFunc_Run(t *testing.T) { } if atomic.LoadInt32(&count) != 2 { - t.Errorf("count != 2") + t.Errorf("count:%d != 2", atomic.LoadInt32(&count)) } }) From 764be3769964de00315cf745cdc272180c455a5d Mon Sep 17 00:00:00 2001 From: guonaihong Date: Fri, 9 Feb 2024 20:55:22 +0800 Subject: [PATCH 3/7] up --- min_heap_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/min_heap_test.go b/min_heap_test.go index 39bbca5..415db85 100644 --- a/min_heap_test.go +++ b/min_heap_test.go @@ -133,7 +133,7 @@ func Test_MinHeap_ScheduleFunc_Run(t *testing.T) { node.Stop() }() - time.Sleep(time.Millisecond * 30) + time.Sleep(time.Millisecond * 40) close(tc) cnt := 1 for tv := range tc { From f3e81a4c3ecb406398d4dc79dc2dd1909ff500b8 Mon Sep 17 00:00:00 2001 From: guonaihong Date: Sat, 10 Feb 2024 16:23:11 +0800 Subject: [PATCH 4/7] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- min_heap_node.go | 5 +-- min_heap_test.go | 101 +++++++++++++++++++++++++++++++++++------------ 2 files changed, 76 insertions(+), 30 deletions(-) diff --git a/min_heap_node.go b/min_heap_node.go index 49aeaaa..da31a88 100644 --- a/min_heap_node.go +++ b/min_heap_node.go @@ -18,10 +18,7 @@ func (m *minHeapNode) Stop() { m.root.removeTimeNode(m) } func (m *minHeapNode) Reset(d time.Duration) { - // m.root.removeTimeNode(m) - // m.userExpire = d - // m.absExpire = time.Now().Add(d) - // // m.root.addTimeNode(m) + } func (m *minHeapNode) Next(now time.Time) time.Time { diff --git a/min_heap_test.go b/min_heap_test.go index 415db85..37e43e9 100644 --- a/min_heap_test.go +++ b/min_heap_test.go @@ -1,6 +1,8 @@ package timer import ( + "log" + "sync" "sync/atomic" "testing" "time" @@ -10,23 +12,41 @@ import ( func Test_MinHeap_AfterFunc_Run(t *testing.T) { t.Run("1ms", func(t *testing.T) { tm := NewTimer(WithMinHeap()) - now := time.Now() + go tm.Run() count := int32(0) tc := make(chan time.Duration, 2) - tm.AfterFunc(time.Millisecond, func() { + var mu sync.Mutex + isClose := false + now := time.Now() + node1 := tm.AfterFunc(time.Millisecond, func() { + + mu.Lock() atomic.AddInt32(&count, 1) - tc <- time.Since(now) + if atomic.LoadInt32(&count) <= 2 && !isClose { + tc <- time.Since(now) + } + mu.Unlock() }) - tm.AfterFunc(time.Millisecond, func() { + + node2 := tm.AfterFunc(time.Millisecond, func() { + mu.Lock() atomic.AddInt32(&count, 1) - tc <- time.Since(now) + if atomic.LoadInt32(&count) <= 2 && !isClose { + tc <- time.Since(now) + } + mu.Unlock() }) - time.Sleep(time.Millisecond * 2) + time.Sleep(time.Millisecond * 3) + mu.Lock() + isClose = true close(tc) + node1.Stop() + node2.Stop() + mu.Unlock() for tv := range tc { if tv < time.Millisecond || tv > 2*time.Millisecond { t.Errorf("tc < time.Millisecond tc > 2*time.Millisecond") @@ -34,28 +54,52 @@ func Test_MinHeap_AfterFunc_Run(t *testing.T) { } } if atomic.LoadInt32(&count) != 2 { - t.Errorf("count != 2") + t.Errorf("count:%d != 2", atomic.LoadInt32(&count)) } }) t.Run("10ms", func(t *testing.T) { tm := NewTimer(WithMinHeap()) - now := time.Now() - go tm.Run() + + go tm.Run() // 运行事件循环 count := int32(0) tc := make(chan time.Duration, 2) - tm.AfterFunc(time.Millisecond*10, func() { + + var mu sync.Mutex + isClosed := false + now := time.Now() + node1 := tm.AfterFunc(time.Millisecond*10, func() { + now2 := time.Now() + mu.Lock() atomic.AddInt32(&count, 1) - tc <- time.Since(now) + if atomic.LoadInt32(&count) <= 2 && !isClosed { + tc <- time.Since(now) + } + mu.Unlock() + log.Printf("node1.Lock:%v\n", time.Since(now2)) }) - tm.AfterFunc(time.Millisecond*10, func() { + node2 := tm.AfterFunc(time.Millisecond*10, func() { + now2 := time.Now() + mu.Lock() atomic.AddInt32(&count, 1) - tc <- time.Since(now) + if atomic.LoadInt32(&count) <= 2 && !isClosed { + tc <- time.Since(now) + } + mu.Unlock() + log.Printf("node2.Lock:%v\n", time.Since(now2)) }) - time.Sleep(time.Millisecond * 20) + time.Sleep(time.Millisecond * 24) + now3 := time.Now() + mu.Lock() + node1.Stop() + node2.Stop() + isClosed = true close(tc) + mu.Unlock() + + log.Printf("node1.Stop:%v\n", time.Since(now3)) cnt := 1 for tv := range tc { left := time.Millisecond * 10 * time.Duration(cnt) @@ -66,7 +110,7 @@ func Test_MinHeap_AfterFunc_Run(t *testing.T) { // cnt++ } if atomic.LoadInt32(&count) != 2 { - t.Errorf("count != 2") + t.Errorf("count:%d != 2", atomic.LoadInt32(&count)) } }) @@ -108,7 +152,7 @@ func Test_MinHeap_ScheduleFunc_Run(t *testing.T) { time.Sleep(time.Millisecond * 5) if atomic.LoadInt32(&count) != 2 { - t.Errorf("count != 2") + t.Errorf("count:%d != 2", atomic.LoadInt32(&count)) } }) @@ -117,24 +161,29 @@ func Test_MinHeap_ScheduleFunc_Run(t *testing.T) { tm := NewTimer(WithMinHeap()) go tm.Run() count := int32(0) - c := make(chan bool, 1) tc := make(chan time.Duration, 2) + var mu sync.Mutex + isClosed := false now := time.Now() + node := tm.ScheduleFunc(time.Millisecond*10, func() { + mu.Lock() atomic.AddInt32(&count, 1) - tc <- time.Since(now) - if atomic.LoadInt32(&count) >= 2 { - c <- true + + if atomic.LoadInt32(&count) <= 2 && !isClosed { + tc <- time.Since(now) } + mu.Unlock() }) - go func() { - <-c - node.Stop() - }() + time.Sleep(time.Millisecond * 25) - time.Sleep(time.Millisecond * 40) + mu.Lock() close(tc) + isClosed = true + node.Stop() + mu.Unlock() + cnt := 1 for tv := range tc { left := time.Millisecond * 10 * time.Duration(cnt) @@ -170,7 +219,7 @@ func Test_MinHeap_ScheduleFunc_Run(t *testing.T) { time.Sleep(time.Millisecond * 70) if atomic.LoadInt32(&count) != 2 { - t.Errorf("count != 2") + t.Errorf("count:%d != 2", atomic.LoadInt32(&count)) } }) From ff3d299325703b03eb2e3e15dd26945e059a4bf7 Mon Sep 17 00:00:00 2001 From: guonaihong Date: Sat, 10 Feb 2024 17:11:55 +0800 Subject: [PATCH 5/7] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- min_heap.go | 38 +++++++++++++++++++++++++------------- min_heap_test.go | 16 ++++++---------- 2 files changed, 31 insertions(+), 23 deletions(-) diff --git a/min_heap.go b/min_heap.go index d1c992d..9eb88f8 100644 --- a/min_heap.go +++ b/min_heap.go @@ -3,8 +3,6 @@ package timer import ( "container/heap" "context" - "math" - "runtime" "sync" "sync/atomic" "time" @@ -12,6 +10,8 @@ import ( var _ Timer = (*minHeap)(nil) +var defaultTimeout = time.Hour + type minHeap struct { mu sync.Mutex minHeaps @@ -20,7 +20,6 @@ type minHeap struct { cancel context.CancelFunc wait sync.WaitGroup tm *time.Timer - timeout time.Duration runCount int32 // 单元测试时使用 } @@ -47,9 +46,6 @@ func (m *minHeap) addCallback(expire time.Duration, n Next, callback func(), isS default: } - m.mu.Lock() - defer m.mu.Unlock() - node := minHeapNode{ callback: callback, userExpire: expire, @@ -63,8 +59,11 @@ func (m *minHeap) addCallback(expire time.Duration, n Next, callback func(), isS node.absExpire = n.Next(time.Now()) } + m.mu.Lock() heap.Push(&m.minHeaps, &node) m.wait.Add(1) + m.mu.Unlock() + select { case m.chAdd <- struct{}{}: default: @@ -95,6 +94,18 @@ func (m *minHeap) resetTimeNode(node *minHeapNode, d time.Duration) { m.mu.Unlock() } +func (m *minHeap) getNewSleepTime() time.Duration { + if m.minHeaps.Len() == 0 { + return time.Hour + } + + timeout := time.Since(m.minHeaps[0].absExpire) + if timeout < 0 { + timeout = 0 + } + return timeout +} + func (m *minHeap) process() { for { m.mu.Lock() @@ -102,7 +113,7 @@ func (m *minHeap) process() { // 如果堆中没有元素,就等待 // 这时候设置一个相对长的时间,避免空转cpu if m.minHeaps.Len() == 0 { - m.tm.Reset(m.timeout) + m.tm.Reset(time.Hour) m.mu.Unlock() return } @@ -112,6 +123,7 @@ func (m *minHeap) process() { first := m.minHeaps[0] // 时间未到直接过滤掉 + // 只是跳过最近的循环 if !now.After(first.absExpire) { break } @@ -140,7 +152,7 @@ func (m *minHeap) process() { // 如果堆中没有元素,就等待 if m.minHeaps.Len() == 0 { - m.tm.Reset(m.timeout) + m.tm.Reset(defaultTimeout) m.mu.Unlock() return } @@ -150,8 +162,9 @@ func (m *minHeap) process() { first := m.minHeaps[0] // 如果第一个元素的时间还没到,就计算下次触发的时间 if time.Now().Before(first.absExpire) { - to := time.Duration(math.Abs(float64(time.Since(m.minHeaps[0].absExpire)))) + to := m.getNewSleepTime() m.tm.Reset(to) + // fmt.Printf("### now=%v, to = %v, m.minHeaps[0].absExpire = %v\n", time.Now(), to, m.minHeaps[0].absExpire) m.mu.Unlock() return } @@ -162,8 +175,7 @@ func (m *minHeap) process() { // 运行 // 为了避免空转cpu, 会等待一个chan, 只要AfterFunc或者ScheduleFunc被调用就会往这个chan里面写值 func (m *minHeap) Run() { - timeout := time.Hour - m.tm = time.NewTimer(timeout) + m.tm = time.NewTimer(time.Hour) m.process() for { select { @@ -173,7 +185,7 @@ func (m *minHeap) Run() { m.mu.Lock() // 极端情况,加完任务立即给删除了, 判断下当前堆中是否有元素 if m.minHeaps.Len() > 0 { - m.tm.Reset(m.minHeaps[0].absExpire.Sub(time.Now())) + m.tm.Reset(m.getNewSleepTime()) } m.mu.Unlock() // 进入事件循环,如果为空就会从事件循环里面退出 @@ -194,7 +206,7 @@ func (m *minHeap) Stop() { func newMinHeap() (mh *minHeap) { mh = &minHeap{} heap.Init(&mh.minHeaps) - mh.chAdd = make(chan struct{}, runtime.NumCPU()) + mh.chAdd = make(chan struct{}, 1024) mh.ctx, mh.cancel = context.WithCancel(context.TODO()) return } diff --git a/min_heap_test.go b/min_heap_test.go index 37e43e9..792a749 100644 --- a/min_heap_test.go +++ b/min_heap_test.go @@ -110,6 +110,7 @@ func Test_MinHeap_AfterFunc_Run(t *testing.T) { // cnt++ } if atomic.LoadInt32(&count) != 2 { + t.Errorf("count:%d != 2", atomic.LoadInt32(&count)) } @@ -136,20 +137,15 @@ func Test_MinHeap_ScheduleFunc_Run(t *testing.T) { tm := NewTimer(WithMinHeap()) go tm.Run() count := int32(0) - c := make(chan bool, 1) - node := tm.ScheduleFunc(time.Millisecond, func() { + + _ = tm.ScheduleFunc(2*time.Millisecond, func() { + log.Printf("%v\n", time.Now()) atomic.AddInt32(&count, 1) if atomic.LoadInt32(&count) == 2 { - c <- true + tm.Stop() } }) - go func() { - <-c - node.Stop() - node.Stop() - }() - time.Sleep(time.Millisecond * 5) if atomic.LoadInt32(&count) != 2 { t.Errorf("count:%d != 2", atomic.LoadInt32(&count)) @@ -323,7 +319,7 @@ func Test_RunCount(t *testing.T) { time.Sleep(time.Millisecond * 15) tm.Stop() if count != uint32(max) { - t.Errorf("count != %d", max) + t.Errorf("count:%d != %d", count, max) } }) From cdf2966bbdec4acb0becb62f4efe11d44f61819a Mon Sep 17 00:00:00 2001 From: guonaihong Date: Sat, 10 Feb 2024 17:51:47 +0800 Subject: [PATCH 6/7] =?UTF-8?q?=E6=96=B0=E5=A2=9Ereset=E5=87=BD=E6=95=B0?= =?UTF-8?q?=EF=BC=8CTODO=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- min_heap.go | 4 +++- min_heap_node.go | 2 +- time_wheel.go | 2 ++ time_wheel_node.go | 18 ++++++++++++++++-- 4 files changed, 22 insertions(+), 4 deletions(-) diff --git a/min_heap.go b/min_heap.go index 9eb88f8..1b31aed 100644 --- a/min_heap.go +++ b/min_heap.go @@ -86,7 +86,9 @@ func (m *minHeap) removeTimeNode(node *minHeapNode) { func (m *minHeap) resetTimeNode(node *minHeapNode, d time.Duration) { m.mu.Lock() - heap.Push(&m.minHeaps, node) + node.userExpire = d + node.absExpire = time.Now().Add(d) + heap.Fix(&m.minHeaps, int(node.index)) select { case m.chAdd <- struct{}{}: default: diff --git a/min_heap_node.go b/min_heap_node.go index da31a88..3fd922c 100644 --- a/min_heap_node.go +++ b/min_heap_node.go @@ -18,7 +18,7 @@ func (m *minHeapNode) Stop() { m.root.removeTimeNode(m) } func (m *minHeapNode) Reset(d time.Duration) { - + m.root.resetTimeNode(m, d) } func (m *minHeapNode) Next(now time.Time) time.Time { diff --git a/time_wheel.go b/time_wheel.go index 7adfe75..b6d0580 100644 --- a/time_wheel.go +++ b/time_wheel.go @@ -129,6 +129,7 @@ func (t *timeWheel) AfterFunc(expire time.Duration, callback func()) TimeNoder { node := &timeNode{ expire: uint64(expire), callback: callback, + root: t, } return t.add(node, jiffies) @@ -148,6 +149,7 @@ func (t *timeWheel) ScheduleFunc(userExpire time.Duration, callback func()) Time expire: uint64(expire), callback: callback, isSchedule: true, + root: t, } return t.add(node, jiffies) diff --git a/time_wheel_node.go b/time_wheel_node.go index 3bb712e..b7aea4d 100644 --- a/time_wheel_node.go +++ b/time_wheel_node.go @@ -59,7 +59,7 @@ type timeNode struct { list unsafe.Pointer //存放表头信息 version uint64 //保存节点版本信息 isSchedule bool - + root *timeWheel list.Head } @@ -90,6 +90,20 @@ func (t *timeNode) Stop() { cpyList.Del(&t.Head) } -func (t *timeNode) Reset(d time.Duration) { +// warning: 该函数目前没有稳定 +func (t *timeNode) Reset(expire time.Duration) { + cpyList := (*Time)(atomic.LoadPointer(&t.list)) + cpyList.Lock() + defer cpyList.Unlock() + // TODO: 这里有一个问题,如果在执行Reset的时候,这个节点已经被移动到tmp链表 + // if atomic.LoadUint64(&t.version) != atomic.LoadUint64(&cpyList.version) { + // return + // } + cpyList.Del(&t.Head) + jiffies := atomic.LoadUint64(&t.root.jiffies) + + expire = expire/(time.Millisecond*10) + time.Duration(jiffies) + t.expire = uint64(expire) + t.root.add(t, atomic.LoadUint64(&t.root.jiffies)) } From 842da421cba3b92b57ec167f1be21edbcab7249f Mon Sep 17 00:00:00 2001 From: guonaihong Date: Fri, 16 Feb 2024 14:15:57 +0800 Subject: [PATCH 7/7] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: guonaihong --- min_heap.go | 3 + min_heap_node.go | 3 + min_heap_node_test.go | 4 ++ min_heap_test.go | 3 + option.go | 7 ++- t_test.go | 3 + time_wheel.go | 3 + time_wheel_node.go | 5 +- time_wheel_test.go | 3 + time_wheel_utils.go | 3 + timer.go | 6 +- timer_test.go | 122 ++++++++++++++++++++++++++++++++++++-- timer_wheel_utils_test.go | 3 + 13 files changed, 158 insertions(+), 10 deletions(-) diff --git a/min_heap.go b/min_heap.go index 1b31aed..5f79ff3 100644 --- a/min_heap.go +++ b/min_heap.go @@ -1,3 +1,6 @@ +// Copyright 2020-2024 guonaihong, antlabs. All rights reserved. +// +// mit license package timer import ( diff --git a/min_heap_node.go b/min_heap_node.go index 3fd922c..760d645 100644 --- a/min_heap_node.go +++ b/min_heap_node.go @@ -1,3 +1,6 @@ +// Copyright 2020-2024 guonaihong, antlabs. All rights reserved. +// +// mit license package timer import ( diff --git a/min_heap_node_test.go b/min_heap_node_test.go index 1c88f22..6489d43 100644 --- a/min_heap_node_test.go +++ b/min_heap_node_test.go @@ -1,3 +1,7 @@ +// Copyright 2020-2024 guonaihong, antlabs. All rights reserved. +// +// mit license + package timer import ( diff --git a/min_heap_test.go b/min_heap_test.go index 792a749..e0788c0 100644 --- a/min_heap_test.go +++ b/min_heap_test.go @@ -1,3 +1,6 @@ +// Copyright 2020-2024 guonaihong, antlabs. All rights reserved. +// +// mit license package timer import ( diff --git a/option.go b/option.go index a70db74..b2606b6 100644 --- a/option.go +++ b/option.go @@ -1,3 +1,6 @@ +// Copyright 2020-2024 guonaihong, antlabs. All rights reserved. +// +// mit license package timer type option struct { @@ -21,14 +24,14 @@ func WithMinHeap() Option { } } -//TODO +// TODO func WithSkipList() Option { return func(o *option) { o.skiplist = true } } -//TODO +// TODO func WithRbtree() Option { return func(o *option) { o.rbtree = true diff --git a/t_test.go b/t_test.go index 2dbdcf6..90b7b9a 100644 --- a/t_test.go +++ b/t_test.go @@ -1,3 +1,6 @@ +// Copyright 2020-2024 guonaihong, antlabs. All rights reserved. +// +// mit license package timer import ( diff --git a/time_wheel.go b/time_wheel.go index b6d0580..4d6e01a 100644 --- a/time_wheel.go +++ b/time_wheel.go @@ -1,3 +1,6 @@ +// Copyright 2020-2024 guonaihong, antlabs. All rights reserved. +// +// mit license package timer import ( diff --git a/time_wheel_node.go b/time_wheel_node.go index b7aea4d..55e33c9 100644 --- a/time_wheel_node.go +++ b/time_wheel_node.go @@ -1,3 +1,6 @@ +// Copyright 2020-2024 guonaihong, antlabs. All rights reserved. +// +// mit license package timer import ( @@ -105,5 +108,5 @@ func (t *timeNode) Reset(expire time.Duration) { expire = expire/(time.Millisecond*10) + time.Duration(jiffies) t.expire = uint64(expire) - t.root.add(t, atomic.LoadUint64(&t.root.jiffies)) + t.root.add(t, jiffies) } diff --git a/time_wheel_test.go b/time_wheel_test.go index e254383..520f04f 100644 --- a/time_wheel_test.go +++ b/time_wheel_test.go @@ -1,3 +1,6 @@ +// Copyright 2020-2024 guonaihong, antlabs. All rights reserved. +// +// mit license package timer import ( diff --git a/time_wheel_utils.go b/time_wheel_utils.go index cc97794..66f15bc 100644 --- a/time_wheel_utils.go +++ b/time_wheel_utils.go @@ -1,3 +1,6 @@ +// Copyright 2020-2024 guonaihong, antlabs. All rights reserved. +// +// mit license package timer import "time" diff --git a/timer.go b/timer.go index 6cc6f68..f36d622 100644 --- a/timer.go +++ b/timer.go @@ -1,3 +1,6 @@ +// Copyright 2020-2024 guonaihong, antlabs. All rights reserved. +// +// mit license package timer import "time" @@ -27,7 +30,8 @@ type Timer interface { // 停止单个定时器 type TimeNoder interface { Stop() - Reset(d time.Duration) + // 重置时间器 + Reset(expire time.Duration) } // 定时器构造函数 diff --git a/timer_test.go b/timer_test.go index a44947f..7936bc6 100644 --- a/timer_test.go +++ b/timer_test.go @@ -1,7 +1,11 @@ +// Copyright 2020-2024 guonaihong, antlabs. All rights reserved. +// +// mit license package timer import ( "log" + "sync" "sync/atomic" "testing" "time" @@ -34,7 +38,7 @@ func Test_ScheduleFunc(t *testing.T) { func Test_AfterFunc(t *testing.T) { tm := NewTimer() - + go tm.Run() log.Printf("start\n") count := uint32(0) @@ -60,11 +64,9 @@ func Test_AfterFunc(t *testing.T) { tm.AfterFunc(time.Hour*24*365*12, nil) */ - go func() { - time.Sleep(time.Second + time.Millisecond*100) - tm.Stop() - }() - tm.Run() + time.Sleep(time.Second + time.Millisecond*100) + tm.Stop() + if count != 2 { t.Errorf("count:%d != 2\n", count) } @@ -107,3 +109,111 @@ func Test_Node_Stop(t *testing.T) { } } + +// 测试重置定时器 +func Test_Reset(t *testing.T) { + t.Run("min heap reset", func(t *testing.T) { + + tm := NewTimer(WithMinHeap()) + + go tm.Run() + count := int32(0) + + tc := make(chan time.Duration, 2) + + var mu sync.Mutex + isClose := false + now := time.Now() + node1 := tm.AfterFunc(time.Millisecond*100, func() { + + mu.Lock() + atomic.AddInt32(&count, 1) + if atomic.LoadInt32(&count) <= 2 && !isClose { + tc <- time.Since(now) + } + mu.Unlock() + }) + + node2 := tm.AfterFunc(time.Millisecond*100, func() { + mu.Lock() + atomic.AddInt32(&count, 1) + if atomic.LoadInt32(&count) <= 2 && !isClose { + tc <- time.Since(now) + } + mu.Unlock() + }) + node1.Reset(time.Millisecond) + node2.Reset(time.Millisecond) + + time.Sleep(time.Millisecond * 3) + mu.Lock() + isClose = true + close(tc) + node1.Stop() + node2.Stop() + mu.Unlock() + for tv := range tc { + if tv < time.Millisecond || tv > 2*time.Millisecond { + t.Errorf("tc < time.Millisecond tc > 2*time.Millisecond") + + } + } + if atomic.LoadInt32(&count) != 2 { + t.Errorf("count:%d != 2", atomic.LoadInt32(&count)) + } + + }) + + t.Run("time wheel reset", func(t *testing.T) { + tm := NewTimer() + + go func() { + tm.Run() + }() + + count := int32(0) + + tc := make(chan time.Duration, 2) + + var mu sync.Mutex + isClose := false + now := time.Now() + node1 := tm.AfterFunc(time.Millisecond*10, func() { + + mu.Lock() + atomic.AddInt32(&count, 1) + if atomic.LoadInt32(&count) <= 2 && !isClose { + tc <- time.Since(now) + } + mu.Unlock() + }) + + node2 := tm.AfterFunc(time.Millisecond*10, func() { + mu.Lock() + atomic.AddInt32(&count, 1) + if atomic.LoadInt32(&count) <= 2 && !isClose { + tc <- time.Since(now) + } + mu.Unlock() + }) + + node1.Reset(time.Millisecond * 20) + node2.Reset(time.Millisecond * 20) + + time.Sleep(time.Millisecond * 40) + mu.Lock() + isClose = true + close(tc) + node1.Stop() + node2.Stop() + mu.Unlock() + for tv := range tc { + if tv < time.Millisecond*20 || tv > 2*time.Millisecond*20 { + t.Errorf("tc < time.Millisecond tc > 2*time.Millisecond") + } + } + if atomic.LoadInt32(&count) != 2 { + t.Errorf("count:%d != 2", atomic.LoadInt32(&count)) + } + }) +} diff --git a/timer_wheel_utils_test.go b/timer_wheel_utils_test.go index 69744c7..0c4cdd6 100644 --- a/timer_wheel_utils_test.go +++ b/timer_wheel_utils_test.go @@ -1,3 +1,6 @@ +// Copyright 2020-2024 guonaihong, antlabs. All rights reserved. +// +// mit license package timer import (