Skip to content

Commit

Permalink
feat(scheduler): gracefully shutdown
Browse files Browse the repository at this point in the history
wait for workers to finish before exiting

should fix tests reporting they couldn't remove rootDir because it's being
written by tasks

Signed-off-by: Petu Eusebiu <[email protected]>
  • Loading branch information
eusebiu-constantin-petu-dbk committed Nov 23, 2023
1 parent 83f287d commit 2537af6
Show file tree
Hide file tree
Showing 49 changed files with 711 additions and 379 deletions.
28 changes: 15 additions & 13 deletions pkg/api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Controller struct {
SyncOnDemand SyncOnDemand
RelyingParties map[string]rp.RelyingParty
CookieStore *CookieStore
taskScheduler *scheduler.Scheduler
// runtime params
chosenPort int // kernel-chosen port
}
Expand Down Expand Up @@ -366,13 +367,14 @@ func (c *Controller) LoadNewConfig(reloadCtx context.Context, newConfig *config.
}

func (c *Controller) Shutdown() {
c.taskScheduler.Shutdown()
ctx := context.Background()
_ = c.Server.Shutdown(ctx)
}

func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
taskScheduler := scheduler.NewScheduler(c.Config, c.Log)
taskScheduler.RunScheduler(reloadCtx)
c.taskScheduler = scheduler.NewScheduler(c.Config, c.Log)
c.taskScheduler.RunScheduler(reloadCtx)

// Enable running garbage-collect periodically for DefaultStore
if c.Config.Storage.GC {
Expand All @@ -381,20 +383,20 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
ImageRetention: c.Config.Storage.Retention,
}, c.Audit, c.Log)

gc.CleanImageStorePeriodically(c.Config.Storage.GCInterval, taskScheduler)
gc.CleanImageStorePeriodically(c.Config.Storage.GCInterval, c.taskScheduler)
}

// Enable running dedupe blobs both ways (dedupe or restore deduped blobs)
c.StoreController.DefaultStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
c.StoreController.DefaultStore.RunDedupeBlobs(time.Duration(0), c.taskScheduler)

// Enable extensions if extension config is provided for DefaultStore
if c.Config != nil && c.Config.Extensions != nil {
ext.EnableMetricsExtension(c.Config, c.Log, c.Config.Storage.RootDirectory)
ext.EnableSearchExtension(c.Config, c.StoreController, c.MetaDB, taskScheduler, c.CveScanner, c.Log)
ext.EnableSearchExtension(c.Config, c.StoreController, c.MetaDB, c.taskScheduler, c.CveScanner, c.Log)
}
// runs once if metrics are enabled & imagestore is local
if c.Config.IsMetricsEnabled() && c.Config.Storage.StorageDriver == nil {
c.StoreController.DefaultStore.PopulateStorageMetrics(time.Duration(0), taskScheduler)
c.StoreController.DefaultStore.PopulateStorageMetrics(time.Duration(0), c.taskScheduler)
}

if c.Config.Storage.SubPaths != nil {
Expand All @@ -407,7 +409,7 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
ImageRetention: storageConfig.Retention,
}, c.Audit, c.Log)

gc.CleanImageStorePeriodically(storageConfig.GCInterval, taskScheduler)
gc.CleanImageStorePeriodically(storageConfig.GCInterval, c.taskScheduler)
}

// Enable extensions if extension config is provided for subImageStore
Expand All @@ -418,19 +420,19 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
// Enable running dedupe blobs both ways (dedupe or restore deduped blobs) for subpaths
substore := c.StoreController.SubStore[route]
if substore != nil {
substore.RunDedupeBlobs(time.Duration(0), taskScheduler)
substore.RunDedupeBlobs(time.Duration(0), c.taskScheduler)

if c.Config.IsMetricsEnabled() && c.Config.Storage.StorageDriver == nil {
substore.PopulateStorageMetrics(time.Duration(0), taskScheduler)
substore.PopulateStorageMetrics(time.Duration(0), c.taskScheduler)
}
}
}
}

