From 58c250c626b805580a431df20822f6bcf43aff85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Garc=C3=ADa=20Crespo?= Date: Wed, 8 May 2024 18:13:24 +0000 Subject: [PATCH] Draft outputClientScriptJob --- hack/ccp/.golangci.yml | 4 -- hack/ccp/internal/controller/iterator.go | 29 +++++++-- hack/ccp/internal/controller/jobs.go | 76 ++++++++++++++++++++---- hack/ccp/internal/controller/package.go | 2 +- hack/ccp/internal/controller/task.go | 5 +- 5 files changed, 92 insertions(+), 24 deletions(-) diff --git a/hack/ccp/.golangci.yml b/hack/ccp/.golangci.yml index 3163d1a64..f312822a6 100644 --- a/hack/ccp/.golangci.yml +++ b/hack/ccp/.golangci.yml @@ -8,10 +8,6 @@ linters: - gci - unparam - gosec - disable: - # There is too much WIP code at the moment. - # TODO: enable this ASAP. - - unused issues: exclude-dirs: diff --git a/hack/ccp/internal/controller/iterator.go b/hack/ccp/internal/controller/iterator.go index 318732d5c..1eae8489d 100644 --- a/hack/ccp/internal/controller/iterator.go +++ b/hack/ccp/internal/controller/iterator.go @@ -15,7 +15,7 @@ import ( var errWait = errors.New("wait") -// A chain is used to carry local state. +// A chain is used for passing information between jobs. // // In Archivematica the workflow is structured around chains and links. // A chain is a sequence of links used to accomplish a broader task or set of @@ -32,11 +32,20 @@ var errWait = errors.New("wait") // without introducing backward-incompatible changes given the reliance on it // in some edge cases like reingest, etc. type chain struct { - wc *workflow.Chain // The chain link in the workflow. - pCtx *packageContext // Local state. - choices any // TODO: see `generated_choices` in `chain.py`. + // The properties of the chain as described by the workflow document. + wc *workflow.Chain + + // A map of replacement variables for tasks. + // TODO: why are we not using replacementMappings instead? + pCtx *packageContext + + // choices is a list of choices available from script output, e.g. available + // storage service locations. Choices are generated by outputClientScriptJob + // and presented as decision points using via outputDecisionJob. + choices map[string]outputClientScriptChoice } +// update the context of the chain with a new map. func (c *chain) update(kvs map[string]string) { for k, v := range kvs { c.pCtx.Set(k, string(v)) @@ -73,7 +82,17 @@ func NewIterator(logger logr.Logger, gearman *gearmin.Server, wf *workflow.Docum return iter } -func (i *iterator) Process(ctx context.Context) error { +func (i *iterator) Process(ctx context.Context) (err error) { + if err := i.p.markAsProcessing(ctx); err != nil { + return err + } + defer func() { + // TODO: can we be more specific? E.g. failed or completed. + if markErr := i.p.markAsDone(ctx); err != nil { + err = errors.Join(err, markErr) + } + }() + next := i.startAt for { diff --git a/hack/ccp/internal/controller/jobs.go b/hack/ccp/internal/controller/jobs.go index d33d5840d..b548dd6fd 100644 --- a/hack/ccp/internal/controller/jobs.go +++ b/hack/ccp/internal/controller/jobs.go @@ -3,6 +3,7 @@ package controller import ( "context" "database/sql" + "encoding/json" "errors" "fmt" "io" @@ -316,8 +317,6 @@ var _ jobRunner = (*updateContextDecisionJob)(nil) // 'bd899573-694e-4d33-8c9b-df0af802437d', should result in that decision taking // effect for all of the others as well. This allows that. // TODO: this should be defined in the workflow, not hardcoded here. -// -// nolint: unused var updateContextDecisionJobChoiceMapping = map[uuid.UUID]uuid.UUID{ // Decision point "Assign UUIDs to directories?". uuid.MustParse("8882bad4-561c-4126-89c9-f7f0c083d5d7"): uuid.MustParse("bd899573-694e-4d33-8c9b-df0af802437d"), @@ -723,24 +722,77 @@ func newOutputClientScriptJob(j *job) (*outputClientScriptJob, error) { }, nil } +// The list of choices are represented using a dictionary as follows: +// +// { +// "default": {"description": "asdf", "uri": "asdf"}, +// "5c732a52-6cdb-4b50-ac2e-ae10361b019a": {"description": "asdf", "uri": "asdf"}, +// } type outputClientScriptChoice struct { Description string `json:"description"` URI string `json:"uri"` } func (l *outputClientScriptJob) exec(ctx context.Context) (uuid.UUID, error) { - // We always need output for this type of job. - // Submission of one task only, like in directoryClientScriptJob. - // Unmarshal task.stdout: - // { - // "default": {"description": "asdf", "uri": "asdf"}, - // "5c732a52-6cdb-4b50-ac2e-ae10361b019a": {"description": "asdf", "uri": "asdf"}, - // } - // Then update generated_choices: self.job_chain.generated_choices = choices. + if err := l.j.pkg.reload(ctx); err != nil { + return uuid.Nil, fmt.Errorf("reload: %v", err) + } + if err := l.j.save(ctx); err != nil { + return uuid.Nil, fmt.Errorf("save: %v", err) + } - panic("not implemented") + taskResult, err := l.submitTasks(ctx) + if err != nil { + return uuid.Nil, fmt.Errorf("submit task: %v", err) + } - return uuid.Nil, nil // nolint: govet + choices := map[string]outputClientScriptChoice{} + if err := json.Unmarshal([]byte(taskResult.Stdout), &choices); err != nil { + l.j.logger.Error(err, "Unable to parse output: %s", taskResult.Stdout) + } else { + l.j.chain.choices = choices + } + + if err := l.j.updateStatusFromExitCode(ctx, taskResult.ExitCode); err != nil { + return uuid.Nil, err + } + + if ec, ok := l.j.wl.ExitCodes[taskResult.ExitCode]; ok { + if ec.LinkID == nil { + return uuid.Nil, io.EOF // End of chain. + } + return *ec.LinkID, nil + } + + if l.j.wl.FallbackLinkID == uuid.Nil { + return uuid.Nil, io.EOF // End of chain. + } + + return uuid.Nil, nil +} + +func (l *outputClientScriptJob) submitTasks(ctx context.Context) (*taskResult, error) { + rm := l.j.pkg.unit.replacements(l.config.FilterSubdir).update(l.j.chain.pCtx) + args := rm.replaceValues(l.config.Arguments) + stdout := rm.replaceValues(l.config.StdoutFile) + stderr := rm.replaceValues(l.config.StderrFile) + + taskBackend := newTaskBackend(l.j.logger, l.j, l.j.pkg.store, l.j.gearman, l.config) + if err := taskBackend.submit(ctx, rm, args, true, stdout, stderr); err != nil { + return nil, err + } + + results, err := taskBackend.wait(ctx) + if err != nil { + return nil, fmt.Errorf("wait: %v", err) + } + + ret := results.First() + if ret == nil { + return nil, errors.New("submit task: no results") + } + + return ret, nil } // setUnitVarLinkJob is a local job that sets the unit variable configured in diff --git a/hack/ccp/internal/controller/package.go b/hack/ccp/internal/controller/package.go index 09ed658e3..93f58c0fa 100644 --- a/hack/ccp/internal/controller/package.go +++ b/hack/ccp/internal/controller/package.go @@ -680,7 +680,7 @@ func loadContext(ctx context.Context, p *Package) (*packageContext, error) { return pCtx, nil } -func (ctx *packageContext) copy() *orderedmap.OrderedMap[string, string] { +func (ctx *packageContext) copy() *orderedmap.OrderedMap[string, string] { //nolint: unused return ctx.Copy() } diff --git a/hack/ccp/internal/controller/task.go b/hack/ccp/internal/controller/task.go index 056d66931..864434285 100644 --- a/hack/ccp/internal/controller/task.go +++ b/hack/ccp/internal/controller/task.go @@ -311,8 +311,9 @@ type task struct { rm replacementMapping stdoutFilePath string stderrFilePath string - exitCode *int - completedAt time.Time + + // exitCode *int + // completedAt time.Time } func (t task) MarshalJSON() ([]byte, error) {