Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support google cloud storage as das #2643

Merged
merged 19 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions cmd/genericconf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,27 @@ var DefaultS3Config = S3Config{
SecretKey: "",
}

type GoogleCloudStorageConfig struct {
AccessToken string `koanf:"access-token"`
renlulu marked this conversation as resolved.
Show resolved Hide resolved
Bucket string `koanf:"bucket"`
ObjectPrefix string `koanf:"object-prefix"`
MaxRetention time.Duration `koanf:"max-retention"`
}

var DefaultGoogleCloudStorageConfig = GoogleCloudStorageConfig{
AccessToken: "",
Bucket: "",
ObjectPrefix: "",
MaxRetention: time.Hour * 24,
renlulu marked this conversation as resolved.
Show resolved Hide resolved
}

func GoogleCloudConfigAddOptions(prefix string, f *flag.FlagSet) {
f.String(prefix+".access-token", DefaultGoogleCloudStorageConfig.AccessToken, "Google Cloud Storage access token")
f.String(prefix+".bucket", DefaultGoogleCloudStorageConfig.Bucket, "Google Cloud Storage bucket")
f.String(prefix+".object-prefix", DefaultGoogleCloudStorageConfig.ObjectPrefix, "prefix to add to Google Cloud Storage objects")
f.Duration(prefix+".max-retention", DefaultGoogleCloudStorageConfig.MaxRetention, "store requests with expiry times farther in the future than max-retention will be rejected")
}

