-
Notifications
You must be signed in to change notification settings - Fork 246
/
twitter.go
374 lines (328 loc) · 12 KB
/
twitter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
//Package anaconda provides structs and functions for accessing version 1.1
//of the Twitter API.
//
//Successful API queries return native Go structs that can be used immediately,
//with no need for type assertions.
//
//Authentication
//
//If you already have the access token (and secret) for your user (Twitter provides this for your own account on the developer portal), creating the client is simple:
//
// anaconda.SetConsumerKey("your-consumer-key")
// anaconda.SetConsumerSecret("your-consumer-secret")
// api := anaconda.NewTwitterApi("your-access-token", "your-access-token-secret")
//
//
//Queries
//
//Executing queries on an authenticated TwitterApi struct is simple.
//
// searchResult, _ := api.GetSearch("golang", nil)
// for _ , tweet := range searchResult.Statuses {
// fmt.Print(tweet.Text)
// }
//
//Certain endpoints allow separate optional parameter; if desired, these can be passed as the final parameter.
//
// v := url.Values{}
// v.Set("count", "30")
// result, err := api.GetSearch("golang", v)
//
//
//Endpoints
//
//Anaconda implements most of the endpoints defined in the Twitter API documentation: https://dev.twitter.com/docs/api/1.1.
//For clarity, in most cases, the function name is simply the name of the HTTP method and the endpoint (e.g., the endpoint `GET /friendships/incoming` is provided by the function `GetFriendshipsIncoming`).
//
//In a few cases, a shortened form has been chosen to make life easier (for example, retweeting is simply the function `Retweet`)
//
//More detailed information about the behavior of each particular endpoint can be found at the official Twitter API documentation.
package anaconda
import (
"compress/zlib"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strings"
"time"
"github.com/ChimeraCoder/tokenbucket"
"github.com/garyburd/go-oauth/oauth"
)
const (
_GET = iota
_POST = iota
_DELETE = iota
_PUT = iota
ClientTimeout = 20
BaseUrlV1 = "https://api.twitter.com/1"
BaseUrl = "https://api.twitter.com/1.1"
UploadBaseUrl = "https://upload.twitter.com/1.1"
)
var (
oauthCredentials oauth.Credentials
)
type TwitterApi struct {
oauthClient oauth.Client
Credentials *oauth.Credentials
queryQueue chan query
bucket *tokenbucket.Bucket
returnRateLimitError bool
HttpClient *http.Client
// Currently used only for the streaming API
// and for checking rate-limiting headers
// Default logger is silent
Log Logger
// used for testing
// defaults to BaseUrl
baseUrl string
}
type query struct {
url string
form url.Values
data interface{}
method int
response_ch chan response
}
type response struct {
data interface{}
err error
}
const DEFAULT_DELAY = 0 * time.Second
const DEFAULT_CAPACITY = 5
//NewTwitterApi takes an user-specific access token and secret and returns a TwitterApi struct for that user.
//The TwitterApi struct can be used for accessing any of the endpoints available.
func NewTwitterApi(access_token string, access_token_secret string) *TwitterApi {
//TODO figure out how much to buffer this channel
//A non-buffered channel will cause blocking when multiple queries are made at the same time
queue := make(chan query)
c := &TwitterApi{
oauthClient: oauth.Client{
TemporaryCredentialRequestURI: "https://api.twitter.com/oauth/request_token",
ResourceOwnerAuthorizationURI: "https://api.twitter.com/oauth/authenticate",
TokenRequestURI: "https://api.twitter.com/oauth/access_token",
Credentials: oauthCredentials,
},
Credentials: &oauth.Credentials{
Token: access_token,
Secret: access_token_secret,
},
queryQueue: queue,
bucket: nil,
returnRateLimitError: false,
HttpClient: http.DefaultClient,
Log: silentLogger{},
baseUrl: BaseUrl,
}
//Configure a timeout to HTTP client (DefaultClient has no default timeout, which may deadlock Mutex-wrapped uses of the lib.)
c.HttpClient.Timeout = time.Duration(ClientTimeout * time.Second)
go c.throttledQuery()
return c
}
//NewTwitterApiWithCredentials takes an app-specific consumer key and secret, along with a user-specific access token and secret and returns a TwitterApi struct for that user.
//The TwitterApi struct can be used for accessing any of the endpoints available.
func NewTwitterApiWithCredentials(access_token string, access_token_secret string, consumer_key string, consumer_secret string) *TwitterApi {
api := NewTwitterApi(access_token, access_token_secret)
api.oauthClient.Credentials.Token = consumer_key
api.oauthClient.Credentials.Secret = consumer_secret
return api
}
//SetConsumerKey will set the application-specific consumer_key used in the initial OAuth process
//This key is listed on https://dev.twitter.com/apps/YOUR_APP_ID/show
func SetConsumerKey(consumer_key string) {
oauthCredentials.Token = consumer_key
}
//SetConsumerSecret will set the application-specific secret used in the initial OAuth process
//This secret is listed on https://dev.twitter.com/apps/YOUR_APP_ID/show
func SetConsumerSecret(consumer_secret string) {
oauthCredentials.Secret = consumer_secret
}
// ReturnRateLimitError specifies behavior when the Twitter API returns a rate-limit error.
// If set to true, the query will fail and return the error instead of automatically queuing and
// retrying the query when the rate limit expires
func (c *TwitterApi) ReturnRateLimitError(b bool) {
c.returnRateLimitError = b
}
// Enable query throttling using the tokenbucket algorithm
func (c *TwitterApi) EnableThrottling(rate time.Duration, bufferSize int64) {
c.bucket = tokenbucket.NewBucket(rate, bufferSize)
}
// Disable query throttling
func (c *TwitterApi) DisableThrottling() {
c.bucket = nil
}
// SetDelay will set the delay between throttled queries
// To turn of throttling, set it to 0 seconds
func (c *TwitterApi) SetDelay(t time.Duration) {
c.bucket.SetRate(t)
}
func (c *TwitterApi) GetDelay() time.Duration {
return c.bucket.GetRate()
}
// SetBaseUrl is experimental and may be removed in future releases.
func (c *TwitterApi) SetBaseUrl(baseUrl string) {
c.baseUrl = baseUrl
}
//AuthorizationURL generates the authorization URL for the first part of the OAuth handshake.
//Redirect the user to this URL.
//This assumes that the consumer key has already been set (using SetConsumerKey or NewTwitterApiWithCredentials).
func (c *TwitterApi) AuthorizationURL(callback string) (string, *oauth.Credentials, error) {
tempCred, err := c.oauthClient.RequestTemporaryCredentials(http.DefaultClient, callback, nil)
if err != nil {
return "", nil, err
}
return c.oauthClient.AuthorizationURL(tempCred, nil), tempCred, nil
}
// GetCredentials gets the access token using the verifier received with the callback URL and the
// credentials in the first part of the handshake. GetCredentials implements the third part of the OAuth handshake.
// The returned url.Values holds the access_token, the access_token_secret, the user_id and the screen_name.
func (c *TwitterApi) GetCredentials(tempCred *oauth.Credentials, verifier string) (*oauth.Credentials, url.Values, error) {
return c.oauthClient.RequestToken(http.DefaultClient, tempCred, verifier)
}
func defaultValues(v url.Values) url.Values {
if v == nil {
v = url.Values{}
}
v.Set("tweet_mode", "extended")
return v
}
func cleanValues(v url.Values) url.Values {
if v == nil {
return url.Values{}
}
return v
}
// apiGet issues a GET request to the Twitter API and decodes the response JSON to data.
func (c TwitterApi) apiGet(urlStr string, form url.Values, data interface{}) error {
form = defaultValues(form)
resp, err := c.oauthClient.Get(c.HttpClient, c.Credentials, urlStr, form)
if err != nil {
return err
}
defer resp.Body.Close()
return decodeResponse(resp, data)
}
// apiPost issues a POST request to the Twitter API and decodes the response JSON to data.
func (c TwitterApi) apiPost(urlStr string, form url.Values, data interface{}) error {
resp, err := c.oauthClient.Post(c.HttpClient, c.Credentials, urlStr, form)
if err != nil {
return err
}
defer resp.Body.Close()
return decodeResponse(resp, data)
}
// apiDel issues a DELETE request to the Twitter API and decodes the response JSON to data.
func (c TwitterApi) apiDel(urlStr string, form url.Values, data interface{}) error {
resp, err := c.oauthClient.Delete(c.HttpClient, c.Credentials, urlStr, form)
if err != nil {
return err
}
defer resp.Body.Close()
return decodeResponse(resp, data)
}
// apiPut issues a PUT request to the Twitter API and decodes the response JSON to data.
func (c TwitterApi) apiPut(urlStr string, form url.Values, data interface{}) error {
resp, err := c.oauthClient.Put(c.HttpClient, c.Credentials, urlStr, form)
if err != nil {
return err
}
defer resp.Body.Close()
return decodeResponse(resp, data)
}
// decodeResponse decodes the JSON response from the Twitter API.
func decodeResponse(resp *http.Response, data interface{}) error {
// Prevent memory leak in the case where the Response.Body is not used.
// As per the net/http package, Response.Body still needs to be closed.
defer resp.Body.Close()
// Twitter returns deflate data despite the client only requesting gzip
// data. net/http automatically handles the latter but not the former:
// https://github.com/golang/go/issues/18779
if resp.Header.Get("Content-Encoding") == "deflate" {
var err error
resp.Body, err = zlib.NewReader(resp.Body)
if err != nil {
return err
}
}
// according to dev.twitter.com, chunked upload append returns HTTP 2XX
// so we need a special case when decoding the response
if strings.HasSuffix(resp.Request.URL.String(), "upload.json") ||
strings.Contains(resp.Request.URL.String(), "webhooks") {
if resp.StatusCode == 204 {
// empty response, don't decode
return nil
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return newApiError(resp)
}
} else if resp.StatusCode != 200 {
return newApiError(resp)
}
return json.NewDecoder(resp.Body).Decode(data)
}
func NewApiError(resp *http.Response) *ApiError {
body, _ := ioutil.ReadAll(resp.Body)
return &ApiError{
StatusCode: resp.StatusCode,
Header: resp.Header,
Body: string(body),
URL: resp.Request.URL,
}
}
//query executes a query to the specified url, sending the values specified by form, and decodes the response JSON to data
//method can be either _GET or _POST
func (c TwitterApi) execQuery(urlStr string, form url.Values, data interface{}, method int) error {
switch method {
case _GET:
return c.apiGet(urlStr, form, data)
case _POST:
return c.apiPost(urlStr, form, data)
case _DELETE:
return c.apiDel(urlStr, form, data)
case _PUT:
return c.apiPut(urlStr, form, data)
default:
return fmt.Errorf("HTTP method not yet supported")
}
}
// throttledQuery executes queries and automatically throttles them according to SECONDS_PER_QUERY
// It is the only function that reads from the queryQueue for a particular *TwitterApi struct
func (c *TwitterApi) throttledQuery() {
for q := range c.queryQueue {
url := q.url
form := q.form
data := q.data //This is where the actual response will be written
method := q.method
response_ch := q.response_ch
if c.bucket != nil {
<-c.bucket.SpendToken(1)
}
err := c.execQuery(url, form, data, method)
// Check if Twitter returned a rate-limiting error
if err != nil {
if apiErr, ok := err.(*ApiError); ok {
if isRateLimitError, nextWindow := apiErr.RateLimitCheck(); isRateLimitError && !c.returnRateLimitError {
c.Log.Info(apiErr.Error())
// If this is a rate-limiting error, re-add the job to the queue
// TODO it really should preserve order
go func(q query) {
c.queryQueue <- q
}(q)
delay := nextWindow.Sub(time.Now())
<-time.After(delay)
// Drain the bucket (start over fresh)
if c.bucket != nil {
c.bucket.Drain()
}
continue
}
}
}
response_ch <- response{data, err}
}
}
// Close query queue
func (c *TwitterApi) Close() {
close(c.queryQueue)
}