-
Notifications
You must be signed in to change notification settings - Fork 19
/
async_write.go
109 lines (95 loc) · 2.37 KB
/
async_write.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package goev
import (
"errors"
"fmt"
"os"
"sync"
"sync/atomic"
"syscall"
"unsafe"
"golang.org/x/sys/unix"
)
// Asynchronously written data block.
// The framework will ensure that it is sent out in the order in which it was enqueued
type asyncWriteItem struct {
fd int
eh EvHandler
abf asyncWriteBuf
}
// Using a double buffer queue, the 'writeq' is only responsible for receiving data blocks.
// When it is time to send, the 'writeq' and 'readq' are swapped,
// and the 'readq' (previously the 'writeq') is responsible for sending
type asyncWrite struct {
IOHandle
efd int
notified atomic.Int32 // used to avoid duplicate call evHandler
readq *RingBuffer[asyncWriteItem]
writeq *RingBuffer[asyncWriteItem]
mtx sync.Mutex
}
func newAsyncWrite(ep *evPoll) (*asyncWrite, error) {
a := &asyncWrite{
readq: NewRingBuffer[asyncWriteItem](256),
writeq: NewRingBuffer[asyncWriteItem](256),
}
fd, err := unix.Eventfd(0, unix.EFD_NONBLOCK|unix.EFD_CLOEXEC)
if err != nil {
return nil, errors.New("goev: eventfd " + err.Error())
}
if err = ep.add(fd, EvEventfd, a); err != nil {
syscall.Close(fd)
return nil, errors.New("goev: asyncWrite add to evpoll fail! " + err.Error())
}
a.efd = fd
return a, nil
}
func (aw *asyncWrite) push(awi asyncWriteItem) {
aw.mtx.Lock()
aw.writeq.PushBack(awi)
aw.mtx.Unlock()
if !aw.notified.CompareAndSwap(0, 1) {
return
}
var v int64 = 1
for {
_, err := syscall.Write(aw.efd, (*(*[8]byte)(unsafe.Pointer(&v)))[:]) // man 2 eventfd
if err != nil && err == syscall.EINTR {
continue
}
break
}
}
// OnRead writeq has data
func (aw *asyncWrite) OnRead() bool {
if aw.readq.IsEmpty() {
aw.mtx.Lock()
aw.writeq, aw.readq = aw.readq, aw.writeq // Swap read/write queues
aw.mtx.Unlock()
}
for i := 0; i < 256; i++ { // Don't process too many at once
item, ok := aw.readq.PopFront()
if !ok {
break
}
item.eh.asyncOrderedWrite(item.eh, item.abf)
}
if !aw.readq.IsEmpty() { // Ignore readable eventfd, continue
return true
}
var bf [8]byte
for {
_, err := syscall.Read(aw.efd, bf[:])
if err != nil {
if err == syscall.EINTR {
continue
} else if err == syscall.EAGAIN {
return true
}
fmt.Fprintf(os.Stderr, "goev: eventfd read fail! "+err.Error())
// return false // TODO add evOptions.debug? panic("Notify: read eventfd failed!")
}
aw.notified.Store(0)
break
}
return true
}