-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathapi.go
186 lines (161 loc) · 5.39 KB
/
api.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
package main
import (
"fmt"
lru "github.com/hashicorp/golang-lru"
"io/ioutil"
"net/http"
"os"
"sort"
"strconv"
"strings"
"time"
"github.com/buger/jsonparser"
)
const jsonAcceptType = "application/json"
const datadogOutputTimeFormat = "2006-01-02T15:04:05.000Z"
const datadogInputTimeFormat = "2006-01-02 15:04:05"
// JSON fields returned from Datadog call.
const logsField = "logs"
const statusField = "status"
const idField = "id"
const contentField = "content"
const tagsField = "tags"
const nextLogIdField = "nextLogId"
// Datadog status values
const statusOk = "ok" // More messages.
const statusDone = "done" // No more messages.
const datadogUri = "https://api.datadoghq.com/api/v1/logs-queries/list?api_key=%s&application_key=%s"
// Stores recent log messages. This is used when tailing to prevent an overlap of messages output.
var msgCache, _ = lru.New(1024)
// Simple structure to hold a single log message.
type logMessage struct {
id string
timestamp time.Time
fields map[string]string
tags []string
}
// Fetch all messages that match the settings in the options.
func fetchMessages(opts *options, startingId string) (result []logMessage, nextId string) {
api := messageAPIURI(opts, startingId)
jsonBytes := callDatadog(opts, api, jsonAcceptType)
// Get the messages from the returned JSON.
messages := getJSONArray(jsonBytes, logsField)
_, valueType, err := getJSONValue(jsonBytes, nextLogIdField)
if err != nil || valueType == jsonparser.Null {
nextId = ""
} else {
nextId = getJSONString(jsonBytes, nextLogIdField)
}
status := getJSONString(jsonBytes, statusField)
if status == statusOk || status == statusDone {
return extractMessages(status, nextId, messages, opts)
} else {
_, _ = fmt.Fprintf(os.Stderr, "Error while retrieving logs, status was: %s", status)
return []logMessage{}, ""
}
}
func extractMessages(status string, nextId string, messages []byte, opts *options) (result []logMessage, nextIdResult string) {
if status == statusDone {
nextId = ""
}
_, _ = jsonparser.ArrayEach(messages, func(value []byte, dataType jsonparser.ValueType, offset int, err error) {
id := getJSONString(value, idField)
msg := getJSONSimpleMap(value, contentField)
tags := getJSONArrayOfStrings(value, contentField, tagsField)
tsStr := msg[timestampField] // 2019-10-03T13:22:52.882Z
ts, err := time.Parse(datadogOutputTimeFormat, tsStr)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "Invalid json timestamp: %s - %s\n", tsStr, err.Error())
}
if err == nil {
msgObj := logMessage{
id: id,
timestamp: ts,
fields: msg,
tags: tags,
}
result = append(result, msgObj)
}
})
sort.Slice(result, func(i, j int) bool {
return result[i].timestamp.Before(result[j].timestamp)
})
if opts.limit > 0 {
var filteredMessages []logMessage
for _, log := range result {
if !msgCache.Contains(log.id) {
filteredMessages = append(filteredMessages, log)
msgCache.Add(log.id, true)
}
}
result = filteredMessages
}
return result, nextId
}
// Compute the API Uri to call. Determined by examining the command-line options.
func messageAPIURI(opts *options, nextId string) (uri string) {
api := "{\"query\": \"%QUERY%\",\"time\": {\"from\": \"%START%\", \"to\": \"%END%\"}, \"sort\": \"desc\", \"limit\": %LIMIT%, \"startAt\": %STARTAT%}"
if opts.startDate == nil || opts.endDate == nil {
// uri = fmt.Sprintf(relativeSearch, strconv.Itoa(opts.timeRange))
api = strings.Replace(api, "%START%", "now - "+strconv.Itoa(opts.timeRange)+"s", 1)
api = strings.Replace(api, "%END%", "now", 1)
} else {
api = strings.Replace(api, "%START%", (*opts.startDate).Format(datadogInputTimeFormat), 1)
api = strings.Replace(api, "%END%", (*opts.endDate).Format(datadogInputTimeFormat), 1)
}
if opts.limit > 0 {
api = strings.Replace(api, "%LIMIT%", strconv.Itoa(opts.limit), 1)
} else {
api = strings.Replace(api, "%LIMIT%", "300", 1)
}
if len(opts.query) > 0 {
api = strings.Replace(api, "%QUERY%", opts.query, 1)
} else {
api = strings.Replace(api, "%QUERY%", "*", 1)
}
if len(nextId) > 0 {
api = strings.Replace(api, "%STARTAT%", nextId, 1)
} else {
api = strings.Replace(api, "%STARTAT%", "null", 1)
}
return api
}
// Common entry-point for calls to Datadog.
func callDatadog(opts *options, api string, acceptType string) []byte {
cfg := opts.serverConfig
apiKey := cfg.ApiKey()
applicationKey := cfg.ApplicationKey()
if acceptType == jsonAcceptType {
uri := fmt.Sprintf(datadogUri, apiKey, applicationKey)
return readBytes(uri, api)
}
return nil
}
// Return the raw bytes sent by Datadog.
func readBytes(uri string, body string) []byte {
return fetch(uri, body, jsonAcceptType)
}
// Low-level HTTP call to Datadog.
func fetch(uri string, api string, acceptType string) []byte {
var client *http.Client
client = &http.Client{}
req, err := http.NewRequest("POST", uri, strings.NewReader(api))
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "Request is malformed: %s\n", err.Error())
os.Exit(1)
}
req.Header.Add("Accept", acceptType)
resp, err := client.Do(req)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "Unable to connect to Datadog: %s\n", err.Error())
os.Exit(1)
}
//noinspection GoUnhandledErrorResult
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "Unable to read content from Datadog: %s\n", err.Error())
os.Exit(1)
}
return body
}