Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler shutdown gracefully #1951

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions pkg/api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Controller struct {
SyncOnDemand SyncOnDemand
RelyingParties map[string]rp.RelyingParty
CookieStore *CookieStore
taskScheduler *scheduler.Scheduler
// runtime params
chosenPort int // kernel-chosen port
}
Expand Down Expand Up @@ -366,13 +367,14 @@ func (c *Controller) LoadNewConfig(reloadCtx context.Context, newConfig *config.
}

func (c *Controller) Shutdown() {
c.taskScheduler.Shutdown()
ctx := context.Background()
_ = c.Server.Shutdown(ctx)
}

func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
taskScheduler := scheduler.NewScheduler(c.Config, c.Log)
taskScheduler.RunScheduler(reloadCtx)
c.taskScheduler = scheduler.NewScheduler(c.Config, c.Log)
c.taskScheduler.RunScheduler(reloadCtx)

// Enable running garbage-collect periodically for DefaultStore
if c.Config.Storage.GC {
Expand All @@ -381,20 +383,20 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
ImageRetention: c.Config.Storage.Retention,
}, c.Audit, c.Log)

gc.CleanImageStorePeriodically(c.Config.Storage.GCInterval, taskScheduler)
gc.CleanImageStorePeriodically(c.Config.Storage.GCInterval, c.taskScheduler)
}

// Enable running dedupe blobs both ways (dedupe or restore deduped blobs)
c.StoreController.DefaultStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
c.StoreController.DefaultStore.RunDedupeBlobs(time.Duration(0), c.taskScheduler)

// Enable extensions if extension config is provided for DefaultStore
if c.Config != nil && c.Config.Extensions != nil {
ext.EnableMetricsExtension(c.Config, c.Log, c.Config.Storage.RootDirectory)
ext.EnableSearchExtension(c.Config, c.StoreController, c.MetaDB, taskScheduler, c.CveScanner, c.Log)
ext.EnableSearchExtension(c.Config, c.StoreController, c.MetaDB, c.taskScheduler, c.CveScanner, c.Log)
}
// runs once if metrics are enabled & imagestore is local
if c.Config.IsMetricsEnabled() && c.Config.Storage.StorageDriver == nil {
c.StoreController.DefaultStore.PopulateStorageMetrics(time.Duration(0), taskScheduler)
c.StoreController.DefaultStore.PopulateStorageMetrics(time.Duration(0), c.taskScheduler)
}

if c.Config.Storage.SubPaths != nil {
Expand All @@ -407,7 +409,7 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
ImageRetention: storageConfig.Retention,
}, c.Audit, c.Log)

gc.CleanImageStorePeriodically(storageConfig.GCInterval, taskScheduler)
gc.CleanImageStorePeriodically(storageConfig.GCInterval, c.taskScheduler)
}

// Enable extensions if extension config is provided for subImageStore
Expand All @@ -418,19 +420,19 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
// Enable running dedupe blobs both ways (dedupe or restore deduped blobs) for subpaths
substore := c.StoreController.SubStore[route]
if substore != nil {
substore.RunDedupeBlobs(time.Duration(0), taskScheduler)
substore.RunDedupeBlobs(time.Duration(0), c.taskScheduler)

if c.Config.IsMetricsEnabled() && c.Config.Storage.StorageDriver == nil {
substore.PopulateStorageMetrics(time.Duration(0), taskScheduler)
substore.PopulateStorageMetrics(time.Duration(0), c.taskScheduler)
}
}
}
}

if c.Config.Extensions != nil {
ext.EnableScrubExtension(c.Config, c.Log, c.StoreController, taskScheduler)
ext.EnableScrubExtension(c.Config, c.Log, c.StoreController, c.taskScheduler)
//nolint: contextcheck
syncOnDemand, err := ext.EnableSyncExtension(c.Config, c.MetaDB, c.StoreController, taskScheduler, c.Log)
syncOnDemand, err := ext.EnableSyncExtension(c.Config, c.MetaDB, c.StoreController, c.taskScheduler, c.Log)
if err != nil {
c.Log.Error().Err(err).Msg("unable to start sync extension")
}
Expand All @@ -439,11 +441,11 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
}

if c.CookieStore != nil {
c.CookieStore.RunSessionCleaner(taskScheduler)
c.CookieStore.RunSessionCleaner(c.taskScheduler)
}

// we can later move enabling the other scheduled tasks inside the call below
ext.EnableScheduledTasks(c.Config, taskScheduler, c.MetaDB, c.Log) //nolint: contextcheck
ext.EnableScheduledTasks(c.Config, c.taskScheduler, c.MetaDB, c.Log) //nolint: contextcheck
}

