-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathwheel_timer.go
139 lines (127 loc) · 2.68 KB
/
wheel_timer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package mylib
import (
"errors"
"fmt"
"runtime/debug"
"sync"
"time"
)
// Inspired by HashedWheelTimer in Netty
type WheelTimer struct {
step time.Duration // check duration, eg: 1s
timeout time.Duration // timeout duration, eg: 1min
doAction func(int64) // action when timeout/expired, eg: logout
id2slot map[int64]int // id --> where it is
sync.Mutex
circle *cycleList
stop chan struct{}
}
type cycleList struct {
currentSlot int
body []*Int64Set
}
func (cl *cycleList) getPreviousSlot() int {
l := len(cl.body)
if l == 0 {
return 0
}
if cl.currentSlot == 0 {
return l - 1
}
return cl.currentSlot - 1
}
func (cl *cycleList) getNextSlot() int {
l := len(cl.body)
if l == 0 {
return 0
}
if l-1 == cl.currentSlot {
return 0
}
return cl.currentSlot + 1
}
func (cl *cycleList) takeStep() {
cl.currentSlot = cl.getNextSlot()
}
// Initialize the timer
func (wt *WheelTimer) Init(step, timeout time.Duration, doWhenExpired func(int64)) error {
n := timeout % step
if n != 0 {
// timeout would not work
// actually (timeout/step)*step works.
// eg.: Init(3s,10s,logout)
// there will be 3 slots,
// 3s * 3 == 9s
return errors.New("timeout%step must be zero")
}
wt.step = step
wt.timeout = timeout
wt.doAction = doWhenExpired
wt.stop = make(chan struct{}, 0)
wt.id2slot = make(map[int64]int)
cl := &cycleList{}
cl.body = make([]*Int64Set, timeout/step)
for i := range cl.body {
s := &Int64Set{}
s.Init()
cl.body[i] = s
}
wt.circle = cl
return nil
}
// Update or register an id in the timer
//
// If an id is never updated since last time in
// expire duration which set in Init() function,
// means it's expired, and doWhenExpired(id)
// will be called
func (wt *WheelTimer) Update(id int64) {
wt.Lock()
defer wt.Unlock()
if v, ok := wt.id2slot[id]; ok {
// remove id from the old set
oldSet := wt.circle.body[v]
oldSet.Remove(id)
}
// get the set of current slot
i := wt.circle.currentSlot
slot := wt.circle.body[i]
// add id in current set
slot.Add(id)
// record down where it is
wt.id2slot[id] = i
}
// Stop the timer
func (wt *WheelTimer) Stop() {
wt.stop <- struct{}{}
}
// Start the timer
func (wt *WheelTimer) Start() {
go func() {
for {
select {
case <-wt.stop:
return
case <-time.Tick(wt.step):
wt.circle.takeStep()
wt.process()
}
}
}()
}
// execute all expired IDs
func (wt *WheelTimer) process() {
defer func() {
if r := recover(); r != nil {
fmt.Println(debug.Stack())
}
}()
// get all ids in current slot
expiredIDs := wt.circle.body[wt.circle.currentSlot]
go func(ids []int64) {
for _, id := range ids {
wt.doAction(id)
}
}(expiredIDs.ToSlice())
expiredIDs.RemoveAll()
}