diff --git a/README.md b/README.md index 52bb6e6..1fdd9a9 100644 --- a/README.md +++ b/README.md @@ -5,12 +5,11 @@ [![Go Report Card](https://goreportcard.com/badge/github.com/fallenstedt/twitter-stream)](https://goreportcard.com/report/github.com/fallenstedt/twitter-stream) [![Go Reference](https://pkg.go.dev/badge/github.com/fallenstedt/twitter-stream.svg)](https://pkg.go.dev/github.com/fallenstedt/twitter-stream) -TwitStream is a Go library for streaming tweets with [Twitter's v2 Filtered Streaming API](https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/introduction). +TwitStream is a Go library for creating streaming rules and streaming tweets with [Twitter's v2 Filtered Streaming API](https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/introduction). +See [examples](https://github.com/fallenstedt/twitter-stream/tree/master/example) to start adding your own rules and start streaming. -![example of twit stream](./example.gif) -This project is not production ready. There are several things I need to do: -- [ ] This package streams strings. I need to convert json into go structs with [these possible response fields](https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/api-reference/get-tweets-search-stream) +![example of twit stream](./example.gif) ## Installation @@ -18,164 +17,133 @@ This project is not production ready. There are several things I need to do: `go get github.com/fallenstedt/twitter-stream` + + ## Examples #### Starting a stream -Once you obtain an Access Token, you can create a TwitterStream instance with `NewTwitterStream(accesToken)` -Then, you can invoke `StartStream` to begin the streaming process. - -To read messages from your stream, start a loop with `GetMessages`. The messages that is returned could be -a tweet, or an error. +##### Obtain an Access Token using your Twitter Access Key and Secret. +You need an access token to do any streaming. `twitterstream` provides an easy way to fetch an access token. +```go + tok, err := twitterstream.NewTokenGenerator().SetApiKeyAndSecret("key", "secret").RequestBearerToken() -The possible errors that can be returned are -* `io.EOF`: An error that you have reached the end of the stream. -* `non io.EOF errors`: This could be errors that are returned from Twitter during your stream + if err != nil { + panic(err) + } +``` -[You can learn more about processing data by reading Twitter's documentation here](https://dev.twitter.com/streaming/overview/processing) +##### Create a streaming api +Create a twitterstream instance with your access token from above. ```go -// Starting a stream assuming you already have -// stream rules set in place -func startStreaming() { - // Obtain an AccessToken - // You can use the token generator and provide your api key and secret - // or provide an access token you already have - token, err := twitterstream.NewTokenGenerator().SetApiKeyAndSecret( - "your_twitter_api_key", - "your_twitter_api_secret", - ).RequestBearerToken() + api := twitterstream.NewTwitterStream(tok.AccessToken) +``` - if err != nil { - panic("No token found!") - } +##### Start Stream +Start your stream. This is a long-running HTTP GET request. +You can get specific data you want by adding [query params](https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/api-reference/get-tweets-search-stream). +Additionally, [view an example of query params here](https://developer.twitter.com/en/docs/twitter-api/expansions). + +```go + err := api.Stream.StartStream("") - // With an access token, you can create a new twitterstream and start streaming - api := twitterstream.NewTwitterStream(token.AccessToken) - err := api.Stream.StartStream() if err != nil { panic(err) } +``` - // If you do not put this in a go routine, you will stream forever +4. Consume Messages from the Stream +Handle any `io.EOF` and other errors that arise first, then unmarshal your bytes into your favorite struct. Below is an example with strings +```go go func() { - // Range over the messages channel to get a message, or an error. - for message := range *api.Stream.GetMessages() { - fmt.Println(message) + for message := range api.Stream.GetMessages() { + if message.Err != nil { + panic(message.Err) + } + // Will print something like: + //{"data":{"id":"1356479201000","text":"Look at this cat picture"},"matching_rules":[{"id":12345,"tag":"cat tweets with images"}]} + fmt.Println(string(message.Data)) } }() - // After 30 seconds, stop the stream time.Sleep(time.Second * 30) - api.Stream.StopStream() -} ``` #### Creating, Deleting, and Getting Rules +##### Obtain an Access Token using your Twitter Access Key and Secret. +You need an access token to do anything. `twitterstream` provides an easy way to fetch an access token. ```go + tok, err := twitterstream.NewTokenGenerator().SetApiKeyAndSecret("key", "secret").RequestBearerToken() + + if err != nil { + panic(err) + } +``` +##### Create a streaming api +Create a twitterstream instance with your access token from above. -func addRules() { - // Obtain an AccessToken - // You can use the token generator and provide your api key and secret - // or provide an access token you already have - token, err := twitterstream.NewTokenGenerator().SetApiKeyAndSecret( - "your_twitter_api_key", - "your_twitter_api_secret", - ).RequestBearerToken() +```go + api := twitterstream.NewTwitterStream(tok.AccessToken) +``` + +##### Get Rules +Use the `Rules` struct to access different Rules endpoints as defined in [Twitter's API Reference](https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/api-reference) +```go + res, err := api.Rules.GetRules() if err != nil { - panic("No token found!") + panic(err) } - // With an access token, you can create a new twitterstream and start adding rules - api := twitterstream.NewTwitterStream(token.AccessToken) + if res.Errors != nil && len(res.Errors) > 0 { + //https://developer.twitter.com/en/support/twitter-api/error-troubleshooting + panic(fmt.Sprintf("Received an error from twiiter: %v", res.Errors)) + } + fmt.Println(res.Data) +``` - // You can add rules by passing in stringified JSON with the rules you want to add - // You can learn more about building rules here: https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/build-a-rule - // Or here: https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/api-reference/post-tweets-search-stream-rules - // The response are the rules you created - // The 2nd argument will perform a dry run if set to true. +##### Add Rules +```go res, err := api.Rules.AddRules(`{ "add": [ {"value": "cat has:images", "tag": "cat tweets with images"} ] - }`, false) + }`, true) // dryRun is set to true if err != nil { panic(err) } - fmt.Println(res.Data, res.Meta) -} - -func deleteRules() { - // Obtain an AccessToken - // You can use the token generator and provide your api key and secret - // or provide an access token you already have - token, err := twitterstream.NewTokenGenerator().SetApiKeyAndSecret( - "your_twitter_api_key", - "your_twitter_api_secret", - ).RequestBearerToken() - - if err != nil { - panic("No token found!") + if res.Errors != nil && len(res.Errors) > 0 { + //https://developer.twitter.com/en/support/twitter-api/error-troubleshooting + panic(fmt.Sprintf("Received an error from twiiter: %v", res.Errors)) } - - // With an access token, you can create a new twitterstream and start deleting rules - api := twitterstream.NewTwitterStream(token.AccessToken) - - // You can delete rules by passing in stringified JSON with the rules you want to delete - // Learn more about deleting rules here: https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/api-reference/post-tweets-search-stream-rules - // The ids are the rule's ids you want to delete. You can find out how to get your ids in the below example - // The response are the rules you have. - // The 2nd argument will perform a dry run if set to true. +``` +##### Delete Rules +```go +// use api.Rules.GetRules to find the ID number for an existing rule res, err := api.Rules.AddRules(`{ "delete": { - "ids": ["1340894899986579457"] + "ids": ["1234567890"] } - }`, false) + }`, true) if err != nil { panic(err) } - fmt.Println(res.Data, res.Meta) -} - -func getRules() { - // Obtain an AccessToken - // You can use the token generator and provide your api key and secret - // or provide an access token you already have - token, err := twitterstream.NewTokenGenerator().SetApiKeyAndSecret( - "your_twitter_api_key", - "your_twitter_api_secret", - ).RequestBearerToken() - - if err != nil { - panic("No token found!") - } - - // With an access token, you can create a new twitterstream and start getting your rules - api := twitterstream.NewTwitterStream(token.AccessToken) - - // You can get your rules by invoking GetRules - // Learn more about getting rules here: https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/api-reference/get-tweets-search-stream-rules - res, err := api.Rules.GetRules() - - if err != nil { - panic(err) + if res.Errors != nil && len(res.Errors) > 0 { + //https://developer.twitter.com/en/support/twitter-api/error-troubleshooting + panic(fmt.Sprintf("Received an error from twiiter: %v", res.Errors)) } - fmt.Println(res.Data, res.Meta) -} - ``` - ## Contributing Pull requests are always welcome. Please accompany a pull request with tests. diff --git a/VERSION b/VERSION index 7dff5b8..9325c3c 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.2.1 \ No newline at end of file +0.3.0 \ No newline at end of file diff --git a/example/go.mod b/example/go.mod new file mode 100644 index 0000000..dbf8fd3 --- /dev/null +++ b/example/go.mod @@ -0,0 +1,7 @@ +module github.com/fallenstedt/twitter-stream/example + +replace github.com/fallenstedt/twitter-stream => /Users/alex/Projects/twitter-stream/ + +go 1.15 + +require github.com/fallenstedt/twitter-stream v0.2.1 diff --git a/example/go.sum b/example/go.sum new file mode 100644 index 0000000..3d086fc --- /dev/null +++ b/example/go.sum @@ -0,0 +1,2 @@ +github.com/fallenstedt/twitter-stream v0.2.1 h1:lhnQDj1R9od8ZpiHya5tgbBon1eQH4Iqwlj0hqqLnK8= +github.com/fallenstedt/twitter-stream v0.2.1/go.mod h1:e3GVow5/CaCeacD7kMH7ubyKHUNVSNntzFddzmzwP/8= diff --git a/example/main.go b/example/main.go new file mode 100644 index 0000000..1ae5a6b --- /dev/null +++ b/example/main.go @@ -0,0 +1,119 @@ +package main + +import ( + "fmt" + twitterstream "github.com/fallenstedt/twitter-stream" + "time" +) + +const key = "YOUR_KEY" +const secret = "YOUR_SECRET" + + +func main() { + // Use your favorite function from below here + startStream() + //addRules() + //getRules() + //deleteRules() +} + + +func startStream() { + tok, err := twitterstream.NewTokenGenerator().SetApiKeyAndSecret(key, secret).RequestBearerToken() + + if err != nil { + panic(err) + } + + api := twitterstream.NewTwitterStream(tok.AccessToken) + + err = api.Stream.StartStream("") + + if err != nil { + panic(err) + } + + go func() { + for message := range api.Stream.GetMessages() { + if message.Err != nil { + panic(message.Err) + } + fmt.Println(string(message.Data)) + } + }() + + time.Sleep(time.Second * 30) +} + +func addRules() { + tok, err := twitterstream.NewTokenGenerator().SetApiKeyAndSecret(key, secret).RequestBearerToken() + if err != nil { + panic(err) + } + api := twitterstream.NewTwitterStream(tok.AccessToken) + res, err := api.Rules.AddRules(`{ + "add": [ + {"value": "cat has:images", "tag": "cat tweets with images"} + ] + }`, true) // dryRun is set to true + + if err != nil { + panic(err) + } + + if res.Errors != nil && len(res.Errors) > 0 { + //https://developer.twitter.com/en/support/twitter-api/error-troubleshooting + panic(fmt.Sprintf("Received an error from twiiter: %v", res.Errors)) + } + + fmt.Println("I have created this many rules: ") + fmt.Println(res.Meta.Summary.Created) +} + +func getRules() { + tok, err := twitterstream.NewTokenGenerator().SetApiKeyAndSecret(key, secret).RequestBearerToken() + if err != nil { + panic(err) + } + api := twitterstream.NewTwitterStream(tok.AccessToken) + res, err := api.Rules.GetRules() + + if err != nil { + panic(err) + } + + if res.Errors != nil && len(res.Errors) > 0 { + //https://developer.twitter.com/en/support/twitter-api/error-troubleshooting + panic(fmt.Sprintf("Received an error from twiiter: %v", res.Errors)) + } + + fmt.Println(res.Data) +} + + +func deleteRules() { + tok, err := twitterstream.NewTokenGenerator().SetApiKeyAndSecret(key, secret).RequestBearerToken() + if err != nil { + panic(err) + } + api := twitterstream.NewTwitterStream(tok.AccessToken) + + // use api.Rules.GetRules to find the ID number for an existing rule + res, err := api.Rules.AddRules(`{ + "delete": { + "ids": ["1234567890"] + } + }`, true) + + if err != nil { + panic(err) + } + + if res.Errors != nil && len(res.Errors) > 0 { + //https://developer.twitter.com/en/support/twitter-api/error-troubleshooting + panic(fmt.Sprintf("Received an error from twiiter: %v", res.Errors)) + } + + fmt.Println(res) +} \ No newline at end of file diff --git a/http_client.go b/http_client.go index 0e6af57..e8a355a 100644 --- a/http_client.go +++ b/http_client.go @@ -6,14 +6,21 @@ import ( "io/ioutil" "log" "net/http" + "strings" ) -var endpoints = make(map[string]string) +type twitterEndpoints map[string]string +var endpoints = make(twitterEndpoints) type ( // IHttpClient is the interface the httpClient struct implements. IHttpClient interface { + newHttpRequest(opts *requestOpts) (*http.Response, error) + getRules() (*http.Response, error) + getSearchStream(queryParams string) (*http.Response, error) + addRules(queryParams string, body string) (*http.Response, error) + generateUrl(name string, queryParams string) (string, error) } httpClient struct { @@ -31,7 +38,6 @@ type ( } ) - func newHttpClient(token string) *httpClient { endpoints["rules"] = "https://api.twitter.com/2/tweets/search/stream/rules" endpoints["stream"] = "https://api.twitter.com/2/tweets/search/stream" @@ -39,6 +45,71 @@ func newHttpClient(token string) *httpClient { return &httpClient{token} } +func (t *httpClient) getRules() (*http.Response, error) { + res, err := t.newHttpRequest(&requestOpts{ + Method: "GET", + Url: endpoints["rules"], + Body: "", + }) + + return res, err +} + +func (t *httpClient) addRules(queryParams string, body string) (*http.Response, error) { + url, err := t.generateUrl("rules", queryParams) + + if err != nil { + return nil, err + } + + res, err := t.newHttpRequest(&requestOpts{ + Method: "POST", + Url: url, + Body: body, + }) + + if err != nil { + return nil, err + } + + return res, nil +} + +func (t *httpClient) getSearchStream(queryParams string) (*http.Response, error) { + // Make an HTTP GET request to GET /2/tweets/search/stream + url, err := t.generateUrl("stream", queryParams) + + if err != nil { + return nil, err + } + + res, err := t.newHttpRequest(&requestOpts{ + Method: "GET", + Url: url, + }) + + if err != nil { + return nil, err + } + + return res, nil +} + +func (t *httpClient) generateUrl(name string, queryParams string) (string, error) { + var url string + if len(queryParams) > 0 { + url = endpoints[name] + queryParams + } else { + url = endpoints[name] + } + + if len(url) == 0 || !strings.HasPrefix(url, "https://api.twitter.com") { + return url, errors.New("Could not find endpoint with name " + name) + } else { + return url, nil + } +} + func (t *httpClient) newHttpRequest(opts *requestOpts) (*http.Response, error) { client := &http.Client{} diff --git a/http_client_mock.go b/http_client_mock.go index 1b4439c..0c8dab5 100644 --- a/http_client_mock.go +++ b/http_client_mock.go @@ -5,12 +5,32 @@ import "net/http" type mockHttpClient struct { token string MockNewHttpRequest func(opts *requestOpts) (*http.Response, error) + MockGetSearchStream func(queryParams string) (*http.Response, error) + MockGetRules func() (*http.Response, error) + MockAddRules func(queryParams string, body string) (*http.Response, error) + MockGenerateUrl func (name string, queryParams string) (string, error) } func newHttpClientMock(token string) *mockHttpClient { return &mockHttpClient{token: token} } +func (t *mockHttpClient) generateUrl(name string, queryParams string) (string, error) { + return t.MockGenerateUrl(name, queryParams) +} + +func (t *mockHttpClient) getRules() (*http.Response, error) { + return t.MockGetRules() +} + +func (t *mockHttpClient) addRules(queryParams string, body string) (*http.Response, error) { + return t.MockAddRules(queryParams, body) +} + +func (t *mockHttpClient) getSearchStream(queryParams string) (*http.Response, error) { + return t.MockGetSearchStream(queryParams) +} + func (t *mockHttpClient) newHttpRequest(opts *requestOpts) (*http.Response, error) { return t.MockNewHttpRequest(opts) } diff --git a/rules.go b/rules.go index 6cea85f..0844d64 100644 --- a/rules.go +++ b/rules.go @@ -18,6 +18,7 @@ type ( rulesResponse struct { Data []rulesResponseValue Meta rulesResponseMeta + Errors []rulesResponseError } rulesResponseValue struct { @@ -29,6 +30,13 @@ type ( Sent string `json:"sent"` Summary addRulesResponseMetaSummary `json:"summary"` } + rulesResponseError struct { + Value string `json:"value"` + Id string `json:"id"` + Title string `json:"title"` + Type string `json:"type"` + } + addRulesResponseMetaSummary struct { Created uint `json:"created"` NotCreated uint `json:"not_created"` @@ -41,20 +49,15 @@ func newRules(httpClient IHttpClient) *rules { // AddRules adds or deletes rules to the stream using twitter's POST /2/tweets/search/stream/rules endpoint. // The body is a stringified object. +// Learn about the possible error messages returned here https://developer.twitter.com/en/support/twitter-api/error-troubleshooting. func (t *rules) AddRules(body string, dryRun bool) (*rulesResponse, error) { - - var url string - if dryRun { - url = endpoints["rules"] + "?dry_run=true" - } else { - url = endpoints["rules"] - } - - res, err := t.httpClient.newHttpRequest(&requestOpts{ - Method: "POST", - Url: url, - Body: body, - }) + res, err := t.httpClient.addRules(func() string { + if dryRun { + return "?dry_run=true" + } else { + return "" + } + }(), body) if err != nil { return nil, err @@ -62,18 +65,17 @@ func (t *rules) AddRules(body string, dryRun bool) (*rulesResponse, error) { defer res.Body.Close() data := new(rulesResponse) - json.NewDecoder(res.Body).Decode(data) + err = json.NewDecoder(res.Body).Decode(data) + if err != nil { + return nil, err + } return data, nil } // GetRules gets rules for a stream using twitter's GET GET /2/tweets/search/stream/rules endpoint. func (t *rules) GetRules() (*rulesResponse, error) { - res, err := t.httpClient.newHttpRequest(&requestOpts{ - Method: "GET", - Url: endpoints["rules"], - Body: "", - }) + res, err := t.httpClient.getRules() if err != nil { return nil, err diff --git a/rules_test.go b/rules_test.go index e69c177..ff47fc0 100644 --- a/rules_test.go +++ b/rules_test.go @@ -12,7 +12,7 @@ func TestAddRules(t *testing.T) { var tests = []struct { body string - mockRequest func(opts *requestOpts) (*http.Response, error) + mockRequest func(queryParams string, body string) (*http.Response, error) result *rulesResponse }{ { @@ -21,7 +21,7 @@ func TestAddRules(t *testing.T) { {"value": "cat has:images", "tag": "cat tweets with images"} ] }`, - func(opts *requestOpts) (*http.Response, error) { + func(queryParams string, bodyRequest string) (*http.Response, error) { json := `{ "data": [{ "value": "cat has:images", @@ -34,7 +34,8 @@ func TestAddRules(t *testing.T) { "created": 1, "not_created": 0 } - } + }, + "errors": [] }` body := ioutil.NopCloser(bytes.NewReader([]byte(json))) @@ -59,6 +60,7 @@ func TestAddRules(t *testing.T) { NotCreated: 0, }, }, + Errors: nil, }, }, } @@ -68,7 +70,7 @@ func TestAddRules(t *testing.T) { t.Run(testName, func(t *testing.T) { mockClient := newHttpClientMock("sometoken") - mockClient.MockNewHttpRequest = tt.mockRequest + mockClient.MockAddRules = tt.mockRequest instance := newRules(mockClient) result, err := instance.AddRules(tt.body, false) @@ -104,14 +106,13 @@ func TestAddRules(t *testing.T) { } } - func TestGetRules(t *testing.T) { var tests = []struct { - mockRequest func(opts *requestOpts) (*http.Response, error) + mockRequest func() (*http.Response, error) result *rulesResponse }{ { - func(opts *requestOpts) (*http.Response, error) { + func() (*http.Response, error) { json := `{ "data": [{ "value": "cat has:images", @@ -158,7 +159,7 @@ func TestGetRules(t *testing.T) { t.Run(testName, func(t *testing.T) { mockClient := newHttpClientMock("sometoken") - mockClient.MockNewHttpRequest = tt.mockRequest + mockClient.MockGetRules = tt.mockRequest instance := newRules(mockClient) result, err := instance.GetRules() @@ -193,4 +194,4 @@ func TestGetRules(t *testing.T) { }) } -} \ No newline at end of file +} diff --git a/stream.go b/stream.go index 141ba29..4eb52d8 100644 --- a/stream.go +++ b/stream.go @@ -8,13 +8,20 @@ import ( type ( // IStream is the interface that the stream struct implements. IStream interface { - StartStream() error + StartStream(queryParams string) error StopStream() - GetMessages() *chan interface{} + GetMessages() <-chan StreamMessage } + // StreamMessage is the message that is sent from the messages channel. + StreamMessage struct { + Data []byte + Err error + } + + stream struct { - messages chan interface{} + messages chan StreamMessage httpClient IHttpClient done chan struct{} group *sync.WaitGroup @@ -24,7 +31,7 @@ type ( func newStream(httpClient IHttpClient, reader IStreamResponseBodyReader) *stream { return &stream{ - messages: make(chan interface{}), + messages: make(chan StreamMessage), done: make(chan struct{}), group: new(sync.WaitGroup), reader: reader, @@ -32,9 +39,9 @@ func newStream(httpClient IHttpClient, reader IStreamResponseBodyReader) *stream } } -// GetMessages returns the messages channel. -func (s *stream) GetMessages() *chan interface{} { - return &s.messages +// GetMessages returns the read-only messages channel +func (s *stream) GetMessages() <-chan StreamMessage { + return s.messages } // StopStream sends a close signal to stop the stream of tweets. @@ -42,22 +49,22 @@ func (s *stream) StopStream() { close(s.done) } -// StartStream makes an HTTP request to twitter and starts streaming tweets to the Messages channel. -func (s *stream) StartStream() error { - - res, err := s.httpClient.newHttpRequest(&requestOpts{ - Method: "GET", - Url: endpoints["stream"], - }) +// StartStream makes an HTTP GET request to twitter and starts streaming tweets to the Messages channel using Server Sent Events. +// Accepts query params described in GET /2/tweets/search/stream to expand the payload that is returned. Query params string must begin with a ?. +// See available query params here https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/api-reference/get-tweets-search-stream. +// See an example here: https://developer.twitter.com/en/docs/twitter-api/expansions. +func (s *stream) StartStream(optionalQueryParams string) error { + res, err := s.httpClient.getSearchStream(optionalQueryParams) if err != nil { return err } s.reader.setStreamResponseBody(res.Body) - s.group.Add(1) + go s.streamMessages(res) + return nil } @@ -68,7 +75,10 @@ func (s *stream) streamMessages(res *http.Response) { for !stopped(s.done) { data, err := s.reader.readNext() if err != nil { - s.messages <- err + s.messages <- StreamMessage{ + Data: nil, + Err: err, + } s.StopStream() break } @@ -77,7 +87,9 @@ func (s *stream) streamMessages(res *http.Response) { continue } - m := string(data) - s.messages <- m + s.messages <- StreamMessage{ + Data: data, + Err: nil, + } } } diff --git a/stream_test.go b/stream_test.go index 8679699..d475747 100644 --- a/stream_test.go +++ b/stream_test.go @@ -27,7 +27,7 @@ func TestStopStream(t *testing.T) { instance := newStream(client, reader) instance.StopStream() - result := <- instance.done + result := <-instance.done if result != struct{}{} { t.Errorf("expected empty struct, got %v", result) @@ -37,16 +37,16 @@ func TestStopStream(t *testing.T) { func TestStartStream(t *testing.T) { var tests = []struct { givenMockHttpRequestToStreamReturns func() IHttpClient - givenMockStreamResponseBodyReader func() IStreamResponseBodyReader - result string - } { + givenMockStreamResponseBodyReader func() IStreamResponseBodyReader + result StreamMessage + }{ { func() IHttpClient { mockClient := newHttpClientMock("foobar") - mockClient.MockNewHttpRequest = func(opts *requestOpts) (*http.Response, error) { + mockClient.MockGetSearchStream = func(queryParams string) (*http.Response, error) { return &http.Response{ StatusCode: http.StatusOK, - Body: ioutil.NopCloser(bytes.NewReader([]byte("hello"))), + Body: ioutil.NopCloser(bytes.NewReader([]byte("hello"))), }, nil } return mockClient @@ -59,7 +59,10 @@ func TestStartStream(t *testing.T) { } return r }, - "hello", + StreamMessage{ + Data: []byte("hello"), + Err: nil, + }, }, } @@ -72,19 +75,22 @@ func TestStartStream(t *testing.T) { tt.givenMockStreamResponseBodyReader(), ) - instance.StartStream() - result := make(chan interface{}) + err := instance.StartStream("") + if err != nil { + t.Errorf("got err when starting stream %v", err) + } + + result := make(chan StreamMessage) go func() { - for message := range *instance.GetMessages() { + for message := range instance.GetMessages() { result <- message } }() - - if tt.result != <-result { + r := <-result + if string(tt.result.Data) != string(r.Data) { t.Errorf("got %v, want %s", result, tt.result) } }) - } -} \ No newline at end of file +} diff --git a/stream_utils_mock.go b/stream_utils_mock.go index bb89445..281d9b0 100644 --- a/stream_utils_mock.go +++ b/stream_utils_mock.go @@ -5,7 +5,7 @@ import ( ) type mockStreamResponseBodyReader struct { - MockReadNext func() ([]byte, error) + MockReadNext func() ([]byte, error) MockSetStreamResponseBody func(body io.Reader) } diff --git a/twitterstream.go b/twitterstream.go index 312210f..2c47a44 100644 --- a/twitterstream.go +++ b/twitterstream.go @@ -20,4 +20,4 @@ func NewTwitterStream(token string) *twitterApi { rules := newRules(client) stream := newStream(client, newStreamResponseBodyReader()) return &twitterApi{Rules: rules, Stream: stream} -} \ No newline at end of file +}