Skip to content

Commit

Permalink
Automatically clear dc overrides after tests (#5064)
Browse files Browse the repository at this point in the history
**What changed?**

I cleaned up how we manage dynamicconfig overrides in our functional
tests.

**Why?**

Forgetting to clean up after ourselves causes bugs (see #5046). By
taking a `testing.T` as a parameter when overriding a value we can
automatically clean up after ourselves when a test ends.

**How did you test it?**
Functional tests

**Potential risks**


**Is hotfix candidate?**
No
  • Loading branch information
tdeebswihart authored Nov 2, 2023
1 parent 76f4b3a commit ce211cb
Show file tree
Hide file tree
Showing 12 changed files with 109 additions and 128 deletions.
24 changes: 8 additions & 16 deletions tests/advanced_visibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2037,10 +2037,8 @@ func (s *advancedVisibilitySuite) Test_BuildIdIndexedOnCompletion_UnversionedWor
func (s *advancedVisibilitySuite) Test_BuildIdIndexedOnCompletion_VersionedWorker() {
// Use only one partition to avoid having to wait for user data propagation later
dc := s.testCluster.host.dcClient
dc.OverrideValue(dynamicconfig.MatchingNumTaskqueueReadPartitions, 1)
defer dc.RemoveOverride(dynamicconfig.MatchingNumTaskqueueReadPartitions)
dc.OverrideValue(dynamicconfig.MatchingNumTaskqueueWritePartitions, 1)
defer dc.RemoveOverride(dynamicconfig.MatchingNumTaskqueueWritePartitions)
dc.OverrideValue(s.T(), dynamicconfig.MatchingNumTaskqueueReadPartitions, 1)
dc.OverrideValue(s.T(), dynamicconfig.MatchingNumTaskqueueWritePartitions, 1)

ctx := NewContext()
id := s.randomizeStr(s.T().Name())
Expand Down Expand Up @@ -2204,10 +2202,8 @@ func (s *advancedVisibilitySuite) Test_BuildIdIndexedOnCompletion_VersionedWorke
func (s *advancedVisibilitySuite) Test_BuildIdIndexedOnReset() {
// Use only one partition to avoid having to wait for user data propagation later
dc := s.testCluster.host.dcClient
dc.OverrideValue(dynamicconfig.MatchingNumTaskqueueReadPartitions, 1)
defer dc.RemoveOverride(dynamicconfig.MatchingNumTaskqueueReadPartitions)
dc.OverrideValue(dynamicconfig.MatchingNumTaskqueueWritePartitions, 1)
defer dc.RemoveOverride(dynamicconfig.MatchingNumTaskqueueWritePartitions)
dc.OverrideValue(s.T(), dynamicconfig.MatchingNumTaskqueueReadPartitions, 1)
dc.OverrideValue(s.T(), dynamicconfig.MatchingNumTaskqueueWritePartitions, 1)

ctx := NewContext()
id := s.randomizeStr(s.T().Name())
Expand Down Expand Up @@ -2291,10 +2287,8 @@ func (s *advancedVisibilitySuite) Test_BuildIdIndexedOnReset() {
func (s *advancedVisibilitySuite) Test_BuildIdIndexedOnRetry() {
// Use only one partition to avoid having to wait for user data propagation later
dc := s.testCluster.host.dcClient
dc.OverrideValue(dynamicconfig.MatchingNumTaskqueueReadPartitions, 1)
defer dc.RemoveOverride(dynamicconfig.MatchingNumTaskqueueReadPartitions)
dc.OverrideValue(dynamicconfig.MatchingNumTaskqueueWritePartitions, 1)
defer dc.RemoveOverride(dynamicconfig.MatchingNumTaskqueueWritePartitions)
dc.OverrideValue(s.T(), dynamicconfig.MatchingNumTaskqueueReadPartitions, 1)
dc.OverrideValue(s.T(), dynamicconfig.MatchingNumTaskqueueWritePartitions, 1)

ctx := NewContext()
id := s.randomizeStr(s.T().Name())
Expand Down Expand Up @@ -2475,8 +2469,7 @@ func (s *advancedVisibilitySuite) TestWorkerTaskReachability_ByBuildId() {
s.checkReachability(ctx, tq1, v01, enumspb.TASK_REACHABILITY_NEW_WORKFLOWS, enumspb.TASK_REACHABILITY_EXISTING_WORKFLOWS)
s.checkReachability(ctx, tq1, v01, enumspb.TASK_REACHABILITY_NEW_WORKFLOWS, enumspb.TASK_REACHABILITY_CLOSED_WORKFLOWS)

defer dc.RemoveOverride(dynamicconfig.ReachabilityQuerySetDurationSinceDefault)
dc.OverrideValue(dynamicconfig.ReachabilityQuerySetDurationSinceDefault, time.Microsecond)
dc.OverrideValue(s.T(), dynamicconfig.ReachabilityQuerySetDurationSinceDefault, time.Microsecond)
// Verify new workflows aren't reachable
s.checkReachability(ctx, tq1, v01, enumspb.TASK_REACHABILITY_EXISTING_WORKFLOWS)
s.checkReachability(ctx, tq1, v01, enumspb.TASK_REACHABILITY_CLOSED_WORKFLOWS)
Expand Down Expand Up @@ -2615,8 +2608,7 @@ func (s *advancedVisibilitySuite) TestWorkerTaskReachability_Unversioned_InTaskQ
s.checkReachability(ctx, tq, "", enumspb.TASK_REACHABILITY_NEW_WORKFLOWS, enumspb.TASK_REACHABILITY_EXISTING_WORKFLOWS)
s.checkReachability(ctx, tq, "", enumspb.TASK_REACHABILITY_NEW_WORKFLOWS, enumspb.TASK_REACHABILITY_CLOSED_WORKFLOWS)

defer dc.RemoveOverride(dynamicconfig.ReachabilityQuerySetDurationSinceDefault)
dc.OverrideValue(dynamicconfig.ReachabilityQuerySetDurationSinceDefault, time.Microsecond)
dc.OverrideValue(s.T(), dynamicconfig.ReachabilityQuerySetDurationSinceDefault, time.Microsecond)

s.checkReachability(ctx, tq, "", enumspb.TASK_REACHABILITY_EXISTING_WORKFLOWS)
s.checkReachability(ctx, tq, "", enumspb.TASK_REACHABILITY_CLOSED_WORKFLOWS)
Expand Down
17 changes: 16 additions & 1 deletion tests/dynamicconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package tests

import (
"sync"
"testing"
"time"

"golang.org/x/exp/maps"
Expand Down Expand Up @@ -75,10 +76,24 @@ func (d *dcClient) GetValue(name dynamicconfig.Key) []dynamicconfig.ConstrainedV
return d.fallback.GetValue(name)
}

func (d *dcClient) OverrideValue(name dynamicconfig.Key, value any) {
// OverrideValue overrides a value for the duration of a test. Once the test completes
// the previous value (if any) will be restored
func (d *dcClient) OverrideValue(t *testing.T, name dynamicconfig.Key, value any) {
d.Lock()
defer d.Unlock()
priorValue, existed := d.overrides[name]
d.overrides[name] = value

t.Cleanup(func() {
d.Lock()
defer d.Unlock()

if existed {
d.overrides[name] = priorValue
} else {
delete(d.overrides, name)
}
})
}

func (d *dcClient) RemoveOverride(name dynamicconfig.Key) {
Expand Down
2 changes: 1 addition & 1 deletion tests/functional_test_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (s *FunctionalTestBase) setupSuite(defaultClusterConfigFile string, options
s.httpAPIAddress = TestFlags.FrontendHTTPAddr
} else {
s.Logger.Info("Running functional test against test cluster")
cluster, err := NewCluster(clusterConfig, s.Logger)
cluster, err := NewCluster(s.T(), clusterConfig, s.Logger)
s.Require().NoError(err)
s.testCluster = cluster
s.engine = s.testCluster.GetFrontendClient()
Expand Down
2 changes: 1 addition & 1 deletion tests/namespace_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (s *namespaceTestSuite) SetupSuite() {

s.clusterConfig.DynamicConfigOverrides = dynamicConfig()

cluster, err := NewCluster(s.clusterConfig, s.logger)
cluster, err := NewCluster(s.T(), s.clusterConfig, s.logger)
s.Require().NoError(err)
s.cluster = cluster
s.frontendClient = s.cluster.GetFrontendClient()
Expand Down
2 changes: 1 addition & 1 deletion tests/ndc/ndc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (s *nDCFunctionalTestSuite) SetupSuite() {
}
clusterConfigs[0].MockAdminClient = s.mockAdminClient

cluster, err := tests.NewCluster(clusterConfigs[0], log.With(s.logger, tag.ClusterName(clusterName[0])))
cluster, err := tests.NewCluster(s.T(), clusterConfigs[0], log.With(s.logger, tag.ClusterName(clusterName[0])))
s.Require().NoError(err)
s.cluster = cluster

Expand Down
35 changes: 18 additions & 17 deletions tests/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"net"
"strconv"
"sync"
"testing"
"time"

"go.uber.org/fx"
Expand Down Expand Up @@ -184,10 +185,10 @@ type (
)

// newTemporal returns an instance that hosts full temporal in one process
func newTemporal(params *TemporalParams) *temporalImpl {
func newTemporal(t *testing.T, params *TemporalParams) *temporalImpl {
testDCClient := newTestDCClient(dynamicconfig.NewNoopClient())
for k, v := range params.DynamicConfigOverrides {
testDCClient.OverrideValue(k, v)
testDCClient.OverrideValue(t, k, v)
}
impl := &temporalImpl{
logger: params.Logger,
Expand Down Expand Up @@ -216,7 +217,7 @@ func newTemporal(params *TemporalParams) *temporalImpl {
serviceFxOptions: params.ServiceFxOptions,
taskCategoryRegistry: params.TaskCategoryRegistry,
}
impl.overrideHistoryDynamicConfig(testDCClient)
impl.overrideHistoryDynamicConfig(t, testDCClient)
return impl
}

Expand Down Expand Up @@ -809,42 +810,42 @@ func (c *temporalImpl) frontendConfigProvider() *config.Config {
}
}

func (c *temporalImpl) overrideHistoryDynamicConfig(client *dcClient) {
client.OverrideValue(dynamicconfig.ReplicationTaskProcessorStartWait, time.Nanosecond)
func (c *temporalImpl) overrideHistoryDynamicConfig(t *testing.T, client *dcClient) {
client.OverrideValue(t, dynamicconfig.ReplicationTaskProcessorStartWait, time.Nanosecond)

if c.esConfig != nil {
client.OverrideValue(dynamicconfig.AdvancedVisibilityWritingMode, visibility.SecondaryVisibilityWritingModeDual)
client.OverrideValue(t, dynamicconfig.AdvancedVisibilityWritingMode, visibility.SecondaryVisibilityWritingModeDual)
}
if c.historyConfig.HistoryCountLimitWarn != 0 {
client.OverrideValue(dynamicconfig.HistoryCountLimitWarn, c.historyConfig.HistoryCountLimitWarn)
client.OverrideValue(t, dynamicconfig.HistoryCountLimitWarn, c.historyConfig.HistoryCountLimitWarn)
}
if c.historyConfig.HistoryCountLimitError != 0 {
client.OverrideValue(dynamicconfig.HistoryCountLimitError, c.historyConfig.HistoryCountLimitError)
client.OverrideValue(t, dynamicconfig.HistoryCountLimitError, c.historyConfig.HistoryCountLimitError)
}
if c.historyConfig.HistorySizeLimitWarn != 0 {
client.OverrideValue(dynamicconfig.HistorySizeLimitWarn, c.historyConfig.HistorySizeLimitWarn)
client.OverrideValue(t, dynamicconfig.HistorySizeLimitWarn, c.historyConfig.HistorySizeLimitWarn)
}
if c.historyConfig.HistorySizeLimitError != 0 {
client.OverrideValue(dynamicconfig.HistorySizeLimitError, c.historyConfig.HistorySizeLimitError)
client.OverrideValue(t, dynamicconfig.HistorySizeLimitError, c.historyConfig.HistorySizeLimitError)
}
if c.historyConfig.BlobSizeLimitError != 0 {
client.OverrideValue(dynamicconfig.BlobSizeLimitError, c.historyConfig.BlobSizeLimitError)
client.OverrideValue(t, dynamicconfig.BlobSizeLimitError, c.historyConfig.BlobSizeLimitError)
}
if c.historyConfig.BlobSizeLimitWarn != 0 {
client.OverrideValue(dynamicconfig.BlobSizeLimitWarn, c.historyConfig.BlobSizeLimitWarn)
client.OverrideValue(t, dynamicconfig.BlobSizeLimitWarn, c.historyConfig.BlobSizeLimitWarn)
}
if c.historyConfig.MutableStateSizeLimitError != 0 {
client.OverrideValue(dynamicconfig.MutableStateSizeLimitError, c.historyConfig.MutableStateSizeLimitError)
client.OverrideValue(t, dynamicconfig.MutableStateSizeLimitError, c.historyConfig.MutableStateSizeLimitError)
}
if c.historyConfig.MutableStateSizeLimitWarn != 0 {
client.OverrideValue(dynamicconfig.MutableStateSizeLimitWarn, c.historyConfig.MutableStateSizeLimitWarn)
client.OverrideValue(t, dynamicconfig.MutableStateSizeLimitWarn, c.historyConfig.MutableStateSizeLimitWarn)
}

// For DeleteWorkflowExecution tests
client.OverrideValue(dynamicconfig.TransferProcessorUpdateAckInterval, 1*time.Second)
client.OverrideValue(dynamicconfig.VisibilityProcessorUpdateAckInterval, 1*time.Second)
client.OverrideValue(t, dynamicconfig.TransferProcessorUpdateAckInterval, 1*time.Second)
client.OverrideValue(t, dynamicconfig.VisibilityProcessorUpdateAckInterval, 1*time.Second)

client.OverrideValue(dynamicconfig.EnableAPIGetCurrentRunIDLock, true)
client.OverrideValue(t, dynamicconfig.EnableAPIGetCurrentRunIDLock, true)
}

func (c *temporalImpl) newRPCFactory(
Expand Down
8 changes: 4 additions & 4 deletions tests/schedule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ func (s *scheduleFunctionalSuite) TestListBeforeRun() {

// disable per-ns worker so that the schedule workflow never runs
dc := s.testCluster.host.dcClient
dc.OverrideValue(dynamicconfig.WorkerPerNamespaceWorkerCount, 0)
dc.OverrideValue(s.T(), dynamicconfig.WorkerPerNamespaceWorkerCount, 0)
s.testCluster.host.workerService.RefreshPerNSWorkerManager()
time.Sleep(2 * time.Second)

Expand Down Expand Up @@ -742,8 +742,8 @@ func (s *scheduleFunctionalSuite) TestRateLimit() {
// waiting one minute) we have to cause the whole worker to be stopped and started. The
// sleeps are needed because the refresh is asynchronous, and there's no way to get access
// to the actual rate limiter object to refresh it directly.
s.testCluster.host.dcClient.OverrideValue(dynamicconfig.SchedulerNamespaceStartWorkflowRPS, 1.0)
s.testCluster.host.dcClient.OverrideValue(dynamicconfig.WorkerPerNamespaceWorkerCount, 0)
s.testCluster.host.dcClient.OverrideValue(s.T(), dynamicconfig.SchedulerNamespaceStartWorkflowRPS, 1.0)
s.testCluster.host.dcClient.OverrideValue(s.T(), dynamicconfig.WorkerPerNamespaceWorkerCount, 0)
s.testCluster.host.workerService.RefreshPerNSWorkerManager()
time.Sleep(2 * time.Second)
s.testCluster.host.dcClient.RemoveOverride(dynamicconfig.WorkerPerNamespaceWorkerCount)
Expand Down Expand Up @@ -805,7 +805,7 @@ func (s *scheduleFunctionalSuite) TestRateLimit() {
}

s.testCluster.host.dcClient.RemoveOverride(dynamicconfig.SchedulerNamespaceStartWorkflowRPS)
s.testCluster.host.dcClient.OverrideValue(dynamicconfig.WorkerPerNamespaceWorkerCount, 0)
s.testCluster.host.dcClient.OverrideValue(s.T(), dynamicconfig.WorkerPerNamespaceWorkerCount, 0)
s.testCluster.host.workerService.RefreshPerNSWorkerManager()
time.Sleep(2 * time.Second)
s.testCluster.host.dcClient.RemoveOverride(dynamicconfig.WorkerPerNamespaceWorkerCount)
Expand Down
5 changes: 3 additions & 2 deletions tests/test_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"fmt"
"os"
"path"
"testing"

"github.com/pborman/uuid"
"go.temporal.io/api/operatorservice/v1"
Expand Down Expand Up @@ -119,7 +120,7 @@ const (
)

// NewCluster creates and sets up the test cluster
func NewCluster(options *TestClusterConfig, logger log.Logger) (*TestCluster, error) {
func NewCluster(t *testing.T, options *TestClusterConfig, logger log.Logger) (*TestCluster, error) {
clusterMetadataConfig := cluster.NewTestClusterMetadataConfig(
options.ClusterMetadata.EnableGlobalNamespace,
options.IsMasterCluster,
Expand Down Expand Up @@ -285,7 +286,7 @@ func NewCluster(options *TestClusterConfig, logger log.Logger) (*TestCluster, er
logger.Fatal("Failed to start pprof", tag.Error(err))
}

cluster := newTemporal(temporalParams)
cluster := newTemporal(t, temporalParams)
if err := cluster.Start(); err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions tests/transient_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (s *functionalSuite) TestTransientWorkflowTaskHistorySize() {
}

// start with 2mb limit
s.testCluster.host.dcClient.OverrideValue(dynamicconfig.HistorySizeSuggestContinueAsNew, 2*1024*1024)
s.testCluster.host.dcClient.OverrideValue(s.T(), dynamicconfig.HistorySizeSuggestContinueAsNew, 2*1024*1024)

// workflow logic
stage := 0
Expand Down Expand Up @@ -282,7 +282,7 @@ func (s *functionalSuite) TestTransientWorkflowTaskHistorySize() {

// change the dynamic config so that SuggestContinueAsNew should now be false. the current
// workflow task should still see true, but the next one will see false.
s.testCluster.host.dcClient.OverrideValue(dynamicconfig.HistorySizeSuggestContinueAsNew, 8*1024*1024)
s.testCluster.host.dcClient.OverrideValue(s.T(), dynamicconfig.HistorySizeSuggestContinueAsNew, 8*1024*1024)

// stage 4
_, err = poller.PollAndProcessWorkflowTask(WithNoDumpCommands)
Expand Down
Loading

0 comments on commit ce211cb

Please sign in to comment.