Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Leggett <[email protected]>
  • Loading branch information
bleggett committed Jul 2, 2024
1 parent a62344a commit ce8e94f
Show file tree
Hide file tree
Showing 3 changed files with 542 additions and 9 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module github.com/spiffe/spire

go 1.22.3

replace github.com/spiffe/spire-api-sdk => ../spire-api-sdk

require (
cloud.google.com/go/iam v1.1.8
cloud.google.com/go/kms v1.18.0
Expand Down
139 changes: 131 additions & 8 deletions pkg/agent/api/delegatedidentity/v1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,22 @@ func New(config Config) *Service {
}

return &Service{
manager: config.Manager,
attestor: endpoints.PeerTrackerAttestor{Attestor: config.Attestor},
metrics: config.Metrics,
authorizedDelegates: AuthorizedDelegates,
manager: config.Manager,
peerAttestor: endpoints.PeerTrackerAttestor{Attestor: config.Attestor},
delegateWorkloadAttestor: config.Attestor,
metrics: config.Metrics,
authorizedDelegates: AuthorizedDelegates,
}
}

// Service implements the delegated identity server
type Service struct {
delegatedidentityv1.UnsafeDelegatedIdentityServer

manager manager.Manager
attestor attestor
metrics telemetry.Metrics
manager manager.Manager
peerAttestor attestor
delegateWorkloadAttestor workloadattestor.Attestor
metrics telemetry.Metrics

// SPIFFE IDs of delegates that are authorized to use this API
authorizedDelegates map[string]bool
Expand All @@ -79,7 +81,7 @@ func (s *Service) isCallerAuthorized(ctx context.Context, log logrus.FieldLogger
callerSelectors := cachedSelectors

if callerSelectors == nil {
callerSelectors, err = s.attestor.Attest(ctx)
callerSelectors, err = s.peerAttestor.Attest(ctx)
if err != nil {
log.WithError(err).Error("Workload attestation failed")
return nil, status.Error(codes.Internal, "workload attestation failed")
Expand Down Expand Up @@ -111,6 +113,63 @@ func (s *Service) isCallerAuthorized(ctx context.Context, log logrus.FieldLogger
return nil, status.Error(codes.PermissionDenied, "caller not configured as an authorized delegate")
}

// Attempt to attest and authorize the delegate, and then take the PID the delegate gave us
// and attempt to attest that into a set of selectors + a subscription to changes of those selectors.
//
// Note that the trusted delegate is responsible for ensuring the PID is valid and not recycled,
// from initiation of this call until the termination of the response stream, and if it is,
// must discard any stream contents provided by this call as invalid.
func (s *Service) SubscribeToX509SVIDsByPID(req *delegatedidentityv1.SubscribeToX509SVIDsByPIDRequest, stream delegatedidentityv1.DelegatedIdentity_SubscribeToX509SVIDsByPIDServer) error {
latency := adminapi.StartFirstX509SVIDUpdateLatency(s.metrics)
ctx := stream.Context()
log := rpccontext.Logger(ctx)
var receivedFirstUpdate bool

cachedSelectors, err := s.isCallerAuthorized(ctx, log, nil)
if err != nil {
return err
}

// Delegate authorized, use PID the delegate gave us to try and attest OBO
selectors, err := s.delegateWorkloadAttestor.Attest(ctx, int(req.Pid))
if err != nil {
return err
}

log.WithFields(logrus.Fields{
"delegate_selectors": cachedSelectors,
"pid_selectors": selectors,
}).Info("Subscribing to cache changes")

subscriber, err := s.manager.SubscribeToCacheChanges(ctx, selectors)
if err != nil {
log.WithError(err).Error("Subscribe to cache changes failed")
return err
}
defer subscriber.Finish()

for {
select {
case update := <-subscriber.Updates():
if len(update.Identities) > 0 && !receivedFirstUpdate {
// emit latency metric for first update containing an SVID.
latency.Measure()
receivedFirstUpdate = true
}

if _, err := s.isCallerAuthorized(ctx, log, cachedSelectors); err != nil {
return err
}

if err := sendX509SVIDResponse(update, stream, log); err != nil {
return err
}
case <-ctx.Done():
return nil
}
}
}

func (s *Service) SubscribeToX509SVIDs(req *delegatedidentityv1.SubscribeToX509SVIDsRequest, stream delegatedidentityv1.DelegatedIdentity_SubscribeToX509SVIDsServer) error {
latency := adminapi.StartFirstX509SVIDUpdateLatency(s.metrics)
ctx := stream.Context()
Expand Down Expand Up @@ -290,6 +349,70 @@ func (s *Service) SubscribeToX509Bundles(_ *delegatedidentityv1.SubscribeToX509B
}
}

// Attempt to attest and authorize the delegate, and then take the PID the delegate gave us
// and attempt to attest that into a set of selectors.
//
// Note that the trusted delegate is responsible for ensuring the PID is valid and not recycled,
// from initiation of this call until the return of this call, and if it is must discard any response
// provided by this call as invalid.
func (s *Service) FetchJWTSVIDsByPID(ctx context.Context, req *delegatedidentityv1.FetchJWTSVIDsByPIDRequest) (resp *delegatedidentityv1.FetchJWTSVIDsResponse, err error) {
log := rpccontext.Logger(ctx)
if len(req.Audience) == 0 {
log.Error("Missing required audience parameter")
return nil, status.Error(codes.InvalidArgument, "audience must be specified")
}

if _, err = s.isCallerAuthorized(ctx, log, nil); err != nil {
return nil, err
}

// Delegate authorized, use PID the delegate gave us to try and attest OBO
selectors, err := s.delegateWorkloadAttestor.Attest(ctx, int(req.Pid))
if err != nil {
return nil, err
}

resp = new(delegatedidentityv1.FetchJWTSVIDsResponse)

entries := s.manager.MatchingRegistrationEntries(selectors)
for _, entry := range entries {
spiffeID, err := spiffeid.FromString(entry.SpiffeId)
if err != nil {
log.WithField(telemetry.SPIFFEID, entry.SpiffeId).WithError(err).Error("Invalid requested SPIFFE ID")
return nil, status.Errorf(codes.InvalidArgument, "invalid requested SPIFFE ID: %v", err)
}

loopLog := log.WithField(telemetry.SPIFFEID, spiffeID.String())

var svid *client.JWTSVID
svid, err = s.manager.FetchJWTSVID(ctx, entry, req.Audience)
if err != nil {
loopLog.WithError(err).Error("Could not fetch JWT-SVID")
return nil, status.Errorf(codes.Unavailable, "could not fetch JWT-SVID: %v", err)
}
resp.Svids = append(resp.Svids, &types.JWTSVID{
Token: svid.Token,
Id: &types.SPIFFEID{
TrustDomain: spiffeID.TrustDomain().Name(),
Path: spiffeID.Path(),
},
ExpiresAt: svid.ExpiresAt.Unix(),
IssuedAt: svid.IssuedAt.Unix(),
Hint: entry.Hint,
})

ttl := time.Until(svid.ExpiresAt)
loopLog.WithField(telemetry.TTL, ttl.Seconds()).Debug("Fetched JWT SVID")
}

if len(resp.Svids) == 0 {
log.Error("No identity issued")
return nil, status.Error(codes.PermissionDenied, "no identity issued")
}

return resp, nil
}

func (s *Service) FetchJWTSVIDs(ctx context.Context, req *delegatedidentityv1.FetchJWTSVIDsRequest) (resp *delegatedidentityv1.FetchJWTSVIDsResponse, err error) {
log := rpccontext.Logger(ctx)
if len(req.Audience) == 0 {
Expand Down
Loading

0 comments on commit ce8e94f

Please sign in to comment.