Skip to content

Commit

Permalink
Fix AWS sns/sqs panic (#3497)
Browse files Browse the repository at this point in the history
Signed-off-by: yaron2 <[email protected]>
Signed-off-by: Artur Souza <[email protected]>
Co-authored-by: yaron2 <[email protected]>
  • Loading branch information
artursouza and yaron2 authored Aug 1, 2024
1 parent c4a4525 commit bffaeeb
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ set -e

export INFLUX_TOKEN=$(openssl rand -base64 32)
echo "INFLUX_TOKEN=$INFLUX_TOKEN" >> $GITHUB_ENV
docker-compose -f .github/infrastructure/docker-compose-influxdb.yml -p influxdb up -d
docker compose -f .github/infrastructure/docker-compose-influxdb.yml -p influxdb up -d
2 changes: 1 addition & 1 deletion .github/scripts/components-scripts/docker-compose.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ set -e
FILE="$1"
PROJECT="${2:-$FILE}"

docker-compose -f .github/infrastructure/docker-compose-${FILE}.yml -p ${PROJECT} up -d
docker compose -f .github/infrastructure/docker-compose-${FILE}.yml -p ${PROJECT} up -d
29 changes: 10 additions & 19 deletions pubsub/aws/snssqs/snssqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (
)

type snsSqs struct {
topicsLocker TopicsLocker
topicLock sync.RWMutex
// key is the sanitized topic name
topicArns map[string]string
// key is the topic name, value holds the ARN of the queue and its url.
Expand Down Expand Up @@ -169,9 +169,7 @@ func (s *snsSqs) Init(ctx context.Context, metadata pubsub.Metadata) error {
}
// subscription manager responsible for managing the lifecycle of subscriptions.
s.subscriptionManager = NewSubscriptionMgmt(s.logger)
s.topicsLocker = NewLockManager()

s.topicArns = make(map[string]string)
s.queues = make(map[string]*sqsQueueInfo)
s.subscriptions = make(map[string]string)

Expand Down Expand Up @@ -241,17 +239,14 @@ func (s *snsSqs) getTopicArn(parentCtx context.Context, topic string) (string, e
func (s *snsSqs) getOrCreateTopic(ctx context.Context, topic string) (topicArn string, sanitizedTopic string, err error) {
sanitizedTopic = nameToAWSSanitizedName(topic, s.metadata.Fifo)

var loadOK bool
if topicArn, loadOK = s.topicArns[sanitizedTopic]; loadOK {
if len(topicArn) > 0 {
s.logger.Debugf("Found existing topic ARN for topic %s: %s", topic, topicArn)
var exists bool
s.topicLock.RLock()
topicArn, exists = s.topicArns[sanitizedTopic]
s.topicLock.RUnlock()

return topicArn, sanitizedTopic, err
} else {
err = fmt.Errorf("the ARN for (sanitized) topic: %s was empty", sanitizedTopic)

return topicArn, sanitizedTopic, err
}
if exists {
s.logger.Debugf("Found existing topic ARN for topic %s: %s", topic, topicArn)
return topicArn, sanitizedTopic, err
}

// creating queues is idempotent, the names serve as unique keys among a given region.
Expand All @@ -274,7 +269,9 @@ func (s *snsSqs) getOrCreateTopic(ctx context.Context, topic string) (topicArn s
}

// record topic ARN.
s.topicLock.Lock()
s.topicArns[sanitizedTopic] = topicArn
s.topicLock.Unlock()

return topicArn, sanitizedTopic, err
}
Expand Down Expand Up @@ -760,9 +757,6 @@ func (s *snsSqs) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, han
return errors.New("component is closed")
}

s.topicsLocker.Lock(req.Topic)
defer s.topicsLocker.Unlock(req.Topic)

// subscribers declare a topic ARN and declare a SQS queue to use
// these should be idempotent - queues should not be created if they exist.
topicArn, sanitizedName, err := s.getOrCreateTopic(ctx, req.Topic)
Expand Down Expand Up @@ -842,9 +836,6 @@ func (s *snsSqs) Publish(ctx context.Context, req *pubsub.PublishRequest) error
return errors.New("component is closed")
}

s.topicsLocker.Lock(req.Topic)
defer s.topicsLocker.Unlock(req.Topic)

topicArn, _, err := s.getOrCreateTopic(ctx, req.Topic)
if err != nil {
s.logger.Errorf("error getting topic ARN for %s: %v", req.Topic, err)
Expand Down
44 changes: 0 additions & 44 deletions pubsub/aws/snssqs/topics_locker.go

This file was deleted.

8 changes: 4 additions & 4 deletions tests/certification/secretstores/hashicorp/vault/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ This secret store [supports the following features][features]:
### Tests for `vaultToken` and `vaultTokenMountPath`

1. Verify `vaultToken` is used (happy case)
* The baseline fo this test is all the previous test are using a known-to-work value that matches what our docker-compose environment sets up.
* The baseline fo this test is all the previous test are using a known-to-work value that matches what our docker compose environment sets up.
1. Verify failure when we use a `vaultToken` value that does not match what our environment sets up
1. Verify `vaultTokenMountPath` is used (happy case)
1. Verify failure when `vaultTokenMountPath` points to a broken path
Expand All @@ -65,7 +65,7 @@ This secret store [supports the following features][features]:
### Tests for vaultAddr

1. Verify `vaultAddr` is used (happy case)
* The baseline fo this test is all the previous test are using this flag with a known-to-work value that matches what our docker-compose environment sets up and is **not the default**.
* The baseline fo this test is all the previous test are using this flag with a known-to-work value that matches what our docker compose environment sets up and is **not the default**.
1. Verify initialization and operation success when `vaultAddr` is missing `skipVerify` is `true`
* Start a vault instance using a self-signed HTTPS certificate.
* Component configuration lacks `vaultAddr` and defaults to address `https://127.0.0.1:8200`
Expand Down Expand Up @@ -147,12 +147,12 @@ GOLANG_PROTOBUF_REGISTRATION_CONFLICT=warn go test -run TestVersioning -v .

### Docker-compose

You might need to verify if docker-compose is doing what you think it is doing: seeding the right secrets or even booting up properly.
You might need to verify if docker compose is doing what you think it is doing: seeding the right secrets or even booting up properly.

Head to the directory hosting the `docker-compose-hashicorp-vault.yml` file and run:

```shell
docker-compose -f docker-compose-hashicorp-vault.yml up --remove-orphans
docker compose -f docker-compose-hashicorp-vault.yml up --remove-orphans
```

# References:
Expand Down
4 changes: 2 additions & 2 deletions tests/conformance/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@
1. Test setup is independent of the test run.
2. Run the service that needs to conformance tested locally or in your own cloud account.

- For cloud-agnostic components such as Kafka, MQTT etc., there are `docker-compose` definitions under the [/.github/infrastructure](../../.github/infrastructure/) folder you can use to quickly create an instance of the service. For example, to setup Kafka for conformance tests:
- For cloud-agnostic components such as Kafka, MQTT etc., there are `docker compose` definitions under the [/.github/infrastructure](../../.github/infrastructure/) folder you can use to quickly create an instance of the service. For example, to setup Kafka for conformance tests:

```bash
docker-compose -f ./.github/infrastructure/docker-compose-kafka.yml -p kafka up -d
docker compose -f ./.github/infrastructure/docker-compose-kafka.yml -p kafka up -d
```

- For Azure components such as Blob Storage, Key Vault etc., there is an automation script that can help you create the resources under your subscription, and extract the environment variables needed to run the conformance tests. See [/.github/infrastructure/conformance/azure/README.md](../../.github/infrastructure/conformance/azure/README.md) for more details.
Expand Down

0 comments on commit bffaeeb

Please sign in to comment.