Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 Backend MVP #75

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/almanac/almanac.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ const (
)

var (
flagStorageType = kingpin.Flag("storage", "Which kind of storage to use").Default(storage.StorageTypeMemory).Enum(storage.StorageTypeMemory, storage.StorageTypeDisk, storage.StorageTypeGcs)
flagStorageType = kingpin.Flag("storage", "Which kind of storage to use").Default(storage.StorageTypeMemory).Enum(storage.StorageTypeMemory, storage.StorageTypeDisk, storage.StorageTypeGcs, storage.StorageTypeS3)
flagGcsBucket = kingpin.Flag("storage.gcs.bucket", "Which gcs bucket to use for storage").Default("almanac-dev").String()
flagS3Bucket = kingpin.Flag("storage.s3.bucket", "Which s3 bucket to use for storage").Default("almanac-dev").String()
flagDiskPath = kingpin.Flag("storage.disk.path", "An existing empty directory to use as root for storage").Default("/tmp/almanac-dev").String()

flagAppenderPorts = kingpin.Flag("appender_ports", "Which ports to run appenders on").Default("5001", "5002", "5003", "5004", "5005").Ints()
Expand Down Expand Up @@ -55,6 +56,7 @@ func main() {

StorageType: *flagStorageType,
GcsBucket: *flagGcsBucket,
S3Bucket: *flagS3Bucket,
DiskPath: *flagDiskPath,
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Config struct {

StorageType string
GcsBucket string
S3Bucket string
DiskPath string
}

Expand Down Expand Up @@ -60,6 +61,11 @@ func CreateCluster(ctx context.Context, logger *logrus.Logger, config *Config, a
if err != nil {
return nil, fmt.Errorf("unable to create gcs storage: %v", err)
}
} else if config.StorageType == st.StorageTypeS3 {
storage, err = st.NewS3Storage(config.S3Bucket)
if err != nil {
return nil, fmt.Errorf("unable to create s3 storage: %v", err)
}
} else if config.StorageType == st.StorageTypeDisk {
storage, err = st.NewDiskStorage(config.DiskPath)
if err != nil {
Expand Down
84 changes: 84 additions & 0 deletions pkg/storage/s3.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package storage

import (
"bytes"
"io/ioutil"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"golang.org/x/net/context"
)

const ()

// newS3Backend returns a new backend implementation backed by the supplied
// S3 bucket name. Note that with the current interface, the AWS_REGION environment
// variable must be specified to use this backend.
func newS3Backend(bucketName string) (*s3Backend, error) {
return &s3Backend{bucketName: aws.String(bucketName)}, nil
}

type s3Backend struct {
bucketName *string
}

func (b *s3Backend) read(_ context.Context, id string) ([]byte, error) {
sess := session.Must(session.NewSession())
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the docs [1] it seems that these sessions (or perhaps even the client?) are designed to be cached/reused. Should we perahps store one in the backend object rather than creating it on every call? Here and below.

[1] https://docs.aws.amazon.com/sdk-for-go/api/aws/session/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like storing the client makes sense for now. That may change, but it's a detail that doesn't impact the interface.


s3Client := s3.New(sess)
getOutput, err := s3Client.GetObject(&s3.GetObjectInput{
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like there's an easily accessible GetObjectWithContext method [1]. Given that we already have a context passed in, could we call that instead? Here and below.

[1] https://github.com/aws/aws-sdk-go/blob/master/service/s3/api.go#L2969

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's pretty great that they support this now 🎉

Bucket: b.bucketName,
Key: aws.String(id),
})
if err != nil {
return []byte{}, err
}

return ioutil.ReadAll(getOutput.Body)
}

func (b *s3Backend) write(_ context.Context, id string, contents []byte) error {
sess := session.Must(session.NewSession())

s3Client := s3.New(sess)
_, err := s3Client.PutObject(&s3.PutObjectInput{
Bucket: b.bucketName,
Key: aws.String(id),
Body: bytes.NewReader(contents),
})

return err
}

func (b *s3Backend) list(_ context.Context, prefix string) ([]string, error) {
sess := session.Must(session.NewSession())

s3Client := s3.New(sess)
listOutput, err := s3Client.ListObjects(&s3.ListObjectsInput{
Bucket: b.bucketName,
Prefix: aws.String(prefix),
})
if err != nil {
return []string{}, err
}

var keys []string
for _, obj := range listOutput.Contents {
keys = append(keys, *obj.Key)
}

return keys, nil
}

func (b *s3Backend) delete(_ context.Context, id string) error {
sess := session.Must(session.NewSession())

s3Client := s3.New(sess)
_, err := s3Client.DeleteObject(&s3.DeleteObjectInput{
Bucket: b.bucketName,
Key: aws.String(id),
})

return err
}
10 changes: 10 additions & 0 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
StorageTypeMemory = "memory"
StorageTypeDisk = "disk"
StorageTypeGcs = "gcs"
StorageTypeS3 = "s3"

chunkPrefix = "chunk-"
chunkTypeLabel = "chunk_type"
Expand Down Expand Up @@ -181,6 +182,15 @@ func NewGcsStorage(bucketName string) (*Storage, error) {
return newStorage(backend)
}

// NewS3Storage returns a storage backed by the supplied S£ bucket.
func NewS3Storage(bucketName string) (*Storage, error) {
backend, err := newS3Backend(bucketName)
if err != nil {
return nil, fmt.Errorf("unable to create S3 backend: %v", err)
}
return newStorage(backend)
}

func newStorage(b backend) (*Storage, error) {
m, err := newStorageMetrics()
if err != nil {
Expand Down