Skip to content

Commit

Permalink
* Force rotation of X.509 workload SVIDs in lru cache
Browse files Browse the repository at this point in the history
* Force rotation of X.509 workload SVIDs in store SVID cache
* Force rotation of Agent SVID

Signed-off-by: Marcos Yacob <[email protected]>
  • Loading branch information
MarcosDY committed Aug 31, 2024
1 parent e91897b commit 4bfad98
Show file tree
Hide file tree
Showing 12 changed files with 287 additions and 11 deletions.
5 changes: 3 additions & 2 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (a *Agent) Run(ctx context.Context) error {
}
}

svidStoreCache := a.newSVIDStoreCache()
svidStoreCache := a.newSVIDStoreCache(metrics)

manager, err := a.newManager(ctx, a.sto, cat, metrics, as, svidStoreCache, nodeAttestor)
if err != nil {
Expand Down Expand Up @@ -328,10 +328,11 @@ func (a *Agent) newManager(ctx context.Context, sto storage.Storage, cat catalog
}
}

func (a *Agent) newSVIDStoreCache() *storecache.Cache {
func (a *Agent) newSVIDStoreCache(metrics telemetry.Metrics) *storecache.Cache {
config := &storecache.Config{
Log: a.c.Log.WithField(telemetry.SubsystemName, "svid_store_cache"),
TrustDomain: a.c.TrustDomain,
Metrics: metrics,
}

return storecache.New(config)
Expand Down
10 changes: 10 additions & 0 deletions pkg/agent/manager/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ type UpdateEntries struct {
// Bundles is a set of ALL trust bundles available to the agent, keyed by trust domain
Bundles map[spiffeid.TrustDomain]*spiffebundle.Bundle

// TaintedX509Authorities is a set of all tainted X.509 authorities notified by the server.
TaintedX509Authorities []string

// TaintedJWTAuthorities is a set of all tainted JWT authorities notified by the server.
TaintedJWTAuthorities []string

// RegistrationEntries is a set of ALL registration entries available to the
// agent, keyed by registration entry id.
RegistrationEntries map[string]*common.RegistrationEntry
Expand Down Expand Up @@ -413,6 +419,10 @@ func (c *Cache) UpdateSVIDs(update *UpdateSVIDs) {
}
}

func (c *Cache) TaintX509SVIDs(taintedX509Authorities []*x509.Certificate) {
// This cache is going to be removed in 1.11...
}

// GetStaleEntries obtains a list of stale entries
func (c *Cache) GetStaleEntries() []*StaleEntry {
c.mu.Lock()
Expand Down
31 changes: 31 additions & 0 deletions pkg/agent/manager/cache/lru_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cache

import (
"context"
"crypto/x509"
"fmt"
"sort"
"sync"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/spiffe/spire/pkg/agent/common/backoff"
"github.com/spiffe/spire/pkg/common/telemetry"
agentmetrics "github.com/spiffe/spire/pkg/common/telemetry/agent"
"github.com/spiffe/spire/pkg/common/x509util"
"github.com/spiffe/spire/proto/spire/common"
)

Expand Down Expand Up @@ -483,6 +485,35 @@ func (c *LRUCache) UpdateSVIDs(update *UpdateSVIDs) {
}
}

func (c *LRUCache) TaintX509SVIDs(taintedX509Authorities []*x509.Certificate) {
// TOOD: add elapsed time metrics

Check failure on line 489 in pkg/agent/manager/cache/lru_cache.go

View workflow job for this annotation

GitHub Actions / lint (linux)

`TOOD` is a misspelling of `TODO` (misspell)

Check failure on line 489 in pkg/agent/manager/cache/lru_cache.go

View workflow job for this annotation

GitHub Actions / lint (windows)

`TOOD` is a misspelling of `TODO` (misspell)
c.mu.Lock()
defer c.mu.Unlock()

start := time.Now()

taintedSVIDs := 0
for key, svid := range c.svids {
// no process already tainted or empty SVIDs
if svid == nil {
continue
}

if tainted := x509util.IsSignedByRoot(svid.Chain, taintedX509Authorities); tainted {
taintedSVIDs += 1
delete(c.svids, key)
}
}

// TODO: remove....
c.log.Debugf("******************************************************")
c.log.Debugf("Duration to process %d svids: %v", taintedSVIDs, time.Since(start))
c.log.Debugf("******************************************************")

agentmetrics.AddCacheManagerExpiredSVIDsSample(c.metrics, "", float32(taintedSVIDs))
c.log.WithField(telemetry.TaintedSVIDs, taintedSVIDs).Debug("Tainted X.509 SVIDs")
}

// GetStaleEntries obtains a list of stale entries
func (c *LRUCache) GetStaleEntries() []*StaleEntry {
c.mu.Lock()
Expand Down
3 changes: 3 additions & 0 deletions pkg/agent/manager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ func newManager(c *Config) *manager {
client: client,
clk: c.Clk,
svidStoreCache: c.SVIDStoreCache,

processedTaintedX509Authorities: make(map[string]struct{}),
processedTaintedJWTAuthorities: make(map[string]struct{}),
}

return m
Expand Down
8 changes: 8 additions & 0 deletions pkg/agent/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,14 @@ type manager struct {
// cache.
syncedEntries map[string]*common.RegistrationEntry
syncedBundles map[string]*common.Bundle

// processedTaintedX509Authorities holds all the already processed tainted X.509 Authorities
// to prevent processing them again.
processedTaintedX509Authorities map[string]struct{}

// processedTaintedJWTAuthorities holds all the already processed tainted JWT Authorities
// to prevent processing them again.
processedTaintedJWTAuthorities map[string]struct{}
}

func (m *manager) Initialize(ctx context.Context) error {
Expand Down
33 changes: 33 additions & 0 deletions pkg/agent/manager/storecache/cache.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package storecache

import (
"crypto/x509"
"sort"
"sync"
"time"
Expand All @@ -10,6 +11,8 @@ import (
"github.com/spiffe/go-spiffe/v2/spiffeid"
"github.com/spiffe/spire/pkg/agent/manager/cache"
"github.com/spiffe/spire/pkg/common/telemetry"
telemetry_agent "github.com/spiffe/spire/pkg/common/telemetry/agent"
"github.com/spiffe/spire/pkg/common/x509util"
"github.com/spiffe/spire/proto/spire/common"
)

Expand Down Expand Up @@ -46,6 +49,7 @@ type cachedRecord struct {
type Config struct {
Log logrus.FieldLogger
TrustDomain spiffeid.TrustDomain
Metrics telemetry.Metrics
}

type Cache struct {
Expand Down Expand Up @@ -219,6 +223,35 @@ func (c *Cache) UpdateSVIDs(update *cache.UpdateSVIDs) {
}
}

func (c *Cache) TaintX509SVIDs(taintedX509Authorities []*x509.Certificate) {
// TOOD: add elapsed time metrics

Check failure on line 227 in pkg/agent/manager/storecache/cache.go

View workflow job for this annotation

GitHub Actions / lint (linux)

`TOOD` is a misspelling of `TODO` (misspell)

Check failure on line 227 in pkg/agent/manager/storecache/cache.go

View workflow job for this annotation

GitHub Actions / lint (windows)

`TOOD` is a misspelling of `TODO` (misspell)
c.mtx.Lock()
defer c.mtx.Unlock()

start := time.Now()

taintedSVIDs := 0
for _, record := range c.records {
// no process already tainted or empty SVIDs
if record.svid == nil {
continue
}

if tainted := x509util.IsSignedByRoot(record.svid.Chain, taintedX509Authorities); tainted {
taintedSVIDs += 1
record.svid = nil
}
}

telemetry_agent.AddCacheManagerExpiredSVIDsSample(c.c.Metrics, "svid_store", float32(taintedSVIDs))
c.c.Log.WithField(telemetry.TaintedSVIDs, taintedSVIDs).Debug("Tainted X.509 SVIDs")

// TODO: remove....
c.c.Log.Debugf("******************************************************")
c.c.Log.Debugf("Duration to process %d svids: %v", taintedSVIDs, time.Since(start))
c.c.Log.Debugf("******************************************************")
}

// GetStaleEntries obtains a list of stale entries, that needs new SVIDs
func (c *Cache) GetStaleEntries() []*cache.StaleEntry {
c.mtx.Lock()
Expand Down
94 changes: 90 additions & 4 deletions pkg/agent/manager/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"crypto"
"crypto/x509"
"fmt"
"strings"
"time"

"github.com/sirupsen/logrus"
Expand All @@ -15,6 +17,7 @@ import (
"github.com/spiffe/spire/pkg/common/telemetry"
telemetry_agent "github.com/spiffe/spire/pkg/common/telemetry/agent"
"github.com/spiffe/spire/pkg/common/util"
"github.com/spiffe/spire/pkg/common/x509util"
"github.com/spiffe/spire/proto/spire/common"
)

Expand All @@ -33,6 +36,10 @@ type SVIDCache interface {

// GetStaleEntries gets a list of records that need update SVIDs
GetStaleEntries() []*cache.StaleEntry

// TaintX509SVIDs marks all SVIDs signed by a tainted X.509 authority as tainted
// to force their rotation.
TaintX509SVIDs(taintedX509Authorities []*x509.Certificate)
}

func (m *manager) syncSVIDs(ctx context.Context) (err error) {
Expand All @@ -44,6 +51,44 @@ func (m *manager) syncSVIDs(ctx context.Context) (err error) {
return nil
}

// processTaintedAuthorities verifies if a new authority is tainted and forces rotation in all caches if required.
func (m *manager) processTaintedAuthorities(x509Authorities []string, jwtAuthorities []string) error {
newTaintedX509Authorities := getNewItems(m.processedTaintedX509Authorities, x509Authorities)
if len(newTaintedX509Authorities) > 0 {
m.c.Log.WithField(telemetry.SubjectKeyIDs, strings.Join(newTaintedX509Authorities, ",")).
Debug("New tainted X.509 authorities found")

taintedX509Authorities, err := bundleutil.FindX509Authorities(m.c.Bundle, newTaintedX509Authorities)
if err != nil {
return fmt.Errorf("failed to search X.509 authorities: %w", err)
}

// Taint all regular X.509 SVIDs
m.cache.TaintX509SVIDs(taintedX509Authorities)

// Taint all SVIDStore SVIDs
m.svidStoreCache.TaintX509SVIDs(taintedX509Authorities)

// Notify rotator about new tainted authorities
if err := m.svid.NotifyTaintedAuthorities(taintedX509Authorities); err != nil {
return err
}

for _, subjectKeyID := range newTaintedX509Authorities {
m.processedTaintedX509Authorities[subjectKeyID] = struct{}{}
}
}

newTaintedJWTAuthorities := getNewItems(m.processedTaintedJWTAuthorities, jwtAuthorities)
if len(newTaintedJWTAuthorities) > 0 {
m.c.Log.WithField(telemetry.SubjectKeyIDs, strings.Join(newTaintedJWTAuthorities, ",")).
Debug("New tainted JWT authorities found")
// TODO: IMPLEMENT!!!
}

return nil
}

// synchronize fetches the authorized entries from the server, updates the
// cache, and fetches missing/expiring SVIDs.
func (m *manager) synchronize(ctx context.Context) (err error) {
Expand All @@ -52,6 +97,11 @@ func (m *manager) synchronize(ctx context.Context) (err error) {
return err
}

// Process all tainted authorities. The bundle is shared between both caches using regular cache data.
if err := m.processTaintedAuthorities(cacheUpdate.TaintedX509Authorities, cacheUpdate.TaintedJWTAuthorities); err != nil {
return err
}

if err := m.updateCache(ctx, cacheUpdate, m.c.Log.WithField(telemetry.CacheType, "workload"), "", m.cache); err != nil {
return err
}
Expand Down Expand Up @@ -258,6 +308,27 @@ func (m *manager) fetchEntries(ctx context.Context) (_ *cache.UpdateEntries, _ *
return nil, nil, err
}

// Get all Subject Key IDs and KeyIDs of tainted authorities
var taintedX509Authorities []string
var taintedJWTAuthorities []string
if b, ok := update.Bundles[m.c.TrustDomain.IDString()]; ok {
for _, rootCA := range b.RootCas {
if rootCA.TaintedKey {
cert, err := x509.ParseCertificate(rootCA.DerBytes)
if err != nil {
return nil, nil, fmt.Errorf("failed to parse tainted x509 authority: %w", err)
}
subjectKeyID := x509util.SubjectKeyIDToString(cert.SubjectKeyId)
taintedX509Authorities = append(taintedX509Authorities, subjectKeyID)
}
}
for _, jwtKey := range b.JwtSigningKeys {
if jwtKey.TaintedKey {
taintedJWTAuthorities = append(taintedJWTAuthorities, jwtKey.Kid)
}
}
}

cacheEntries := make(map[string]*common.RegistrationEntry)
storeEntries := make(map[string]*common.RegistrationEntry)

Expand All @@ -271,11 +342,15 @@ func (m *manager) fetchEntries(ctx context.Context) (_ *cache.UpdateEntries, _ *
}

return &cache.UpdateEntries{
Bundles: bundles,
RegistrationEntries: cacheEntries,
Bundles: bundles,
RegistrationEntries: cacheEntries,
TaintedJWTAuthorities: taintedJWTAuthorities,
TaintedX509Authorities: taintedX509Authorities,
}, &cache.UpdateEntries{
Bundles: bundles,
RegistrationEntries: storeEntries,
Bundles: bundles,
RegistrationEntries: storeEntries,
TaintedJWTAuthorities: taintedJWTAuthorities,
TaintedX509Authorities: taintedX509Authorities,
}, nil
}

Expand Down Expand Up @@ -307,3 +382,14 @@ func parseBundles(bundles map[string]*common.Bundle) (map[spiffeid.TrustDomain]*
}
return out, nil
}

func getNewItems(current map[string]struct{}, items []string) []string {
var newItems []string
for _, subjectKeyID := range items {
if _, ok := current[subjectKeyID]; !ok {
newItems = append(newItems, subjectKeyID)
}
}

return newItems
}
Loading

0 comments on commit 4bfad98

Please sign in to comment.