Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added CompactSplitter #60

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open

Added CompactSplitter #60

wants to merge 2 commits into from

Conversation

innonagle
Copy link

This is a solution for #57 .

Java is not my forté so I am unsure how to run the test suite. If instructions can be provided on how to do so I can update the PR to include test coverage.

@chaochenq
Copy link
Contributor

chaochenq commented Dec 7, 2017

Hi, sorry for the delay. We are releasing a AggregationSplitter that does the similar thing to this PR. I will be pushing out the changes soon. Please let me know if that fits your requirement. If not we can discuss how to improve it.

The changes that we made are pretty similar to what's in here, other than that we have tests added as well.

@innonagle
Copy link
Author

@chaochenq Can I get a commit/branch/fork reference?

@chaochenq
Copy link
Contributor

@lennynyktyk I haven't pushed out the changes yet. But I will let you know once it's done. Thanks!

@chaochenq
Copy link
Contributor

@lennynyktyk I have released a newer version with AggregationSplitter which does similar thing to this one

https://github.com/awslabs/amazon-kinesis-agent/blob/master/src/com/amazon/kinesis/streaming/agent/tailing/AggregationSplitter.java

Please have a look and let me know of your thoughts. Thanks!

@innonagle
Copy link
Author

Hello @chaochenq ,

I have got around to checking this out and can say this is equivalent.

In my testing the only issue I found was that the aggregatedRecordSizeBytes value in the agent.json does not seem to get parsed unless it is a JSON string. A JSON int does not work for me. I am only able to test with version 1.1.3 on Java 1.7 as that is the version my ElasticBeanstalk environment can find in yum.

@innonagle innonagle closed this Dec 6, 2018
@innonagle
Copy link
Author

Hello @chaochenq ,

After digging into this more I am unable to replicate the results I would expect.

In the attached graph you can see how after 17:00 there are only dots and not lines. Prior to 17:00 is the stock AWSKinesisAgent JAR v1.1.3 as I tried to configure it with aggregatedRecordSizeBytes After 17:00 is the modified JAR with the CompactSplitter in this PR. Also after 17:00 the IncomingRecords series has a consistent value of 1 where as prior to that it has the same shape as the IncomingBytes series.

I understand there may need to be a time component e.g. aggegatedRecordSizeBytes or X milliseconds which ever comes first but the AggregatedSpiltter does not seem to be reducing the number of IncomingRecords at all.

screen shot 2018-12-06 at 3 43 00 pm

@innonagle innonagle reopened this Dec 6, 2018
@nitzanav
Copy link

nitzanav commented Jan 3, 2019

@chaochenq @lennynyktyk
I wanted to ask if there is a solution to aggregate and compress lines of files as one Kinesis stream message.

Will this solution work for Kinesis Stream?
If not can anyway is willing to get paid to develop such a solution? Or I can contribute... Please email me for this, [email protected]

@innonagle
Copy link
Author

innonagle commented Jan 4, 2019

@nitzanav The way I implemented this solution while I have only tested the aggregation for Kinesis Firehose there is nothing I think why this would not aggregate for Kinesis Streams. Please understand this PR does not compress the data only aggregates it. I have not attempted to compress the data prior to sending it. The solution provided in release 1.1.3 only support Kinesis Firehose.

I believe it is possible to write compressed data to the file under observation as long as each "blob" of compressed data written to the file is terminated by a newline "\n".

@nitzanav
Copy link

nitzanav commented Jan 4, 2019

@lennynyktyk It seems that you are suggesting to compress each row in seperate, I guess that you want to avoid trying to compress the entire batch/chunk every time a row added, in order to verify that it doesn't exceed aggregatedRecordSizeBytes.

In any case I need a technical assistance of a few hours of work, so if any of you can help me code the things I need here, it will be much appreciated. [email protected]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants