From f467ccfb2f8f473f592e2dc15431b0779225283c Mon Sep 17 00:00:00 2001 From: proost Date: Sat, 13 May 2023 17:44:49 +0900 Subject: [PATCH 1/2] fix: duplicate subscribe to shard --- kinesumer.go | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/kinesumer.go b/kinesumer.go index 4f948dd..622c155 100644 --- a/kinesumer.go +++ b/kinesumer.go @@ -539,12 +539,17 @@ func (k *Kinesumer) consumePipe(stream string, shard *Shard) { func (k *Kinesumer) subscribeToShard(streamEvents chan kinesis.SubscribeToShardEventStreamEvent, stream string, shard *Shard) { defer close(streamEvents) + var ( + subscribeStart time.Time + consumerARN string + ) for { ctx := context.Background() ctx, cancel := context.WithCancel(ctx) + newConsumerARN := k.efoMeta[stream].consumerARN input := &kinesis.SubscribeToShardInput{ - ConsumerARN: aws.String(k.efoMeta[stream].consumerARN), + ConsumerARN: aws.String(newConsumerARN), ShardId: aws.String(shard.ID), StartingPosition: &kinesis.StartingPosition{ Type: aws.String(kinesis.ShardIteratorTypeLatest), @@ -556,12 +561,22 @@ func (k *Kinesumer) subscribeToShard(streamEvents chan kinesis.SubscribeToShardE input.StartingPosition.SetSequenceNumber(seq.(string)) } + // NOTE(proost): https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html + switch { + case subscribeStart.IsZero(): // first time. + case consumerARN != newConsumerARN: // consumer changed. + default: + time.Sleep(subscribeStart.Add(5 * time.Second).Sub(time.Now())) + } + output, err := k.client.SubscribeToShardWithContext(ctx, input) if err != nil { k.sendOrDiscardError(errors.WithStack(err)) cancel() continue } + subscribeStart = time.Now() + consumerARN = newConsumerARN open := true for open { @@ -576,6 +591,10 @@ func (k *Kinesumer) subscribeToShard(streamEvents chan kinesis.SubscribeToShardE return case e, ok := <-output.GetEventStream().Events(): if !ok { + err := output.GetStream().Close() + if err != nil { + k.sendOrDiscardError(errors.WithStack(err)) + } cancel() open = false } From 7d32695ef1a66b06c249463dbe256ad1cedfebda Mon Sep 17 00:00:00 2001 From: proost Date: Sat, 13 May 2023 23:05:30 +0900 Subject: [PATCH 2/2] refactor: change if already subscribed --- kinesumer.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/kinesumer.go b/kinesumer.go index 622c155..04a5afc 100644 --- a/kinesumer.go +++ b/kinesumer.go @@ -571,6 +571,14 @@ func (k *Kinesumer) subscribeToShard(streamEvents chan kinesis.SubscribeToShardE output, err := k.client.SubscribeToShardWithContext(ctx, input) if err != nil { + var awsErr awserr.Error + if errors.As(err, &awsErr) { + if awsErr.Code() == kinesis.ErrCodeResourceInUseException { + subscribeStart = time.Now() + consumerARN = newConsumerARN + } + } + k.sendOrDiscardError(errors.WithStack(err)) cancel() continue