type SyncOnDemand interface {
Expand Down
12 changes: 6 additions & 6 deletions pkg/api/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8378,7 +8378,7 @@ func TestGCSignaturesAndUntaggedManifestsWithMetaDB(t *testing.T) {
So(len(index.Manifests), ShouldEqual, 1)

// shouldn't do anything
err = gc.CleanRepo(repoName) //nolint: contextcheck
err = gc.CleanRepo(ctx, repoName) //nolint: contextcheck
So(err, ShouldBeNil)

// make sure both signatures are stored in repodb
Expand All @@ -8404,7 +8404,7 @@ func TestGCSignaturesAndUntaggedManifestsWithMetaDB(t *testing.T) {
err = UploadImage(img, baseURL, repoName, img.DigestStr())
So(err, ShouldBeNil)

err = gc.CleanRepo(repoName)
err = gc.CleanRepo(ctx, repoName)
So(err, ShouldNotBeNil)

err = os.Chmod(path.Join(dir, repoName, "blobs", "sha256", refs.Manifests[0].Digest.Encoded()), 0o755)
Expand All @@ -8418,7 +8418,7 @@ func TestGCSignaturesAndUntaggedManifestsWithMetaDB(t *testing.T) {
err = UploadImage(img, baseURL, repoName, tag)
So(err, ShouldBeNil)

err = gc.CleanRepo(repoName)
err = gc.CleanRepo(ctx, repoName)
So(err, ShouldNotBeNil)

err = os.WriteFile(path.Join(dir, repoName, "blobs", "sha256", refs.Manifests[0].Digest.Encoded()), content, 0o600)
Expand Down Expand Up @@ -8469,7 +8469,7 @@ func TestGCSignaturesAndUntaggedManifestsWithMetaDB(t *testing.T) {
So(err, ShouldBeNil)
newManifestDigest := godigest.FromBytes(manifestBuf)

err = gc.CleanRepo(repoName) //nolint: contextcheck
err = gc.CleanRepo(ctx, repoName) //nolint: contextcheck
So(err, ShouldBeNil)

// make sure both signatures are removed from metaDB and repo reference for untagged is removed
Expand Down Expand Up @@ -8548,7 +8548,7 @@ func TestGCSignaturesAndUntaggedManifestsWithMetaDB(t *testing.T) {
So(err, ShouldBeNil)

cm := test.NewControllerManager(ctlr)
cm.StartAndWait(port)
cm.StartAndWait(port) //nolint: contextcheck
defer cm.StopServer()

gc := gc.NewGarbageCollect(ctlr.StoreController.DefaultStore, ctlr.MetaDB,
Expand Down Expand Up @@ -8606,7 +8606,7 @@ func TestGCSignaturesAndUntaggedManifestsWithMetaDB(t *testing.T) {
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, http.StatusCreated)

err = gc.CleanRepo(repoName)
err = gc.CleanRepo(ctx, repoName)
So(err, ShouldBeNil)

resp, err = resty.R().SetHeader("Content-Type", ispec.MediaTypeImageIndex).
Expand Down
22 changes: 11 additions & 11 deletions pkg/cli/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@
header, err := makeHEADRequest(ctx, job.url, job.username, job.password, job.config.VerifyTLS,
job.config.Debug)
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return
}
p.outputCh <- stringResult{"", err}
Expand All @@ -231,7 +231,7 @@
case ispec.MediaTypeImageManifest:
image, err := fetchImageManifestStruct(ctx, job)
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return
}
p.outputCh <- stringResult{"", err}
Expand All @@ -242,23 +242,23 @@

str, err := image.string(job.config.OutputFormat, len(job.imageName), len(job.tagName), len(platformStr), verbose)
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {

Check warning on line 245 in pkg/cli/client/client.go

View check run for this annotation

Codecov / codecov/patch

pkg/cli/client/client.go#L245

Added line #L245 was not covered by tests
return
}
p.outputCh <- stringResult{"", err}

return
}

if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return
}

p.outputCh <- stringResult{str, nil}
case ispec.MediaTypeImageIndex:
image, err := fetchImageIndexStruct(ctx, job)
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return
}
p.outputCh <- stringResult{"", err}
Expand All @@ -270,15 +270,15 @@

str, err := image.string(job.config.OutputFormat, len(job.imageName), len(job.tagName), len(platformStr), verbose)
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return
}
p.outputCh <- stringResult{"", err}

return
}

if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return
}

Expand All @@ -294,7 +294,7 @@
header, err := makeGETRequest(ctx, job.url, job.username, job.password,
job.config.VerifyTLS, job.config.Debug, &indexContent, job.config.ResultWriter)
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return nil, context.Canceled
}

Expand Down Expand Up @@ -385,7 +385,7 @@
header, err := makeGETRequest(ctx, URL, username, password,
searchConf.VerifyTLS, searchConf.Debug, &manifestResp, searchConf.ResultWriter)
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return common.ManifestSummary{}, context.Canceled
}

Expand All @@ -397,7 +397,7 @@

configContent, err := fetchConfig(ctx, repo, configDigest, searchConf, username, password)
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return common.ManifestSummary{}, context.Canceled
}

Expand Down Expand Up @@ -474,7 +474,7 @@
_, err := makeGETRequest(ctx, URL, username, password,
searchConf.VerifyTLS, searchConf.Debug, &configContent, searchConf.ResultWriter)
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return ispec.Image{}, context.Canceled
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/cli/client/cve_cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ func TestCVESort(t *testing.T) {
}

ctlr.CveScanner = mocks.CveScannerMock{
ScanImageFn: func(image string) (map[string]cvemodel.CVE, error) {
ScanImageFn: func(ctx context.Context, image string) (map[string]cvemodel.CVE, error) {
return map[string]cvemodel.CVE{
"CVE-2023-1255": {
ID: "CVE-2023-1255",
Expand Down Expand Up @@ -687,7 +687,7 @@ func getMockCveScanner(metaDB mTypes.MetaDB) cveinfo.Scanner {
// MetaDB loaded with initial data now mock the scanner
// Setup test CVE data in mock scanner
scanner := mocks.CveScannerMock{
ScanImageFn: func(image string) (map[string]cvemodel.CVE, error) {
ScanImageFn: func(ctx context.Context, image string) (map[string]cvemodel.CVE, error) {
if strings.Contains(image, "zot-cve-test@sha256:db573b01") ||
image == "zot-cve-test:0.0.1" {
return map[string]cvemodel.CVE{
Expand Down
Loading
Loading