diff --git a/Makefile b/Makefile index a40f7f4a6..603485d03 100644 --- a/Makefile +++ b/Makefile @@ -134,11 +134,13 @@ bins: thriftc $(ALL_SRC) $(BUILD)/copyright lint $(BUILD)/dummy unit_test: $(BUILD)/dummy @mkdir -p $(COVER_ROOT) @echo "mode: atomic" > $(UT_COVER_FILE) - @for dir in $(UT_DIRS); do \ + @failed=0; \ + for dir in $(UT_DIRS); do \ mkdir -p $(COVER_ROOT)/"$$dir"; \ - go test "$$dir" $(TEST_ARG) -coverprofile=$(COVER_ROOT)/"$$dir"/cover.out || exit 1; \ + go test "$$dir" $(TEST_ARG) -coverprofile=$(COVER_ROOT)/"$$dir"/cover.out || failed=1; \ cat $(COVER_ROOT)/"$$dir"/cover.out | grep -v "mode: atomic" >> $(UT_COVER_FILE); \ - done; + done; \ + exit $$failed integ_test_sticky_off: $(BUILD)/dummy @mkdir -p $(COVER_ROOT) diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index 87223a824..184fb4f4b 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -476,16 +476,24 @@ func (w *workflowExecutionContextImpl) Lock() { } func (w *workflowExecutionContextImpl) Unlock(err error) { + cleared := false + cached := getWorkflowCache().Exist(w.workflowInfo.WorkflowExecution.RunID) if err != nil || w.err != nil || w.isWorkflowCompleted || (w.wth.disableStickyExecution && !w.hasPendingLocalActivityWork()) { // TODO: in case of closed, it assumes the close decision always succeed. need server side change to return - // error to indicate the close failure case. This should be rear case. For now, always remove the cache, and + // error to indicate the close failure case. This should be rare case. For now, always remove the cache, and // if the close decision failed, the next decision will have to rebuild the state. - if getWorkflowCache().Exist(w.workflowInfo.WorkflowExecution.RunID) { + if cached { + // also clears state asynchronously via cache eviction removeWorkflowContext(w.workflowInfo.WorkflowExecution.RunID) } else { - // sticky is disabled, manually clear the workflow state. w.clearState() } + cleared = true + } + // there are a variety of reasons a workflow may not have been put into the cache. + // all of them mean we need to clear the state at this point, or any running goroutines will be orphaned. + if !cleared && !cached { + w.clearState() } w.mutex.Unlock() diff --git a/internal/internal_task_handlers_test.go b/internal/internal_task_handlers_test.go index 9f86f8d9b..f9200b658 100644 --- a/internal/internal_task_handlers_test.go +++ b/internal/internal_task_handlers_test.go @@ -28,18 +28,19 @@ import ( "fmt" "reflect" "strings" + "sync" "testing" "time" + "github.com/golang/mock/gomock" "github.com/opentracing/opentracing-go" "github.com/pborman/uuid" - "github.com/stretchr/testify/suite" - - "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" "go.uber.org/cadence/.gen/go/cadence/workflowservicetest" s "go.uber.org/cadence/.gen/go/shared" "go.uber.org/cadence/internal/common" + "go.uber.org/goleak" "go.uber.org/zap" ) @@ -416,7 +417,7 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_ActivityTaskScheduled() { t.NotNil(response.Decisions[0].CompleteWorkflowExecutionDecisionAttributes) } -func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow_Sticky() { +func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow() { // Schedule an activity and see if we complete workflow. taskList := "sticky-tl" execution := &s.WorkflowExecution{ @@ -463,8 +464,12 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow_Sticky() { t.verifyQueryResult(queryResp, "waiting-activity-result") } -func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow_NonSticky() { +func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow_2() { // Schedule an activity and see if we complete workflow. + + // This test appears to be just a finer-grained version of TestWorkflowTask_QueryWorkflow, though the older names + // for them implied entirely different purposes. Likely it can be combined with TestWorkflowTask_QueryWorkflow + // without losing anything useful. taskList := "tl1" testEvents := []*s.HistoryEvent{ createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), @@ -478,6 +483,8 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow_NonSticky() { }), createTestEventActivityTaskStarted(6, &s.ActivityTaskStartedEventAttributes{}), createTestEventActivityTaskCompleted(7, &s.ActivityTaskCompletedEventAttributes{ScheduledEventId: common.Int64Ptr(5)}), + // TODO: below seems irrational. there's a start without a schedule, and this workflow does not respond to signals. + // aside from this, the list of tasks is the same as TestWorkflowTask_QueryWorkflow createTestEventDecisionTaskStarted(8), createTestEventWorkflowExecutionSignaled(9, "test-signal"), } @@ -1449,6 +1456,104 @@ func (t *TaskHandlersTestSuite) TestActivityExecutionWorkerStop() { t.NotNil(r) } +// a regrettably-hacky func to use goleak to count leaking goroutines. +// ideally there will be a structured way to do this in the future, rather than string parsing +func countLeaks(leaks error) int { + if leaks == nil { + return 0 + } + // leak messages look something like: + // Goroutine 23 in state chan receive, with go.uber.org/cadence/internal.(*coroutineState).initialYield on top of the stack: + // ... stacktrace ... + // + // Goroutine 28 ... on top of the stack: + // ... stacktrace ... + return strings.Count(leaks.Error(), "on top of the stack") +} + +func (t *TaskHandlersTestSuite) TestRegression_QueriesDoNotLeakGoroutines() { + // this test must not be run in parallel with most other tests, as it mutates global vars + var ridsToCleanUp []string + originalLeaks := goleak.Find() + defer func(size int) { + // empty the cache to clear out any newly-introduced leaks + current := getWorkflowCache() + for _, rid := range ridsToCleanUp { + current.Delete(rid) + } + // check the cleanup + currentLeaks := goleak.Find() + if countLeaks(currentLeaks) != countLeaks(originalLeaks) { + t.T().Errorf("failed to clean up goroutines.\nOriginal state:\n%v\n\nCurrent state:\n%v", originalLeaks, currentLeaks) + } + + // reset everything to make it "normal". + // this does NOT restore the original workflow cache - that cannot be done correctly, initCacheOnce is not safe to copy (thus restore). + stickyCacheSize = size + workflowCache = nil + initCacheOnce = sync.Once{} + }(stickyCacheSize) + workflowCache = nil + initCacheOnce = sync.Once{} + // cache is intentionally not *disabled*, as that would go down no-cache code paths. + // also, there is an LRU-cache bug where the size allows N to enter, but then removes until N-1 remain, + // so a size of 2 actually means a size of 1. + SetStickyWorkflowCacheSize(2) + + taskList := "tl1" + params := workerExecutionParameters{ + TaskList: taskList, + Identity: "test-id-1", + Logger: t.logger, + DisableStickyExecution: false, + } + taskHandler := newWorkflowTaskHandler(testDomain, params, nil, t.registry) + + // process a throw-away workflow to fill the cache. this is copied from TestWorkflowTask_QueryWorkflow since it's + // relatively simple, but any should work fine, as long as it can be queried. + testEvents := []*s.HistoryEvent{ + createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), + createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), + createTestEventDecisionTaskStarted(3), + createTestEventDecisionTaskCompleted(4, &s.DecisionTaskCompletedEventAttributes{ScheduledEventId: common.Int64Ptr(2)}), + createTestEventActivityTaskScheduled(5, &s.ActivityTaskScheduledEventAttributes{ + ActivityId: common.StringPtr("0"), + ActivityType: &s.ActivityType{Name: common.StringPtr("Greeter_Activity")}, + TaskList: &s.TaskList{Name: &taskList}, + }), + } + cachedTask := createWorkflowTask(testEvents[0:1], 1, "HelloWorld_Workflow") + cachedTask.WorkflowExecution.WorkflowId = common.StringPtr("cache-filling workflow id") + ridsToCleanUp = append(ridsToCleanUp, *cachedTask.WorkflowExecution.RunId) + _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: cachedTask}, nil) + + // sanity check that the cache was indeed filled, and that it has created a goroutine + require.NoError(t.T(), err, "cache-filling must succeed") + require.Equal(t.T(), 1, getWorkflowCache().Size(), "workflow should be cached, but was not") + oneCachedLeak := goleak.Find() + require.Error(t.T(), oneCachedLeak, "expected at least one leaking goroutine") + require.Equal(t.T(), countLeaks(originalLeaks)+1, countLeaks(oneCachedLeak), // ideally == 1, but currently there are other leaks + "expected the cached workflow to leak one goroutine. original leaks:\n%v\n\nleaks after one workflow:\n%v", originalLeaks, oneCachedLeak) + + // now query a different workflow ID / run ID + uncachedTask := createQueryTask(testEvents, 5, "HelloWorld_Workflow", queryType) + uncachedTask.WorkflowExecution.WorkflowId = common.StringPtr("should not leak this workflow id") + ridsToCleanUp = append(ridsToCleanUp, *uncachedTask.WorkflowExecution.RunId) // only necessary if the test fails + result, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: uncachedTask}, nil) + require.NoError(t.T(), err) + t.verifyQueryResult(result, "waiting-activity-result") // largely a sanity check + + // and finally the purpose of this test: + // verify that the cache has not been modified, and that there is no new leak + t.Equal(1, getWorkflowCache().Size(), "workflow cache should be the same size") + t.True(getWorkflowCache().Exist(cachedTask.WorkflowExecution.GetRunId()), "originally-cached workflow should still be cached") + t.False(getWorkflowCache().Exist(uncachedTask.WorkflowExecution.GetRunId()), "queried workflow should not be cached") + newLeaks := goleak.Find() + t.Error(newLeaks, "expected at least one leaking goroutine") + t.Equal(countLeaks(oneCachedLeak), countLeaks(newLeaks), + "expected the query to leak no new goroutines. before query:\n%v\n\nafter query:\n%v", oneCachedLeak, newLeaks) +} + func Test_NonDeterministicCheck(t *testing.T) { decisionTypes := s.DecisionType_Values() require.Equal(t, 13, len(decisionTypes), "If you see this error, you are adding new decision type. "+