Skip to content

Commit

Permalink
Step.getChain and Step.nestedDepends. (#103)
Browse files Browse the repository at this point in the history
  • Loading branch information
crunk1 authored Jun 30, 2017
1 parent 77920ac commit c496b8e
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 0 deletions.
9 changes: 9 additions & 0 deletions daisy/workflow/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,15 @@ func getGCSAPIPath(p string) (string, error) {
return fmt.Sprintf("%s/%s", gcsAPIBase, path.Join(b, o)), nil
}

func minInt(x int, ys ...int) int {
for _, y := range ys {
if y < x {
x = y
}
}
return x
}

func randString(n int) string {
gen := rand.New(rand.NewSource(time.Now().UnixNano()))
letters := "bdghjlmnpqrstvwxyz0123456789"
Expand Down
20 changes: 20 additions & 0 deletions daisy/workflow/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,26 @@ func TestGetGCSAPIPath(t *testing.T) {
}
}

func TestMinInt(t *testing.T) {
tests := []struct {
desc string
x int
ys []int
want int
}{
{"single int case", 1, nil, 1},
{"first int case", 2, []int{1}, 1},
{"same ints case", 2, []int{2}, 2},
{"third int case", 4, []int{3, 2}, 2},
}

for _, tt := range tests {
if got := minInt(tt.x, tt.ys...); got != tt.want {
t.Errorf("%s: %d != %d", tt.desc, got, tt.want)
}
}
}

func TestRandString(t *testing.T) {
for i := 0; i < 10; i++ {
l := len(randString(i))
Expand Down
52 changes: 52 additions & 0 deletions daisy/workflow/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,15 @@ func (s *Step) stepImpl() (stepImpl, error) {
}

func (s *Step) depends(other *Step) bool {
if s == nil || other == nil || s.w == nil || s.w != other.w {
return false
}
deps := s.w.Dependencies
steps := s.w.Steps
q := deps[s.name]
seen := map[string]bool{}

// Do a BFS search on s's dependencies, looking for the target dependency. Don't revisit visited dependencies.
for i := 0; i < len(q); i++ {
name := q[i]
if seen[name] {
Expand All @@ -128,6 +132,54 @@ func (s *Step) depends(other *Step) bool {
return false
}

// nestedDepends determines if s depends on other, taking into account the recursive, nested nature of
// workflows, i.e. workflows in IncludeWorkflow and SubWorkflow.
// Example: if s depends on an IncludeWorkflow whose workflow contains other, then s depends on other.
func (s *Step) nestedDepends(other *Step) bool {
sChain := s.getChain()
oChain := other.getChain()
// If sChain and oChain don't share the same root workflow, then there is no dependency relationship.
if len(sChain) == 0 || len(oChain) == 0 || sChain[0].w != oChain[0].w {
return false
}

// Find where the step chains diverge.
// A divergence in the chains indicates sibling steps, where we can check dependency.
// We want to see if s's branch depends on other's branch.
var sStep, oStep *Step
for i := 0; i < minInt(len(sChain), len(oChain)); i++ {
sStep = sChain[i]
oStep = oChain[i]
if sStep != oStep {
break
}
}
return sStep.depends(oStep)
}

// getChain returns the step chain getting to a step. A link in the chain represents an IncludeWorkflow step, a
// SubWorkflow step, or the step itself.
// For example, workflow A has a step s1 which includes workflow B. B has a step s2 which subworkflows C. Finally,
// C has a step s3. s3.getChain() will return []*Step{s1, s2, s3}
func (s *Step) getChain() []*Step {
if s == nil || s.w == nil {
return nil
}
if s.w.parent == nil {
return []*Step{s}
}
for _, st := range s.w.parent.Steps {
if st.IncludeWorkflow != nil && st.IncludeWorkflow.w == s.w {
return append(st.getChain(), s)
}
if st.SubWorkflow != nil && st.SubWorkflow.w == s.w {
return append(st.getChain(), s)
}
}
// We shouldn't get here.
return nil
}

func (s *Step) run(ctx context.Context) error {
impl, err := s.stepImpl()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions daisy/workflow/step_includeworkflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type IncludeWorkflow struct {
}

func (i *IncludeWorkflow) populate(ctx context.Context, s *Step) error {
i.w.parent = s.w
i.w.GCSPath = s.w.GCSPath
i.w.Name = s.name
i.w.Project = s.w.Project
Expand Down
1 change: 1 addition & 0 deletions daisy/workflow/step_sub_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type SubWorkflow struct {
}

func (s *SubWorkflow) populate(ctx context.Context, st *Step) error {
s.w.parent = st.w
s.w.GCSPath = fmt.Sprintf("gs://%s/%s", s.w.parent.bucket, s.w.parent.scratchPath)
s.w.Name = st.name
s.w.Project = s.w.parent.Project
Expand Down
90 changes: 90 additions & 0 deletions daisy/workflow/step_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,96 @@ func TestDepends(t *testing.T) {
}
}

func TestGetChain(t *testing.T) {
a := &Workflow{}
b := &Workflow{parent: a}
c := &Workflow{parent: b}
a1 := &Step{w: a}
a2 := &Step{w: a, IncludeWorkflow: &IncludeWorkflow{w: b}}
b1 := &Step{w: b}
b2 := &Step{w: b, SubWorkflow: &SubWorkflow{w: c}}
c1 := &Step{w: c}
orphan := &Step{}
a.Steps = map[string]*Step{"a1": a1, "a2": a2}
b.Steps = map[string]*Step{"b1": b1, "b2": b2}
c.Steps = map[string]*Step{"c1": c1}

tests := []struct {
desc string
s *Step
wantChain []*Step
}{
{"leaf case", a1, []*Step{a1}},
{"step from include case", b1, []*Step{a2, b1}},
{"step from sub case", c1, []*Step{a2, b2, c1}},
{"orphan step case", orphan, nil},
}

for _, tt := range tests {
if chain := tt.s.getChain(); !reflect.DeepEqual(chain, tt.wantChain) {
t.Errorf("%s: %v != %v", tt.desc, chain, tt.wantChain)
}
}
}

func TestNestedDepends(t *testing.T) {
// root -- a1 (some step)
// |
// -- a2 (IncludeWorkflow) -- b1 (SubWorkflow) -- c1 (some step)
// |
// -- b2 (SubWorkflow) -- d1 (some step)
// |
// -- b3 (some step)
// different root -- e1 (some step)
a := &Workflow{}
b := &Workflow{parent: a}
c := &Workflow{parent: b}
d := &Workflow{parent: b}
e := &Workflow{}
a1 := &Step{name: "a1", w: a}
a2 := &Step{name: "a2", w: a, IncludeWorkflow: &IncludeWorkflow{w: b}}
b1 := &Step{name: "b1", w: b, SubWorkflow: &SubWorkflow{w: c}}
b2 := &Step{name: "b2", w: b, SubWorkflow: &SubWorkflow{w: d}}
b3 := &Step{name: "b3", w: b}
c1 := &Step{name: "c1", w: c}
d1 := &Step{name: "d1", w: d}
e1 := &Step{name: "e1", w: e}
orphan := &Step{}
a.Steps = map[string]*Step{"a1": a1, "a2": a2}
b.Steps = map[string]*Step{"b1": b1, "b2": b2, "b3": b3}
c.Steps = map[string]*Step{"c1": c1}
d.Steps = map[string]*Step{"d1": d1}
e.Steps = map[string]*Step{"e1": e1}
a.Dependencies = map[string][]string{"a1": {"a2"}}
b.Dependencies = map[string][]string{"b1": {"b2"}}

tests := []struct {
desc string
s1, s2 *Step
want bool
}{
{"depends on niece/nephew case", a1, b3, true},
{"doesn't depend on niece/nephew case", b3, c1, false},
{"depends on great niece/nephew case", a1, c1, true},
{"doesn't depend on son/daughter case", b1, c1, false},
{"doesn't depend on mother/father case", c1, b1, false},
{"depends on aunt/uncle case", c1, b2, true},
{"depends on cousin case", c1, d1, true},
{"doesn't depend on brother from another mother case", a1, e1, false},
{"orphan step case", a1, orphan, false},
{"orphan step case 2", orphan, a1, false},
{"nil step case", a1, nil, false},
{"nil step case 2", nil, a1, false},
}

for _, tt := range tests {
got := tt.s1.nestedDepends(tt.s2)
if got != tt.want {
t.Errorf("%s: got %t, want %t", tt.desc, got, tt.want)
}
}
}

func TestStepImpl(t *testing.T) {
// Good. Try normal, working case.
tests := []struct {
Expand Down

0 comments on commit c496b8e

Please sign in to comment.