forked from onflow/flow-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
request_heap.go
73 lines (61 loc) · 2.11 KB
/
request_heap.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package synchronization
import (
"sync"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/model/flow"
)
// RequestHeap is a special structure that implements engine.MessageStore interface and
// indexes requests by originator. If request will be sent by same originator then it will replace the old one.
// Comparing to default FIFO queue this one can contain MAX one request for origin ID.
// Getting value from queue as well as ejecting is pseudo-random.
type RequestHeap struct {
lock sync.Mutex
limit uint
requests map[flow.Identifier]*engine.Message
}
func NewRequestHeap(limit uint) *RequestHeap {
return &RequestHeap{
limit: limit,
requests: make(map[flow.Identifier]*engine.Message),
}
}
// Put stores message into requests map using OriginID as key.
// Returns always true
func (q *RequestHeap) Put(message *engine.Message) bool {
q.lock.Lock()
defer q.lock.Unlock()
// first try to eject if we are at max capacity, we need to do this way
// to prevent a situation where just inserted item gets ejected
if _, found := q.requests[message.OriginID]; !found {
// if no message from the origin is stored, make sure we have room to store the new message:
q.reduce()
}
// at this point we can be sure that there is at least one slot
q.requests[message.OriginID] = message
return true
}
// Get returns pseudo-random element from request storage using go map properties.
func (q *RequestHeap) Get() (*engine.Message, bool) {
q.lock.Lock()
defer q.lock.Unlock()
var originID flow.Identifier
var msg *engine.Message
if len(q.requests) == 0 {
return nil, false
}
// pick first element using go map randomness property
for originID, msg = range q.requests {
break
}
delete(q.requests, originID)
return msg, true
}
// reduce will reduce the size of the kept entities until we are within the
// configured memory pool size limit. If called on max capacity will eject at least one element.
func (q *RequestHeap) reduce() {
for overCapacity := len(q.requests) - int(q.limit); overCapacity >= 0; overCapacity-- {
for originID := range q.requests {
delete(q.requests, originID)
}
}
}