Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 Backend MVP #75

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ sudo: required
services:
- docker

go_import_path: dinowernli.me/almanac
go_import_path: github.com/dinowernli/almanac

env:
global:
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
FROM golang:1.9 as build
WORKDIR /go/src/dinowernli.me/almanac
WORKDIR /go/src/github.com/dinowernli/almanac
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -o almanac-linux-static -a -ldflags '-extldflags "-static"' cmd/almanac/almanac.go

FROM scratch
COPY --from=build /go/src/dinowernli.me/almanac/almanac-linux-static .
COPY --from=build /go/src/github.com/dinowernli/almanac/almanac-linux-static .
ENTRYPOINT ["./almanac-linux-static"]

10 changes: 6 additions & 4 deletions cmd/almanac/almanac.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"net/http"
"os"

"dinowernli.me/almanac/pkg/cluster"
"dinowernli.me/almanac/pkg/storage"
pb_almanac "dinowernli.me/almanac/proto"
"github.com/dinowernli/almanac/pkg/cluster"
"github.com/dinowernli/almanac/pkg/storage"
pb_almanac "github.com/dinowernli/almanac/proto"

"github.com/alecthomas/kingpin"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -21,8 +21,9 @@ const (
)

var (
flagStorageType = kingpin.Flag("storage", "Which kind of storage to use").Default(storage.StorageTypeMemory).Enum(storage.StorageTypeMemory, storage.StorageTypeDisk, storage.StorageTypeGcs)
flagStorageType = kingpin.Flag("storage", "Which kind of storage to use").Default(storage.StorageTypeMemory).Enum(storage.StorageTypeMemory, storage.StorageTypeDisk, storage.StorageTypeGcs, storage.StorageTypeS3)
flagGcsBucket = kingpin.Flag("storage.gcs.bucket", "Which gcs bucket to use for storage").Default("almanac-dev").String()
flagS3Bucket = kingpin.Flag("storage.s3.bucket", "Which s3 bucket to use for storage").Default("almanac-dev").String()
flagDiskPath = kingpin.Flag("storage.disk.path", "An existing empty directory to use as root for storage").Default("/tmp/almanac-dev").String()

flagAppenderPorts = kingpin.Flag("appender_ports", "Which ports to run appenders on").Default("5001", "5002", "5003", "5004", "5005").Ints()
Expand Down Expand Up @@ -55,6 +56,7 @@ func main() {

StorageType: *flagStorageType,
GcsBucket: *flagGcsBucket,
S3Bucket: *flagS3Bucket,
DiskPath: *flagDiskPath,
}

Expand Down
20 changes: 13 additions & 7 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import (
"net"
"time"

"dinowernli.me/almanac/pkg/service/appender"
dc "dinowernli.me/almanac/pkg/service/discovery"
in "dinowernli.me/almanac/pkg/service/ingester"
"dinowernli.me/almanac/pkg/service/janitor"
mx "dinowernli.me/almanac/pkg/service/mixer"
st "dinowernli.me/almanac/pkg/storage"
pb_almanac "dinowernli.me/almanac/proto"
"github.com/dinowernli/almanac/pkg/service/appender"
dc "github.com/dinowernli/almanac/pkg/service/discovery"
in "github.com/dinowernli/almanac/pkg/service/ingester"
"github.com/dinowernli/almanac/pkg/service/janitor"
mx "github.com/dinowernli/almanac/pkg/service/mixer"
st "github.com/dinowernli/almanac/pkg/storage"
pb_almanac "github.com/dinowernli/almanac/proto"

"github.com/sirupsen/logrus"
"golang.org/x/net/context"
Expand All @@ -30,6 +30,7 @@ type Config struct {

StorageType string
GcsBucket string
S3Bucket string
DiskPath string
}

Expand Down Expand Up @@ -60,6 +61,11 @@ func CreateCluster(ctx context.Context, logger *logrus.Logger, config *Config, a
if err != nil {
return nil, fmt.Errorf("unable to create gcs storage: %v", err)
}
} else if config.StorageType == st.StorageTypeS3 {
storage, err = st.NewS3Storage(config.S3Bucket)
if err != nil {
return nil, fmt.Errorf("unable to create s3 storage: %v", err)
}
} else if config.StorageType == st.StorageTypeDisk {
storage, err = st.NewDiskStorage(config.DiskPath)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"io"
"strconv"

"dinowernli.me/almanac/pkg/http/templates"
pb_almanac "dinowernli.me/almanac/proto"
"github.com/dinowernli/almanac/pkg/http/templates"
pb_almanac "github.com/dinowernli/almanac/proto"
)

var (
Expand Down
2 changes: 1 addition & 1 deletion pkg/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package http
import (
"testing"

pb_almanac "dinowernli.me/almanac/proto"
pb_almanac "github.com/dinowernli/almanac/proto"

"github.com/stretchr/testify/assert"
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/index/serialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"path/filepath"
"strings"

pb_almanac "dinowernli.me/almanac/proto"
pb_almanac "github.com/dinowernli/almanac/proto"
)

// Serialize returns a proto with the contents of the supplied index.
Expand Down
4 changes: 2 additions & 2 deletions pkg/service/appender/appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"sync"
"time"

"dinowernli.me/almanac/pkg/storage"
pb_almanac "dinowernli.me/almanac/proto"
"github.com/dinowernli/almanac/pkg/storage"
pb_almanac "github.com/dinowernli/almanac/proto"

"github.com/sirupsen/logrus"
"golang.org/x/net/context"
Expand Down
6 changes: 3 additions & 3 deletions pkg/service/appender/open_chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"sync"
"time"

"dinowernli.me/almanac/pkg/index"
"dinowernli.me/almanac/pkg/storage"
pb_almanac "dinowernli.me/almanac/proto"
"github.com/dinowernli/almanac/pkg/index"
"github.com/dinowernli/almanac/pkg/storage"
pb_almanac "github.com/dinowernli/almanac/proto"

"golang.org/x/net/context"
)
Expand Down
4 changes: 2 additions & 2 deletions pkg/service/appender/open_chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"testing"
"time"

"dinowernli.me/almanac/pkg/storage"
pb_almanac "dinowernli.me/almanac/proto"
"github.com/dinowernli/almanac/pkg/storage"
pb_almanac "github.com/dinowernli/almanac/proto"

"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
Expand Down
2 changes: 1 addition & 1 deletion pkg/service/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package discovery
import (
"fmt"

pb_almanac "dinowernli.me/almanac/proto"
pb_almanac "github.com/dinowernli/almanac/proto"

"google.golang.org/grpc"
)
Expand Down
8 changes: 4 additions & 4 deletions pkg/service/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (
"net/http"
"time"

almHttp "dinowernli.me/almanac/pkg/http"
dc "dinowernli.me/almanac/pkg/service/discovery"
"dinowernli.me/almanac/pkg/util"
pb_almanac "dinowernli.me/almanac/proto"
almHttp "github.com/dinowernli/almanac/pkg/http"
dc "github.com/dinowernli/almanac/pkg/service/discovery"
"github.com/dinowernli/almanac/pkg/util"
pb_almanac "github.com/dinowernli/almanac/proto"

"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
Expand Down
6 changes: 3 additions & 3 deletions pkg/service/janitor/janitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"fmt"
"time"

st "dinowernli.me/almanac/pkg/storage"
"dinowernli.me/almanac/pkg/util"
pb_almanac "dinowernli.me/almanac/proto"
st "github.com/dinowernli/almanac/pkg/storage"
"github.com/dinowernli/almanac/pkg/util"
pb_almanac "github.com/dinowernli/almanac/proto"

"github.com/sirupsen/logrus"
"golang.org/x/net/context"
Expand Down
4 changes: 2 additions & 2 deletions pkg/service/janitor/janitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"testing"
"time"

st "dinowernli.me/almanac/pkg/storage"
pb_almanac "dinowernli.me/almanac/proto"
st "github.com/dinowernli/almanac/pkg/storage"
pb_almanac "github.com/dinowernli/almanac/proto"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
Expand Down
4 changes: 2 additions & 2 deletions pkg/service/mixer/heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package mixer
import (
"fmt"

st "dinowernli.me/almanac/pkg/storage"
pb_almanac "dinowernli.me/almanac/proto"
st "github.com/dinowernli/almanac/pkg/storage"
pb_almanac "github.com/dinowernli/almanac/proto"

"golang.org/x/net/context"
)
Expand Down
8 changes: 4 additions & 4 deletions pkg/service/mixer/mixer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (
"net/http"
"time"

almHttp "dinowernli.me/almanac/pkg/http"
"dinowernli.me/almanac/pkg/service/discovery"
"dinowernli.me/almanac/pkg/storage"
pb_almanac "dinowernli.me/almanac/proto"
almHttp "github.com/dinowernli/almanac/pkg/http"
"github.com/dinowernli/almanac/pkg/service/discovery"
"github.com/dinowernli/almanac/pkg/storage"
pb_almanac "github.com/dinowernli/almanac/proto"

"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
Expand Down
6 changes: 3 additions & 3 deletions pkg/service/mixer/mixer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"strings"
"testing"

"dinowernli.me/almanac/pkg/service/discovery"
st "dinowernli.me/almanac/pkg/storage"
pb_almanac "dinowernli.me/almanac/proto"
"github.com/dinowernli/almanac/pkg/service/discovery"
st "github.com/dinowernli/almanac/pkg/storage"
pb_almanac "github.com/dinowernli/almanac/proto"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"sort"
"strings"

"dinowernli.me/almanac/pkg/index"
"dinowernli.me/almanac/pkg/util"
pb_almanac "dinowernli.me/almanac/proto"
"github.com/dinowernli/almanac/pkg/index"
"github.com/dinowernli/almanac/pkg/util"
pb_almanac "github.com/dinowernli/almanac/proto"

"golang.org/x/net/context"
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package storage
import (
"testing"

pb_almanac "dinowernli.me/almanac/proto"
pb_almanac "github.com/dinowernli/almanac/proto"

"github.com/stretchr/testify/assert"
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/entry.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package storage

import (
pb_almanac "dinowernli.me/almanac/proto"
pb_almanac "github.com/dinowernli/almanac/proto"
)

// OldestEntryFirst is an ordering over log entries by ascending timestamp.
Expand Down
79 changes: 79 additions & 0 deletions pkg/storage/s3.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package storage

import (
"bytes"
"io/ioutil"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"golang.org/x/net/context"
)

const ()

// newS3Backend returns a new backend implementation backed by the supplied
// S3 bucket name. Note that with the current interface, the AWS_REGION environment
// variable must be specified to use this backend.
func newS3Backend(bucketName string) (*s3Backend, error) {
sess := session.Must(session.NewSession())
s3Client := s3.New(sess)

return &s3Backend{
bucketName: aws.String(bucketName),
s3Client: s3Client,
}, nil
}

type s3Backend struct {
bucketName *string
s3Client *s3.S3
}

func (b *s3Backend) read(ctx context.Context, id string) ([]byte, error) {
getOutput, err := b.s3Client.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: b.bucketName,
Key: aws.String(id),
})
if err != nil {
return []byte{}, err
}

return ioutil.ReadAll(getOutput.Body)
}

func (b *s3Backend) write(ctx context.Context, id string, contents []byte) error {
_, err := b.s3Client.PutObjectWithContext(ctx, &s3.PutObjectInput{
Bucket: b.bucketName,
Key: aws.String(id),
Body: bytes.NewReader(contents),
})

return err
}

func (b *s3Backend) list(ctx context.Context, prefix string) ([]string, error) {
listOutput, err := b.s3Client.ListObjectsWithContext(ctx, &s3.ListObjectsInput{
Bucket: b.bucketName,
Prefix: aws.String(prefix),
})
if err != nil {
return []string{}, err
}

var keys []string
for _, obj := range listOutput.Contents {
keys = append(keys, *obj.Key)
}

return keys, nil
}

func (b *s3Backend) delete(ctx context.Context, id string) error {
_, err := b.s3Client.DeleteObjectWithContext(ctx, &s3.DeleteObjectInput{
Bucket: b.bucketName,
Key: aws.String(id),
})

return err
}
14 changes: 12 additions & 2 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"os"
"strings"

"dinowernli.me/almanac/pkg/util"
pb_almanac "dinowernli.me/almanac/proto"
"github.com/dinowernli/almanac/pkg/util"
pb_almanac "github.com/dinowernli/almanac/proto"

"github.com/golang/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -18,6 +18,7 @@ const (
StorageTypeMemory = "memory"
StorageTypeDisk = "disk"
StorageTypeGcs = "gcs"
StorageTypeS3 = "s3"

chunkPrefix = "chunk-"
chunkTypeLabel = "chunk_type"
Expand Down Expand Up @@ -181,6 +182,15 @@ func NewGcsStorage(bucketName string) (*Storage, error) {
return newStorage(backend)
}

// NewS3Storage returns a storage backed by the supplied S£ bucket.
func NewS3Storage(bucketName string) (*Storage, error) {
backend, err := newS3Backend(bucketName)
if err != nil {
return nil, fmt.Errorf("unable to create S3 backend: %v", err)
}
return newStorage(backend)
}

func newStorage(b backend) (*Storage, error) {
m, err := newStorageMetrics()
if err != nil {
Expand Down
Loading