-
Notifications
You must be signed in to change notification settings - Fork 85
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
stream wrapper functionality #27
base: main
Are you sure you want to change the base?
stream wrapper functionality #27
Conversation
Also, blocking queues were added and started parsing the raw responses to objects in parallel
…le thread deserialization layer
…ome exception handling
47d4529
to
96f1b74
Compare
I've also mentioned this PR in this apache beam issue and I am waiting for their feedback on the approach. I'll keep you posted |
Changed to draft since jackson was not set properly for parsing every field. This will reduce the performance mentioned in the PR description. I'll check the parsing with all the required jackson configuration and the performance compared to gson and I'll post the results. |
Switched back to gson for the deserialization thread because lots of config is required for jackson to work as expected for all the fields, nevertheless the performance improvement from the addition of the mid/deserialization layer is significant. |
Hi @kisaga , thank you for your PR.
Again, thank you for the effort and sharing your ideas. |
@kisaga I would appreciate it as I was also working on a multi-threaded version of the queuer / consumer scenario including automatic reconnect, etc. I also encountered the "empty stream" thing after a certain amount of time, which (atm?) requires a reconnect to solve. I think, that other devs would benefit from a more exhaustive example repo in dealing with the SDK. |
Problem
Recently I've started getting myself familiar with the open source software and how to contribute. I found the apache beam project and while I was trying to get familiar with the project I found an example that was using twitter4j as a client. I tried running this example and then I realized that v1 endpoint of twitter stream API has been deprecated recently. I tried to find a way to contribute on the twitter4j project and I saw that it has not updated for the last four years. This project seemed the correct choice for the any required enhancement for integrating with this apache beam example . I needed a mechanism/API that will wrap the stream methods (implemented the
sampleStream
, but the same can be done for thesearchStream
) and will provide an interface that to retrieve the results of the stream.Solution
Some components of the solution were already in place, but in the examples folder. I've moved the executor and the listener to a new package named
stream
and I've added one more thread to deserialize the lines.Permormance improvements
I've done several comparisons for the performance of the stream processing.
You can find the results here: https://docs.google.com/spreadsheets/d/1yt6uofcbBNOj_o34A_LhhRhv7Lg7-6o0B9ts4u1m87o/edit?usp=sharing
The results contain the durations in seconds of parsing 250.000 tweets served from wiremock running locally. (I can share the file with the wiremock config if you want).
The initial solution (with 2 threads - without deserialization layer) corresponds to the
No Deserial Layer (Gson)
line/column.By changing the deserialization provider from gson to jackson 'No Deserial Layer (ObjectMapper) the performance was improved enough.
After that. I added the third (deserialization) layer to decouple 'getting the next line' from 'transforming from String to the Model object'. This has also an impact in the performance.
Adding more that one threads in the third layer did not improve the performance but it made things worse.
Finally I changed the type of the response from
InputStream
tookio.BufferedSource
and it made the application perform faster.Structural changes
I've introduced some enums for the parameters of the request for
sampleStream
andsearchStream
. They can be used in many other places as I've seen. I've also introduced a parameter object along with the builder. The StreamQueryParameter object was introduced to decouple the signatures of all the low level calls inside TweetsApi from all these arguments. The builder object was created as a way of avoiding setting all the values if not required and having some default values. Also I've overridden the method for setting the field with var args to be able to set params with comma separation as in the API.While rebasing I saw that something similar had been introduced but it was not easy to create a wrapper of the sampleStream with parametrized input without adding every parameter in the signature of the wrapper method.
Towards avoiding the above, I've modified the builder of the request object and the request object itself to contain the StreamQueryParameter as a field and delegated the creation of the http call parameters to the QueryParameters object leaving the sampleSearch with only the high level composition of actions.
Other changes
I've added some logging using slf4 in some places and in the pom of the examples module I've added the slf4j-simple implementation to be able to see the logs. It can be removed if you think that it should not be there.
Also, a timeout mechanism has been implemented as mentioned here (after 20 seconds of empty stream results). Re-connection is mentioned in the page but it can be added in a future contribution.
Result
An API for handling the stream internally and providing the results to the users of this library through the listeners has been implemented.
And a new contributor to this repo 🙂 . I have also some other improvements/recommendations for the main structure of this repo towards increased maintainability.