-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathreader.go
171 lines (143 loc) · 4.11 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package plex
import (
"fmt"
"io"
"net"
"time"
"github.com/swxctx/plex/pack"
"github.com/swxctx/plex/plog"
)
// startReaderRoutine
func (c *plexConnection) startReaderRoutine() {
remoteAddr := c.storeInfo.conn.RemoteAddr().String()
c.remoteAddr = remoteAddr
plog.Infof("plex server accept conn-> %s", remoteAddr)
// set auth timeout
c.storeInfo.conn.SetReadDeadline(time.Now().Add(time.Duration(c.plexServer.cfg.AuthTimeout) * time.Second))
for {
// message logic
if err := c.handleMessage(); err != nil {
if err == io.EOF {
plog.Tracef("connection closed, remote-> %s, uid-> %s", c.remoteAddr, c.uid)
c.close()
break
}
// ReadDeadline timeout
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
plog.Warnf("connection timeout, remote-> %s", remoteAddr)
c.close()
break
}
plog.Errorf("error handling message: %s", err)
continue
}
}
}
// handleMessage
func (c *plexConnection) handleMessage() error {
message, err := pack.Unpack(c.storeInfo.conn)
if err != nil {
return err
}
if message == nil {
return nil
}
plog.Tracef("Server received message from remote-> %s, msg-> %#v", c.remoteAddr, message)
return c.receiveMsgHandler(message)
}
// receiveMsgHandler
func (c *plexConnection) receiveMsgHandler(message *pack.Message) error {
if !c.isAuthenticated && message.URI != auth_uri && message.URI != inner_auth_uri {
plog.Errorf("unauthorized access attempt, remote-> %s", c.remoteAddr)
// auth failed
err := c.responseOnlyUri(auth_failed_uri)
if err != nil {
return err
}
return fmt.Errorf("unauthorized access")
}
switch message.URI {
case auth_uri:
// auth
return c.handlerAuth(message)
case heartbeat_uri:
// heartbeat
return c.handleHeartbeat(message)
case inner_auth_uri:
// inner tcp
return c.handlerInnerAuth(message)
case send_msg_uri:
// send msg to client
return c.handleSendMessage(message)
}
return nil
}
// setReadDeadline
func (c *plexConnection) setReadDeadline(duration int64) {
c.storeInfo.conn.SetReadDeadline(time.Now().Add(time.Duration(duration) * time.Second))
}
// handlerAuth
func (c *plexConnection) handlerAuth(message *pack.Message) error {
// already auth
if c.isAuthenticated {
plog.Infof("already authenticated, remote-> %s", c.remoteAddr)
return nil
}
// auth func
authSuccess, uid := c.plexServer.authFunc(message.Body)
if !authSuccess {
plog.Errorf("auth failed, remote-> %s, body-> %s", c.remoteAddr, message.Body)
c.responseOnlyUri(auth_failed_uri)
return fmt.Errorf("auth failed")
}
c.isAuthenticated = true
plog.Infof("auth success, remote-> %s, body-> %s", c.remoteAddr, message.Body)
// save conn cache
c.plexServer.store.set(uid, c.storeInfo)
c.uid = uid
// heartbeat timeout
c.setReadDeadline(c.plexServer.cfg.HeartbeatTimeout)
// auth success
c.responseOnlyUri(auth_success_uri)
// online status change
c.plexServer.onlineStatusSubscribe(true, c.uid)
return nil
}
// handlerInnerAuth
func (c *plexConnection) handlerInnerAuth(message *pack.Message) error {
// already auth
if c.isAuthenticated {
plog.Infof("already authenticated, remote-> %s", c.remoteAddr)
return nil
}
// auth func
if message.Body != c.plexServer.cfg.InnerPassword {
plog.Errorf("inner auth failed, remote-> %s, body-> %d", c.remoteAddr, message.Body)
// auth failed
c.responseOnlyUri(auth_failed_uri)
return fmt.Errorf("inner auth failed")
}
c.isAuthenticated = true
plog.Infof("inner auth success, remote-> %s, body-> %s", c.remoteAddr, message.Body)
// heartbeat timeout
c.setReadDeadline(c.plexServer.cfg.HeartbeatTimeout)
// auth success
return c.responseOnlyUri(auth_success_uri)
}
// handlerAuth
func (c *plexConnection) handleHeartbeat(message *pack.Message) error {
// heartbeat
c.storeInfo.heartbeat = time.Now().Unix()
if len(c.uid) > 0 {
c.plexServer.store.set(c.uid, c.storeInfo)
}
// next heartbeat timeout
c.setReadDeadline(c.plexServer.cfg.HeartbeatTimeout)
// response success
return c.responseOnlyUri(heartbeat_uri)
}
// handlerAuth
func (c *plexConnection) handleSendMessage(message *pack.Message) error {
go c.plexServer.sendMessageOuterClient(message)
return nil
}