Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arize does not need gcs signed URLs so make it optional #8

Merged
merged 15 commits into from
May 23, 2024
18 changes: 15 additions & 3 deletions broker/fragment/store_gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package fragment

import (
"context"
"fmt"
"io"
"net/url"
"strings"
Expand Down Expand Up @@ -41,10 +42,21 @@ func (s *gcsBackend) SignGet(ep *url.URL, fragment pb.Fragment, d time.Duration)
if err != nil {
return "", err
}
opts.Method = "GET"
opts.Expires = time.Now().Add(d)

return client.Bucket(cfg.bucket).SignedURL(cfg.rewritePath(cfg.prefix, fragment.ContentPath()), &opts)
if DisableSignedUrls {
u := &url.URL{
Path: fmt.Sprintf("/%s/%s", cfg.bucket, cfg.rewritePath(cfg.prefix, fragment.ContentPath())),
}
u.Scheme = "https"
u.Host = "storage.googleapis.com"

return u.String(), nil
} else {
opts.Method = "GET"
opts.Expires = time.Now().Add(d)

return client.Bucket(cfg.bucket).SignedURL(cfg.rewritePath(cfg.prefix, fragment.ContentPath()), &opts)
}
}

func (s *gcsBackend) Exists(ctx context.Context, ep *url.URL, fragment pb.Fragment) (exists bool, err error) {
Expand Down
3 changes: 3 additions & 0 deletions broker/fragment/stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
// If true, fragments are not persisted, and stores are not listed for existing fragments.
var DisableStores bool = false

// Whether to return an unsigned URL when a signed URL is requested. Useful when clients do not require the signing.
var DisableSignedUrls bool = false

type backend interface {
Provider() string
SignGet(ep *url.URL, fragment pb.Fragment, d time.Duration) (string, error)
Expand Down
16 changes: 9 additions & 7 deletions cmd/gazette/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ const iniFilename = "gazette.ini"
var Config = new(struct {
Broker struct {
mbp.ServiceConfig
Limit uint32 `long:"limit" env:"LIMIT" default:"1024" description:"Maximum number of Journals the broker will allocate"`
FileRoot string `long:"file-root" env:"FILE_ROOT" description:"Local path which roots file:// fragment stores (optional)"`
MaxAppendRate uint32 `long:"max-append-rate" env:"MAX_APPEND_RATE" default:"0" description:"Max rate (in bytes-per-sec) that any one journal may be appended to. If zero, there is no max rate"`
MaxReplication uint32 `long:"max-replication" env:"MAX_REPLICATION" default:"9" description:"Maximum effective replication of any one journal, which upper-bounds its stated replication."`
MinAppendRate uint32 `long:"min-append-rate" env:"MIN_APPEND_RATE" default:"65536" description:"Min rate (in bytes-per-sec) at which a client may stream Append RPC content. RPCs unable to sustain this rate are aborted"`
DisableStores bool `long:"disable-stores" env:"DISABLE_STORES" description:"Disable use of any configured journal fragment stores. The broker will neither list or persist remote fragments, and all data is discarded on broker exit."`
WatchDelay time.Duration `long:"watch-delay" env:"WATCH_DELAY" default:"30ms" description:"Delay applied to the application of watched Etcd events. Larger values amortize the processing of fast-changing Etcd keys."`
Limit uint32 `long:"limit" env:"LIMIT" default:"1024" description:"Maximum number of Journals the broker will allocate"`
FileRoot string `long:"file-root" env:"FILE_ROOT" description:"Local path which roots file:// fragment stores (optional)"`
MaxAppendRate uint32 `long:"max-append-rate" env:"MAX_APPEND_RATE" default:"0" description:"Max rate (in bytes-per-sec) that any one journal may be appended to. If zero, there is no max rate"`
MaxReplication uint32 `long:"max-replication" env:"MAX_REPLICATION" default:"9" description:"Maximum effective replication of any one journal, which upper-bounds its stated replication."`
MinAppendRate uint32 `long:"min-append-rate" env:"MIN_APPEND_RATE" default:"65536" description:"Min rate (in bytes-per-sec) at which a client may stream Append RPC content. RPCs unable to sustain this rate are aborted"`
DisableStores bool `long:"disable-stores" env:"DISABLE_STORES" description:"Disable use of any configured journal fragment stores. The broker will neither list or persist remote fragments, and all data is discarded on broker exit."`
WatchDelay time.Duration `long:"watch-delay" env:"WATCH_DELAY" default:"30ms" description:"Delay applied to the application of watched Etcd events. Larger values amortize the processing of fast-changing Etcd keys."`
DisableSignedUrls bool `long:"disable-signed-urls" env:"DISABLE_SIGNED_URLS" description:"When a signed URL is requested, return an unsigned URL instead. This is useful when clients do not require the signing."`
} `group:"Broker" namespace:"broker" env-namespace:"BROKER"`

Etcd struct {
Expand Down Expand Up @@ -71,6 +72,7 @@ func (cmdServe) Execute(args []string) error {
broker.MaxAppendRate = int64(Config.Broker.MaxAppendRate)
pb.MaxReplication = int32(Config.Broker.MaxReplication)
fragment.DisableStores = Config.Broker.DisableStores
fragment.DisableSignedUrls = Config.Broker.DisableSignedUrls

var (
lo = pb.NewJournalClient(srv.GRPCLoopback)
Expand Down
Loading