if c.Config.Extensions != nil {
ext.EnableScrubExtension(c.Config, c.Log, c.StoreController, taskScheduler)
ext.EnableScrubExtension(c.Config, c.Log, c.StoreController, c.taskScheduler)
//nolint: contextcheck
syncOnDemand, err := ext.EnableSyncExtension(c.Config, c.MetaDB, c.StoreController, taskScheduler, c.Log)
syncOnDemand, err := ext.EnableSyncExtension(c.Config, c.MetaDB, c.StoreController, c.taskScheduler, c.Log)
if err != nil {
c.Log.Error().Err(err).Msg("unable to start sync extension")
}
Expand All @@ -439,11 +441,11 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
}

if c.CookieStore != nil {
c.CookieStore.RunSessionCleaner(taskScheduler)
c.CookieStore.RunSessionCleaner(c.taskScheduler)
}

// we can later move enabling the other scheduled tasks inside the call below
ext.EnableScheduledTasks(c.Config, taskScheduler, c.MetaDB, c.Log) //nolint: contextcheck
ext.EnableScheduledTasks(c.Config, c.taskScheduler, c.MetaDB, c.Log) //nolint: contextcheck
}

type SyncOnDemand interface {
Expand Down
12 changes: 6 additions & 6 deletions pkg/api/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8378,7 +8378,7 @@ func TestGCSignaturesAndUntaggedManifestsWithMetaDB(t *testing.T) {
So(len(index.Manifests), ShouldEqual, 1)

// shouldn't do anything
err = gc.CleanRepo(repoName) //nolint: contextcheck
err = gc.CleanRepo(ctx, repoName) //nolint: contextcheck
So(err, ShouldBeNil)

// make sure both signatures are stored in repodb
Expand All @@ -8404,7 +8404,7 @@ func TestGCSignaturesAndUntaggedManifestsWithMetaDB(t *testing.T) {
err = UploadImage(img, baseURL, repoName, img.DigestStr())
So(err, ShouldBeNil)

err = gc.CleanRepo(repoName)
err = gc.CleanRepo(ctx, repoName)
So(err, ShouldNotBeNil)

err = os.Chmod(path.Join(dir, repoName, "blobs", "sha256", refs.Manifests[0].Digest.Encoded()), 0o755)
Expand All @@ -8418,7 +8418,7 @@ func TestGCSignaturesAndUntaggedManifestsWithMetaDB(t *testing.T) {
err = UploadImage(img, baseURL, repoName, tag)
So(err, ShouldBeNil)

err = gc.CleanRepo(repoName)
err = gc.CleanRepo(ctx, repoName)
So(err, ShouldNotBeNil)

err = os.WriteFile(path.Join(dir, repoName, "blobs", "sha256", refs.Manifests[0].Digest.Encoded()), content, 0o600)
Expand Down Expand Up @@ -8469,7 +8469,7 @@ func TestGCSignaturesAndUntaggedManifestsWithMetaDB(t *testing.T) {
So(err, ShouldBeNil)
newManifestDigest := godigest.FromBytes(manifestBuf)

err = gc.CleanRepo(repoName) //nolint: contextcheck
err = gc.CleanRepo(ctx, repoName) //nolint: contextcheck
So(err, ShouldBeNil)

// make sure both signatures are removed from metaDB and repo reference for untagged is removed
Expand Down Expand Up @@ -8548,7 +8548,7 @@ func TestGCSignaturesAndUntaggedManifestsWithMetaDB(t *testing.T) {
So(err, ShouldBeNil)

cm := test.NewControllerManager(ctlr)
cm.StartAndWait(port)
cm.StartAndWait(port) //nolint: contextcheck
defer cm.StopServer()

gc := gc.NewGarbageCollect(ctlr.StoreController.DefaultStore, ctlr.MetaDB,
Expand Down Expand Up @@ -8606,7 +8606,7 @@ func TestGCSignaturesAndUntaggedManifestsWithMetaDB(t *testing.T) {
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, http.StatusCreated)

err = gc.CleanRepo(repoName)
err = gc.CleanRepo(ctx, repoName)
So(err, ShouldBeNil)

resp, err = resty.R().SetHeader("Content-Type", ispec.MediaTypeImageIndex).
Expand Down
22 changes: 11 additions & 11 deletions pkg/cli/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (p *requestsPool) doJob(ctx context.Context, job *httpJob) {
header, err := makeHEADRequest(ctx, job.url, job.username, job.password, job.config.VerifyTLS,
job.config.Debug)
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return
}
p.outputCh <- stringResult{"", err}
Expand All @@ -231,7 +231,7 @@ func (p *requestsPool) doJob(ctx context.Context, job *httpJob) {
case ispec.MediaTypeImageManifest:
image, err := fetchImageManifestStruct(ctx, job)
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return
}
p.outputCh <- stringResult{"", err}
Expand All @@ -242,23 +242,23 @@ func (p *requestsPool) doJob(ctx context.Context, job *httpJob) {

str, err := image.string(job.config.OutputFormat, len(job.imageName), len(job.tagName), len(platformStr), verbose)
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return
}
p.outputCh <- stringResult{"", err}

return
}

if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return
}

p.outputCh <- stringResult{str, nil}
case ispec.MediaTypeImageIndex:
image, err := fetchImageIndexStruct(ctx, job)
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return
}
p.outputCh <- stringResult{"", err}
Expand All @@ -270,15 +270,15 @@ func (p *requestsPool) doJob(ctx context.Context, job *httpJob) {

str, err := image.string(job.config.OutputFormat, len(job.imageName), len(job.tagName), len(platformStr), verbose)
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return
}
p.outputCh <- stringResult{"", err}

return
}

if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return
}

Expand All @@ -294,7 +294,7 @@ func fetchImageIndexStruct(ctx context.Context, job *httpJob) (*imageStruct, err
header, err := makeGETRequest(ctx, job.url, job.username, job.password,
job.config.VerifyTLS, job.config.Debug, &indexContent, job.config.ResultWriter)
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return nil, context.Canceled
}

Expand Down Expand Up @@ -385,7 +385,7 @@ func fetchManifestStruct(ctx context.Context, repo, manifestReference string, se
header, err := makeGETRequest(ctx, URL, username, password,
searchConf.VerifyTLS, searchConf.Debug, &manifestResp, searchConf.ResultWriter)
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return common.ManifestSummary{}, context.Canceled
}

Expand All @@ -397,7 +397,7 @@ func fetchManifestStruct(ctx context.Context, repo, manifestReference string, se

configContent, err := fetchConfig(ctx, repo, configDigest, searchConf, username, password)
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return common.ManifestSummary{}, context.Canceled
}

Expand Down Expand Up @@ -474,7 +474,7 @@ func fetchConfig(ctx context.Context, repo, configDigest string, searchConf Sear
_, err := makeGETRequest(ctx, URL, username, password,
searchConf.VerifyTLS, searchConf.Debug, &configContent, searchConf.ResultWriter)
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return ispec.Image{}, context.Canceled
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/cli/client/cve_cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ func TestCVESort(t *testing.T) {
}

ctlr.CveScanner = mocks.CveScannerMock{
ScanImageFn: func(image string) (map[string]cvemodel.CVE, error) {
ScanImageFn: func(ctx context.Context, image string) (map[string]cvemodel.CVE, error) {
return map[string]cvemodel.CVE{
"CVE-2023-1255": {
ID: "CVE-2023-1255",
Expand Down Expand Up @@ -687,7 +687,7 @@ func getMockCveScanner(metaDB mTypes.MetaDB) cveinfo.Scanner {
// MetaDB loaded with initial data now mock the scanner
// Setup test CVE data in mock scanner
scanner := mocks.CveScannerMock{
ScanImageFn: func(image string) (map[string]cvemodel.CVE, error) {
ScanImageFn: func(ctx context.Context, image string) (map[string]cvemodel.CVE, error) {
if strings.Contains(image, "zot-cve-test@sha256:db573b01") ||
image == "zot-cve-test:0.0.1" {
return map[string]cvemodel.CVE{
Expand Down
Loading

0 comments on commit 2537af6

Please sign in to comment.