From e147a555493ae9f6d75ae014622398e46dbe369a Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Thu, 3 Aug 2023 00:26:37 +0100 Subject: [PATCH 1/3] upgrades: avoid crdb_internal.system_jobs in upgrade manager MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The crdb_internal.system_jobs is a virtual table that joins information from the jobs table and the jobs_info table. When given a job status predicate it does this by running a query such as: WITH latestpayload AS ( SELECT job_id, value FROM system.job_info AS payload WHERE info_key = 'legacy_payload' ORDER BY written DESC ), latestprogress AS ( SELECT job_id, value FROM system.job_info AS progress WHERE info_key = 'legacy_progress' ORDER BY written DESC ) SELECT distinct(id), status, created, payload.value AS payload, progress.value AS progress, created_by_type, created_by_id, claim_session_id, claim_instance_id, num_runs, last_run,job_type FROM system.jobs AS j INNER JOIN latestpayload AS payload ON j.id = payload.job_id LEFT JOIN latestprogress AS progress ON j.id = progress.job_id WHERE j.status = 'cancel-requested'; This uses 2 full scans of the job_info table: ``` • distinct │ distinct on: id, value, value │ └── • merge join │ equality: (job_id) = (id) │ ├── • render │ │ │ └── • filter │ │ estimated row count: 2,787 │ │ filter: info_key = 'legacy_payload' │ │ │ └── • scan │ estimated row count: 5,597 (100% of the table; stats collected 27 minutes ago; using stats forecast for 17 minutes ago) │ table: job_info@primary │ spans: FULL SCAN │ └── • merge join (right outer) │ equality: (job_id) = (id) │ right cols are key │ ├── • render │ │ │ └── • filter │ │ estimated row count: 2,787 │ │ filter: info_key = 'legacy_progress' │ │ │ └── • scan │ estimated row count: 5,597 (100% of the table; stats collected 27 minutes ago; using stats forecast for 17 minutes ago) │ table: job_info@primary │ spans: FULL SCAN │ └── • index join │ table: jobs@primary │ └── • sort │ order: +id │ └── • scan missing stats table: jobs@jobs_status_created_idx spans: [/'cancel-requested' - /'cancel-requested'] ``` Previously, the upgrade manager was using this virtual table as part of a larger query: SELECT id, status FROM ( SELECT id, status, crdb_internal.pb_to_json( 'cockroach.sql.jobs.jobspb.Payload', payload, false ) AS pl FROM crdb_internal.system_jobs WHERE status IN ('running', 'pending', 'cancel-requested', 'pause-requested', 'reverting', 'paused') ) WHERE pl->'migration'->'clusterVersion' = $1::JSONB; I believe the use of the IN operator causes the virtual index's populate function to be called for each value. Perhaps the optimizer accounts for this in some way to avoid this resulting in 2 * 6 full scans of the job table, but it is hard to confirm with the explain output. In at least one recent escalation, we observed this query taking a substantial amount of time as it continually conflicted with other job system queries. Here, we avoid using the virtual table. This allows us to avoid one full scan of the info table since we don't need the progress (only the payload). It also allows us to use the full `IN` predicate directly, avoiding any uncertainty. In a local example, this is substantially faster ``` root@localhost:26257/defaultdb> SELECT id, status -> FROM ( -> SELECT id, -> status, -> crdb_internal.pb_to_json( -> 'cockroach.sql.jobs.jobspb.Payload', -> payload, -> false -- emit_defaults -> ) AS pl -> FROM crdb_internal.system_jobs -> WHERE status IN ('running', 'pending', 'cancel-requested', 'pause-requested', 'reverting', 'paused') -> ) -> WHERE pl->'migration'->'clusterVersion' = '{"activeVersion": {"internal": 84, "majorVal": 22, "minorVal": 2}}'::JSONB; id | status -----+--------- (0 rows) Time: 384ms total (execution 384ms / network 0ms) root@localhost:26257/defaultdb> WITH latestpayload AS (SELECT job_id, value FROM system.job_info AS payload WHERE info_key = 'legacy_payload' ORDER BY written DESC) -> SELECT id, status FROM ( -> SELECT distinct(id), status, crdb_internal.pb_to_json('cockroach.sql.jobs.jobspb.Payload', payload.value, false) AS pl -> FROM system.jobs AS j -> INNER JOIN latestpayload AS payload ON j.id = payload.job_id -> WHERE status IN ('running', 'pending', 'cancel-requested', 'pause-requested', 'reverting', 'paused') -> AND job_type = 'MIGRATION' -> ) WHERE ((pl->'migration')->'clusterVersion') = '{"activeVersion": {"internal": 84, "majorVal": 22, "minorVal": 2}}'::JSONB; id | status -----+--------- (0 rows) Time: 26ms total (execution 26ms / network 0ms) ``` We should do more work to understand contention within the job system, but perhaps speeding up this query will help a bit. Epic: None Release note: None --- pkg/upgrade/upgrademanager/manager.go | 58 ++++++++++++++----- .../upgrademanager/manager_external_test.go | 5 +- 2 files changed, 48 insertions(+), 15 deletions(-) diff --git a/pkg/upgrade/upgrademanager/manager.go b/pkg/upgrade/upgrademanager/manager.go index e96f2a994e39..0cb8210b0632 100644 --- a/pkg/upgrade/upgrademanager/manager.go +++ b/pkg/upgrade/upgrademanager/manager.go @@ -778,26 +778,56 @@ func (m *Manager) getOrCreateMigrationJob( return alreadyCompleted, alreadyExisting, jobID, nil } +const ( + preJobInfoTableQuery = ` +SELECT id, status +FROM ( + SELECT id, status, + crdb_internal.pb_to_json( + 'cockroach.sql.jobs.jobspb.Payload', + payload, + false -- emit_defaults + ) AS pl + FROM system.jobs + WHERE status IN ` + jobs.NonTerminalStatusTupleString + ` +) +WHERE ((pl->'migration')->'clusterVersion') = $1::JSONB` + postJobInfoTableQuery = ` +WITH latestpayload AS ( + SELECT job_id, value FROM system.job_info AS payload + WHERE info_key = 'legacy_payload' + ORDER BY written DESC +) +SELECT id, status +FROM ( + SELECT + distinct(id), + status, + crdb_internal.pb_to_json( + 'cockroach.sql.jobs.jobspb.Payload', + payload.value, + false -- emit_defaults + ) AS pl + FROM system.jobs AS j + INNER JOIN latestpayload AS payload ON j.id = payload.job_id + WHERE j.status IN ` + jobs.NonTerminalStatusTupleString + ` + AND j.job_type = 'MIGRATION' +) +WHERE ((pl->'migration')->'clusterVersion') = $1::JSONB` +) + func (m *Manager) getRunningMigrationJob( ctx context.Context, txn isql.Txn, version roachpb.Version, ) (found bool, jobID jobspb.JobID, _ error) { // Wrap the version into a ClusterVersion so that the JSON looks like what the // Payload proto has inside. cv := clusterversion.ClusterVersion{Version: version} - const query = ` -SELECT id, status - FROM ( - SELECT id, - status, - crdb_internal.pb_to_json( - 'cockroach.sql.jobs.jobspb.Payload', - payload, - false -- emit_defaults - ) AS pl - FROM crdb_internal.system_jobs - WHERE status IN ` + jobs.NonTerminalStatusTupleString + ` - ) - WHERE pl->'migration'->'clusterVersion' = $1::JSON;` + var query string + if m.settings.Version.IsActive(ctx, clusterversion.V23_1JobInfoTableIsBackfilled) { + query = postJobInfoTableQuery + } else { + query = preJobInfoTableQuery + } jsonMsg, err := protoreflect.MessageToJSON(&cv, protoreflect.FmtFlags{EmitDefaults: false}) if err != nil { return false, 0, errors.Wrap(err, "failed to marshal version to JSON") diff --git a/pkg/upgrade/upgrademanager/manager_external_test.go b/pkg/upgrade/upgrademanager/manager_external_test.go index 5cd231e9492f..5392ab8608a2 100644 --- a/pkg/upgrade/upgrademanager/manager_external_test.go +++ b/pkg/upgrade/upgrademanager/manager_external_test.go @@ -155,7 +155,10 @@ func TestAlreadyRunningJobsAreHandledProperly(t *testing.T) { created_by_type, created_by_id, claim_session_id, - claim_instance_id + claim_instance_id, + 0, + NULL, + job_type FROM crdb_internal.system_jobs WHERE id = $1 ) From 612d49f867fe7784e8e484a13398d84e340db69b Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Mon, 7 Aug 2023 13:47:46 +0100 Subject: [PATCH 2/3] upgrade: remove last full scan from get jobs query MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This removes the last full scan from the query used to find migration jobs. ``` • root │ ├── • hash join │ │ equality: (job_id) = (id) │ │ right cols are key │ │ │ ├── • render │ │ │ │ │ └── • lookup join │ │ │ table: job_info@primary │ │ │ equality: (id, lookup_join_const_col_@16) = (job_id,info_key) │ │ │ │ │ └── • render │ │ │ │ │ └── • scan buffer │ │ label: buffer 1 (running_migration_jobs) │ │ │ └── • scan buffer │ label: buffer 1 (running_migration_jobs) │ └── • subquery │ id: @S1 │ original sql: SELECT id, status FROM system.jobs WHERE (status IN ('running', 'pending', 'cancel-requested', 'pause-requested', 'reverting', 'paused')) AND (job_type = 'MIGRATION') │ exec mode: all rows │ └── • buffer │ label: buffer 1 (running_migration_jobs) │ └── • filter │ filter: status IN ('cancel-requested', 'pause-requested', 'paused', 'pending', 'reverting', 'running') │ └── • index join │ table: jobs@primary │ └── • scan missing stats table: jobs@jobs_job_type_idx spans: [/'MIGRATION' - /'MIGRATION'] ``` Note that if a migration job has more than 1 legacy_payload key, it will cause an assertion failure error since the query will produce multiple rows. We should not have multiple legacy_payload keys because we delete any existing payload keys anytime we write a new payload key. Epic: none Release note: None --- pkg/upgrade/upgrademanager/manager.go | 40 +++++++++++++-------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/pkg/upgrade/upgrademanager/manager.go b/pkg/upgrade/upgrademanager/manager.go index 0cb8210b0632..6260d8b9dadd 100644 --- a/pkg/upgrade/upgrademanager/manager.go +++ b/pkg/upgrade/upgrademanager/manager.go @@ -792,28 +792,28 @@ FROM ( WHERE status IN ` + jobs.NonTerminalStatusTupleString + ` ) WHERE ((pl->'migration')->'clusterVersion') = $1::JSONB` + // postJobInfoQuery avoids the crdb_internal.system_jobs table + // to avoid expensive full scans. postJobInfoTableQuery = ` -WITH latestpayload AS ( - SELECT job_id, value FROM system.job_info AS payload - WHERE info_key = 'legacy_payload' - ORDER BY written DESC +WITH +running_migration_jobs AS ( + SELECT id, status + FROM system.jobs + WHERE status IN ` + jobs.NonTerminalStatusTupleString + ` + AND job_type = 'MIGRATION' +), +payloads AS ( + SELECT job_id, value + FROM system.job_info AS payload + WHERE info_key = 'legacy_payload' + AND job_id IN (SELECT id FROM running_migration_jobs) + ORDER BY written DESC ) -SELECT id, status -FROM ( - SELECT - distinct(id), - status, - crdb_internal.pb_to_json( - 'cockroach.sql.jobs.jobspb.Payload', - payload.value, - false -- emit_defaults - ) AS pl - FROM system.jobs AS j - INNER JOIN latestpayload AS payload ON j.id = payload.job_id - WHERE j.status IN ` + jobs.NonTerminalStatusTupleString + ` - AND j.job_type = 'MIGRATION' -) -WHERE ((pl->'migration')->'clusterVersion') = $1::JSONB` +SELECT id, status FROM ( + SELECT id, status, crdb_internal.pb_to_json('cockroach.sql.jobs.jobspb.Payload', payloads.value, false) AS pl + FROM running_migration_jobs AS j + INNER JOIN payloads ON j.id = payloads.job_id +) WHERE ((pl->'migration')->'clusterVersion') = $1::JSONB` ) func (m *Manager) getRunningMigrationJob( From 4398dc46c120cdd4d98760a16668acd6fbdd3ddf Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 31 Jul 2023 17:54:25 +0000 Subject: [PATCH 3/3] roachtest: add changefeed workload benchmarks This patch adds a set of benchmarks measuring the workload impact of a changefeed. The workload is single-row KV read-only and write-only, recording the throughput and latencies of the workload both without and with a changefeed running, graphed via roachperf. The watermark lag is also logged, but not recorded or asserted. ``` cdc/workload/kv0/nodes=5/cpu=16/ranges=100/control [cdc] cdc/workload/kv0/nodes=5/cpu=16/ranges=100/server=processor/protocol=mux/format=json/sink=null [cdc] cdc/workload/kv0/nodes=5/cpu=16/ranges=100/server=processor/protocol=rangefeed/format=json/sink=null [cdc] cdc/workload/kv0/nodes=5/cpu=16/ranges=100000/control [cdc] cdc/workload/kv0/nodes=5/cpu=16/ranges=100000/server=processor/protocol=mux/format=json/sink=null [cdc] cdc/workload/kv0/nodes=5/cpu=16/ranges=100000/server=processor/protocol=rangefeed/format=json/sink=null [cdc] cdc/workload/kv100/nodes=5/cpu=16/ranges=100/control [cdc] cdc/workload/kv100/nodes=5/cpu=16/ranges=100/server=processor/protocol=mux/format=json/sink=null [cdc] cdc/workload/kv100/nodes=5/cpu=16/ranges=100/server=processor/protocol=rangefeed/format=json/sink=null [cdc] cdc/workload/kv100/nodes=5/cpu=16/ranges=100000/control [cdc] cdc/workload/kv100/nodes=5/cpu=16/ranges=100000/server=processor/protocol=mux/format=json/sink=null [cdc] cdc/workload/kv100/nodes=5/cpu=16/ranges=100000/server=processor/protocol=rangefeed/format=json/sink=null [cdc] ``` Epic: none Release note: None --- pkg/cmd/roachtest/tests/cdc_bench.go | 259 +++++++++++++++++++++++++++ 1 file changed, 259 insertions(+) diff --git a/pkg/cmd/roachtest/tests/cdc_bench.go b/pkg/cmd/roachtest/tests/cdc_bench.go index 394e8bee5119..377cb4dfd9c3 100644 --- a/pkg/cmd/roachtest/tests/cdc_bench.go +++ b/pkg/cmd/roachtest/tests/cdc_bench.go @@ -17,6 +17,8 @@ import ( "encoding/json" "fmt" "path/filepath" + "strconv" + "sync/atomic" "time" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" @@ -26,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/roachprod/install" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload/histogram" "github.com/cockroachdb/errors" @@ -34,6 +37,7 @@ import ( ) type cdcBenchScanType string +type cdcBenchServer string type cdcBenchProtocol string const ( @@ -56,6 +60,10 @@ const ( // practice it can. cdcBenchColdCatchupScan cdcBenchScanType = "catchup-cold" + cdcBenchNoServer cdcBenchServer = "" + cdcBenchProcessorServer cdcBenchServer = "processor" // legacy processor + cdcBenchSchedulerServer cdcBenchServer = "scheduler" // new scheduler + cdcBenchNoProtocol cdcBenchProtocol = "" cdcBenchRangefeedProtocol cdcBenchProtocol = "rangefeed" // basic rangefeed protocol cdcBenchMuxProtocol cdcBenchProtocol = "mux" // multiplexing rangefeed protocol @@ -64,6 +72,7 @@ const ( var ( cdcBenchScanTypes = []cdcBenchScanType{ cdcBenchInitialScan, cdcBenchCatchupScan, cdcBenchColdCatchupScan} + cdcBenchServers = []cdcBenchServer{cdcBenchProcessorServer} // TODO(erikgrinaker): scheduler cdcBenchProtocols = []cdcBenchProtocol{cdcBenchRangefeedProtocol, cdcBenchMuxProtocol} ) @@ -99,6 +108,53 @@ func registerCDCBench(r registry.Registry) { } } } + + // Workload impact benchmarks. + for _, readPercent := range []int{0, 100} { + for _, ranges := range []int64{100, 100000} { + readPercent, ranges := readPercent, ranges // pin loop variables + const ( + nodes = 5 // excluding coordinator and workload nodes + cpus = 16 + format = "json" + ) + + // Control run that only runs the workload, with no changefeed. + r.Add(registry.TestSpec{ + Name: fmt.Sprintf( + "cdc/workload/kv%d/nodes=%d/cpu=%d/ranges=%d/control", + readPercent, nodes, cpus, ranges), + Owner: registry.OwnerCDC, + Benchmark: true, + Cluster: r.MakeClusterSpec(nodes+2, spec.CPU(cpus)), + RequiresLicense: true, + Timeout: time.Hour, + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + runCDCBenchWorkload(ctx, t, c, ranges, readPercent, "", "", "") + }, + }) + + // Workloads with a concurrent changefeed running. + for _, server := range cdcBenchServers { + for _, protocol := range cdcBenchProtocols { + server, protocol := server, protocol // pin loop variables + r.Add(registry.TestSpec{ + Name: fmt.Sprintf( + "cdc/workload/kv%d/nodes=%d/cpu=%d/ranges=%d/server=%s/protocol=%s/format=%s/sink=null", + readPercent, nodes, cpus, ranges, server, protocol, format), + Owner: registry.OwnerCDC, + Benchmark: true, + Cluster: r.MakeClusterSpec(nodes+2, spec.CPU(cpus)), + RequiresLicense: true, + Timeout: time.Hour, + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + runCDCBenchWorkload(ctx, t, c, ranges, readPercent, server, protocol, format) + }, + }) + } + } + } + } } func formatSI(num int64) string { @@ -263,6 +319,209 @@ func runCDCBenchScan( m.Wait() } +// runCDCBenchWorkload runs a KV workload on top of a changefeed, measuring the +// workload throughput and latency. Rangefeeds are configured to backpressure +// writers, which yields reliable results for the full write+emission cost. +// The workload results (throughput and latency) can be compared to separate +// control runs that only run the workload without changefeeds and rangefeeds. +// +// It sets up a cluster with N-2 data nodes, and a separate changefeed +// coordinator node and workload runner. +func runCDCBenchWorkload( + ctx context.Context, + t test.Test, + c cluster.Cluster, + numRanges int64, + readPercent int, + server cdcBenchServer, + protocol cdcBenchProtocol, + format string, +) { + const sink = "null://" + var ( + numNodes = c.Spec().NodeCount + nData = c.Range(1, numNodes-2) + nCoord = c.Node(numNodes - 1) + nWorkload = c.Node(numNodes) + + workloadSeed = randutil.NewPseudoSeed() + concurrency = len(nData) * 64 + duration = 20 * time.Minute + insertCount = int64(0) + cdcEnabled = true + ) + if readPercent == 100 { + insertCount = 1_000_000 // ingest some data to read + } + // Either of these will disable changefeeds. Make sure they're all disabled. + if server == "" || protocol == "" || format == "" { + require.Empty(t, server) + require.Empty(t, protocol) + require.Empty(t, format) + cdcEnabled = false + } + + // Start data nodes first to place data on them. We'll start the changefeed + // coordinator later, since we don't want any data on it. + opts, settings := makeCDCBenchOptions() + settings.ClusterSettings["kv.rangefeed.enabled"] = strconv.FormatBool(cdcEnabled) + + switch protocol { + case cdcBenchMuxProtocol: + settings.ClusterSettings["changefeed.mux_rangefeed.enabled"] = "true" + case cdcBenchRangefeedProtocol: + settings.ClusterSettings["changefeed.mux_rangefeed.enabled"] = "false" + case cdcBenchNoProtocol: + default: + t.Fatalf("unknown protocol %q", protocol) + } + + c.Put(ctx, t.Cockroach(), "./cockroach") + c.Start(ctx, t.L(), opts, settings, nData) + m := c.NewMonitor(ctx, nData.Merge(nCoord)) + + conn := c.Conn(ctx, t.L(), nData[0]) + defer conn.Close() + + // Prohibit ranges on the changefeed coordinator. + t.L().Printf("configuring zones") + for _, target := range getAllZoneTargets(ctx, t, conn) { + _, err := conn.ExecContext(ctx, fmt.Sprintf( + `ALTER %s CONFIGURE ZONE USING num_replicas=3, constraints='[-node%d]'`, target, nCoord[0])) + require.NoError(t, err) + } + + // Wait for system ranges to upreplicate. + require.NoError(t, WaitFor3XReplication(ctx, t, conn)) + + // Create and split the workload table. + // + // NB: don't scatter -- the ranges end up fairly well-distributed anyway, and + // the scatter can often fail with 100k ranges. + t.L().Printf("creating table with %s ranges", humanize.Comma(numRanges)) + c.Run(ctx, nWorkload, fmt.Sprintf( + `./cockroach workload init kv --splits %d {pgurl:%d}`, numRanges, nData[0])) + require.NoError(t, WaitFor3XReplication(ctx, t, conn)) + + // For read-only workloads, ingest some data. init --insert-count does not use + // the standard key generator that the read workload uses, so we have to write + // them with a separate write workload first, see: + // https://github.com/cockroachdb/cockroach/issues/107874 + if insertCount > 0 { + const batchSize = 1000 + batches := (insertCount-1)/batchSize + 1 // ceiling division + t.L().Printf("ingesting %s rows", humanize.Comma(insertCount)) + c.Run(ctx, nWorkload, fmt.Sprintf( + `./cockroach workload run kv --seed %d --read-percent 0 --batch %d --max-ops %d {pgurl:%d}`, + workloadSeed, batchSize, batches, nData[0])) + } + + // Now that the ranges are placed, start the changefeed coordinator. + t.L().Printf("starting coordinator node") + c.Start(ctx, t.L(), opts, settings, nCoord) + + conn = c.Conn(ctx, t.L(), nCoord[0]) + defer conn.Close() + + // Start the changefeed if enabled. We disable the initial scan, since we + // don't care about the historical data. + var jobID int + var done atomic.Value // time.Time + if cdcEnabled { + t.L().Printf("starting changefeed") + require.NoError(t, conn.QueryRowContext(ctx, fmt.Sprintf( + `CREATE CHANGEFEED FOR kv.kv INTO '%s' WITH format = '%s', initial_scan = 'no'`, + sink, format)). + Scan(&jobID)) + + // Monitor the changefeed for failures. When the workload finishes, it will + // store the completion timestamp in done, and we'll wait for the + // changefeed's watermark to reach it. + // + // The watermark and lag isn't recorded by the benchmark, but we make sure + // all data is eventually emitted. It is also helpful for inspection, and we + // may want to track or assert on it later. Initially, this asserted that + // the changefeed wasn't lagging by more than 1-2 minutes, but with 100k + // ranges it was found to sometimes lag by over 8 minutes. + m.Go(func(ctx context.Context) error { + info, err := waitForChangefeed(ctx, conn, jobID, func(info changefeedInfo) (bool, error) { + switch jobs.Status(info.status) { + case jobs.StatusPending, jobs.StatusRunning: + doneValue := done.Load() + return doneValue != nil && info.highwaterTime.After(doneValue.(time.Time)), nil + default: + return false, errors.Errorf("unexpected changefeed status %s", info.status) + } + }) + if err != nil { + return err + } + t.L().Printf("changefeed watermark is %s", info.highwaterTime.Format(time.RFC3339)) + return nil + }) + + // Wait for a stable changefeed before starting the workload, by waiting for + // the watermark to reach the current time. + now := timeutil.Now() + t.L().Printf("waiting for changefeed watermark to reach current time (%s)", + now.Format(time.RFC3339)) + info, err := waitForChangefeed(ctx, conn, jobID, func(info changefeedInfo) (bool, error) { + switch jobs.Status(info.status) { + case jobs.StatusPending, jobs.StatusRunning: + return info.highwaterTime.After(now), nil + default: + return false, errors.Errorf("unexpected changefeed status %s", info.status) + } + }) + require.NoError(t, err) + t.L().Printf("changefeed watermark is %s", info.highwaterTime.Format(time.RFC3339)) + + } else { + t.L().Printf("control run, not starting changefeed") + } + + // Run the workload and record stats. Make sure to use the same seed, so we + // read any rows we wrote above. + m.Go(func(ctx context.Context) error { + // If there's more than 10,000 replicas per node they may struggle to + // maintain RPC connections or liveness, which occasionally fails client + // write requests with ambiguous errors. We tolerate errors in this case + // until we optimize rangefeeds. + // + // TODO(erikgrinaker): remove this when benchmarks are stable. + var extra string + if readPercent < 100 && (numRanges/int64(len(nData))) >= 10000 { + extra += ` --tolerate-errors` + } + t.L().Printf("running workload") + err := c.RunE(ctx, nWorkload, fmt.Sprintf( + `./cockroach workload run kv --seed %d --histograms=%s/stats.json `+ + `--concurrency %d --duration %s --write-seq R%d --read-percent %d %s {pgurl:%d-%d}`, + workloadSeed, t.PerfArtifactsDir(), concurrency, duration, insertCount, readPercent, extra, + nData[0], nData[len(nData)-1])) + if err != nil { + return err + } + t.L().Printf("workload completed") + + // When the workload completes, signal the completion time to the changefeed + // monitor via done, which will wait for it to fully catch up. + if cdcEnabled { + now := timeutil.Now() + done.Store(now) + info, err := getChangefeedInfo(conn, jobID) + if err != nil { + return err + } + t.L().Printf("waiting for changefeed watermark to reach %s (lagging by %s)", + now.Format(time.RFC3339), now.Sub(info.highwaterTime).Truncate(time.Second)) + } + return nil + }) + + m.Wait() +} + // getAllZoneTargets returns all zone targets (e.g. "RANGE default", "DATABASE // system", etc). func getAllZoneTargets(ctx context.Context, t test.Test, conn *gosql.DB) []string {