You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
A previous run of kinesis-consumer with a checkpointed sequence number
kinesis-consumer stopped for > 24 hours such that the stored sequence number refers to expired data
Now I find that upon restarting the consumer, it continues to read 0 records with MillisBehindLatest remaining at 86400000 (24 hours). The shard iterator is changing after each GetRecords call, but each request yields 0 records and showing 86400000 ms behind.
I've let this run for about 15 minutes of continuously polling GetRecords without finding any actual records. I can't be sure it wouldn't eventually find data and finally advance the sequence number, but at least after ~15 minutes of that there wasn't any real sign of progress.
So I began investigating, and implemented the following bit of logic in the consumer which quickly (within about 5 seconds) gets things moving again:
In Consumer.Scan() call c.client.DescribeStreamSummaryWithContext() to discover the retention period for the stream
In Consumer.ScanShard() if we return 0 records and MillisBehindLatest is >= the retention period from Message buffer #1, then we can infer we have encountered this condition, and so we fetch a new shard iterator of type ShardIteratorTypeTrimHorizon.
Add first checkpoint and interface #2 is only ever done once if the condition described exists, and is never done if records were ever observed from the shard, to more ensure we are better targeting this edge case.
Like the immediate rescan stuff from #122, fetching ShardIteratorTypeTrimHorizon also takes the fast path, skipping the scan ticker. But I've also implemented the "fast path" via a new ticker which is fixed at 200ms, based on the published data plan API limits which also addresses the throttling concern mentioned in #122.
I can clean up the code and submit a PR but wanted to float the idea by you. Obviously all this work I've been doing lately is really complicated your beautifully simple shard scanning loop, but I am finding that dealing with Kinesis edge cases in practice is a bit of a subtle science and exact art. :)
The text was updated successfully, but these errors were encountered:
Another noteworthy thing about this is that it would require an additional permission on the stream for kinesis:DescribeStreamSummary (or kinesis:DescribeStream*), so given that it may need to be opt-in.
I have the following scenario:
Now I find that upon restarting the consumer, it continues to read 0 records with MillisBehindLatest remaining at 86400000 (24 hours). The shard iterator is changing after each
GetRecords
call, but each request yields 0 records and showing 86400000 ms behind.I've let this run for about 15 minutes of continuously polling GetRecords without finding any actual records. I can't be sure it wouldn't eventually find data and finally advance the sequence number, but at least after ~15 minutes of that there wasn't any real sign of progress.
So I began investigating, and implemented the following bit of logic in the consumer which quickly (within about 5 seconds) gets things moving again:
Consumer.Scan()
callc.client.DescribeStreamSummaryWithContext()
to discover the retention period for the streamConsumer.ScanShard()
if we return 0 records andMillisBehindLatest
is >= the retention period from Message buffer #1, then we can infer we have encountered this condition, and so we fetch a new shard iterator of typeShardIteratorTypeTrimHorizon
.Like the immediate rescan stuff from #122, fetching
ShardIteratorTypeTrimHorizon
also takes the fast path, skipping the scan ticker. But I've also implemented the "fast path" via a new ticker which is fixed at 200ms, based on the published data plan API limits which also addresses the throttling concern mentioned in #122.I can clean up the code and submit a PR but wanted to float the idea by you. Obviously all this work I've been doing lately is really complicated your beautifully simple shard scanning loop, but I am finding that dealing with Kinesis edge cases in practice is a bit of a subtle science and exact art. :)
The text was updated successfully, but these errors were encountered: