From ed86b8a06e3be9ebfee763ee61180d4047e327cf Mon Sep 17 00:00:00 2001 From: Steven L Date: Tue, 13 Jul 2021 12:40:57 -0700 Subject: [PATCH] Clear workflow state when not cached and not complete (#1111) This resolves a bug a user encountered, where a full sticky cache + a query for a non-cached workflow would result in a goroutine leak due to the associated event handler never being shut down. Since this leak retains all in-workflow data, it can eventually lead to an out-of-memory crash, though it does not cause any logic errors (these abandoned goroutines are forever idle once abandoned). There are probably also other scenarios where this is possible, hopefully all caught by this addition. --- Separately: this function seems to be far too complex, and is almost certainly duplicating checks made elsewhere, which should not be duplicated like this. There have been multiple issues with state-clearing that have lead to adding conditions to this func, which is a clear sign of a code smell. State / cache decisions like this should be made in exactly one place ever, and built up as obviously as possible, to ensure gaps like this never occur. In this case we'll likely need to invert the dependency flow somehow, so callers control when cache is cleared based on whether or not it is cached, rather than double-checking internally like this. We should also probably add something like go.uber.org/goleak to our tests, to help ensure we do not have goroutine leaks. This may not have been caught by that, as the steps leading to it are a bit odd and rely on singleton config (sticky cache size), but it may find or prevent others. With this PR we now have at least one test using it, but it'll probably take some time to roll out all over, and to add missing test-state cleanup funcs. --- Makefile | 8 +- internal/internal_task_handlers.go | 14 ++- internal/internal_task_handlers_test.go | 115 ++++++++++++++++++++++-- 3 files changed, 126 insertions(+), 11 deletions(-) diff --git a/Makefile b/Makefile index a40f7f4a6..603485d03 100644 --- a/Makefile +++ b/Makefile @@ -134,11 +134,13 @@ bins: thriftc $(ALL_SRC) $(BUILD)/copyright lint $(BUILD)/dummy unit_test: $(BUILD)/dummy @mkdir -p $(COVER_ROOT) @echo "mode: atomic" > $(UT_COVER_FILE) - @for dir in $(UT_DIRS); do \ + @failed=0; \ + for dir in $(UT_DIRS); do \ mkdir -p $(COVER_ROOT)/"$$dir"; \ - go test "$$dir" $(TEST_ARG) -coverprofile=$(COVER_ROOT)/"$$dir"/cover.out || exit 1; \ + go test "$$dir" $(TEST_ARG) -coverprofile=$(COVER_ROOT)/"$$dir"/cover.out || failed=1; \ cat $(COVER_ROOT)/"$$dir"/cover.out | grep -v "mode: atomic" >> $(UT_COVER_FILE); \ - done; + done; \ + exit $$failed integ_test_sticky_off: $(BUILD)/dummy @mkdir -p $(COVER_ROOT) diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index 87223a824..184fb4f4b 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -476,16 +476,24 @@ func (w *workflowExecutionContextImpl) Lock() { } func (w *workflowExecutionContextImpl) Unlock(err error) { + cleared := false + cached := getWorkflowCache().Exist(w.workflowInfo.WorkflowExecution.RunID) if err != nil || w.err != nil || w.isWorkflowCompleted || (w.wth.disableStickyExecution && !w.hasPendingLocalActivityWork()) { // TODO: in case of closed, it assumes the close decision always succeed. need server side change to return - // error to indicate the close failure case. This should be rear case. For now, always remove the cache, and + // error to indicate the close failure case. This should be rare case. For now, always remove the cache, and // if the close decision failed, the next decision will have to rebuild the state. - if getWorkflowCache().Exist(w.workflowInfo.WorkflowExecution.RunID) { + if cached { + // also clears state asynchronously via cache eviction removeWorkflowContext(w.workflowInfo.WorkflowExecution.RunID) } else { - // sticky is disabled, manually clear the workflow state. w.clearState() } + cleared = true + } + // there are a variety of reasons a workflow may not have been put into the cache. + // all of them mean we need to clear the state at this point, or any running goroutines will be orphaned. + if !cleared && !cached { + w.clearState() } w.mutex.Unlock() diff --git a/internal/internal_task_handlers_test.go b/internal/internal_task_handlers_test.go index 9f86f8d9b..f9200b658 100644 --- a/internal/internal_task_handlers_test.go +++ b/internal/internal_task_handlers_test.go @@ -28,18 +28,19 @@ import ( "fmt" "reflect" "strings" + "sync" "testing" "time" + "github.com/golang/mock/gomock" "github.com/opentracing/opentracing-go" "github.com/pborman/uuid" - "github.com/stretchr/testify/suite" - - "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" "go.uber.org/cadence/.gen/go/cadence/workflowservicetest" s "go.uber.org/cadence/.gen/go/shared" "go.uber.org/cadence/internal/common" + "go.uber.org/goleak" "go.uber.org/zap" ) @@ -416,7 +417,7 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_ActivityTaskScheduled() { t.NotNil(response.Decisions[0].CompleteWorkflowExecutionDecisionAttributes) } -func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow_Sticky() { +func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow() { // Schedule an activity and see if we complete workflow. taskList := "sticky-tl" execution := &s.WorkflowExecution{ @@ -463,8 +464,12 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow_Sticky() { t.verifyQueryResult(queryResp, "waiting-activity-result") } -func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow_NonSticky() { +func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow_2() { // Schedule an activity and see if we complete workflow. + + // This test appears to be just a finer-grained version of TestWorkflowTask_QueryWorkflow, though the older names + // for them implied entirely different purposes. Likely it can be combined with TestWorkflowTask_QueryWorkflow + // without losing anything useful. taskList := "tl1" testEvents := []*s.HistoryEvent{ createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), @@ -478,6 +483,8 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow_NonSticky() { }), createTestEventActivityTaskStarted(6, &s.ActivityTaskStartedEventAttributes{}), createTestEventActivityTaskCompleted(7, &s.ActivityTaskCompletedEventAttributes{ScheduledEventId: common.Int64Ptr(5)}), + // TODO: below seems irrational. there's a start without a schedule, and this workflow does not respond to signals. + // aside from this, the list of tasks is the same as TestWorkflowTask_QueryWorkflow createTestEventDecisionTaskStarted(8), createTestEventWorkflowExecutionSignaled(9, "test-signal"), } @@ -1449,6 +1456,104 @@ func (t *TaskHandlersTestSuite) TestActivityExecutionWorkerStop() { t.NotNil(r) } +// a regrettably-hacky func to use goleak to count leaking goroutines. +// ideally there will be a structured way to do this in the future, rather than string parsing +func countLeaks(leaks error) int { + if leaks == nil { + return 0 + } + // leak messages look something like: + // Goroutine 23 in state chan receive, with go.uber.org/cadence/internal.(*coroutineState).initialYield on top of the stack: + // ... stacktrace ... + // + // Goroutine 28 ... on top of the stack: + // ... stacktrace ... + return strings.Count(leaks.Error(), "on top of the stack") +} + +func (t *TaskHandlersTestSuite) TestRegression_QueriesDoNotLeakGoroutines() { + // this test must not be run in parallel with most other tests, as it mutates global vars + var ridsToCleanUp []string + originalLeaks := goleak.Find() + defer func(size int) { + // empty the cache to clear out any newly-introduced leaks + current := getWorkflowCache() + for _, rid := range ridsToCleanUp { + current.Delete(rid) + } + // check the cleanup + currentLeaks := goleak.Find() + if countLeaks(currentLeaks) != countLeaks(originalLeaks) { + t.T().Errorf("failed to clean up goroutines.\nOriginal state:\n%v\n\nCurrent state:\n%v", originalLeaks, currentLeaks) + } + + // reset everything to make it "normal". + // this does NOT restore the original workflow cache - that cannot be done correctly, initCacheOnce is not safe to copy (thus restore). + stickyCacheSize = size + workflowCache = nil + initCacheOnce = sync.Once{} + }(stickyCacheSize) + workflowCache = nil + initCacheOnce = sync.Once{} + // cache is intentionally not *disabled*, as that would go down no-cache code paths. + // also, there is an LRU-cache bug where the size allows N to enter, but then removes until N-1 remain, + // so a size of 2 actually means a size of 1. + SetStickyWorkflowCacheSize(2) + + taskList := "tl1" + params := workerExecutionParameters{ + TaskList: taskList, + Identity: "test-id-1", + Logger: t.logger, + DisableStickyExecution: false, + } + taskHandler := newWorkflowTaskHandler(testDomain, params, nil, t.registry) + + // process a throw-away workflow to fill the cache. this is copied from TestWorkflowTask_QueryWorkflow since it's + // relatively simple, but any should work fine, as long as it can be queried. + testEvents := []*s.HistoryEvent{ + createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), + createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), + createTestEventDecisionTaskStarted(3), + createTestEventDecisionTaskCompleted(4, &s.DecisionTaskCompletedEventAttributes{ScheduledEventId: common.Int64Ptr(2)}), + createTestEventActivityTaskScheduled(5, &s.ActivityTaskScheduledEventAttributes{ + ActivityId: common.StringPtr("0"), + ActivityType: &s.ActivityType{Name: common.StringPtr("Greeter_Activity")}, + TaskList: &s.TaskList{Name: &taskList}, + }), + } + cachedTask := createWorkflowTask(testEvents[0:1], 1, "HelloWorld_Workflow") + cachedTask.WorkflowExecution.WorkflowId = common.StringPtr("cache-filling workflow id") + ridsToCleanUp = append(ridsToCleanUp, *cachedTask.WorkflowExecution.RunId) + _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: cachedTask}, nil) + + // sanity check that the cache was indeed filled, and that it has created a goroutine + require.NoError(t.T(), err, "cache-filling must succeed") + require.Equal(t.T(), 1, getWorkflowCache().Size(), "workflow should be cached, but was not") + oneCachedLeak := goleak.Find() + require.Error(t.T(), oneCachedLeak, "expected at least one leaking goroutine") + require.Equal(t.T(), countLeaks(originalLeaks)+1, countLeaks(oneCachedLeak), // ideally == 1, but currently there are other leaks + "expected the cached workflow to leak one goroutine. original leaks:\n%v\n\nleaks after one workflow:\n%v", originalLeaks, oneCachedLeak) + + // now query a different workflow ID / run ID + uncachedTask := createQueryTask(testEvents, 5, "HelloWorld_Workflow", queryType) + uncachedTask.WorkflowExecution.WorkflowId = common.StringPtr("should not leak this workflow id") + ridsToCleanUp = append(ridsToCleanUp, *uncachedTask.WorkflowExecution.RunId) // only necessary if the test fails + result, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: uncachedTask}, nil) + require.NoError(t.T(), err) + t.verifyQueryResult(result, "waiting-activity-result") // largely a sanity check + + // and finally the purpose of this test: + // verify that the cache has not been modified, and that there is no new leak + t.Equal(1, getWorkflowCache().Size(), "workflow cache should be the same size") + t.True(getWorkflowCache().Exist(cachedTask.WorkflowExecution.GetRunId()), "originally-cached workflow should still be cached") + t.False(getWorkflowCache().Exist(uncachedTask.WorkflowExecution.GetRunId()), "queried workflow should not be cached") + newLeaks := goleak.Find() + t.Error(newLeaks, "expected at least one leaking goroutine") + t.Equal(countLeaks(oneCachedLeak), countLeaks(newLeaks), + "expected the query to leak no new goroutines. before query:\n%v\n\nafter query:\n%v", oneCachedLeak, newLeaks) +} + func Test_NonDeterministicCheck(t *testing.T) { decisionTypes := s.DecisionType_Values() require.Equal(t, 13, len(decisionTypes), "If you see this error, you are adding new decision type. "+