Skip to content

Commit

Permalink
avoiding signed urls requires auth header support
Browse files Browse the repository at this point in the history
  • Loading branch information
ddowker committed Jun 5, 2024
1 parent 66a6347 commit 44b2ed5
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 31 deletions.
55 changes: 38 additions & 17 deletions broker/client/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (
"io"
"io/ioutil"
"net/http"
"net/url"

"github.com/prometheus/client_golang/prometheus"
"go.gazette.dev/core/broker/codecs"
"go.gazette.dev/core/broker/fragment"
pb "go.gazette.dev/core/broker/protocol"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -24,9 +26,9 @@ import (
// seek to the requested offset, and read its content.
//
// Reader returns EOF if:
// * The broker closes the RPC, eg because its assignment has change or it's shutting down.
// * The requested EndOffset has been read through.
// * A Fragment being read by the Reader reaches EOF.
// - The broker closes the RPC, eg because its assignment has change or it's shutting down.
// - The requested EndOffset has been read through.
// - A Fragment being read by the Reader reaches EOF.
//
// If Block is true, Read may block indefinitely. Otherwise, ErrOffsetNotYetAvailable
// is returned upon reaching the journal write head.
Expand Down Expand Up @@ -58,6 +60,10 @@ func NewReader(ctx context.Context, client pb.RoutedJournalClient, req pb.ReadRe
return r
}

// Arize logic to avoid use of signed URLs on GCS
var TransformSignedURLs = false
var gcs = fragment.GcsBackend{}

// Read from the journal. If this is the first Read of the Reader, a Read RPC is started.
func (r *Reader) Read(p []byte) (n int, err error) {
// If we have an open direct reader of a persisted fragment, delegate to it.
Expand Down Expand Up @@ -164,9 +170,24 @@ func (r *Reader) Read(p []byte) (n int, err error) {

// If the frame preceding EOF provided a fragment URL, open it directly.
if !r.Request.MetadataOnly && r.Response.Status == pb.Status_OK && r.Response.FragmentUrl != "" {
if r.direct, err = OpenFragmentURL(r.ctx, *r.Response.Fragment,
r.Request.Offset, r.Response.FragmentUrl); err == nil {
n, err = r.Read(p) // Recurse to attempt read against opened |r.direct|.
if TransformSignedURLs {
var url *url.URL
if url, err = url.Parse(r.Response.FragmentUrl); err != nil {
return 0, err

}
if url.Scheme != "gs" {
return 0, fmt.Error("TransformSignedURLs is only supported for GCS")
}
if r.direct, err = gcs.Open(r.ctx, r.Response.FragmentUrl,
*r.Response.Fragment, r.Request.Offset); err == nil {
n, err = r.Read(p) // Recurse to attempt read against opened |r.direct|.
}
} else {
if r.direct, err = OpenFragmentURL(r.ctx, *r.Response.Fragment,
r.Request.Offset, r.Response.FragmentUrl); err == nil {
n, err = r.Read(p) // Recurse to attempt read against opened |r.direct|.
}
}
return
}
Expand Down Expand Up @@ -197,9 +218,10 @@ func (r *Reader) AdjustedOffset(br *bufio.Reader) int64 {
}

// Seek provides a limited form of seeking support. Specifically, if:
// * A Fragment URL is being directly read, and
// * The Seek offset is ahead of the current Reader offset, and
// * The Fragment also covers the desired Seek offset
// - A Fragment URL is being directly read, and
// - The Seek offset is ahead of the current Reader offset, and
// - The Fragment also covers the desired Seek offset
//
// Then a seek is performed by reading and discarding to the seeked offset.
// Seek will otherwise return ErrSeekRequiresNewReader.
func (r *Reader) Seek(offset int64, whence int) (int64, error) {
Expand Down Expand Up @@ -349,15 +371,14 @@ func (fr *FragmentReader) Close() error {
// fragments are persisted, and to which this client also has access. The
// returned cleanup function removes the handler and restores the prior http.Client.
//
// const root = "/mnt/shared-nas-array/path/to/fragment-root"
// defer client.InstallFileTransport(root)()
//
// var rr = NewRetryReader(ctx, client, protocol.ReadRequest{
// Journal: "a/journal/with/nas/fragment/store",
// DoNotProxy: true,
// })
// // rr.Read will read Fragments directly from NAS.
// const root = "/mnt/shared-nas-array/path/to/fragment-root"
// defer client.InstallFileTransport(root)()
//
// var rr = NewRetryReader(ctx, client, protocol.ReadRequest{
// Journal: "a/journal/with/nas/fragment/store",
// DoNotProxy: true,
// })
// // rr.Read will read Fragments directly from NAS.
func InstallFileTransport(root string) (remove func()) {
var transport = http.DefaultTransport.(*http.Transport).Clone()
transport.RegisterProtocol("file", http.NewFileTransport(http.Dir(root)))
Expand Down
27 changes: 18 additions & 9 deletions broker/fragment/store_gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ type GSStoreConfig struct {
RewriterConfig
}

type gcsBackend struct {
type GcsBackend struct {
client *storage.Client
signedURLOptions storage.SignedURLOptions
clientMu sync.Mutex
}

func (s *gcsBackend) Provider() string {
func (s *GcsBackend) Provider() string {
return "gcs"
}

func (s *gcsBackend) SignGet(ep *url.URL, fragment pb.Fragment, d time.Duration) (string, error) {
func (s *GcsBackend) SignGet(ep *url.URL, fragment pb.Fragment, d time.Duration) (string, error) {
cfg, client, opts, err := s.gcsClient(ep)
if err != nil {
return "", err
Expand All @@ -59,7 +59,7 @@ func (s *gcsBackend) SignGet(ep *url.URL, fragment pb.Fragment, d time.Duration)
}
}

func (s *gcsBackend) Exists(ctx context.Context, ep *url.URL, fragment pb.Fragment) (exists bool, err error) {
func (s *GcsBackend) Exists(ctx context.Context, ep *url.URL, fragment pb.Fragment) (exists bool, err error) {
cfg, client, _, err := s.gcsClient(ep)
if err != nil {
return false, err
Expand All @@ -73,15 +73,24 @@ func (s *gcsBackend) Exists(ctx context.Context, ep *url.URL, fragment pb.Fragme
return exists, err
}

func (s *gcsBackend) Open(ctx context.Context, ep *url.URL, fragment pb.Fragment) (io.ReadCloser, error) {
func (s *GcsBackend) Open(ctx context.Context, ep *url.URL, fragment pb.Fragment) (io.ReadCloser, error) {
cfg, client, _, err := s.gcsClient(ep)
if err != nil {
return nil, err
}
return client.Bucket(cfg.bucket).Object(cfg.rewritePath(cfg.prefix, fragment.ContentPath())).NewReader(ctx)
}

func (s *gcsBackend) Persist(ctx context.Context, ep *url.URL, spool Spool) error {
// Arize Open routine with offset for use with consumers and signed URLs.
func (s *GcsBackend) Open(ctx context.Context, ep *url.URL, fragment pb.Fragment, offset int64) (io.ReadCloser, error) {
cfg, client, _, err := s.gcsClient(ep)
if err != nil {
return nil, err
}
return client.Bucket(cfg.bucket).Object(cfg.rewritePath(cfg.prefix, fragment.ContentPath())).NewRangeReader(ctx, offset, -1)
}

func (s *GcsBackend) Persist(ctx context.Context, ep *url.URL, spool Spool) error {
cfg, client, _, err := s.gcsClient(ep)
if err != nil {
return err
Expand All @@ -105,7 +114,7 @@ func (s *gcsBackend) Persist(ctx context.Context, ep *url.URL, spool Spool) erro
return err
}

func (s *gcsBackend) List(ctx context.Context, store pb.FragmentStore, ep *url.URL, journal pb.Journal, callback func(pb.Fragment)) error {
func (s *GcsBackend) List(ctx context.Context, store pb.FragmentStore, ep *url.URL, journal pb.Journal, callback func(pb.Fragment)) error {
var cfg, client, _, err = s.gcsClient(ep)
if err != nil {
return err
Expand Down Expand Up @@ -136,15 +145,15 @@ func (s *gcsBackend) List(ctx context.Context, store pb.FragmentStore, ep *url.U
return err
}

func (s *gcsBackend) Remove(ctx context.Context, fragment pb.Fragment) error {
func (s *GcsBackend) Remove(ctx context.Context, fragment pb.Fragment) error {
cfg, client, _, err := s.gcsClient(fragment.BackingStore.URL())
if err != nil {
return err
}
return client.Bucket(cfg.bucket).Object(cfg.rewritePath(cfg.prefix, fragment.ContentPath())).Delete(ctx)
}

func (s *gcsBackend) gcsClient(ep *url.URL) (cfg GSStoreConfig, client *storage.Client, opts storage.SignedURLOptions, err error) {
func (s *GcsBackend) gcsClient(ep *url.URL) (cfg GSStoreConfig, client *storage.Client, opts storage.SignedURLOptions, err error) {
var conf *jwt.Config

if err = parseStoreArgs(ep, &cfg); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions broker/fragment/stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ type backend interface {

var sharedStores = struct {
s3 *s3Backend
gcs *gcsBackend
gcs *GcsBackend
azure *azureBackend
fs *fsBackend
}{
s3: newS3Backend(),
gcs: &gcsBackend{},
gcs: &GcsBackend{},
azure: &azureBackend{
pipelines: make(map[string]pipeline.Pipeline),
clients: make(map[string]*service.Client),
Expand Down
10 changes: 7 additions & 3 deletions mainboilerplate/runconsumer/run_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,10 @@ type Config interface {
type BaseConfig struct {
Consumer struct {
mbp.ServiceConfig
Limit uint32 `long:"limit" env:"LIMIT" default:"32" description:"Maximum number of Shards this consumer process will allocate"`
MaxHotStandbys uint32 `long:"max-hot-standbys" env:"MAX_HOT_STANDBYS" default:"3" description:"Maximum effective hot standbys of any one shard, which upper-bounds its stated hot-standbys."`
WatchDelay time.Duration `long:"watch-delay" env:"WATCH_DELAY" default:"30ms" description:"Delay applied to the application of watched Etcd events. Larger values amortize the processing of fast-changing Etcd keys."`
Limit uint32 `long:"limit" env:"LIMIT" default:"32" description:"Maximum number of Shards this consumer process will allocate"`
MaxHotStandbys uint32 `long:"max-hot-standbys" env:"MAX_HOT_STANDBYS" default:"3" description:"Maximum effective hot standbys of any one shard, which upper-bounds its stated hot-standbys."`
WatchDelay time.Duration `long:"watch-delay" env:"WATCH_DELAY" default:"30ms" description:"Delay applied to the application of watched Etcd events. Larger values amortize the processing of fast-changing Etcd keys."`
TransformSignedURLs bool `long:"transform-signed-urls" env:"TRANSFORM_SIGNED_URLS" description:"When a signed URL is received, transform it into an unsigned URL. This is useful when clients do not require the signing."`
} `group:"Consumer" namespace:"consumer" env-namespace:"CONSUMER"`

Broker struct {
Expand Down Expand Up @@ -124,6 +125,9 @@ func (sc Cmd) Execute(args []string) error {
var srv, err = server.New("", bc.Consumer.Port)
mbp.Must(err, "building Server instance")

// Arize avoidance of using signed URLs.
client.TransformSignedURLs = bc.Consumer.TransformSignedURLs

if bc.Broker.Cache.Size <= 0 {
log.Warn("--broker.cache.size is disabled; consider setting > 0")
}
Expand Down

0 comments on commit 44b2ed5

Please sign in to comment.