diff --git a/pkg/cmd/roachtest/roachtestutil/task/tasker.go b/pkg/cmd/roachtest/roachtestutil/task/tasker.go index e5cf6c67311e..bf0b8eb9222b 100644 --- a/pkg/cmd/roachtest/roachtestutil/task/tasker.go +++ b/pkg/cmd/roachtest/roachtestutil/task/tasker.go @@ -16,9 +16,12 @@ type Func func(context.Context, *logger.Logger) error // Tasker is an interface for executing tasks (goroutines). It is intended for // use in tests, enabling the test framework to manage panics and errors. type Tasker interface { - // Go runs the given function in a goroutine. + // Go runs the given function in a goroutine. If an error is returned, it will + // fail the test. Panics are recovered and treated as errors. Go(fn Func, opts ...Option) // GoWithCancel runs the given function in a goroutine and returns a - // CancelFunc that can be used to cancel the function. + // CancelFunc that can be used to cancel the function. If an error is + // returned, it will fail the test. Panics are recovered and treated as + // errors. GoWithCancel(fn Func, opts ...Option) context.CancelFunc } diff --git a/pkg/cmd/roachtest/tests/BUILD.bazel b/pkg/cmd/roachtest/tests/BUILD.bazel index e5dc0fc2f812..736abf0c41e5 100644 --- a/pkg/cmd/roachtest/tests/BUILD.bazel +++ b/pkg/cmd/roachtest/tests/BUILD.bazel @@ -348,6 +348,7 @@ go_test( deps = [ "//pkg/cmd/roachtest/option", "//pkg/cmd/roachtest/registry", + "//pkg/cmd/roachtest/roachtestutil/task", "//pkg/cmd/roachtest/spec", "//pkg/roachprod/logger", "//pkg/roachprod/prometheus", diff --git a/pkg/cmd/roachtest/tests/allocator.go b/pkg/cmd/roachtest/tests/allocator.go index c80ed16e3c10..bd19df21e6b9 100644 --- a/pkg/cmd/roachtest/tests/allocator.go +++ b/pkg/cmd/roachtest/tests/allocator.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/roachprod/install" @@ -81,18 +82,10 @@ func registerAllocator(r registry.Registry) { c.Start(ctx, t.L(), startOpts, install.MakeClusterSettings(), c.Range(start+1, nodes)) c.Run(ctx, option.WithNodes(c.Node(1)), "./cockroach workload init kv --drop {pgurl:1}") for node := 1; node <= nodes; node++ { - node := node - // TODO(dan): Ideally, the test would fail if this queryload failed, - // but we can't put it in monitor as-is because the test deadlocks. - go func() { + t.Go(func(taskCtx context.Context, _ *logger.Logger) error { cmd := fmt.Sprintf("./cockroach workload run kv --tolerate-errors --min-block-bytes=8 --max-block-bytes=127 {pgurl%s}", c.Node(node)) - l, err := t.L().ChildLogger(fmt.Sprintf(`kv-%d`, node)) - if err != nil { - t.Fatal(err) - } - defer l.Close() - _ = c.RunE(ctx, option.WithNodes(c.Node(node)), cmd) - }() + return c.RunE(taskCtx, option.WithNodes(c.Node(node)), cmd) + }, task.Name(fmt.Sprintf(`kv-%d`, node))) } // Wait for 3x replication, we record the time taken to achieve this. diff --git a/pkg/cmd/roachtest/tests/cancel.go b/pkg/cmd/roachtest/tests/cancel.go index 79cd39327148..62907b904aaf 100644 --- a/pkg/cmd/roachtest/tests/cancel.go +++ b/pkg/cmd/roachtest/tests/cancel.go @@ -14,9 +14,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/roachprod/install" + "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -66,30 +68,32 @@ func registerCancel(r registry.Registry) { // (either query execution error or an error indicating the // absence of expected cancellation error). errCh := make(chan error, 1) - go func(queryNum int) { - runnerConn := c.Conn(ctx, t.L(), 1) + t.Go(func(taskCtx context.Context, l *logger.Logger) error { + runnerConn := c.Conn(taskCtx, l, 1) defer runnerConn.Close() setupQueries := []string{"USE tpch;"} if !useDistsql { setupQueries = append(setupQueries, "SET distsql = off;") } for _, setupQuery := range setupQueries { - t.L().Printf("executing setup query %q", setupQuery) + l.Printf("executing setup query %q", setupQuery) if _, err := runnerConn.Exec(setupQuery); err != nil { errCh <- err close(sem) - return + // Errors are handled in the main goroutine. + return nil //nolint:returnerrcheck } } query := tpch.QueriesByNumber[queryNum] - t.L().Printf("executing q%d\n", queryNum) + l.Printf("executing q%d\n", queryNum) close(sem) _, err := runnerConn.Exec(query) if err == nil { err = errors.New("query completed before it could be canceled") } errCh <- err - }(queryNum) + return nil + }, task.Name("query-runner")) // Wait for the query-runner goroutine to start as well as // to execute setup queries. diff --git a/pkg/cmd/roachtest/tests/cluster_init.go b/pkg/cmd/roachtest/tests/cluster_init.go index 1fc9635267e8..3627474fcbb5 100644 --- a/pkg/cmd/roachtest/tests/cluster_init.go +++ b/pkg/cmd/roachtest/tests/cluster_init.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/roachprod/install" + "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/server/authserver" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/util/httputil" @@ -87,11 +88,11 @@ func runClusterInit(ctx context.Context, t test.Test, c cluster.Cluster) { t.L().Printf("checking that the SQL conns are not failing immediately") errCh := make(chan error, len(dbs)) for _, db := range dbs { - db := db - go func() { + t.Go(func(taskCtx context.Context, _ *logger.Logger) error { var val int - errCh <- db.QueryRow("SELECT 1").Scan(&val) - }() + errCh <- db.QueryRowContext(taskCtx, "SELECT 1").Scan(&val) + return nil + }) } // Give them time to get a "connection refused" or similar error if diff --git a/pkg/cmd/roachtest/tests/decommissionbench.go b/pkg/cmd/roachtest/tests/decommissionbench.go index 3b036b6d675e..d5d512408a51 100644 --- a/pkg/cmd/roachtest/tests/decommissionbench.go +++ b/pkg/cmd/roachtest/tests/decommissionbench.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/roachprod/install" @@ -313,20 +314,22 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe }) } -// fireAfter executes fn after the duration elapses. If the context expires -// first, it will not be executed. -func fireAfter(ctx context.Context, duration time.Duration, fn func()) { - go func() { +// fireAfter executes fn after the duration elapses. If the passed context, or +// tasker context, expires first, it will not be executed. +func fireAfter(ctx context.Context, t task.Tasker, duration time.Duration, fn func()) { + t.Go(func(taskCtx context.Context, _ *logger.Logger) error { var fireTimer timeutil.Timer defer fireTimer.Stop() fireTimer.Reset(duration) select { case <-ctx.Done(): + case <-taskCtx.Done(): case <-fireTimer.C: fireTimer.Read = true fn() } - }() + return nil + }) } // createDecommissionBenchPerfArtifacts initializes a histogram registry for @@ -992,7 +995,7 @@ func runSingleDecommission( if estimateDuration { estimateDecommissionDuration( - ctx, h.t.L(), tickByName, snapshotRateMb, bytesUsed, candidateStores, + ctx, h.t, h.t.L(), tickByName, snapshotRateMb, bytesUsed, candidateStores, rangeCount, avgBytesPerReplica, ) } @@ -1148,6 +1151,7 @@ func logLSMHealth(ctx context.Context, l *logger.Logger, c cluster.Cluster, targ // recorded perf artifacts as ticks. func estimateDecommissionDuration( ctx context.Context, + t task.Tasker, log *logger.Logger, tickByName func(name string), snapshotRateMb int, @@ -1185,7 +1189,7 @@ func estimateDecommissionDuration( rangeCount, humanizeutil.IBytes(avgBytesPerReplica), minDuration, estDuration, ) - fireAfter(ctx, estDuration, func() { + fireAfter(ctx, t, estDuration, func() { tickByName(estimatedMetric) }) } diff --git a/pkg/cmd/roachtest/tests/drt.go b/pkg/cmd/roachtest/tests/drt.go index a85e7b252df8..67166dca230e 100644 --- a/pkg/cmd/roachtest/tests/drt.go +++ b/pkg/cmd/roachtest/tests/drt.go @@ -11,6 +11,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task" "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/roachprod/prometheus" "github.com/cockroachdb/cockroach/pkg/util/retry" @@ -233,8 +234,8 @@ func (ep *tpccChaosEventProcessor) err() error { return err } -func (ep *tpccChaosEventProcessor) listen(ctx context.Context, l *logger.Logger) { - go func() { +func (ep *tpccChaosEventProcessor) listen(ctx context.Context, t task.Tasker, l *logger.Logger) { + t.Go(func(context.Context, *logger.Logger) error { var prevTime time.Time started := false for ev := range ep.ch { @@ -294,5 +295,6 @@ func (ep *tpccChaosEventProcessor) listen(ctx context.Context, l *logger.Logger) } prevTime = ev.Time } - }() + return nil + }) } diff --git a/pkg/cmd/roachtest/tests/drt_test.go b/pkg/cmd/roachtest/tests/drt_test.go index 9c64ee3c5a3d..971d423f4f6b 100644 --- a/pkg/cmd/roachtest/tests/drt_test.go +++ b/pkg/cmd/roachtest/tests/drt_test.go @@ -12,6 +12,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task" "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/roachprod/prometheus" gomock "github.com/golang/mock/gomock" @@ -530,7 +531,8 @@ func TestTPCCChaosEventProcessor(t *testing.T) { l, err := (&logger.Config{}).NewLogger("") require.NoError(t, err) - ep.listen(ctx, l) + tasker := task.NewManager(ctx, l) + ep.listen(ctx, tasker, l) for _, chaosEvent := range tc.chaosEvents { ch <- chaosEvent } diff --git a/pkg/cmd/roachtest/tests/export_parquet.go b/pkg/cmd/roachtest/tests/export_parquet.go index eb7bd3e84a78..d2c5caf5089f 100644 --- a/pkg/cmd/roachtest/tests/export_parquet.go +++ b/pkg/cmd/roachtest/tests/export_parquet.go @@ -15,8 +15,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/grafana" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" + "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/roachprod/prometheus" ) @@ -89,7 +91,8 @@ func registerExportParquet(r registry.Registry) { wg := sync.WaitGroup{} for i := 0; i < numConcurrentExports; i++ { wg.Add(1) - go func(i int, target string) { + target := allTpccTargets[i%len(allTpccTargets)] + t.Go(func(context.Context, *logger.Logger) error { t.Status(fmt.Sprintf("worker %d/%d starting export of target %s", i+1, numConcurrentExports, target)) fileNum := 0 db := c.Conn(ctx, t.L(), 1) @@ -103,7 +106,8 @@ func registerExportParquet(r registry.Registry) { } t.Status(fmt.Sprintf("worker %d/%d terminated", i+1, numConcurrentExports)) wg.Done() - }(i, allTpccTargets[i%len(allTpccTargets)]) + return nil + }) } wg.Wait() @@ -150,9 +154,10 @@ func registerExportParquet(r registry.Registry) { wg := sync.WaitGroup{} for i := 0; i < numWorkers; i++ { wg.Add(1) - go func(i int, target string) { + target := allTpccTargets[i] + t.Go(func(taskCtx context.Context, l *logger.Logger) error { t.Status(fmt.Sprintf("worker %d/%d starting export of target %s", i+1, numWorkers, target)) - db := c.Conn(ctx, t.L(), 1) + db := c.Conn(taskCtx, l, 1) _, err := db.Exec( fmt.Sprintf("EXPORT INTO PARQUET 'nodelocal://1/outputfile%d' FROM SELECT * FROM %s", i, target)) if err != nil { @@ -160,7 +165,8 @@ func registerExportParquet(r registry.Registry) { } t.Status(fmt.Sprintf("worker %d/%d terminated", i+1, numWorkers)) wg.Done() - }(i, allTpccTargets[i]) + return nil + }, task.Name(fmt.Sprintf("parquet-export-worker-%d", i+1))) } wg.Wait() }, diff --git a/pkg/cmd/roachtest/tests/jepsen.go b/pkg/cmd/roachtest/tests/jepsen.go index 1d577c377c87..6ec3978e3a5b 100644 --- a/pkg/cmd/roachtest/tests/jepsen.go +++ b/pkg/cmd/roachtest/tests/jepsen.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" + "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/util/retry" ) @@ -279,17 +280,19 @@ func (j jepsenConfig) startTest( } t.Fatalf("error installing Jepsen deps: %+v", err) } - go func() { + t.Go(func(context.Context, *logger.Logger) error { errCh <- run("bash", "-e", "-c", fmt.Sprintf( `"cd /mnt/data1/jepsen/cockroachdb && set -eo pipefail && ~/lein run %s > invoke.log 2>&1"`, testArgs)) - }() + return nil + }) } else { - go func() { + t.Go(func(context.Context, *logger.Logger) error { errCh <- run("bash", "-e", "-c", fmt.Sprintf( `"cd /mnt/data1/jepsen/cockroachdb && set -eo pipefail && java -jar %s %s > invoke.log 2>&1"`, j.binaryName(), testArgs)) - }() + return nil + }) } return errCh } diff --git a/pkg/cmd/roachtest/tests/mixed_version_c2c.go b/pkg/cmd/roachtest/tests/mixed_version_c2c.go index 467208ba198f..441786f3940e 100644 --- a/pkg/cmd/roachtest/tests/mixed_version_c2c.go +++ b/pkg/cmd/roachtest/tests/mixed_version_c2c.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/mixedversion" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/roachprod" @@ -71,7 +72,7 @@ func runC2CMixedVersions(ctx context.Context, t test.Test, c cluster.Cluster, sp cm.WorkloadHook(ctx) cm.LatencyHook(ctx) cm.UpdateHook(ctx) - cm.Run(ctx, c) + cm.Run(t) } func InitC2CMixed( @@ -451,31 +452,33 @@ func (cm *c2cMixed) sourceFingerprintAndCompare( return nil } -func (cm *c2cMixed) Run(ctx context.Context, c cluster.Cluster) { +func (cm *c2cMixed) Run(t task.Tasker) { var wg sync.WaitGroup wg.Add(2) - go func() { + t.Go(func(_ context.Context, l *logger.Logger) error { defer func() { if r := recover(); r != nil { - cm.t.L().Printf("source cluster upgrade failed: %v", r) + l.Printf("source cluster upgrade failed: %v", r) } }() defer wg.Done() cm.sourceMvt.Run() - }() + return nil + }) - go func() { + t.Go(func(taskCtx context.Context, l *logger.Logger) error { defer func() { if r := recover(); r != nil { - cm.t.L().Printf("destination cluster upgrade failed: %v", r) + l.Printf("destination cluster upgrade failed: %v", r) } }() defer wg.Done() - chanReadCtx(ctx, cm.sourceStartedChan) + chanReadCtx(taskCtx, cm.sourceStartedChan) cm.destMvt.Run() - }() + return nil + }) wg.Wait() } diff --git a/pkg/cmd/roachtest/tests/multitenant_upgrade.go b/pkg/cmd/roachtest/tests/multitenant_upgrade.go index 0a002acd1274..8715ba417384 100644 --- a/pkg/cmd/roachtest/tests/multitenant_upgrade.go +++ b/pkg/cmd/roachtest/tests/multitenant_upgrade.go @@ -164,10 +164,11 @@ func runMultitenantUpgrade(ctx context.Context, t test.Test, c cluster.Cluster) } returnCh := make(chan struct{}) - go func() { + h.Go(func(context.Context, *logger.Logger) error { wg.Wait() close(returnCh) - }() + return nil + }) return returnCh } @@ -280,16 +281,18 @@ func runMultitenantUpgrade(ctx context.Context, t test.Test, c cluster.Cluster) var wg sync.WaitGroup wg.Add(2) // tpcc worklaod and upgrade finalization - go func() { + h.Go(func(_ context.Context, l *logger.Logger) error { defer wg.Done() <-tpccFinished l.Printf("tpcc workload finished running on tenants") - }() - go func() { + return nil + }) + h.Go(func(_ context.Context, l *logger.Logger) error { defer wg.Done() <-upgradeFinished l.Printf("tenant upgrades finished") - }() + return nil + }) wg.Wait() return nil diff --git a/pkg/cmd/roachtest/tests/mvcc_gc.go b/pkg/cmd/roachtest/tests/mvcc_gc.go index 69ba1288e965..74b4c2ade6b7 100644 --- a/pkg/cmd/roachtest/tests/mvcc_gc.go +++ b/pkg/cmd/roachtest/tests/mvcc_gc.go @@ -18,10 +18,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/roachprod/install" + "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -134,7 +136,7 @@ func runMVCCGC(ctx context.Context, t test.Test, c cluster.Cluster) { wlCtx, wlCancel := context.WithCancel(ctx) defer wlCancel() wlFailure := make(chan error, 1) - go func() { + t.Go(func(context.Context, *logger.Logger) error { defer close(wlFailure) cmd = roachtestutil.NewCommand("./cockroach workload run kv"). Flag("cycle-length", 20000). @@ -146,7 +148,8 @@ func runMVCCGC(ctx context.Context, t test.Test, c cluster.Cluster) { String() err := c.RunE(wlCtx, option.WithNodes(c.Node(1)), cmd) wlFailure <- err - }() + return nil + }, task.Name("workload")) m := queryTableMetaOrFatal(t, conn, "kv", "kv") diff --git a/pkg/cmd/roachtest/tests/query_comparison_util.go b/pkg/cmd/roachtest/tests/query_comparison_util.go index a0b319621bbd..68b6239d920f 100644 --- a/pkg/cmd/roachtest/tests/query_comparison_util.go +++ b/pkg/cmd/roachtest/tests/query_comparison_util.go @@ -411,7 +411,7 @@ func runOneRoundQueryComparison( // state of the database. if i < numInitialMutations || i%25 == 0 { mConn, mConnInfo := h.chooseConn() - runMutationStatement(mConn, mConnInfo, mutatingSmither, logStmt) + runMutationStatement(t, mConn, mConnInfo, mutatingSmither, logStmt) continue } diff --git a/pkg/cmd/roachtest/tests/schemachange.go b/pkg/cmd/roachtest/tests/schemachange.go index 30812ac90a1e..a703b89002eb 100644 --- a/pkg/cmd/roachtest/tests/schemachange.go +++ b/pkg/cmd/roachtest/tests/schemachange.go @@ -14,6 +14,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/roachprod/install" @@ -53,17 +54,10 @@ func registerSchemaChangeDuringKV(r registry.Registry) { c.Run(ctx, option.WithNodes(c.Node(1)), `./cockroach workload init kv --drop --db=test {pgurl:1}`) for node := 1; node <= c.Spec().NodeCount; node++ { node := node - // TODO(dan): Ideally, the test would fail if this queryload failed, - // but we can't put it in monitor as-is because the test deadlocks. - go func() { + t.Go(func(taskCtx context.Context, _ *logger.Logger) error { const cmd = `./cockroach workload run kv --tolerate-errors --min-block-bytes=8 --max-block-bytes=127 --db=test {pgurl%s}` - l, err := t.L().ChildLogger(fmt.Sprintf(`kv-%d`, node)) - if err != nil { - t.Fatal(err) - } - defer l.Close() - _ = c.RunE(ctx, option.WithNodes(c.Node(node)), fmt.Sprintf(cmd, c.Nodes(node))) - }() + return c.RunE(taskCtx, option.WithNodes(c.Node(node)), fmt.Sprintf(cmd, c.Nodes(node))) + }, task.Name(fmt.Sprintf(`kv-%d`, node))) } m = c.NewMonitor(ctx, c.All()) diff --git a/pkg/cmd/roachtest/tests/tlp.go b/pkg/cmd/roachtest/tests/tlp.go index a7e403782833..5ce86b2121c6 100644 --- a/pkg/cmd/roachtest/tests/tlp.go +++ b/pkg/cmd/roachtest/tests/tlp.go @@ -17,9 +17,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/internal/sqlsmith" "github.com/cockroachdb/cockroach/pkg/roachprod/install" + "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/errors" @@ -167,11 +169,11 @@ func runOneTLP( // for a fraction of iterations after that to continually change the // state of the database. if i < 1000 || i%10 == 0 { - runMutationStatement(conn, "", mutSmither, logStmt) + runMutationStatement(t, conn, "", mutSmither, logStmt) continue } - if err := runTLPQuery(conn, tlpSmither, logStmt); err != nil { + if err := runTLPQuery(t, conn, tlpSmither, logStmt); err != nil { t.Fatal(err) } } @@ -180,7 +182,7 @@ func runOneTLP( // runMutationsStatement runs a random INSERT, UPDATE, or DELETE statement that // potentially modifies the state of the database. func runMutationStatement( - conn *gosql.DB, connInfo string, smither *sqlsmith.Smither, logStmt func(string), + t task.Tasker, conn *gosql.DB, connInfo string, smither *sqlsmith.Smither, logStmt func(string), ) { // Ignore panics from Generate. defer func() { @@ -193,7 +195,7 @@ func runMutationStatement( // Ignore timeouts. var err error - _ = runWithTimeout(func() error { + _ = runWithTimeout(t, func() error { // Ignore errors. Log successful statements. if _, err = conn.Exec(stmt); err == nil { logStmt(connInfo + stmt) @@ -207,7 +209,9 @@ func runMutationStatement( // returned. GenerateTLP also returns any placeholder arguments needed for the // partitioned query. See GenerateTLP for more information on TLP and the // generated queries. -func runTLPQuery(conn *gosql.DB, smither *sqlsmith.Smither, logStmt func(string)) error { +func runTLPQuery( + t task.Tasker, conn *gosql.DB, smither *sqlsmith.Smither, logStmt func(string), +) error { // Ignore panics from GenerateTLP. defer func() { if r := recover(); r != nil { @@ -218,7 +222,7 @@ func runTLPQuery(conn *gosql.DB, smither *sqlsmith.Smither, logStmt func(string) unpartitioned, partitioned, args := smither.GenerateTLP() combined := sqlsmith.CombinedTLP(unpartitioned, partitioned) - return runWithTimeout(func() error { + return runWithTimeout(t, func() error { counts := conn.QueryRow(combined, args...) var undiffCount, diffCount int if err := counts.Scan(&undiffCount, &diffCount); err != nil { @@ -268,12 +272,13 @@ func runTLPQuery(conn *gosql.DB, smither *sqlsmith.Smither, logStmt func(string) }) } -func runWithTimeout(f func() error) error { +func runWithTimeout(t task.Tasker, f func() error) error { done := make(chan error, 1) - go func() { + t.Go(func(context.Context, *logger.Logger) error { err := f() done <- err - }() + return nil + }) select { case <-time.After(statementTimeout + time.Second*5): // Ignore timeouts. diff --git a/pkg/cmd/roachtest/tests/tpcc.go b/pkg/cmd/roachtest/tests/tpcc.go index a99ea3e6278e..04ea2b8ed03d 100644 --- a/pkg/cmd/roachtest/tests/tpcc.go +++ b/pkg/cmd/roachtest/tests/tpcc.go @@ -267,7 +267,7 @@ func runTPCC( if err != nil { t.Fatal(err) } - cep.listen(ctx, l) + cep.listen(ctx, t, l) ep = &cep }