Skip to content

Commit

Permalink
Parent task bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
demdxx committed Sep 22, 2024
1 parent 4908f0b commit a30f789
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 19 deletions.
39 changes: 28 additions & 11 deletions mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,35 +82,39 @@ func (srv *TaskMux) Handle(taskName string, handler any) Promise {
return srv.handleExt(taskName, handler, false)
}

func (srv *TaskMux) handleExt(taskName string, handler any, anonymous bool) Promise {
func (srv *TaskMux) handleExt(name string, handler any, anonymous bool) Promise {
var (
parentPromis Promise
parentTaskName = ""
splitName = strings.SplitN(taskName, ">", 2)
parentPromis Promise
parentTaskName, taskName = prepareTaskName(name)
)
if srv.tasks == nil {
srv.tasks = map[string]Promise{}
}
if len(splitName) > 1 {
parentTaskName = splitName[0]
taskName = splitName[1]
}
if _, ok := srv.tasks[taskName]; ok {
panic(errors.Wrap(ErrChanelTaken, taskName))
}
if parentTaskName != "" {
// If there is no parent promis in the scope of local tasks
// then the parent is external task
parentPromis = srv.tasks[taskName]
if parentPromis = srv.tasks[parentTaskName]; parentPromis != nil {
parentPromis = parentPromis.LastPromise()
}
}

taskItemValue := newPoromise(srv, parentPromis, taskName, TaskFrom(handler), anonymous)
srv.tasks[taskName] = taskItemValue
if parentTaskName != "" {

if parentTaskName != "" && parentPromis == nil {
// Links global event name and the target external one
taskItemValue.parent = newPromisVirtual(parentTaskName, taskName)
parentTaskName = "@" + parentTaskName
srv.hiddenTaskMapping[parentTaskName] = append(srv.hiddenTaskMapping[parentTaskName], taskName)
}

// If parent task is not virtual then add event to the parent task
if parentPromis != nil && !parentPromis.IsVirtual() {
parentPromis.ThenEvent(taskName)
}
return taskItemValue
}

Expand All @@ -123,7 +127,7 @@ func (srv *TaskMux) Failver(task any) error {
// Receive definds the processing function
func (srv *TaskMux) Receive(msg Message) error {
event, err := srv.eventAllocator.Decode(msg)
if err != nil {
if event != nil {
defer func() {
_ = srv.eventAllocator.Release(event)
if srv.panicHandler != nil {
Expand Down Expand Up @@ -295,12 +299,25 @@ func (srv *TaskMux) TaskMap() map[string][]string {
mp[eventName] = mergeStrArr(mp[eventName], promiseObject.TargetEventName())
}
}

if srv.hiddenTaskMapping != nil {
for eventName, targetEvent := range srv.hiddenTaskMapping {
mp[eventName] = mergeStrArr(mp[eventName], targetEvent)
}
}

return mp
}

func prepareTaskName(name string) (parent, target string) {
splitName := strings.SplitN(name, ">", 2)
if len(splitName) > 1 {
parent = splitName[0]
target = splitName[1]
} else {
target = splitName[0]
}
return parent, target
}

var _ = (notificationcenter.Receiver)((*TaskMux)(nil))
12 changes: 7 additions & 5 deletions mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ func TestMuxErrorPanic(t *testing.T) {
WithStreamResponseMap(&testPublisher{name: "test"}),
)
)
_ = mux.Handle(`error`, FuncTask(func(_ context.Context, e Event, _ ResponseWriter) error { lastEvent = e; return fmt.Errorf(`test`) }))
_ = mux.Handle(`error`, FuncTask(func(_ context.Context, e Event, _ ResponseWriter) error {
lastEvent = e.(*event).Copy()
return fmt.Errorf(`test`)
}))
_ = mux.Handle(`panic`, FuncTask(func(context.Context, Event, ResponseWriter) error { panic("test") }))
_ = mux.Handle(`panic>noop`, FuncTask(func(context.Context, Event, ResponseWriter) error { panic("noop") }))
_ = mux.Failver(FuncTask(func(context.Context, Event, ResponseWriter) error { isFailover = true; return nil }))
Expand All @@ -33,10 +36,9 @@ func TestMuxErrorPanic(t *testing.T) {
assert.True(t, isFailover, `failover`)
assert.NoError(t, mux.Close())
assert.Equal(t, map[string][]string{
`error`: {},
`panic`: {},
`@panic`: {"noop"},
`noop`: {},
`error`: {},
`panic`: {"noop"},
`noop`: {},
}, mux.TaskMap())
totalTasks, completeTasks := mux.CompleteTasks(lastEvent)
assert.ElementsMatch(t, []string{`error`}, totalTasks)
Expand Down
25 changes: 23 additions & 2 deletions promise.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,15 @@ type Promise interface {
// ThenEvent which need to execute
ThenEvent(name string)

// IsAnonymous promise type
IsAnonymous() bool

// Parent promise item
Parent() Promise

// LastPromise returns the last promise in the chain
LastPromise() Promise

// Task executor interface
Task() Task

Expand Down Expand Up @@ -99,8 +105,7 @@ func (prom *promise) TargetEvent(name string) Promise {
}

func (prom *promise) Then(handler any) Promise {
p := prom.mux.handleExt(prom.genTargetEvent(), handler, true)
p.(*promise).parent = prom
p := prom.mux.handleExt(prom.EventName()+">"+prom.genTargetEvent(), handler, true)
return p
}

Expand All @@ -112,6 +117,19 @@ func (prom *promise) Parent() Promise {
return prom.parent
}

func (prom *promise) LastPromise() Promise {
for _, name := range prom.targetEventName {
if task := prom.mux.tasks[name]; task != nil {
return task.LastPromise()
}
}
return prom
}

func (prom *promise) IsAnonymous() bool {
return prom.anonymous
}

func (prom *promise) Origin(novirtual ...bool) (Promise, int) {
depth := 0
p := prom.Parent()
Expand All @@ -121,6 +139,9 @@ func (prom *promise) Origin(novirtual ...bool) (Promise, int) {
}
for {
depth++
if !p.IsAnonymous() {
break
}
pr := p.Parent()
if pr == nil || pr.IsVirtual() {
break
Expand Down
2 changes: 1 addition & 1 deletion promise_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ var testTask = FuncTask(func(ctx context.Context, event Event, responseWriter Re

func TestPromise(t *testing.T) {
mux := NewTaskMux()
pr1 := newPoromise(mux, nil, "test", testTask, false)
pr1 := mux.Handle("test", testTask)
pr2 := pr1.Then(testTask)
pr3 := pr2.Then(testTask)

Expand Down
6 changes: 6 additions & 0 deletions promise_virtual.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ func (v *promiseVirtual) Then(handler any) Promise {
// ThenEvent which need to execute
func (v *promiseVirtual) ThenEvent(name string) { v.targetEventName = []string{name} }

// IsAnonymous promise type
func (v *promiseVirtual) IsAnonymous() bool { return false }

// LastPromise returns the last promise in the chain
func (v *promiseVirtual) LastPromise() Promise { return nil }

// Parent promise item
func (v *promiseVirtual) Parent() Promise { return nil }

Expand Down

0 comments on commit a30f789

Please sign in to comment.