Skip to content

Commit

Permalink
Use timestamp invalidate kubelet response cache (spiffe#5620)
Browse files Browse the repository at this point in the history
Changes the K8s workload attestor to invalidate the cache response based
on a timestamp instead of kicking off a goroutine.

While this means that the plugin will always have a cached response in
memory, it simplifies the code for both the plugin and tests. In
practice, only an idle agent would waste memory holding onto the
response.

Signed-off-by: Andrew Harding <[email protected]>
  • Loading branch information
azdagron authored Oct 31, 2024
1 parent e692085 commit 2287067
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 60 deletions.
51 changes: 14 additions & 37 deletions pkg/agent/plugin/workloadattestor/k8s/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,32 +274,18 @@ type Plugin struct {
containerHelper ContainerHelper
sigstoreVerifier sigstore.Verifier

cachedPodList map[string]*fastjson.Value
singleflight singleflight.Group

shutdownCtx context.Context
shutdownCtxCancel context.CancelFunc
shutdownWG sync.WaitGroup
cachedPodList map[string]*fastjson.Value
cachedPodListValidUntil time.Time
singleflight singleflight.Group
}

func New() *Plugin {
ctx, cancel := context.WithCancel(context.Background())

return &Plugin{
clock: clock.New(),
getenv: os.Getenv,
shutdownCtx: ctx,
shutdownCtxCancel: cancel,
clock: clock.New(),
getenv: os.Getenv,
}
}

func (p *Plugin) Close() error {
p.shutdownCtxCancel()
p.shutdownWG.Wait()

return nil
}

func (p *Plugin) SetLogger(log hclog.Logger) {
p.log = log
}
Expand Down Expand Up @@ -332,7 +318,7 @@ func (p *Plugin) Attest(ctx context.Context, req *workloadattestorv1.AttestReque
for attempt := 1; ; attempt++ {
log = log.With(telemetry.Attempt, attempt)

podList, err := p.getPodList(ctx, config.Client)
podList, err := p.getPodList(ctx, config.Client, config.PollRetryInterval/2)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -465,31 +451,22 @@ func (p *Plugin) getConfig() (*k8sConfig, ContainerHelper, sigstore.Verifier, er
return p.config, p.containerHelper, p.sigstoreVerifier, nil
}

func (p *Plugin) setPodListCache(podList map[string]*fastjson.Value, expires time.Duration) {
func (p *Plugin) setPodListCache(podList map[string]*fastjson.Value, cacheFor time.Duration) {
p.mu.Lock()
defer p.mu.Unlock()

p.cachedPodList = podList

p.shutdownWG.Add(1)
go func() {
defer p.shutdownWG.Done()

select {
case <-p.clock.After(expires):
case <-p.shutdownCtx.Done():
}

p.mu.Lock()
defer p.mu.Unlock()
p.cachedPodList = nil
}()
p.cachedPodListValidUntil = p.clock.Now().Add(cacheFor)
}

func (p *Plugin) getPodListCache() map[string]*fastjson.Value {
p.mu.RLock()
defer p.mu.RUnlock()

if p.clock.Now().Sub(p.cachedPodListValidUntil) >= 0 {
return nil
}

return p.cachedPodList
}

Expand Down Expand Up @@ -668,7 +645,7 @@ func (p *Plugin) getNodeName(name string, env string) string {
}
}

func (p *Plugin) getPodList(ctx context.Context, client *kubeletClient) (map[string]*fastjson.Value, error) {
func (p *Plugin) getPodList(ctx context.Context, client *kubeletClient, cacheFor time.Duration) (map[string]*fastjson.Value, error) {
result := p.getPodListCache()
if result != nil {
return result, nil
Expand Down Expand Up @@ -705,7 +682,7 @@ func (p *Plugin) getPodList(ctx context.Context, client *kubeletClient) (map[str
result[uid] = podValue
}

p.setPodListCache(result, p.config.PollRetryInterval/2)
p.setPodListCache(result, cacheFor)

return result, nil
})
Expand Down
71 changes: 48 additions & 23 deletions pkg/agent/plugin/workloadattestor/k8s/k8s_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
const (
pid = 123

testPollRetryInterval = time.Second

podListFilePath = "testdata/pod_list.json"
podListNotRunningFilePath = "testdata/pod_list_not_running.json"

Expand Down Expand Up @@ -102,8 +104,10 @@ type Suite struct {
dir string
clock *clock.Mock

podList [][]byte
env map[string]string
podListMu sync.RWMutex
podList [][]byte

env map[string]string

// kubelet stuff
server *httptest.Server
Expand Down Expand Up @@ -148,11 +152,10 @@ func (s *Suite) TestAttestWithPidInPodAfterRetry() {

resultCh := s.goAttest(p)

s.clock.WaitForAfter(time.Minute, "waiting for cache expiry timer")
s.clock.WaitForAfter(time.Minute, "waiting for retry timer")
s.clock.Add(time.Second)
s.clock.Add(testPollRetryInterval)
s.clock.WaitForAfter(time.Minute, "waiting for retry timer")
s.clock.Add(time.Second)
s.clock.Add(testPollRetryInterval)

select {
case result := <-resultCh:
Expand Down Expand Up @@ -180,19 +183,25 @@ func (s *Suite) TestAttestWithPidNotInPodCancelsEarly() {
func (s *Suite) TestAttestPodListCache() {
s.startInsecureKubelet()
p := s.loadInsecurePlugin()
s.addGetContainerResponsePidInPod()

// Add two pod listings.
s.addPodListResponse(podListFilePath)
s.addPodListResponse(podListFilePath)
s.Require().Equal(2, s.podListResponseCount())

s.requireAttestSuccessWithPod(p)
s.clock.WaitForAfter(time.Minute, "waiting for cache expiry timer")
// Attest and assert one pod listing was consumed (one remaining)
s.requireAttestSuccess(p, testPodAndContainerSelectors)
s.Require().Equal(1, s.podListResponseCount())

// The pod list is cached so we don't expect a request to kubelet
s.requireAttestSuccessWithPod(p)
// Attest again and assert no pod listing was consumed (still at one)
s.requireAttestSuccess(p, testPodAndContainerSelectors)
s.Require().Equal(1, s.podListResponseCount())

// The cache expires after the clock advances by at least half the retry interval
s.clock.Add(time.Minute)
s.addPodListResponse(podListFilePath)
s.requireAttestSuccessWithPod(p)
// Now expire the cache, attest, and observe the last listing was consumed.
s.clock.Add(testPollRetryInterval / 2)
s.requireAttestSuccess(p, testPodAndContainerSelectors)
s.Require().Equal(0, s.podListResponseCount())
}

func (s *Suite) TestAttestWithPidNotInPodAfterRetry() {
Expand All @@ -207,15 +216,14 @@ func (s *Suite) TestAttestWithPidNotInPodAfterRetry() {

resultCh := s.goAttest(p)

s.clock.WaitForAfter(time.Minute, "waiting for cache expiry timer")
s.clock.WaitForAfter(time.Minute, "waiting for retry timer")
s.clock.Add(time.Second)
s.clock.Add(testPollRetryInterval)
s.clock.WaitForAfter(time.Minute, "waiting for retry timer")
s.clock.Add(time.Second)
s.clock.Add(testPollRetryInterval)
s.clock.WaitForAfter(time.Minute, "waiting for retry timer")
s.clock.Add(time.Second)
s.clock.Add(testPollRetryInterval)
s.clock.WaitForAfter(time.Minute, "waiting for retry timer")
s.clock.Add(time.Second)
s.clock.Add(testPollRetryInterval)

select {
case result := <-resultCh:
Expand Down Expand Up @@ -723,13 +731,11 @@ func (s *Suite) writeFile(path, data string) {
}

func (s *Suite) serveHTTP(w http.ResponseWriter, _ *http.Request) {
// TODO:
if len(s.podList) == 0 {
http.Error(w, "not configured to return a pod list", http.StatusOK)
podList := s.consumePodListResponse()
if podList == nil {
http.Error(w, "not configured to return a pod list", http.StatusInternalServerError)
return
}
podList := s.podList[0]
s.podList = s.podList[1:]
_, _ = w.Write(podList)
}

Expand Down Expand Up @@ -970,9 +976,28 @@ func (s *Suite) addPodListResponse(fixturePath string) {
podList, err := os.ReadFile(fixturePath)
s.Require().NoError(err)

s.podListMu.Lock()
defer s.podListMu.Unlock()
s.podList = append(s.podList, podList)
}

func (s *Suite) consumePodListResponse() []byte {
s.podListMu.Lock()
defer s.podListMu.Unlock()
if len(s.podList) > 0 {
podList := s.podList[0]
s.podList = s.podList[1:]
return podList
}
return nil
}

func (s *Suite) podListResponseCount() int {
s.podListMu.RLock()
defer s.podListMu.RUnlock()
return len(s.podList)
}

type fakeSigstoreVerifier struct {
mu sync.Mutex

Expand Down

0 comments on commit 2287067

Please sign in to comment.