Skip to content

Commit

Permalink
Draft outputClientScriptJob
Browse files Browse the repository at this point in the history
  • Loading branch information
sevein committed May 8, 2024
1 parent fafc315 commit 58c250c
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 24 deletions.
4 changes: 0 additions & 4 deletions hack/ccp/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@ linters:
- gci
- unparam
- gosec
disable:
# There is too much WIP code at the moment.
# TODO: enable this ASAP.
- unused

issues:
exclude-dirs:
Expand Down
29 changes: 24 additions & 5 deletions hack/ccp/internal/controller/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

var errWait = errors.New("wait")

// A chain is used to carry local state.
// A chain is used for passing information between jobs.
//
// In Archivematica the workflow is structured around chains and links.
// A chain is a sequence of links used to accomplish a broader task or set of
Expand All @@ -32,11 +32,20 @@ var errWait = errors.New("wait")
// without introducing backward-incompatible changes given the reliance on it
// in some edge cases like reingest, etc.
type chain struct {
wc *workflow.Chain // The chain link in the workflow.
pCtx *packageContext // Local state.
choices any // TODO: see `generated_choices` in `chain.py`.
// The properties of the chain as described by the workflow document.
wc *workflow.Chain

// A map of replacement variables for tasks.
// TODO: why are we not using replacementMappings instead?
pCtx *packageContext

// choices is a list of choices available from script output, e.g. available
// storage service locations. Choices are generated by outputClientScriptJob
// and presented as decision points using via outputDecisionJob.
choices map[string]outputClientScriptChoice
}

// update the context of the chain with a new map.
func (c *chain) update(kvs map[string]string) {
for k, v := range kvs {
c.pCtx.Set(k, string(v))
Expand Down Expand Up @@ -73,7 +82,17 @@ func NewIterator(logger logr.Logger, gearman *gearmin.Server, wf *workflow.Docum
return iter
}

func (i *iterator) Process(ctx context.Context) error {
func (i *iterator) Process(ctx context.Context) (err error) {
if err := i.p.markAsProcessing(ctx); err != nil {
return err
}
defer func() {
// TODO: can we be more specific? E.g. failed or completed.
if markErr := i.p.markAsDone(ctx); err != nil {
err = errors.Join(err, markErr)
}
}()

next := i.startAt

for {
Expand Down
76 changes: 64 additions & 12 deletions hack/ccp/internal/controller/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controller
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -316,8 +317,6 @@ var _ jobRunner = (*updateContextDecisionJob)(nil)
// 'bd899573-694e-4d33-8c9b-df0af802437d', should result in that decision taking
// effect for all of the others as well. This allows that.
// TODO: this should be defined in the workflow, not hardcoded here.
//
// nolint: unused
var updateContextDecisionJobChoiceMapping = map[uuid.UUID]uuid.UUID{
// Decision point "Assign UUIDs to directories?".
uuid.MustParse("8882bad4-561c-4126-89c9-f7f0c083d5d7"): uuid.MustParse("bd899573-694e-4d33-8c9b-df0af802437d"),
Expand Down Expand Up @@ -723,24 +722,77 @@ func newOutputClientScriptJob(j *job) (*outputClientScriptJob, error) {
}, nil
}

// The list of choices are represented using a dictionary as follows:
//
// {
// "default": {"description": "asdf", "uri": "asdf"},
// "5c732a52-6cdb-4b50-ac2e-ae10361b019a": {"description": "asdf", "uri": "asdf"},
// }
type outputClientScriptChoice struct {
Description string `json:"description"`
URI string `json:"uri"`
}

func (l *outputClientScriptJob) exec(ctx context.Context) (uuid.UUID, error) {
// We always need output for this type of job.
// Submission of one task only, like in directoryClientScriptJob.
// Unmarshal task.stdout:
// {
// "default": {"description": "asdf", "uri": "asdf"},
// "5c732a52-6cdb-4b50-ac2e-ae10361b019a": {"description": "asdf", "uri": "asdf"},
// }
// Then update generated_choices: self.job_chain.generated_choices = choices.
if err := l.j.pkg.reload(ctx); err != nil {
return uuid.Nil, fmt.Errorf("reload: %v", err)
}
if err := l.j.save(ctx); err != nil {
return uuid.Nil, fmt.Errorf("save: %v", err)
}

panic("not implemented")
taskResult, err := l.submitTasks(ctx)
if err != nil {
return uuid.Nil, fmt.Errorf("submit task: %v", err)
}

return uuid.Nil, nil // nolint: govet
choices := map[string]outputClientScriptChoice{}
if err := json.Unmarshal([]byte(taskResult.Stdout), &choices); err != nil {
l.j.logger.Error(err, "Unable to parse output: %s", taskResult.Stdout)
} else {
l.j.chain.choices = choices
}

if err := l.j.updateStatusFromExitCode(ctx, taskResult.ExitCode); err != nil {
return uuid.Nil, err
}

if ec, ok := l.j.wl.ExitCodes[taskResult.ExitCode]; ok {
if ec.LinkID == nil {
return uuid.Nil, io.EOF // End of chain.
}
return *ec.LinkID, nil
}

if l.j.wl.FallbackLinkID == uuid.Nil {
return uuid.Nil, io.EOF // End of chain.
}

return uuid.Nil, nil
}

func (l *outputClientScriptJob) submitTasks(ctx context.Context) (*taskResult, error) {
rm := l.j.pkg.unit.replacements(l.config.FilterSubdir).update(l.j.chain.pCtx)
args := rm.replaceValues(l.config.Arguments)
stdout := rm.replaceValues(l.config.StdoutFile)
stderr := rm.replaceValues(l.config.StderrFile)

taskBackend := newTaskBackend(l.j.logger, l.j, l.j.pkg.store, l.j.gearman, l.config)
if err := taskBackend.submit(ctx, rm, args, true, stdout, stderr); err != nil {
return nil, err
}

results, err := taskBackend.wait(ctx)
if err != nil {
return nil, fmt.Errorf("wait: %v", err)
}

ret := results.First()
if ret == nil {
return nil, errors.New("submit task: no results")
}

return ret, nil
}

// setUnitVarLinkJob is a local job that sets the unit variable configured in
Expand Down
2 changes: 1 addition & 1 deletion hack/ccp/internal/controller/package.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ func loadContext(ctx context.Context, p *Package) (*packageContext, error) {
return pCtx, nil
}

func (ctx *packageContext) copy() *orderedmap.OrderedMap[string, string] {
func (ctx *packageContext) copy() *orderedmap.OrderedMap[string, string] { //nolint: unused
return ctx.Copy()
}

Expand Down
5 changes: 3 additions & 2 deletions hack/ccp/internal/controller/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,9 @@ type task struct {
rm replacementMapping
stdoutFilePath string
stderrFilePath string
exitCode *int
completedAt time.Time

// exitCode *int
// completedAt time.Time
}

func (t task) MarshalJSON() ([]byte, error) {
Expand Down

0 comments on commit 58c250c

Please sign in to comment.