Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Taint workload x509 svid v1 #524

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (a *Agent) Run(ctx context.Context) error {
}
}

svidStoreCache := a.newSVIDStoreCache()
svidStoreCache := a.newSVIDStoreCache(metrics)

manager, err := a.newManager(ctx, a.sto, cat, metrics, as, svidStoreCache, nodeAttestor)
if err != nil {
Expand Down Expand Up @@ -328,10 +328,11 @@ func (a *Agent) newManager(ctx context.Context, sto storage.Storage, cat catalog
}
}

func (a *Agent) newSVIDStoreCache() *storecache.Cache {
func (a *Agent) newSVIDStoreCache(metrics telemetry.Metrics) *storecache.Cache {
config := &storecache.Config{
Log: a.c.Log.WithField(telemetry.SubsystemName, "svid_store_cache"),
TrustDomain: a.c.TrustDomain,
Metrics: metrics,
}

return storecache.New(config)
Expand Down
10 changes: 10 additions & 0 deletions pkg/agent/manager/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ type UpdateEntries struct {
// Bundles is a set of ALL trust bundles available to the agent, keyed by trust domain
Bundles map[spiffeid.TrustDomain]*spiffebundle.Bundle

// TaintedX509Authorities is a set of all tainted X.509 authorities notified by the server.
TaintedX509Authorities []string

// TaintedJWTAuthorities is a set of all tainted JWT authorities notified by the server.
TaintedJWTAuthorities []string

// RegistrationEntries is a set of ALL registration entries available to the
// agent, keyed by registration entry id.
RegistrationEntries map[string]*common.RegistrationEntry
Expand Down Expand Up @@ -413,6 +419,10 @@ func (c *Cache) UpdateSVIDs(update *UpdateSVIDs) {
}
}

func (c *Cache) TaintX509SVIDs(context.Context, []*x509.Certificate) {
// This cache is going to be removed in 1.11...
}

// GetStaleEntries obtains a list of stale entries
func (c *Cache) GetStaleEntries() []*StaleEntry {
c.mu.Lock()
Expand Down
94 changes: 94 additions & 0 deletions pkg/agent/manager/cache/lru_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import (
"context"
"crypto/x509"
"fmt"
"sort"
"sync"
Expand All @@ -14,6 +15,7 @@
"github.com/spiffe/spire/pkg/agent/common/backoff"
"github.com/spiffe/spire/pkg/common/telemetry"
agentmetrics "github.com/spiffe/spire/pkg/common/telemetry/agent"
"github.com/spiffe/spire/pkg/common/x509util"
"github.com/spiffe/spire/proto/spire/common"
)

Expand All @@ -22,6 +24,13 @@
DefaultSVIDCacheMaxSize = 1000
// SVIDSyncInterval is the interval at which SVIDs are synced with subscribers
SVIDSyncInterval = 500 * time.Millisecond
// Default batch size for processing tainted SVIDs
defaultProcessingBatchSize = 100
)

var (
// Time interval between SVID batch processing
processingTaintedX509SVIDInterval = 5 * time.Second
)

// Cache caches each registration entry, bundles, and JWT SVIDs for the agent.
Expand Down Expand Up @@ -483,6 +492,34 @@
}
}

// TaintX509SVIDs initiates the processing of all cached SVIDs, checking if they are tainted by the provided authorities.
// It schedules the processing to run asynchronously in batches.
func (c *LRUCache) TaintX509SVIDs(ctx context.Context, taintedX509Authorities []*x509.Certificate) {
c.mu.RLock()
defer c.mu.RUnlock()

var entriesToProcess []string
for key, svid := range c.svids {
if svid != nil {
entriesToProcess = append(entriesToProcess, key)
}
}

// Check if there are any entries to process before scheduling
if len(entriesToProcess) == 0 {
c.log.Debug("No SVID entries to process for tainted X.509 authorities")
return
}

// Schedule the rotation process in a separate goroutine
go func() {
c.scheduleRotation(ctx, entriesToProcess, taintedX509Authorities)
}()

c.log.WithField(telemetry.Count, len(entriesToProcess)).
Debug("Scheduled rotation for SVID entries due to tainted X.509 authorities")
}

// GetStaleEntries obtains a list of stale entries
func (c *LRUCache) GetStaleEntries() []*StaleEntry {
c.mu.Lock()
Expand Down Expand Up @@ -521,6 +558,63 @@
c.syncSVIDsWithSubscribers()
}

// scheduleRotation processes SVID entries in batches, removing those tainted by X.509 authorities.
// The process continues at regular intervals until all entries have been processed or the context is cancelled.
func (c *LRUCache) scheduleRotation(ctx context.Context, entryIDs []string, taintedX509Authorities []*x509.Certificate) {
ticker := c.clk.Ticker(processingTaintedX509SVIDInterval)
defer ticker.Stop()

for len(entryIDs) > 0 {
// Processing SVIDs in batches
batchSize := min(defaultProcessingBatchSize, len(entryIDs))
processingEntries := entryIDs[:batchSize]

start := time.Now()
c.processTaintedSVIDs(processingEntries, taintedX509Authorities)

c.log.Debugf("******************************************************")
c.log.Debugf("Processed %d SVIDs in %v", len(processingEntries), time.Since(start))
c.log.Debugf("******************************************************")

// Update the list to remove processed entries
entryIDs = entryIDs[batchSize:]
c.log.WithField(telemetry.Count, batchSize).Debug("entries left to process")

select {
case <-ticker.C:
case <-ctx.Done():
c.log.Debug("Context cancelled, exiting rotation schedule")
return
}
}
}

