Skip to content

Commit

Permalink
feat(scheduler): gracefully shutdown
Browse files Browse the repository at this point in the history
wait for workers to finish before exiting

should fix tests reporting they couldn't remove rootDir because it's being
written by tasks

Signed-off-by: Petu Eusebiu <[email protected]>
  • Loading branch information
eusebiu-constantin-petu-dbk committed Nov 17, 2023
1 parent 8dd06c6 commit 5ad1c4f
Show file tree
Hide file tree
Showing 46 changed files with 571 additions and 350 deletions.
28 changes: 15 additions & 13 deletions pkg/api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Controller struct {
SyncOnDemand SyncOnDemand
RelyingParties map[string]rp.RelyingParty
CookieStore *CookieStore
taskScheduler *scheduler.Scheduler
// runtime params
chosenPort int // kernel-chosen port
}
Expand Down Expand Up @@ -366,13 +367,14 @@ func (c *Controller) LoadNewConfig(reloadCtx context.Context, newConfig *config.
}

func (c *Controller) Shutdown() {
c.taskScheduler.Shutdown()
ctx := context.Background()
_ = c.Server.Shutdown(ctx)
}

func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
taskScheduler := scheduler.NewScheduler(c.Config, c.Log)
taskScheduler.RunScheduler(reloadCtx)
c.taskScheduler = scheduler.NewScheduler(c.Config, c.Log)
c.taskScheduler.RunScheduler(reloadCtx)

// Enable running garbage-collect periodically for DefaultStore
if c.Config.Storage.GC {
Expand All @@ -381,20 +383,20 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
ImageRetention: c.Config.Storage.Retention,
}, c.Audit, c.Log)

gc.CleanImageStorePeriodically(c.Config.Storage.GCInterval, taskScheduler)
gc.CleanImageStorePeriodically(c.Config.Storage.GCInterval, c.taskScheduler)
}

// Enable running dedupe blobs both ways (dedupe or restore deduped blobs)
c.StoreController.DefaultStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
c.StoreController.DefaultStore.RunDedupeBlobs(time.Duration(0), c.taskScheduler)

// Enable extensions if extension config is provided for DefaultStore
if c.Config != nil && c.Config.Extensions != nil {
ext.EnableMetricsExtension(c.Config, c.Log, c.Config.Storage.RootDirectory)
ext.EnableSearchExtension(c.Config, c.StoreController, c.MetaDB, taskScheduler, c.CveScanner, c.Log)
ext.EnableSearchExtension(c.Config, c.StoreController, c.MetaDB, c.taskScheduler, c.CveScanner, c.Log)
}
// runs once if metrics are enabled & imagestore is local
if c.Config.IsMetricsEnabled() && c.Config.Storage.StorageDriver == nil {
c.StoreController.DefaultStore.PopulateStorageMetrics(time.Duration(0), taskScheduler)
c.StoreController.DefaultStore.PopulateStorageMetrics(time.Duration(0), c.taskScheduler)
}

if c.Config.Storage.SubPaths != nil {
Expand All @@ -407,7 +409,7 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
ImageRetention: storageConfig.Retention,
}, c.Audit, c.Log)

gc.CleanImageStorePeriodically(storageConfig.GCInterval, taskScheduler)
gc.CleanImageStorePeriodically(storageConfig.GCInterval, c.taskScheduler)
}

// Enable extensions if extension config is provided for subImageStore
Expand All @@ -418,19 +420,19 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
// Enable running dedupe blobs both ways (dedupe or restore deduped blobs) for subpaths
substore := c.StoreController.SubStore[route]
if substore != nil {
substore.RunDedupeBlobs(time.Duration(0), taskScheduler)
substore.RunDedupeBlobs(time.Duration(0), c.taskScheduler)

if c.Config.IsMetricsEnabled() && c.Config.Storage.StorageDriver == nil {
substore.PopulateStorageMetrics(time.Duration(0), taskScheduler)
substore.PopulateStorageMetrics(time.Duration(0), c.taskScheduler)
}
}
}
}

if c.Config.Extensions != nil {
ext.EnableScrubExtension(c.Config, c.Log, c.StoreController, taskScheduler)
ext.EnableScrubExtension(c.Config, c.Log, c.StoreController, c.taskScheduler)
//nolint: contextcheck
syncOnDemand, err := ext.EnableSyncExtension(c.Config, c.MetaDB, c.StoreController, taskScheduler, c.Log)
syncOnDemand, err := ext.EnableSyncExtension(c.Config, c.MetaDB, c.StoreController, c.taskScheduler, c.Log)
if err != nil {
c.Log.Error().Err(err).Msg("unable to start sync extension")
}
Expand All @@ -439,11 +441,11 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
}

if c.CookieStore != nil {
c.CookieStore.RunSessionCleaner(taskScheduler)
c.CookieStore.RunSessionCleaner(c.taskScheduler)
}

// we can later move enabling the other scheduled tasks inside the call below
ext.EnableScheduledTasks(c.Config, taskScheduler, c.MetaDB, c.Log) //nolint: contextcheck
ext.EnableScheduledTasks(c.Config, c.taskScheduler, c.MetaDB, c.Log) //nolint: contextcheck
}

