forked from xmidt-org/kratos
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclientConfig.go
172 lines (141 loc) · 5.38 KB
/
clientConfig.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package kratos
import (
"errors"
"net/http"
"strings"
"time"
"github.com/go-kit/kit/log"
"github.com/gorilla/websocket"
"github.com/xmidt-org/webpa-common/device"
"github.com/xmidt-org/webpa-common/logging"
)
const (
// Time allowed to write a message to the peer.
writeWait = time.Duration(10) * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
)
var (
errNilHandlePingMiss = errors.New("HandlePingMiss should not be nil")
)
// ClientConfig is the configuration to provide when making a new client.
type ClientConfig struct {
DeviceName string
FirmwareName string
ModelName string
Manufacturer string
DestinationURL string
OutboundQueue QueueConfig
WRPEncoderQueue QueueConfig
WRPDecoderQueue QueueConfig
HandlerRegistryQueue QueueConfig
HandleMsgQueue QueueConfig
Handlers []HandlerConfig
HandlePingMiss HandlePingMiss
ClientLogger log.Logger
PingConfig PingConfig
}
// QueueConfig is used to configure all the queues used to make kratos asynchronous.
type QueueConfig struct {
MaxWorkers int
Size int
}
type PingConfig struct {
PingWait time.Duration
MaxPingMiss int
}
// NewClient is used to create a new kratos Client from a ClientConfig.
func NewClient(config ClientConfig) (Client, error) {
if config.HandlePingMiss == nil {
return nil, errNilHandlePingMiss
}
inHeader := &clientHeader{
deviceName: config.DeviceName,
firmwareName: config.FirmwareName,
modelName: config.ModelName,
manufacturer: config.Manufacturer,
}
newConnection, connectionURL, err := createConnection(inHeader, config.DestinationURL)
if err != nil {
return nil, err
}
pinged := make(chan string)
newConnection.SetPingHandler(func(appData string) error {
pinged <- appData
err := newConnection.WriteControl(websocket.PongMessage, []byte(appData), time.Now().Add(writeWait))
return err
})
// at this point we know that the URL connection is legitimate, so we can do some string manipulation
// with the knowledge that `:` will be found in the string twice
connectionURL = connectionURL[len("ws://"):strings.LastIndex(connectionURL, ":")]
var logger log.Logger
if config.ClientLogger != nil {
logger = config.ClientLogger
} else {
logger = logging.DefaultLogger()
}
if config.PingConfig.MaxPingMiss <= 0 {
config.PingConfig.MaxPingMiss = 1
}
if config.PingConfig.PingWait == 0 {
config.PingConfig.PingWait = time.Minute
}
sender := NewSender(newConnection, config.OutboundQueue.MaxWorkers, config.OutboundQueue.Size, logger)
encoder := NewEncoderSender(sender, config.WRPEncoderQueue.MaxWorkers, config.WRPEncoderQueue.Size, logger)
newClient := &client{
deviceID: inHeader.deviceName,
userAgent: "WebPA-1.6(" + inHeader.firmwareName + ";" + inHeader.modelName + "/" + inHeader.manufacturer + ";)",
deviceProtocols: "TODO-what-to-put-here",
hostname: connectionURL,
handlePingMiss: config.HandlePingMiss,
encoderSender: encoder,
connection: newConnection,
headerInfo: inHeader,
done: make(chan struct{}, 1),
logger: logger,
pingConfig: config.PingConfig,
}
newClient.registry, err = NewHandlerRegistry(config.Handlers)
if err != nil {
logging.Warn(newClient.logger).Log(logging.MessageKey(), "failed to initialize all handlers for registry", logging.ErrorKey(), err.Error())
}
downstreamSender := NewDownstreamSender(newClient.Send, config.HandleMsgQueue.MaxWorkers, config.HandleMsgQueue.Size, logger)
registryHandler := NewRegistryHandler(newClient.Send, newClient.registry, downstreamSender, config.HandlerRegistryQueue.MaxWorkers, config.HandlerRegistryQueue.Size, newClient.deviceID, logger)
decoder := NewDecoderSender(registryHandler, config.WRPDecoderQueue.MaxWorkers, config.WRPDecoderQueue.Size, logger)
newClient.decoderSender = decoder
pingTimer := time.NewTimer(newClient.pingConfig.PingWait)
newClient.wg.Add(2)
go newClient.checkPing(pingTimer, pinged)
go newClient.read()
return newClient, nil
}
// private func used to generate the client that we're looking to produce
func createConnection(headerInfo *clientHeader, httpURL string) (connection *websocket.Conn, wsURL string, err error) {
_, err = device.ParseID(headerInfo.deviceName)
if err != nil {
return nil, "", err
}
// make a header and put some data in that (including MAC address)
// TODO: find special function for user agent
headers := make(http.Header)
headers.Add("X-Webpa-Device-Name", headerInfo.deviceName)
headers.Add("X-Webpa-Firmware-Name", headerInfo.firmwareName)
headers.Add("X-Webpa-Model-Name", headerInfo.modelName)
headers.Add("X-Webpa-Manufacturer", headerInfo.manufacturer)
// make sure destUrl's protocol is websocket (ws)
wsURL = strings.Replace(httpURL, "http", "ws", 1)
// creates a new client connection given the URL string
connection, resp, err := websocket.DefaultDialer.Dial(wsURL, headers)
for ;err == websocket.ErrBadHandshake && resp != nil && resp.StatusCode == http.StatusTemporaryRedirect; {
// Get url to which we are redirected and reconfigure it
wsURL = strings.Replace(resp.Header.Get("Location"), "http", "ws", 1)
connection, resp, err = websocket.DefaultDialer.Dial(wsURL, headers)
}
if err != nil {
if resp != nil {
err = createHTTPError(resp, err)
}
return nil, "", err
}
return connection, wsURL, nil
}