Skip to content

Commit

Permalink
Merge pull request #3 from mumoshu/localstaco-s3-metrics-testing
Browse files Browse the repository at this point in the history
Use localstack for s3 metrics testing
  • Loading branch information
cw-atkhry authored Aug 20, 2024
2 parents d7162cb + 9688d99 commit 4dbc62e
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 48 deletions.
27 changes: 27 additions & 0 deletions localstack/s3.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package localstack

import (
"context"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"

smithyendpoints "github.com/aws/smithy-go/endpoints"
)

func S3EndpointResolver() s3.EndpointResolverV2 {
return &s3EndpointResolver{
baseResolver: s3.NewDefaultEndpointResolverV2(),
}
}

type s3EndpointResolver struct {
baseResolver s3.EndpointResolverV2
}

func (r *s3EndpointResolver) ResolveEndpoint(ctx context.Context, params s3.EndpointParameters) (smithyendpoints.Endpoint, error) {
params.ForcePathStyle = aws.Bool(true)
params.Endpoint = aws.String("http://localhost:4566")

return r.baseResolver.ResolveEndpoint(ctx, params)
}
137 changes: 90 additions & 47 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"syscall"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/s3"
Expand Down Expand Up @@ -44,7 +45,7 @@ func main() {
}
}

func Run(sigs chan os.Signal) error {
func Run(sigs chan os.Signal, opts ...Option) error {
var (
httpServerGracefulShutdownTimeout = 5 * time.Second

Expand All @@ -62,14 +63,17 @@ func Run(sigs chan os.Signal) error {
return fmt.Errorf("unable to load SDK config, %v", err)
}

s3Client := s3.NewFromConfig(cfg)
dynamoClient := dynamodb.NewFromConfig(cfg)
sqsCleint := sqs.NewFromConfig(cfg)
checks := make(chan struct{}, 1)
chkr := newChecker(cfg, opts...)

s3Bucket := os.Getenv("S3_BUCKET")
s3Key := os.Getenv("S3_KEY")
dynamodbTable := os.Getenv("DYNAMODB_TABLE")
sqsQueueURL := os.Getenv("SQS_QUEUE_URL")
go func() {
for {
select {
case <-checks:
chkr.doCheck()
}
}
}()

for {
select {
Expand All @@ -89,47 +93,86 @@ func Run(sigs chan os.Signal) error {
log.Printf("HTTP server shut down successfully")

return nil
default:
time.Sleep(1 * time.Second)
// S3 GetObject
getStart := time.Now()
_, err = s3Client.GetObject(context.Background(), &s3.GetObjectInput{
Bucket: &s3Bucket,
Key: &s3Key,
})
getDuration := time.Since(getStart).Seconds()
if err != nil {
log.Printf("failed to get object, %v", err)
requestDuration.WithLabelValues("S3", "GetObject", "Failure").Observe(getDuration)
} else {
requestDuration.WithLabelValues("S3", "GetObject", "Success").Observe(getDuration)
case <-time.After(1 * time.Second):
select {
case checks <- struct{}{}:
default:
}
}
}
}

// DynamoDB Scan
dynamoStart := time.Now()
_, err = dynamoClient.Scan(context.Background(), &dynamodb.ScanInput{
TableName: &dynamodbTable,
})
dynamoDuration := time.Since(dynamoStart).Seconds()
if err != nil {
log.Printf("failed to get item, %v", err)
requestDuration.WithLabelValues("DynamoDB", "Scan", "Failure").Observe(dynamoDuration)
} else {
requestDuration.WithLabelValues("DynamoDB", "Scan", "Success").Observe(dynamoDuration)
}
type checker struct {
s3Client *s3.Client
dynamoClient *dynamodb.Client
sqsClient *sqs.Client

// SQS ReceiveMessage
sqsStart := time.Now()
_, err = sqsCleint.ReceiveMessage(context.Background(), &sqs.ReceiveMessageInput{
QueueUrl: &sqsQueueURL,
})
sqsDuration := time.Since(sqsStart).Seconds()
if err != nil {
log.Printf("failed to receive message, %v", err)
requestDuration.WithLabelValues("SQS", "ReceiveMessage", "Failure").Observe(sqsDuration)
} else {
requestDuration.WithLabelValues("SQS", "ReceiveMessage", "Success").Observe(sqsDuration)
}
}
s3Bucket string
s3Key string
dynamodbTable string
sqsQueueURL string

s3Opts []func(*s3.Options)
}

type Option func(*checker)

func newChecker(cfg aws.Config, opts ...Option) *checker {
c := &checker{}
for _, opt := range opts {
opt(c)
}

c.s3Client = s3.NewFromConfig(cfg, c.s3Opts...)
c.dynamoClient = dynamodb.NewFromConfig(cfg)
c.sqsClient = sqs.NewFromConfig(cfg)

c.s3Bucket = os.Getenv("S3_BUCKET")
c.s3Key = os.Getenv("S3_KEY")
c.dynamodbTable = os.Getenv("DYNAMODB_TABLE")
c.sqsQueueURL = os.Getenv("SQS_QUEUE_URL")

return c
}

func (c *checker) doCheck() {
// S3 GetObject
getStart := time.Now()
_, err := c.s3Client.GetObject(context.Background(), &s3.GetObjectInput{
Bucket: &c.s3Bucket,
Key: &c.s3Key,
})
getDuration := time.Since(getStart).Seconds()
if err != nil {
log.Printf("failed to get object, %v", err)
requestDuration.WithLabelValues("S3", "GetObject", "Failure").Observe(getDuration)
} else {
requestDuration.WithLabelValues("S3", "GetObject", "Success").Observe(getDuration)
}

// DynamoDB Scan
dynamoStart := time.Now()
_, err = c.dynamoClient.Scan(context.Background(), &dynamodb.ScanInput{
TableName: &c.dynamodbTable,
})
dynamoDuration := time.Since(dynamoStart).Seconds()
if err != nil {
log.Printf("failed to get item, %v", err)
requestDuration.WithLabelValues("DynamoDB", "Scan", "Failure").Observe(dynamoDuration)
} else {
requestDuration.WithLabelValues("DynamoDB", "Scan", "Success").Observe(dynamoDuration)
}

// SQS ReceiveMessage
sqsStart := time.Now()
_, err = c.sqsClient.ReceiveMessage(context.Background(), &sqs.ReceiveMessageInput{
QueueUrl: &c.sqsQueueURL,
})
sqsDuration := time.Since(sqsStart).Seconds()
if err != nil {
log.Printf("failed to receive message, %v", err)
requestDuration.WithLabelValues("SQS", "ReceiveMessage", "Failure").Observe(sqsDuration)
} else {
requestDuration.WithLabelValues("SQS", "ReceiveMessage", "Success").Observe(sqsDuration)
}
}
58 changes: 57 additions & 1 deletion main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ package main

import (
"context"
"io"
"net/http"
"os"
"strings"
"syscall"
"testing"
"time"

"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/cw-sakamoto/sample/localstack"
"github.com/stretchr/testify/require"
)

Expand All @@ -21,17 +26,68 @@ func TestSigint(t *testing.T) {
defer cancel()

go func() {
runErr <- Run(sigs)
runErr <- Run(sigs, func(c *checker) {
// Use localstack for S3
c.s3Opts = append(c.s3Opts, s3.WithEndpointResolverV2(localstack.S3EndpointResolver()))
})

cancel()
}()

//
// Wait for the server to start exposing metrics
//

metrics := make(chan string, 1)
go func() {
for {
time.Sleep(100 * time.Millisecond)
m := httpGetStr(t, "http://localhost:8080/metrics")
if strings.Contains(m, "aws_request_duration_seconds") {
metrics <- m
break
}
}
}()

select {
case m := <-metrics:
// m is the metrics in the Prometheus exposition format,
// expectedly containing the aws_request_duration_seconds metric.
// We can check the presence of any metric here, in any detail.
require.Contains(t, m, "promhttp_metric_handler_requests_total")
case <-time.After(2 * time.Second):
// We assume that the server is expected to start and expose metrics within 2 seconds.
// Otherwise, we consider it as a failure, and you may need to fix the server implementation,
// or you may need to increase the timeout if the runtime environment is soooo slow.
t.Fatal("timed out waiting for metrics")
}

sigs <- syscall.SIGINT

select {
case <-ctx.Done():
require.NoError(t, <-runErr)
case <-time.After(1 * time.Second):
// We assume the server can gracefully shut down within 5 seconds.
// Otherwise, we consider it as a failure, and you may need to fix the server implementation.
t.Fatal("timeout")
}
}

func httpGetStr(t *testing.T, url string) string {
resp, err := http.Get(url)
if err != nil {
t.Logf("Error: %v", err)
return ""
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
t.Logf("Error: %v", err)
return ""
}

return string(body)
}

0 comments on commit 4dbc62e

Please sign in to comment.