func HandlerFromLogType(logType string, output io.Writer) (slog.Handler, error) {
if logType == "plaintext" {
return log.NewTerminalHandler(output, false), nil
Expand Down
8 changes: 5 additions & 3 deletions das/das.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ type DataAvailabilityConfig struct {
LocalCache CacheConfig `koanf:"local-cache"`
RedisCache RedisConfig `koanf:"redis-cache"`

LocalDBStorage LocalDBStorageConfig `koanf:"local-db-storage"`
LocalFileStorage LocalFileStorageConfig `koanf:"local-file-storage"`
S3Storage S3StorageServiceConfig `koanf:"s3-storage"`
LocalDBStorage LocalDBStorageConfig `koanf:"local-db-storage"`
LocalFileStorage LocalFileStorageConfig `koanf:"local-file-storage"`
S3Storage S3StorageServiceConfig `koanf:"s3-storage"`
GoogleCloudStorage GoogleCloudStorageServiceConfig `koanf:"google-cloud-storage"`

MigrateLocalDBToFileStorage bool `koanf:"migrate-local-db-to-file-storage"`

Expand Down Expand Up @@ -114,6 +115,7 @@ func dataAvailabilityConfigAddOptions(prefix string, f *flag.FlagSet, r role) {
LocalDBStorageConfigAddOptions(prefix+".local-db-storage", f)
LocalFileStorageConfigAddOptions(prefix+".local-file-storage", f)
S3ConfigAddOptions(prefix+".s3-storage", f)
GoogleCloudConfigAddOptions(prefix+".google-cloud-storage", f)
f.Bool(prefix+".migrate-local-db-to-file-storage", DefaultDataAvailabilityConfig.MigrateLocalDBToFileStorage, "daserver will migrate all data on startup from local-db-storage to local-file-storage, then mark local-db-storage as unusable")

// Key config for storage
Expand Down
9 changes: 9 additions & 0 deletions das/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ func CreatePersistentStorageService(
storageServices = append(storageServices, s)
}

if config.GoogleCloudStorage.Enable {
s, err := NewGoogleCloudStorageService(config.GoogleCloudStorage)
if err != nil {
return nil, nil, err
}
lifecycleManager.Register(s)
storageServices = append(storageServices, s)
}

if len(storageServices) > 1 {
s, err := NewRedundantStorageService(ctx, storageServices)
if err != nil {
Expand Down
202 changes: 202 additions & 0 deletions das/google_cloud_storage_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package das

import (
googlestorage "cloud.google.com/go/storage"
"context"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/google/go-cmp/cmp"
"github.com/offchainlabs/nitro/arbstate/daprovider"
"github.com/offchainlabs/nitro/das/dastree"
"github.com/offchainlabs/nitro/util/pretty"
flag "github.com/spf13/pflag"
"google.golang.org/api/option"
"io"
"math"
"sort"
"time"
)

type GoogleCloudStorageOperator interface {
Bucket(name string) *googlestorage.BucketHandle
Upload(ctx context.Context, bucket, objectPrefix string, value []byte) error
Download(ctx context.Context, bucket, objectPrefix string, key common.Hash) ([]byte, error)
Close(ctx context.Context) error
}

type GoogleCloudStorageClient struct {
client *googlestorage.Client
}

func (g *GoogleCloudStorageClient) Bucket(name string) *googlestorage.BucketHandle {
return g.client.Bucket(name)
}

func (g *GoogleCloudStorageClient) Upload(ctx context.Context, bucket, objectPrefix string, value []byte) error {
obj := g.client.Bucket(bucket).Object(objectPrefix + EncodeStorageServiceKey(dastree.Hash(value)))
w := obj.NewWriter(ctx)

if _, err := fmt.Fprintln(w, value); err != nil {
return err
}
return w.Close()

}

func (g *GoogleCloudStorageClient) Download(ctx context.Context, bucket, objectPrefix string, key common.Hash) ([]byte, error) {
obj := g.client.Bucket(bucket).Object(objectPrefix + EncodeStorageServiceKey(key))
reader, err := obj.NewReader(ctx)
if err != nil {
return nil, err
}
return io.ReadAll(reader)
}

func (g *GoogleCloudStorageClient) Close(ctx context.Context) error {
return g.client.Close()
}

type GoogleCloudStorageServiceConfig struct {
Enable bool `koanf:"enable"`
AccessToken string `koanf:"access-token"`
Bucket string `koanf:"bucket"`
ObjectPrefix string `koanf:"object-prefix"`
EnableExpiry bool `koanf:"enable-expiry"`
MaxRetention time.Duration `koanf:"max-retention"`
}

var DefaultGoogleCloudStorageServiceConfig = GoogleCloudStorageServiceConfig{}

func GoogleCloudConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Bool(prefix+".enable", DefaultGoogleCloudStorageServiceConfig.Enable, "enable storage/retrieval of sequencer batch data from an Google Cloud Storage bucket")
f.String(prefix+".access-token", DefaultGoogleCloudStorageServiceConfig.AccessToken, "Google Cloud Storage access token")
f.String(prefix+".bucket", DefaultGoogleCloudStorageServiceConfig.Bucket, "Google Cloud Storage bucket")
f.String(prefix+".object-prefix", DefaultGoogleCloudStorageServiceConfig.ObjectPrefix, "prefix to add to Google Cloud Storage objects")
f.Bool(prefix+".enable-expiry", DefaultLocalFileStorageConfig.EnableExpiry, "enable expiry of batches")
f.Duration(prefix+".max-retention", DefaultLocalFileStorageConfig.MaxRetention, "store requests with expiry times farther in the future than max-retention will be rejected")

}

type GoogleCloudStorageService struct {
operator GoogleCloudStorageOperator
bucket string
objectPrefix string
enableExpiry bool
maxRetention time.Duration
}

func NewGoogleCloudStorageService(config GoogleCloudStorageServiceConfig) (StorageService, error) {
var client *googlestorage.Client
var err error
// Note that if the credentials are not specified, the client library will find credentials using ADC(Application Default Credentials)
// https://cloud.google.com/docs/authentication/provide-credentials-adc.
if config.AccessToken == "" {
client, err = googlestorage.NewClient(context.Background())
} else {
client, err = googlestorage.NewClient(context.Background(), option.WithCredentialsJSON([]byte(config.AccessToken)))
}
if err != nil {
return nil, fmt.Errorf("error creating Google Cloud Storage client: %w", err)
}
service := &GoogleCloudStorageService{
operator: &GoogleCloudStorageClient{client: client},
bucket: config.Bucket,
objectPrefix: config.ObjectPrefix,
enableExpiry: config.EnableExpiry,
maxRetention: config.MaxRetention,
}
if config.EnableExpiry {
lifecycleRule := googlestorage.LifecycleRule{
Action: googlestorage.LifecycleAction{Type: "Delete"},
Condition: googlestorage.LifecycleCondition{AgeInDays: int64(config.MaxRetention.Hours() / 24)}, // Objects older than 30 days
}
ctx := context.Background()
bucket := service.operator.Bucket(service.bucket)
// check if bucket exists (and others), and update expiration policy if enabled
attrs, err := bucket.Attrs(ctx)
if err != nil {
return nil, fmt.Errorf("error getting bucket attributes: %w", err)
}
attrs.Lifecycle.Rules = append(attrs.Lifecycle.Rules, lifecycleRule)

bucketAttrsToUpdate := googlestorage.BucketAttrsToUpdate{
Lifecycle: &attrs.Lifecycle,
}
if _, err := bucket.Update(ctx, bucketAttrsToUpdate); err != nil {
return nil, fmt.Errorf("failed to update bucket lifecycle: %w", err)
}
}
return service, nil
}

func (gcs *GoogleCloudStorageService) Put(ctx context.Context, data []byte, expiry uint64) error {
logPut("das.GoogleCloudStorageService.Store", data, expiry, gcs)
if expiry > math.MaxInt64 {
return fmt.Errorf("request expiry time (%v) exceeds max int64", expiry)
}
// #nosec G115
expiryTime := time.Unix(int64(expiry), 0)
currentTimePlusRetention := time.Now().Add(gcs.maxRetention)
if expiryTime.After(currentTimePlusRetention) {
return fmt.Errorf("requested expiry time (%v) exceeds current time plus maximum allowed retention period(%v)", expiryTime, currentTimePlusRetention)
}
if err := gcs.operator.Upload(ctx, gcs.bucket, gcs.objectPrefix, data); err != nil {
log.Error("das.GoogleCloudStorageService.Store", "err", err)
return err
}
return nil
}

func (gcs *GoogleCloudStorageService) GetByHash(ctx context.Context, key common.Hash) ([]byte, error) {
log.Trace("das.GoogleCloudStorageService.GetByHash", "key", pretty.PrettyHash(key), "this", gcs)
buf, err := gcs.operator.Download(ctx, gcs.bucket, gcs.objectPrefix, key)
if err != nil {
log.Error("das.GoogleCloudStorageService.GetByHash", "err", err)
return nil, err
}
return buf, nil
}

func (gcs *GoogleCloudStorageService) ExpirationPolicy(ctx context.Context) (daprovider.ExpirationPolicy, error) {
if gcs.enableExpiry {
return daprovider.KeepForever, nil
}
return daprovider.DiscardAfterDataTimeout, nil
}

func (gcs *GoogleCloudStorageService) Sync(ctx context.Context) error {
return nil
}

func (gcs *GoogleCloudStorageService) Close(ctx context.Context) error {
return gcs.operator.Close(ctx)
}

func (gcs *GoogleCloudStorageService) String() string {
return fmt.Sprintf("GoogleCloudStorageService(:%s)", gcs.bucket)
}

func (gcs *GoogleCloudStorageService) HealthCheck(ctx context.Context) error {
bucket := gcs.operator.Bucket(gcs.bucket)
// check if we have bucket permissions
permissions := []string{
"storage.buckets.get",
"storage.buckets.list",
"storage.objects.create",
"storage.objects.delete",
"storage.objects.list",
"storage.objects.get",
}
perms, err := bucket.IAM().TestPermissions(ctx, permissions)
if err != nil {
return fmt.Errorf("could not check permissions: %w", err)
}
sort.Strings(permissions)
sort.Strings(perms)
if !cmp.Equal(perms, permissions) {
return fmt.Errorf("permissions mismatch (-want +got):\n%s", cmp.Diff(permissions, perms))
}

return nil
}
81 changes: 81 additions & 0 deletions das/google_cloud_storage_service_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package das

import (
"bytes"
googlestorage "cloud.google.com/go/storage"
"context"
"errors"
"github.com/ethereum/go-ethereum/common"
"github.com/offchainlabs/nitro/cmd/genericconf"
"github.com/offchainlabs/nitro/das/dastree"
"testing"
"time"
)

type mockGCSClient struct {
storage map[string][]byte
}

func (c *mockGCSClient) Bucket(name string) *googlestorage.BucketHandle {
return nil
}

func (c *mockGCSClient) Download(ctx context.Context, bucket, objectPrefix string, key common.Hash) ([]byte, error) {
value, ok := c.storage[objectPrefix+EncodeStorageServiceKey(key)]
if !ok {
return nil, ErrNotFound
}
return value, nil
}

func (c *mockGCSClient) Close(ctx context.Context) error {
return nil
}

func (c *mockGCSClient) Upload(ctx context.Context, bucket, objectPrefix string, value []byte) error {
key := objectPrefix + EncodeStorageServiceKey(dastree.Hash(value))
c.storage[key] = value
return nil
}

func NewTestGoogleCloudStorageService(ctx context.Context, googleCloudStorageConfig genericconf.GoogleCloudStorageConfig) (StorageService, error) {
return &GoogleCloudStorageService{
bucket: googleCloudStorageConfig.Bucket,
objectPrefix: googleCloudStorageConfig.ObjectPrefix,
operator: &mockGCSClient{
storage: make(map[string][]byte),
},
maxRetention: googleCloudStorageConfig.MaxRetention,
}, nil
}

func TestNewGoogleCloudStorageService(t *testing.T) {
ctx := context.Background()
expiry := uint64(time.Now().Add(time.Hour).Unix())
googleCloudService, err := NewTestGoogleCloudStorageService(ctx, genericconf.DefaultGoogleCloudStorageConfig)
Require(t, err)

val1 := []byte("The first value")
val1CorrectKey := dastree.Hash(val1)
val2IncorrectKey := dastree.Hash(append(val1, 0))

_, err = googleCloudService.GetByHash(ctx, val1CorrectKey)
if !errors.Is(err, ErrNotFound) {
t.Fatal(err)
}

err = googleCloudService.Put(ctx, val1, expiry)
Require(t, err)

_, err = googleCloudService.GetByHash(ctx, val2IncorrectKey)
if !errors.Is(err, ErrNotFound) {
t.Fatal(err)
}

val, err := googleCloudService.GetByHash(ctx, val1CorrectKey)
Require(t, err)
if !bytes.Equal(val, val1) {
t.Fatal(val, val1)
}

}
Loading