Skip to content

Commit

Permalink
adjust to compressed returned data
Browse files Browse the repository at this point in the history
  • Loading branch information
ddowker committed Jun 6, 2024
1 parent c9e98f0 commit 98e2f8f
Showing 1 changed file with 20 additions and 5 deletions.
25 changes: 20 additions & 5 deletions broker/client/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,8 @@ func (r *Reader) Read(p []byte) (n int, err error) {
if fragURL.Scheme != "gs" {
return 0, fmt.Errorf("TransformSignedURL unsupported scheme: %s", fragURL.Scheme)
}
if r.direct, err = gcs.openWithOffset(r.ctx, fragURL,
*r.Response.Fragment, r.Request.Offset-r.Response.Fragment.Begin); err == nil {
if r.direct, err = OpenUnsignedFragmentURL(r.ctx, *r.Response.Fragment,
r.Request.Offset, fragURL); err == nil {
n, err = r.Read(p) // Recurse to attempt read against opened |r.direct|.
}
} else {
Expand Down Expand Up @@ -301,6 +301,19 @@ func OpenFragmentURL(ctx context.Context, fragment pb.Fragment, offset int64, ur
return NewFragmentReader(resp.Body, fragment, offset)
}

func OpenUnsignedFragmentURL(ctx context.Context, fragment pb.Fragment, offset int64, url *url.URL) (*FragmentReader, error) {
if rdr, err = gcs.open(r.ctx, fragURL, *r.Response.Fragment); err != nil {
return nil, err
}

// Record metrics related to opening the fragment.
var labels = fragmentLabels(fragment)
fragmentOpen.With(labels).Inc()
fragmentOpenBytes.With(labels).Add(float64(fragment.End - fragment.Begin))

return NewFragmentReader(rdr, fragment, offset)
}

// NewFragmentReader wraps a io.ReadCloser of raw Fragment bytes with a
// returned *FragmentReader which has been pre-seeked to the given offset.
func NewFragmentReader(rc io.ReadCloser, fragment pb.Fragment, offset int64) (*FragmentReader, error) {
Expand Down Expand Up @@ -443,6 +456,8 @@ var (
httpClient = http.DefaultClient
)

// ARIZE specific code to end of file.
//
// stores_test.go, which is in broker/fragment, imports broker/client so we cannot import broker/fragment here
// to avoid a cycle. Instead we will repeat a subset of store_gcs.go.

Expand All @@ -454,13 +469,13 @@ type gcsBackend struct {
clientMu sync.Mutex
}

// Arize Open routine with offset for use with consumers and signed URLs.
func (s *gcsBackend) openWithOffset(ctx context.Context, ep *url.URL, fragment pb.Fragment, offset int64) (io.ReadCloser, error) {
// Arize Open routine for use with consumers and signed URLs.
func (s *gcsBackend) open(ctx context.Context, ep *url.URL, fragment pb.Fragment) (io.ReadCloser, error) {
cfg, gClient, err := s.gcsClient(ep)
if err != nil {
return nil, err
}
return gClient.Bucket(cfg.bucket).Object(cfg.rewritePath(cfg.prefix, fragment.ContentPath())).NewRangeReader(ctx, offset, -1)
return gClient.Bucket(cfg.bucket).Object(cfg.rewritePath(cfg.prefix, fragment.ContentPath())).NewReader(ctx)
}

func (s *gcsBackend) gcsClient(ep *url.URL) (cfg GSStoreConfig, client *storage.Client, err error) {
Expand Down

0 comments on commit 98e2f8f

Please sign in to comment.