diff --git a/daisy/workflow/common.go b/daisy/workflow/common.go index dee9bfac1..807268e8f 100644 --- a/daisy/workflow/common.go +++ b/daisy/workflow/common.go @@ -70,6 +70,15 @@ func getGCSAPIPath(p string) (string, error) { return fmt.Sprintf("%s/%s", gcsAPIBase, path.Join(b, o)), nil } +func minInt(x int, ys ...int) int { + for _, y := range ys { + if y < x { + x = y + } + } + return x +} + func randString(n int) string { gen := rand.New(rand.NewSource(time.Now().UnixNano())) letters := "bdghjlmnpqrstvwxyz0123456789" diff --git a/daisy/workflow/common_test.go b/daisy/workflow/common_test.go index 73dee6e25..1a7c77aa3 100644 --- a/daisy/workflow/common_test.go +++ b/daisy/workflow/common_test.go @@ -62,6 +62,26 @@ func TestGetGCSAPIPath(t *testing.T) { } } +func TestMinInt(t *testing.T) { + tests := []struct { + desc string + x int + ys []int + want int + }{ + {"single int case", 1, nil, 1}, + {"first int case", 2, []int{1}, 1}, + {"same ints case", 2, []int{2}, 2}, + {"third int case", 4, []int{3, 2}, 2}, + } + + for _, tt := range tests { + if got := minInt(tt.x, tt.ys...); got != tt.want { + t.Errorf("%s: %d != %d", tt.desc, got, tt.want) + } + } +} + func TestRandString(t *testing.T) { for i := 0; i < 10; i++ { l := len(randString(i)) diff --git a/daisy/workflow/step.go b/daisy/workflow/step.go index fc78aa038..e2e4e32ee 100644 --- a/daisy/workflow/step.go +++ b/daisy/workflow/step.go @@ -106,11 +106,15 @@ func (s *Step) stepImpl() (stepImpl, error) { } func (s *Step) depends(other *Step) bool { + if s == nil || other == nil || s.w == nil || s.w != other.w { + return false + } deps := s.w.Dependencies steps := s.w.Steps q := deps[s.name] seen := map[string]bool{} + // Do a BFS search on s's dependencies, looking for the target dependency. Don't revisit visited dependencies. for i := 0; i < len(q); i++ { name := q[i] if seen[name] { @@ -128,6 +132,54 @@ func (s *Step) depends(other *Step) bool { return false } +// nestedDepends determines if s depends on other, taking into account the recursive, nested nature of +// workflows, i.e. workflows in IncludeWorkflow and SubWorkflow. +// Example: if s depends on an IncludeWorkflow whose workflow contains other, then s depends on other. +func (s *Step) nestedDepends(other *Step) bool { + sChain := s.getChain() + oChain := other.getChain() + // If sChain and oChain don't share the same root workflow, then there is no dependency relationship. + if len(sChain) == 0 || len(oChain) == 0 || sChain[0].w != oChain[0].w { + return false + } + + // Find where the step chains diverge. + // A divergence in the chains indicates sibling steps, where we can check dependency. + // We want to see if s's branch depends on other's branch. + var sStep, oStep *Step + for i := 0; i < minInt(len(sChain), len(oChain)); i++ { + sStep = sChain[i] + oStep = oChain[i] + if sStep != oStep { + break + } + } + return sStep.depends(oStep) +} + +// getChain returns the step chain getting to a step. A link in the chain represents an IncludeWorkflow step, a +// SubWorkflow step, or the step itself. +// For example, workflow A has a step s1 which includes workflow B. B has a step s2 which subworkflows C. Finally, +// C has a step s3. s3.getChain() will return []*Step{s1, s2, s3} +func (s *Step) getChain() []*Step { + if s == nil || s.w == nil { + return nil + } + if s.w.parent == nil { + return []*Step{s} + } + for _, st := range s.w.parent.Steps { + if st.IncludeWorkflow != nil && st.IncludeWorkflow.w == s.w { + return append(st.getChain(), s) + } + if st.SubWorkflow != nil && st.SubWorkflow.w == s.w { + return append(st.getChain(), s) + } + } + // We shouldn't get here. + return nil +} + func (s *Step) run(ctx context.Context) error { impl, err := s.stepImpl() if err != nil { diff --git a/daisy/workflow/step_includeworkflow.go b/daisy/workflow/step_includeworkflow.go index a047563a6..5f1f7d960 100644 --- a/daisy/workflow/step_includeworkflow.go +++ b/daisy/workflow/step_includeworkflow.go @@ -33,6 +33,7 @@ type IncludeWorkflow struct { } func (i *IncludeWorkflow) populate(ctx context.Context, s *Step) error { + i.w.parent = s.w i.w.GCSPath = s.w.GCSPath i.w.Name = s.name i.w.Project = s.w.Project diff --git a/daisy/workflow/step_sub_workflow.go b/daisy/workflow/step_sub_workflow.go index f314ec338..b2252e879 100644 --- a/daisy/workflow/step_sub_workflow.go +++ b/daisy/workflow/step_sub_workflow.go @@ -27,6 +27,7 @@ type SubWorkflow struct { } func (s *SubWorkflow) populate(ctx context.Context, st *Step) error { + s.w.parent = st.w s.w.GCSPath = fmt.Sprintf("gs://%s/%s", s.w.parent.bucket, s.w.parent.scratchPath) s.w.Name = st.name s.w.Project = s.w.parent.Project diff --git a/daisy/workflow/step_test.go b/daisy/workflow/step_test.go index c08bf0658..5dce0d7e2 100644 --- a/daisy/workflow/step_test.go +++ b/daisy/workflow/step_test.go @@ -66,6 +66,96 @@ func TestDepends(t *testing.T) { } } +func TestGetChain(t *testing.T) { + a := &Workflow{} + b := &Workflow{parent: a} + c := &Workflow{parent: b} + a1 := &Step{w: a} + a2 := &Step{w: a, IncludeWorkflow: &IncludeWorkflow{w: b}} + b1 := &Step{w: b} + b2 := &Step{w: b, SubWorkflow: &SubWorkflow{w: c}} + c1 := &Step{w: c} + orphan := &Step{} + a.Steps = map[string]*Step{"a1": a1, "a2": a2} + b.Steps = map[string]*Step{"b1": b1, "b2": b2} + c.Steps = map[string]*Step{"c1": c1} + + tests := []struct { + desc string + s *Step + wantChain []*Step + }{ + {"leaf case", a1, []*Step{a1}}, + {"step from include case", b1, []*Step{a2, b1}}, + {"step from sub case", c1, []*Step{a2, b2, c1}}, + {"orphan step case", orphan, nil}, + } + + for _, tt := range tests { + if chain := tt.s.getChain(); !reflect.DeepEqual(chain, tt.wantChain) { + t.Errorf("%s: %v != %v", tt.desc, chain, tt.wantChain) + } + } +} + +func TestNestedDepends(t *testing.T) { + // root -- a1 (some step) + // | + // -- a2 (IncludeWorkflow) -- b1 (SubWorkflow) -- c1 (some step) + // | + // -- b2 (SubWorkflow) -- d1 (some step) + // | + // -- b3 (some step) + // different root -- e1 (some step) + a := &Workflow{} + b := &Workflow{parent: a} + c := &Workflow{parent: b} + d := &Workflow{parent: b} + e := &Workflow{} + a1 := &Step{name: "a1", w: a} + a2 := &Step{name: "a2", w: a, IncludeWorkflow: &IncludeWorkflow{w: b}} + b1 := &Step{name: "b1", w: b, SubWorkflow: &SubWorkflow{w: c}} + b2 := &Step{name: "b2", w: b, SubWorkflow: &SubWorkflow{w: d}} + b3 := &Step{name: "b3", w: b} + c1 := &Step{name: "c1", w: c} + d1 := &Step{name: "d1", w: d} + e1 := &Step{name: "e1", w: e} + orphan := &Step{} + a.Steps = map[string]*Step{"a1": a1, "a2": a2} + b.Steps = map[string]*Step{"b1": b1, "b2": b2, "b3": b3} + c.Steps = map[string]*Step{"c1": c1} + d.Steps = map[string]*Step{"d1": d1} + e.Steps = map[string]*Step{"e1": e1} + a.Dependencies = map[string][]string{"a1": {"a2"}} + b.Dependencies = map[string][]string{"b1": {"b2"}} + + tests := []struct { + desc string + s1, s2 *Step + want bool + }{ + {"depends on niece/nephew case", a1, b3, true}, + {"doesn't depend on niece/nephew case", b3, c1, false}, + {"depends on great niece/nephew case", a1, c1, true}, + {"doesn't depend on son/daughter case", b1, c1, false}, + {"doesn't depend on mother/father case", c1, b1, false}, + {"depends on aunt/uncle case", c1, b2, true}, + {"depends on cousin case", c1, d1, true}, + {"doesn't depend on brother from another mother case", a1, e1, false}, + {"orphan step case", a1, orphan, false}, + {"orphan step case 2", orphan, a1, false}, + {"nil step case", a1, nil, false}, + {"nil step case 2", nil, a1, false}, + } + + for _, tt := range tests { + got := tt.s1.nestedDepends(tt.s2) + if got != tt.want { + t.Errorf("%s: got %t, want %t", tt.desc, got, tt.want) + } + } +} + func TestStepImpl(t *testing.T) { // Good. Try normal, working case. tests := []struct {