type SyncOnDemand interface {
Expand Down
12 changes: 6 additions & 6 deletions pkg/api/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8378,7 +8378,7 @@ func TestGCSignaturesAndUntaggedManifestsWithMetaDB(t *testing.T) {
So(len(index.Manifests), ShouldEqual, 1)

// shouldn't do anything
err = gc.CleanRepo(repoName) //nolint: contextcheck
err = gc.CleanRepo(ctx, repoName) //nolint: contextcheck
So(err, ShouldBeNil)

// make sure both signatures are stored in repodb
Expand All @@ -8404,7 +8404,7 @@ func TestGCSignaturesAndUntaggedManifestsWithMetaDB(t *testing.T) {
err = UploadImage(img, baseURL, repoName, img.DigestStr())
So(err, ShouldBeNil)

err = gc.CleanRepo(repoName)
err = gc.CleanRepo(ctx, repoName)
So(err, ShouldNotBeNil)

err = os.Chmod(path.Join(dir, repoName, "blobs", "sha256", refs.Manifests[0].Digest.Encoded()), 0o755)
Expand All @@ -8418,7 +8418,7 @@ func TestGCSignaturesAndUntaggedManifestsWithMetaDB(t *testing.T) {
err = UploadImage(img, baseURL, repoName, tag)
So(err, ShouldBeNil)

err = gc.CleanRepo(repoName)
err = gc.CleanRepo(ctx, repoName)
So(err, ShouldNotBeNil)

err = os.WriteFile(path.Join(dir, repoName, "blobs", "sha256", refs.Manifests[0].Digest.Encoded()), content, 0o600)
Expand Down Expand Up @@ -8469,7 +8469,7 @@ func TestGCSignaturesAndUntaggedManifestsWithMetaDB(t *testing.T) {
So(err, ShouldBeNil)
newManifestDigest := godigest.FromBytes(manifestBuf)

err = gc.CleanRepo(repoName) //nolint: contextcheck
err = gc.CleanRepo(ctx, repoName) //nolint: contextcheck
So(err, ShouldBeNil)

// make sure both signatures are removed from metaDB and repo reference for untagged is removed
Expand Down Expand Up @@ -8548,7 +8548,7 @@ func TestGCSignaturesAndUntaggedManifestsWithMetaDB(t *testing.T) {
So(err, ShouldBeNil)

cm := test.NewControllerManager(ctlr)
cm.StartAndWait(port)
cm.StartAndWait(port) //nolint: contextcheck
defer cm.StopServer()

gc := gc.NewGarbageCollect(ctlr.StoreController.DefaultStore, ctlr.MetaDB,
Expand Down Expand Up @@ -8606,7 +8606,7 @@ func TestGCSignaturesAndUntaggedManifestsWithMetaDB(t *testing.T) {
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, http.StatusCreated)

err = gc.CleanRepo(repoName)
err = gc.CleanRepo(ctx, repoName)
So(err, ShouldBeNil)

resp, err = resty.R().SetHeader("Content-Type", ispec.MediaTypeImageIndex).
Expand Down
4 changes: 2 additions & 2 deletions pkg/cli/client/cve_cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ func TestCVESort(t *testing.T) {
}

ctlr.CveScanner = mocks.CveScannerMock{
ScanImageFn: func(image string) (map[string]cvemodel.CVE, error) {
ScanImageFn: func(ctx context.Context, image string) (map[string]cvemodel.CVE, error) {
return map[string]cvemodel.CVE{
"CVE-2023-1255": {
ID: "CVE-2023-1255",
Expand Down Expand Up @@ -687,7 +687,7 @@ func getMockCveScanner(metaDB mTypes.MetaDB) cveinfo.Scanner {
// MetaDB loaded with initial data now mock the scanner
// Setup test CVE data in mock scanner
scanner := mocks.CveScannerMock{
ScanImageFn: func(image string) (map[string]cvemodel.CVE, error) {
ScanImageFn: func(ctx context.Context, image string) (map[string]cvemodel.CVE, error) {
if strings.Contains(image, "zot-cve-test@sha256:db573b01") ||
image == "zot-cve-test:0.0.1" {
return map[string]cvemodel.CVE{
Expand Down
10 changes: 10 additions & 0 deletions pkg/common/common.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package common

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -135,3 +136,12 @@ func IsReferrersTag(tag string) bool {

return referrersTagRule.MatchString(tag)
}

func IsContextDone(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}
6 changes: 5 additions & 1 deletion pkg/extensions/imagetrust/image_trust.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,12 @@ func (validityT *validityTask) DoWork(ctx context.Context) error {
validityT.log.Info().Msg("update signatures validity")

for signedManifest, sigs := range validityT.repo.Signatures {
if zcommon.IsContextDone(ctx) {
return ctx.Err()
}

if len(sigs[zcommon.CosignSignature]) != 0 || len(sigs[zcommon.NotationSignature]) != 0 {
err := validityT.metaDB.UpdateSignaturesValidity(validityT.repo.Name, godigest.Digest(signedManifest))
err := validityT.metaDB.UpdateSignaturesValidity(ctx, validityT.repo.Name, godigest.Digest(signedManifest))
if err != nil {
validityT.log.Info().Msg("error while verifying signatures")

Expand Down
16 changes: 8 additions & 8 deletions pkg/extensions/search/convert/convert_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestCVEConvert(t *testing.T) {
Vulnerabilities: false,
},
mocks.CveInfoMock{
GetCVESummaryForImageMediaFn: func(repo string, digest, mediaType string,
GetCVESummaryForImageMediaFn: func(ctx context.Context, repo string, digest, mediaType string,
) (cvemodel.ImageCVESummary, error) {
return cvemodel.ImageCVESummary{}, ErrTestError
},
Expand Down Expand Up @@ -99,7 +99,7 @@ func TestCVEConvert(t *testing.T) {
Vulnerabilities: false,
},
mocks.CveInfoMock{
GetCVESummaryForImageMediaFn: func(repo string, digest, mediaType string,
GetCVESummaryForImageMediaFn: func(ctx context.Context, repo string, digest, mediaType string,
) (cvemodel.ImageCVESummary, error) {
return cvemodel.ImageCVESummary{
Count: 1,
Expand All @@ -126,7 +126,7 @@ func TestCVEConvert(t *testing.T) {
Vulnerabilities: false,
},
mocks.CveInfoMock{
GetCVESummaryForImageMediaFn: func(repo string, digest, mediaType string,
GetCVESummaryForImageMediaFn: func(ctx context.Context, repo string, digest, mediaType string,
) (cvemodel.ImageCVESummary, error) {
return cvemodel.ImageCVESummary{}, ErrTestError
},
Expand All @@ -149,7 +149,7 @@ func TestCVEConvert(t *testing.T) {
Vulnerabilities: false,
},
mocks.CveInfoMock{
GetCVESummaryForImageMediaFn: func(repo string, digest, mediaType string,
GetCVESummaryForImageMediaFn: func(ctx context.Context, repo string, digest, mediaType string,
) (cvemodel.ImageCVESummary, error) {
return cvemodel.ImageCVESummary{
Count: 1,
Expand Down Expand Up @@ -179,7 +179,7 @@ func TestCVEConvert(t *testing.T) {
Vulnerabilities: false,
},
mocks.CveInfoMock{
GetCVESummaryForImageMediaFn: func(repo string, digest, mediaType string,
GetCVESummaryForImageMediaFn: func(ctx context.Context, repo string, digest, mediaType string,
) (cvemodel.ImageCVESummary, error) {
return cvemodel.ImageCVESummary{
Count: 1,
Expand Down Expand Up @@ -207,7 +207,7 @@ func TestCVEConvert(t *testing.T) {
Vulnerabilities: false,
},
mocks.CveInfoMock{
GetCVESummaryForImageMediaFn: func(repo string, digest, mediaType string,
GetCVESummaryForImageMediaFn: func(ctx context.Context, repo string, digest, mediaType string,
) (cvemodel.ImageCVESummary, error) {
return cvemodel.ImageCVESummary{
Count: 1,
Expand Down Expand Up @@ -248,7 +248,7 @@ func TestCVEConvert(t *testing.T) {
Vulnerabilities: false,
},
mocks.CveInfoMock{
GetCVESummaryForImageMediaFn: func(repo string, digest, mediaType string,
GetCVESummaryForImageMediaFn: func(ctx context.Context, repo string, digest, mediaType string,
) (cvemodel.ImageCVESummary, error) {
return cvemodel.ImageCVESummary{
Count: 1,
Expand All @@ -271,7 +271,7 @@ func TestCVEConvert(t *testing.T) {
Vulnerabilities: false,
},
mocks.CveInfoMock{
GetCVESummaryForImageMediaFn: func(repo string, digest, mediaType string,
GetCVESummaryForImageMediaFn: func(ctx context.Context, repo string, digest, mediaType string,
) (cvemodel.ImageCVESummary, error) {
return cvemodel.ImageCVESummary{}, ErrTestError
},
Expand Down
4 changes: 2 additions & 2 deletions pkg/extensions/search/convert/cve.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func updateImageSummaryVulnerabilities(
return
}

imageCveSummary, err := cveInfo.GetCVESummaryForImageMedia(*imageSummary.RepoName, *imageSummary.Digest,
imageCveSummary, err := cveInfo.GetCVESummaryForImageMedia(ctx, *imageSummary.RepoName, *imageSummary.Digest,
*imageSummary.MediaType)
if err != nil {
// Log the error, but we should still include the image in results
Expand Down Expand Up @@ -91,7 +91,7 @@ func updateManifestSummaryVulnerabilities(
return
}

imageCveSummary, err := cveInfo.GetCVESummaryForImageMedia(repoName, *manifestSummary.Digest,
imageCveSummary, err := cveInfo.GetCVESummaryForImageMedia(ctx, repoName, *manifestSummary.Digest,
ispec.MediaTypeImageManifest)
if err != nil {
// Log the error, but we should still include the manifest in results
Expand Down
Loading

0 comments on commit 5ad1c4f

Please sign in to comment.