Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Creates Blob Retrieval Endpoint #96

Merged
merged 6 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions azure/azblob/azblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,47 +103,49 @@ func (c *Client) DeleteBlob(ctx context.Context, uri string) error {

// SignedUploadURL returns a URL that is allowed to upload to the given URI.
// See https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/storage/[email protected]/sas#example-package-UserDelegationSAS
func (c *Client) SignedUploadURL(ctx context.Context, uri string) (string, error) {
func (c *Client) SignedUploadURL(ctx context.Context, uri string) (string, time.Time, error) {
return c.signBlob(ctx, uri, &sas.BlobPermissions{Create: true, Write: true})
}

// SignedDownloadURL returns a URL that is allowed to download the file at the given URI.
// See https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/storage/[email protected]/sas#example-package-UserDelegationSAS
func (c *Client) SignedDownloadURL(ctx context.Context, uri string) (string, error) {
func (c *Client) SignedDownloadURL(ctx context.Context, uri string) (string, time.Time, error) {
return c.signBlob(ctx, uri, &sas.BlobPermissions{Read: true})
}

func (c *Client) signBlob(ctx context.Context, uri string, perms *sas.BlobPermissions) (string, error) {
func (c *Client) signBlob(ctx context.Context, uri string, perms *sas.BlobPermissions) (string, time.Time, error) {
ctr, blb, ok := blob.SplitURI(Scheme, uri)
if !ok {
return "", fmt.Errorf("malformed URI %q is not for Azure", uri)
return "", time.Time{}, fmt.Errorf("malformed URI %q is not for Azure", uri)
}

// The blob component is important, otherwise the signed URL is applicable to the whole container.
if blb == "" {
return "", fmt.Errorf("uri %q did not contain a blob component", uri)
return "", time.Time{}, fmt.Errorf("uri %q did not contain a blob component", uri)
}

now := c.now().UTC().Add(-10 * time.Second)
udc, err := c.getUserDelegationCredential(ctx, now)
if err != nil {
return "", fmt.Errorf("failed to get udc: %w", err)
return "", time.Time{}, fmt.Errorf("failed to get udc: %w", err)
}

expiry := now.Add(15 * time.Minute)

// Create Blob Signature Values with desired permissions and sign with user delegation credential
sasQueryParams, err := sas.BlobSignatureValues{
Protocol: sas.ProtocolHTTPS,
StartTime: now,
ExpiryTime: now.Add(15 * time.Minute),
ExpiryTime: expiry,
Permissions: perms.String(),
ContainerName: ctr,
BlobName: blb,
}.SignWithUserDelegation(udc)
if err != nil {
return "", fmt.Errorf("failed to sign blob: %w", err)
return "", time.Time{}, fmt.Errorf("failed to sign blob: %w", err)
}

return fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s?%s", c.storageAccount, ctr, blb, sasQueryParams.Encode()), nil
return fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s?%s", c.storageAccount, ctr, blb, sasQueryParams.Encode()), expiry, nil
}

func (c *Client) ListBlobs(ctx context.Context, uriPrefix string) ([]string, error) {
Expand Down
1 change: 1 addition & 0 deletions cmd/server/pactasrv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "pactasrv",
srcs = [
"audit_logs.go",
"blobs.go",
"incomplete_upload.go",
"initiative.go",
"initiative_invitation.go",
Expand Down
103 changes: 103 additions & 0 deletions cmd/server/pactasrv/blobs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package pactasrv

import (
"context"
"fmt"

"github.com/RMI/pacta/db"
"github.com/RMI/pacta/oapierr"
api "github.com/RMI/pacta/openapi/pacta"
"github.com/RMI/pacta/pacta"
"go.uber.org/zap"
)

func (s *Server) AccessBlobContent(ctx context.Context, request api.AccessBlobContentRequestObject) (api.AccessBlobContentResponseObject, error) {
actorInfo, err := s.getActorInfoOrFail(ctx)
if err != nil {
return nil, err
}

blobIDs := []pacta.BlobID{}
for _, item := range request.Body.Items {
blobIDs = append(blobIDs, pacta.BlobID(item.BlobId))
}
err404 := oapierr.NotFound("blob not found", zap.Strings("blob_ids", asStrs(blobIDs)))
bos, err := s.DB.BlobContexts(s.DB.NoTxn(ctx), blobIDs)
if err != nil {
if db.IsNotFound(err) {
return nil, err404
}
return nil, oapierr.Internal("error getting blob owners", zap.Error(err), zap.Strings("blob_ids", asStrs(blobIDs)))
}
asMap := map[pacta.BlobID]*pacta.BlobContext{}
for _, boi := range bos {
asMap[boi.BlobID] = boi
}
auditLogs := []*pacta.AuditLog{}
for _, blobID := range blobIDs {
boi := asMap[blobID]
accessAsOwner := boi.PrimaryTargetOwnerID == actorInfo.OwnerID
accessAsAdmin := boi.AdminDebugEnabled && actorInfo.IsAdmin
accessAsSuperAdmin := boi.AdminDebugEnabled && actorInfo.IsSuperAdmin
var actorType pacta.AuditLogActorType
if accessAsOwner {
actorType = pacta.AuditLogActorType_Owner
} else if accessAsAdmin {
actorType = pacta.AuditLogActorType_Admin
} else if accessAsSuperAdmin {
actorType = pacta.AuditLogActorType_SuperAdmin
} else {
// DENY CASE
return nil, err404
}
auditLogs = append(auditLogs, &pacta.AuditLog{
Action: pacta.AuditLogAction_Download,
ActorID: string(actorInfo.UserID),
ActorOwner: &pacta.Owner{ID: actorInfo.OwnerID},
ActorType: actorType,
PrimaryTargetType: boi.PrimaryTargetType,
PrimaryTargetID: boi.PrimaryTargetID,
PrimaryTargetOwner: &pacta.Owner{ID: boi.PrimaryTargetOwnerID},
})
}

blobs, err := s.DB.Blobs(s.DB.NoTxn(ctx), blobIDs)
if err != nil {
if db.IsNotFound(err) {
return nil, err404
}
return nil, oapierr.Internal("error getting blobs", zap.Error(err), zap.Strings("blob_ids", asStrs(blobIDs)))
}

err = s.DB.Transactional(ctx, func(tx db.Tx) error {
for i, al := range auditLogs {
_, err := s.DB.CreateAuditLog(tx, al)
gbdubs marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("creating audit log %d/%d: %w", i+1, len(auditLogs), err)
}
}
return nil
})
if err != nil {
return nil, oapierr.Internal("error creating audit logs - no download URLs generated", zap.Error(err), zap.Strings("blob_ids", asStrs(blobIDs)))
}

// Note, we're not parallelizing this because it is probably not nescessary.
// The majority use case of this endpoint will be the user clicking a download
// button, which will spin as it gets the URL, then turn into a dial as the
// download starts. That allows us to only generate audit logs for true accesses,
// and will typically happen on a single-file basis.
response := api.AccessBlobContentResp{}
for _, blob := range blobs {
url, expiryTime, err := s.Blob.SignedDownloadURL(ctx, string(blob.BlobURI))
if err != nil {
return nil, oapierr.Internal("error getting signed download url", zap.Error(err), zap.String("blob_uri", string(blob.BlobURI)))
}
response.Items = append(response.Items, api.AccessBlobContentRespItem{
BlobId: string(blob.ID),
DownloadUrl: url,
ExpirationTime: expiryTime,
})
}
return api.AccessBlobContent200JSONResponse(response), nil
}
50 changes: 46 additions & 4 deletions cmd/server/pactasrv/pactasrv.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type DB interface {
CreateBlob(tx db.Tx, b *pacta.Blob) (pacta.BlobID, error)
UpdateBlob(tx db.Tx, id pacta.BlobID, mutations ...db.UpdateBlobFn) error
DeleteBlob(tx db.Tx, id pacta.BlobID) (pacta.BlobURI, error)
BlobContexts(tx db.Tx, ids []pacta.BlobID) ([]*pacta.BlobContext, error)

InitiativeInvitation(tx db.Tx, id pacta.InitiativeInvitationID) (*pacta.InitiativeInvitation, error)
InitiativeInvitationsByInitiative(tx db.Tx, iid pacta.InitiativeID) ([]*pacta.InitiativeInvitation, error)
Expand Down Expand Up @@ -93,20 +94,21 @@ type DB interface {
CreatePortfolioGroupMembership(tx db.Tx, pgID pacta.PortfolioGroupID, pID pacta.PortfolioID) error
DeletePortfolioGroupMembership(tx db.Tx, pgID pacta.PortfolioGroupID, pID pacta.PortfolioID) error

AuditLogs(tx db.Tx, q *db.AuditLogQuery) ([]*pacta.AuditLog, *db.PageInfo, error)

GetOrCreateUserByAuthn(tx db.Tx, mech pacta.AuthnMechanism, authnID, email, canonicalEmail string) (*pacta.User, error)
User(tx db.Tx, id pacta.UserID) (*pacta.User, error)
Users(tx db.Tx, ids []pacta.UserID) (map[pacta.UserID]*pacta.User, error)
UpdateUser(tx db.Tx, id pacta.UserID, mutations ...db.UpdateUserFn) error
DeleteUser(tx db.Tx, id pacta.UserID) error

CreateAuditLog(tx db.Tx, a *pacta.AuditLog) (pacta.AuditLogID, error)
AuditLogs(tx db.Tx, q *db.AuditLogQuery) ([]*pacta.AuditLog, *db.PageInfo, error)
}

type Blob interface {
Scheme() blob.Scheme

SignedUploadURL(ctx context.Context, uri string) (string, error)
SignedDownloadURL(ctx context.Context, uri string) (string, error)
SignedUploadURL(ctx context.Context, uri string) (string, time.Time, error)
SignedDownloadURL(ctx context.Context, uri string) (string, time.Time, error)
DeleteBlob(ctx context.Context, uri string) error
}

Expand Down Expand Up @@ -166,10 +168,50 @@ func (s *Server) getUserOwnerID(ctx context.Context) (pacta.OwnerID, error) {
return ownerID, nil
}

func (s *Server) isAdminOrSuperAdmin(ctx context.Context) (bool, bool, error) {
userID, err := getUserID(ctx)
if err != nil {
return false, false, err
}
user, err := s.DB.User(s.DB.NoTxn(ctx), userID)
if err != nil {
return false, false, oapierr.Internal("failed to find user", zap.Error(err))
}
return user.Admin, user.SuperAdmin, nil
}

func asStrs[T ~string](ts []T) []string {
result := make([]string, len(ts))
for i, t := range ts {
result[i] = string(t)
}
return result
}

type actorInfo struct {
UserID pacta.UserID
OwnerID pacta.OwnerID
IsAdmin bool
IsSuperAdmin bool
}

func (s *Server) getActorInfoOrFail(ctx context.Context) (*actorInfo, error) {
actorUserID, err := getUserID(ctx)
if err != nil {
return nil, err
}
actorOwnerID, err := s.getUserOwnerID(ctx)
if err != nil {
return nil, err
}
actorIsAdmin, actorIsSuperAdmin, err := s.isAdminOrSuperAdmin(ctx)
if err != nil {
return nil, err
}
return &actorInfo{
UserID: actorUserID,
OwnerID: actorOwnerID,
IsAdmin: actorIsAdmin,
IsSuperAdmin: actorIsSuperAdmin,
}, nil
}
2 changes: 1 addition & 1 deletion cmd/server/pactasrv/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (s *Server) StartPortfolioUpload(ctx context.Context, request api.StartPort
for i := range request.Body.Items {
id := uuid.NewString()
uri := blob.Join(s.Blob.Scheme(), s.PorfolioUploadURI, id)
signed, err := s.Blob.SignedUploadURL(ctx, uri)
signed, _, err := s.Blob.SignedUploadURL(ctx, uri)
if err != nil {
return nil, oapierr.Internal("failed to sign blob URI", zap.String("uri", uri), zap.Error(err))
}
Expand Down
36 changes: 36 additions & 0 deletions db/sqldb/analysis_artifact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,45 @@ func TestAnalysisArtifacts(t *testing.T) {
t.Errorf("unexpected diff (+got -want): %v", diff)
}

blobContexts, err := tdb.BlobContexts(tx, []pacta.BlobID{b1.ID, b2.ID, b3.ID})
if err != nil {
t.Fatalf("reading blob owners: %v", err)
}
expectedBCs := []*pacta.BlobContext{{
BlobID: b1.ID,
PrimaryTargetOwnerID: o.ID,
PrimaryTargetType: "ANALYSIS",
PrimaryTargetID: string(aid),
AdminDebugEnabled: false,
}, {
BlobID: b2.ID,
PrimaryTargetOwnerID: o.ID,
PrimaryTargetType: "ANALYSIS",
PrimaryTargetID: string(aid),
AdminDebugEnabled: true,
}, {
BlobID: b3.ID,
PrimaryTargetOwnerID: o.ID,
PrimaryTargetType: "ANALYSIS",
PrimaryTargetID: string(aid),
AdminDebugEnabled: false,
}}
if diff := cmp.Diff(expectedBCs, blobContexts, cmpOpts); diff != "" {
t.Errorf("unexpected diff (+got -want): %v", diff)
}

buris, err := tdb.DeleteAnalysis(tx, aid)
if err != nil {
t.Fatalf("deleting analysis: %v", err)
}
if diff := cmp.Diff([]pacta.BlobURI{b1.BlobURI, b2.BlobURI, b3.BlobURI}, buris, cmpOpts); diff != "" {
t.Errorf("unexpected diff (+got -want): %v", diff)
}

_, err = tdb.BlobContexts(tx, []pacta.BlobID{b1.ID, b2.ID, b3.ID})
if err == nil {
t.Fatalf("reading blob owners should have failed but was fine", err)
}
}

func analysisArtifactCmpOpts() cmp.Option {
Expand All @@ -115,10 +147,14 @@ func analysisArtifactCmpOpts() cmp.Option {
aaLessFn := func(a, b *pacta.AnalysisArtifact) bool {
return a.ID < b.ID
}
boLessFn := func(a, b *pacta.BlobContext) bool {
return a.BlobID < b.BlobID
}
return cmp.Options{
cmpopts.EquateEmpty(),
cmpopts.EquateApproxTime(time.Second),
cmpopts.SortSlices(blobURILessFn),
cmpopts.SortSlices(aaLessFn),
cmpopts.SortSlices(boLessFn),
}
}
Loading