// processTaintedSVIDs identifies and removes tainted SVIDs from the cache that have been signed by the given tainted authorities.
func (c *LRUCache) processTaintedSVIDs(entryIDs []string, taintedX509Authorities []*x509.Certificate) {
taintedSVIDs := 0

c.mu.Lock()
defer c.mu.Unlock()

for _, entryID := range entryIDs {
svid, exists := c.svids[entryID]
if !exists || svid == nil {
// Skip if the SVID is not in cache or is nil
continue
}

// Check if the SVID is signed by any tainted authority
if tainted := x509util.IsSignedByRoot(svid.Chain, taintedX509Authorities); tainted {
taintedSVIDs++
delete(c.svids, entryID)
}

}

Check failure on line 612 in pkg/agent/manager/cache/lru_cache.go

View workflow job for this annotation

GitHub Actions / lint (linux)

unnecessary trailing newline (whitespace)

Check failure on line 612 in pkg/agent/manager/cache/lru_cache.go

View workflow job for this annotation

GitHub Actions / lint (windows)

unnecessary trailing newline (whitespace)

agentmetrics.AddCacheManagerTaintedSVIDsSample(c.metrics, "", float32(taintedSVIDs))
c.log.WithField(telemetry.TaintedSVIDs, taintedSVIDs).Debug("Tainted X.509 SVIDs")
}

// Notify subscriber of selector set only if all SVIDs for corresponding selector set are cached
// It returns whether all SVIDs are cached or not.
// This method should be retried with backoff to avoid lock contention.
Expand Down
3 changes: 3 additions & 0 deletions pkg/agent/manager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ func newManager(c *Config) *manager {
client: client,
clk: c.Clk,
svidStoreCache: c.SVIDStoreCache,

processedTaintedX509Authorities: make(map[string]struct{}),
processedTaintedJWTAuthorities: make(map[string]struct{}),
}

return m
Expand Down
8 changes: 8 additions & 0 deletions pkg/agent/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,14 @@ type manager struct {
// cache.
syncedEntries map[string]*common.RegistrationEntry
syncedBundles map[string]*common.Bundle

// processedTaintedX509Authorities holds all the already processed tainted X.509 Authorities
// to prevent processing them again.
processedTaintedX509Authorities map[string]struct{}

// processedTaintedJWTAuthorities holds all the already processed tainted JWT Authorities
// to prevent processing them again.
processedTaintedJWTAuthorities map[string]struct{}
}

func (m *manager) Initialize(ctx context.Context) error {
Expand Down
34 changes: 34 additions & 0 deletions pkg/agent/manager/storecache/cache.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package storecache

import (
"context"
"crypto/x509"
"sort"
"sync"
"time"
Expand All @@ -10,6 +12,8 @@
"github.com/spiffe/go-spiffe/v2/spiffeid"
"github.com/spiffe/spire/pkg/agent/manager/cache"
"github.com/spiffe/spire/pkg/common/telemetry"
telemetry_agent "github.com/spiffe/spire/pkg/common/telemetry/agent"
"github.com/spiffe/spire/pkg/common/x509util"
"github.com/spiffe/spire/proto/spire/common"
)

Expand Down Expand Up @@ -46,6 +50,7 @@
type Config struct {
Log logrus.FieldLogger
TrustDomain spiffeid.TrustDomain
Metrics telemetry.Metrics
}

type Cache struct {
Expand Down Expand Up @@ -219,6 +224,35 @@
}
}

func (c *Cache) TaintX509SVIDs(ctx context.Context, taintedX509Authorities []*x509.Certificate) {
// TOOD: add elapsed time metrics

Check failure on line 228 in pkg/agent/manager/storecache/cache.go

View workflow job for this annotation

GitHub Actions / lint (linux)

`TOOD` is a misspelling of `TODO` (misspell)

Check failure on line 228 in pkg/agent/manager/storecache/cache.go

View workflow job for this annotation

GitHub Actions / lint (windows)

`TOOD` is a misspelling of `TODO` (misspell)
c.mtx.Lock()
defer c.mtx.Unlock()

start := time.Now()

taintedSVIDs := 0
for _, record := range c.records {
// no process already tainted or empty SVIDs
if record.svid == nil {
continue
}

if tainted := x509util.IsSignedByRoot(record.svid.Chain, taintedX509Authorities); tainted {
taintedSVIDs += 1
record.svid = nil
}
}

telemetry_agent.AddCacheManagerExpiredSVIDsSample(c.c.Metrics, "svid_store", float32(taintedSVIDs))
c.c.Log.WithField(telemetry.TaintedSVIDs, taintedSVIDs).Debug("Tainted X.509 SVIDs")

// TODO: remove....
c.c.Log.Debugf("******************************************************")
c.c.Log.Debugf("Duration to process %d svids: %v", taintedSVIDs, time.Since(start))
c.c.Log.Debugf("******************************************************")
}

// GetStaleEntries obtains a list of stale entries, that needs new SVIDs
func (c *Cache) GetStaleEntries() []*cache.StaleEntry {
c.mtx.Lock()
Expand Down
Loading
Loading