Skip to content

Commit

Permalink
Clear workflow state when not cached and not complete (#1111)
Browse files Browse the repository at this point in the history
This resolves a bug a user encountered, where a full sticky cache + a query for a non-cached workflow
would result in a goroutine leak due to the associated event handler never being shut down.
Since this leak retains all in-workflow data, it can eventually lead to an out-of-memory crash, though
it does not cause any logic errors (these abandoned goroutines are forever idle once abandoned).
There are probably also other scenarios where this is possible, hopefully all caught by this addition.

---

Separately: this function seems to be far too complex, and is almost certainly duplicating checks made
elsewhere, which should not be duplicated like this.  There have been multiple issues with
state-clearing that have lead to adding conditions to this func, which is a clear sign of a code smell.

State / cache decisions like this should be made in exactly one place ever, and built up as obviously
as possible, to ensure gaps like this never occur.  In this case we'll likely need to invert the
dependency flow somehow, so callers control when cache is cleared based on whether or not it is cached,
rather than double-checking internally like this.

We should also probably add something like go.uber.org/goleak to our tests, to help ensure we do not
have goroutine leaks.  This may not have been caught by that, as the steps leading to it are a bit odd
and rely on singleton config (sticky cache size), but it may find or prevent others.  With this PR we now
have at least one test using it, but it'll probably take some time to roll out all over, and to add missing
test-state cleanup funcs.
  • Loading branch information
Groxx authored and vytautas-karpavicius committed Aug 23, 2021
1 parent eb5bff7 commit ed86b8a
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 11 deletions.
8 changes: 5 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,13 @@ bins: thriftc $(ALL_SRC) $(BUILD)/copyright lint $(BUILD)/dummy
unit_test: $(BUILD)/dummy
@mkdir -p $(COVER_ROOT)
@echo "mode: atomic" > $(UT_COVER_FILE)
@for dir in $(UT_DIRS); do \
@failed=0; \
for dir in $(UT_DIRS); do \
mkdir -p $(COVER_ROOT)/"$$dir"; \
go test "$$dir" $(TEST_ARG) -coverprofile=$(COVER_ROOT)/"$$dir"/cover.out || exit 1; \
go test "$$dir" $(TEST_ARG) -coverprofile=$(COVER_ROOT)/"$$dir"/cover.out || failed=1; \
cat $(COVER_ROOT)/"$$dir"/cover.out | grep -v "mode: atomic" >> $(UT_COVER_FILE); \
done;
done; \
exit $$failed

integ_test_sticky_off: $(BUILD)/dummy
@mkdir -p $(COVER_ROOT)
Expand Down
14 changes: 11 additions & 3 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,16 +476,24 @@ func (w *workflowExecutionContextImpl) Lock() {
}

func (w *workflowExecutionContextImpl) Unlock(err error) {
cleared := false
cached := getWorkflowCache().Exist(w.workflowInfo.WorkflowExecution.RunID)
if err != nil || w.err != nil || w.isWorkflowCompleted || (w.wth.disableStickyExecution && !w.hasPendingLocalActivityWork()) {
// TODO: in case of closed, it assumes the close decision always succeed. need server side change to return
// error to indicate the close failure case. This should be rear case. For now, always remove the cache, and
// error to indicate the close failure case. This should be rare case. For now, always remove the cache, and
// if the close decision failed, the next decision will have to rebuild the state.
if getWorkflowCache().Exist(w.workflowInfo.WorkflowExecution.RunID) {
if cached {
// also clears state asynchronously via cache eviction
removeWorkflowContext(w.workflowInfo.WorkflowExecution.RunID)
} else {
// sticky is disabled, manually clear the workflow state.
w.clearState()
}
cleared = true
}
// there are a variety of reasons a workflow may not have been put into the cache.
// all of them mean we need to clear the state at this point, or any running goroutines will be orphaned.
if !cleared && !cached {
w.clearState()
}

w.mutex.Unlock()
Expand Down
115 changes: 110 additions & 5 deletions internal/internal_task_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,19 @@ import (
"fmt"
"reflect"
"strings"
"sync"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/opentracing/opentracing-go"
"github.com/pborman/uuid"
"github.com/stretchr/testify/suite"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.uber.org/cadence/.gen/go/cadence/workflowservicetest"
s "go.uber.org/cadence/.gen/go/shared"
"go.uber.org/cadence/internal/common"
"go.uber.org/goleak"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -416,7 +417,7 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_ActivityTaskScheduled() {
t.NotNil(response.Decisions[0].CompleteWorkflowExecutionDecisionAttributes)
}

func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow_Sticky() {
func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow() {
// Schedule an activity and see if we complete workflow.
taskList := "sticky-tl"
execution := &s.WorkflowExecution{
Expand Down Expand Up @@ -463,8 +464,12 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow_Sticky() {
t.verifyQueryResult(queryResp, "waiting-activity-result")
}

func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow_NonSticky() {
func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow_2() {
// Schedule an activity and see if we complete workflow.

// This test appears to be just a finer-grained version of TestWorkflowTask_QueryWorkflow, though the older names
// for them implied entirely different purposes. Likely it can be combined with TestWorkflowTask_QueryWorkflow
// without losing anything useful.
taskList := "tl1"
testEvents := []*s.HistoryEvent{
createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: &s.TaskList{Name: &taskList}}),
Expand All @@ -478,6 +483,8 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow_NonSticky() {
}),
createTestEventActivityTaskStarted(6, &s.ActivityTaskStartedEventAttributes{}),
createTestEventActivityTaskCompleted(7, &s.ActivityTaskCompletedEventAttributes{ScheduledEventId: common.Int64Ptr(5)}),
// TODO: below seems irrational. there's a start without a schedule, and this workflow does not respond to signals.
// aside from this, the list of tasks is the same as TestWorkflowTask_QueryWorkflow
createTestEventDecisionTaskStarted(8),
createTestEventWorkflowExecutionSignaled(9, "test-signal"),
}
Expand Down Expand Up @@ -1449,6 +1456,104 @@ func (t *TaskHandlersTestSuite) TestActivityExecutionWorkerStop() {
t.NotNil(r)
}

// a regrettably-hacky func to use goleak to count leaking goroutines.
// ideally there will be a structured way to do this in the future, rather than string parsing
func countLeaks(leaks error) int {
if leaks == nil {
return 0
}
// leak messages look something like:
// Goroutine 23 in state chan receive, with go.uber.org/cadence/internal.(*coroutineState).initialYield on top of the stack:
// ... stacktrace ...
//
// Goroutine 28 ... on top of the stack:
// ... stacktrace ...
return strings.Count(leaks.Error(), "on top of the stack")
}

func (t *TaskHandlersTestSuite) TestRegression_QueriesDoNotLeakGoroutines() {
// this test must not be run in parallel with most other tests, as it mutates global vars
var ridsToCleanUp []string
originalLeaks := goleak.Find()
defer func(size int) {
// empty the cache to clear out any newly-introduced leaks
current := getWorkflowCache()
for _, rid := range ridsToCleanUp {
current.Delete(rid)
}
// check the cleanup
currentLeaks := goleak.Find()
if countLeaks(currentLeaks) != countLeaks(originalLeaks) {
t.T().Errorf("failed to clean up goroutines.\nOriginal state:\n%v\n\nCurrent state:\n%v", originalLeaks, currentLeaks)
}

// reset everything to make it "normal".
// this does NOT restore the original workflow cache - that cannot be done correctly, initCacheOnce is not safe to copy (thus restore).
stickyCacheSize = size
workflowCache = nil
initCacheOnce = sync.Once{}
}(stickyCacheSize)
workflowCache = nil
initCacheOnce = sync.Once{}
// cache is intentionally not *disabled*, as that would go down no-cache code paths.
// also, there is an LRU-cache bug where the size allows N to enter, but then removes until N-1 remain,
// so a size of 2 actually means a size of 1.
SetStickyWorkflowCacheSize(2)

taskList := "tl1"
params := workerExecutionParameters{
TaskList: taskList,
Identity: "test-id-1",
Logger: t.logger,
DisableStickyExecution: false,
}
taskHandler := newWorkflowTaskHandler(testDomain, params, nil, t.registry)

// process a throw-away workflow to fill the cache. this is copied from TestWorkflowTask_QueryWorkflow since it's
// relatively simple, but any should work fine, as long as it can be queried.
testEvents := []*s.HistoryEvent{
createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: &s.TaskList{Name: &taskList}}),
createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: &s.TaskList{Name: &taskList}}),
createTestEventDecisionTaskStarted(3),
createTestEventDecisionTaskCompleted(4, &s.DecisionTaskCompletedEventAttributes{ScheduledEventId: common.Int64Ptr(2)}),
createTestEventActivityTaskScheduled(5, &s.ActivityTaskScheduledEventAttributes{
ActivityId: common.StringPtr("0"),
ActivityType: &s.ActivityType{Name: common.StringPtr("Greeter_Activity")},
TaskList: &s.TaskList{Name: &taskList},
}),
}
cachedTask := createWorkflowTask(testEvents[0:1], 1, "HelloWorld_Workflow")
cachedTask.WorkflowExecution.WorkflowId = common.StringPtr("cache-filling workflow id")
ridsToCleanUp = append(ridsToCleanUp, *cachedTask.WorkflowExecution.RunId)
_, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: cachedTask}, nil)

// sanity check that the cache was indeed filled, and that it has created a goroutine
require.NoError(t.T(), err, "cache-filling must succeed")
require.Equal(t.T(), 1, getWorkflowCache().Size(), "workflow should be cached, but was not")
oneCachedLeak := goleak.Find()
require.Error(t.T(), oneCachedLeak, "expected at least one leaking goroutine")
require.Equal(t.T(), countLeaks(originalLeaks)+1, countLeaks(oneCachedLeak), // ideally == 1, but currently there are other leaks
"expected the cached workflow to leak one goroutine. original leaks:\n%v\n\nleaks after one workflow:\n%v", originalLeaks, oneCachedLeak)

// now query a different workflow ID / run ID
uncachedTask := createQueryTask(testEvents, 5, "HelloWorld_Workflow", queryType)
uncachedTask.WorkflowExecution.WorkflowId = common.StringPtr("should not leak this workflow id")
ridsToCleanUp = append(ridsToCleanUp, *uncachedTask.WorkflowExecution.RunId) // only necessary if the test fails
result, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: uncachedTask}, nil)
require.NoError(t.T(), err)
t.verifyQueryResult(result, "waiting-activity-result") // largely a sanity check

// and finally the purpose of this test:
// verify that the cache has not been modified, and that there is no new leak
t.Equal(1, getWorkflowCache().Size(), "workflow cache should be the same size")
t.True(getWorkflowCache().Exist(cachedTask.WorkflowExecution.GetRunId()), "originally-cached workflow should still be cached")
t.False(getWorkflowCache().Exist(uncachedTask.WorkflowExecution.GetRunId()), "queried workflow should not be cached")
newLeaks := goleak.Find()
t.Error(newLeaks, "expected at least one leaking goroutine")
t.Equal(countLeaks(oneCachedLeak), countLeaks(newLeaks),
"expected the query to leak no new goroutines. before query:\n%v\n\nafter query:\n%v", oneCachedLeak, newLeaks)
}

func Test_NonDeterministicCheck(t *testing.T) {
decisionTypes := s.DecisionType_Values()
require.Equal(t, 13, len(decisionTypes), "If you see this error, you are adding new decision type. "+
Expand Down

0 comments on commit ed86b8a

Please sign in to comment.