diff --git a/pkg/cmd/roachtest/tests/cdc_bench.go b/pkg/cmd/roachtest/tests/cdc_bench.go index 394e8bee5119..377cb4dfd9c3 100644 --- a/pkg/cmd/roachtest/tests/cdc_bench.go +++ b/pkg/cmd/roachtest/tests/cdc_bench.go @@ -17,6 +17,8 @@ import ( "encoding/json" "fmt" "path/filepath" + "strconv" + "sync/atomic" "time" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" @@ -26,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/roachprod/install" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload/histogram" "github.com/cockroachdb/errors" @@ -34,6 +37,7 @@ import ( ) type cdcBenchScanType string +type cdcBenchServer string type cdcBenchProtocol string const ( @@ -56,6 +60,10 @@ const ( // practice it can. cdcBenchColdCatchupScan cdcBenchScanType = "catchup-cold" + cdcBenchNoServer cdcBenchServer = "" + cdcBenchProcessorServer cdcBenchServer = "processor" // legacy processor + cdcBenchSchedulerServer cdcBenchServer = "scheduler" // new scheduler + cdcBenchNoProtocol cdcBenchProtocol = "" cdcBenchRangefeedProtocol cdcBenchProtocol = "rangefeed" // basic rangefeed protocol cdcBenchMuxProtocol cdcBenchProtocol = "mux" // multiplexing rangefeed protocol @@ -64,6 +72,7 @@ const ( var ( cdcBenchScanTypes = []cdcBenchScanType{ cdcBenchInitialScan, cdcBenchCatchupScan, cdcBenchColdCatchupScan} + cdcBenchServers = []cdcBenchServer{cdcBenchProcessorServer} // TODO(erikgrinaker): scheduler cdcBenchProtocols = []cdcBenchProtocol{cdcBenchRangefeedProtocol, cdcBenchMuxProtocol} ) @@ -99,6 +108,53 @@ func registerCDCBench(r registry.Registry) { } } } + + // Workload impact benchmarks. + for _, readPercent := range []int{0, 100} { + for _, ranges := range []int64{100, 100000} { + readPercent, ranges := readPercent, ranges // pin loop variables + const ( + nodes = 5 // excluding coordinator and workload nodes + cpus = 16 + format = "json" + ) + + // Control run that only runs the workload, with no changefeed. + r.Add(registry.TestSpec{ + Name: fmt.Sprintf( + "cdc/workload/kv%d/nodes=%d/cpu=%d/ranges=%d/control", + readPercent, nodes, cpus, ranges), + Owner: registry.OwnerCDC, + Benchmark: true, + Cluster: r.MakeClusterSpec(nodes+2, spec.CPU(cpus)), + RequiresLicense: true, + Timeout: time.Hour, + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + runCDCBenchWorkload(ctx, t, c, ranges, readPercent, "", "", "") + }, + }) + + // Workloads with a concurrent changefeed running. + for _, server := range cdcBenchServers { + for _, protocol := range cdcBenchProtocols { + server, protocol := server, protocol // pin loop variables + r.Add(registry.TestSpec{ + Name: fmt.Sprintf( + "cdc/workload/kv%d/nodes=%d/cpu=%d/ranges=%d/server=%s/protocol=%s/format=%s/sink=null", + readPercent, nodes, cpus, ranges, server, protocol, format), + Owner: registry.OwnerCDC, + Benchmark: true, + Cluster: r.MakeClusterSpec(nodes+2, spec.CPU(cpus)), + RequiresLicense: true, + Timeout: time.Hour, + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + runCDCBenchWorkload(ctx, t, c, ranges, readPercent, server, protocol, format) + }, + }) + } + } + } + } } func formatSI(num int64) string { @@ -263,6 +319,209 @@ func runCDCBenchScan( m.Wait() } +// runCDCBenchWorkload runs a KV workload on top of a changefeed, measuring the +// workload throughput and latency. Rangefeeds are configured to backpressure +// writers, which yields reliable results for the full write+emission cost. +// The workload results (throughput and latency) can be compared to separate +// control runs that only run the workload without changefeeds and rangefeeds. +// +// It sets up a cluster with N-2 data nodes, and a separate changefeed +// coordinator node and workload runner. +func runCDCBenchWorkload( + ctx context.Context, + t test.Test, + c cluster.Cluster, + numRanges int64, + readPercent int, + server cdcBenchServer, + protocol cdcBenchProtocol, + format string, +) { + const sink = "null://" + var ( + numNodes = c.Spec().NodeCount + nData = c.Range(1, numNodes-2) + nCoord = c.Node(numNodes - 1) + nWorkload = c.Node(numNodes) + + workloadSeed = randutil.NewPseudoSeed() + concurrency = len(nData) * 64 + duration = 20 * time.Minute + insertCount = int64(0) + cdcEnabled = true + ) + if readPercent == 100 { + insertCount = 1_000_000 // ingest some data to read + } + // Either of these will disable changefeeds. Make sure they're all disabled. + if server == "" || protocol == "" || format == "" { + require.Empty(t, server) + require.Empty(t, protocol) + require.Empty(t, format) + cdcEnabled = false + } + + // Start data nodes first to place data on them. We'll start the changefeed + // coordinator later, since we don't want any data on it. + opts, settings := makeCDCBenchOptions() + settings.ClusterSettings["kv.rangefeed.enabled"] = strconv.FormatBool(cdcEnabled) + + switch protocol { + case cdcBenchMuxProtocol: + settings.ClusterSettings["changefeed.mux_rangefeed.enabled"] = "true" + case cdcBenchRangefeedProtocol: + settings.ClusterSettings["changefeed.mux_rangefeed.enabled"] = "false" + case cdcBenchNoProtocol: + default: + t.Fatalf("unknown protocol %q", protocol) + } + + c.Put(ctx, t.Cockroach(), "./cockroach") + c.Start(ctx, t.L(), opts, settings, nData) + m := c.NewMonitor(ctx, nData.Merge(nCoord)) + + conn := c.Conn(ctx, t.L(), nData[0]) + defer conn.Close() + + // Prohibit ranges on the changefeed coordinator. + t.L().Printf("configuring zones") + for _, target := range getAllZoneTargets(ctx, t, conn) { + _, err := conn.ExecContext(ctx, fmt.Sprintf( + `ALTER %s CONFIGURE ZONE USING num_replicas=3, constraints='[-node%d]'`, target, nCoord[0])) + require.NoError(t, err) + } + + // Wait for system ranges to upreplicate. + require.NoError(t, WaitFor3XReplication(ctx, t, conn)) + + // Create and split the workload table. + // + // NB: don't scatter -- the ranges end up fairly well-distributed anyway, and + // the scatter can often fail with 100k ranges. + t.L().Printf("creating table with %s ranges", humanize.Comma(numRanges)) + c.Run(ctx, nWorkload, fmt.Sprintf( + `./cockroach workload init kv --splits %d {pgurl:%d}`, numRanges, nData[0])) + require.NoError(t, WaitFor3XReplication(ctx, t, conn)) + + // For read-only workloads, ingest some data. init --insert-count does not use + // the standard key generator that the read workload uses, so we have to write + // them with a separate write workload first, see: + // https://github.com/cockroachdb/cockroach/issues/107874 + if insertCount > 0 { + const batchSize = 1000 + batches := (insertCount-1)/batchSize + 1 // ceiling division + t.L().Printf("ingesting %s rows", humanize.Comma(insertCount)) + c.Run(ctx, nWorkload, fmt.Sprintf( + `./cockroach workload run kv --seed %d --read-percent 0 --batch %d --max-ops %d {pgurl:%d}`, + workloadSeed, batchSize, batches, nData[0])) + } + + // Now that the ranges are placed, start the changefeed coordinator. + t.L().Printf("starting coordinator node") + c.Start(ctx, t.L(), opts, settings, nCoord) + + conn = c.Conn(ctx, t.L(), nCoord[0]) + defer conn.Close() + + // Start the changefeed if enabled. We disable the initial scan, since we + // don't care about the historical data. + var jobID int + var done atomic.Value // time.Time + if cdcEnabled { + t.L().Printf("starting changefeed") + require.NoError(t, conn.QueryRowContext(ctx, fmt.Sprintf( + `CREATE CHANGEFEED FOR kv.kv INTO '%s' WITH format = '%s', initial_scan = 'no'`, + sink, format)). + Scan(&jobID)) + + // Monitor the changefeed for failures. When the workload finishes, it will + // store the completion timestamp in done, and we'll wait for the + // changefeed's watermark to reach it. + // + // The watermark and lag isn't recorded by the benchmark, but we make sure + // all data is eventually emitted. It is also helpful for inspection, and we + // may want to track or assert on it later. Initially, this asserted that + // the changefeed wasn't lagging by more than 1-2 minutes, but with 100k + // ranges it was found to sometimes lag by over 8 minutes. + m.Go(func(ctx context.Context) error { + info, err := waitForChangefeed(ctx, conn, jobID, func(info changefeedInfo) (bool, error) { + switch jobs.Status(info.status) { + case jobs.StatusPending, jobs.StatusRunning: + doneValue := done.Load() + return doneValue != nil && info.highwaterTime.After(doneValue.(time.Time)), nil + default: + return false, errors.Errorf("unexpected changefeed status %s", info.status) + } + }) + if err != nil { + return err + } + t.L().Printf("changefeed watermark is %s", info.highwaterTime.Format(time.RFC3339)) + return nil + }) + + // Wait for a stable changefeed before starting the workload, by waiting for + // the watermark to reach the current time. + now := timeutil.Now() + t.L().Printf("waiting for changefeed watermark to reach current time (%s)", + now.Format(time.RFC3339)) + info, err := waitForChangefeed(ctx, conn, jobID, func(info changefeedInfo) (bool, error) { + switch jobs.Status(info.status) { + case jobs.StatusPending, jobs.StatusRunning: + return info.highwaterTime.After(now), nil + default: + return false, errors.Errorf("unexpected changefeed status %s", info.status) + } + }) + require.NoError(t, err) + t.L().Printf("changefeed watermark is %s", info.highwaterTime.Format(time.RFC3339)) + + } else { + t.L().Printf("control run, not starting changefeed") + } + + // Run the workload and record stats. Make sure to use the same seed, so we + // read any rows we wrote above. + m.Go(func(ctx context.Context) error { + // If there's more than 10,000 replicas per node they may struggle to + // maintain RPC connections or liveness, which occasionally fails client + // write requests with ambiguous errors. We tolerate errors in this case + // until we optimize rangefeeds. + // + // TODO(erikgrinaker): remove this when benchmarks are stable. + var extra string + if readPercent < 100 && (numRanges/int64(len(nData))) >= 10000 { + extra += ` --tolerate-errors` + } + t.L().Printf("running workload") + err := c.RunE(ctx, nWorkload, fmt.Sprintf( + `./cockroach workload run kv --seed %d --histograms=%s/stats.json `+ + `--concurrency %d --duration %s --write-seq R%d --read-percent %d %s {pgurl:%d-%d}`, + workloadSeed, t.PerfArtifactsDir(), concurrency, duration, insertCount, readPercent, extra, + nData[0], nData[len(nData)-1])) + if err != nil { + return err + } + t.L().Printf("workload completed") + + // When the workload completes, signal the completion time to the changefeed + // monitor via done, which will wait for it to fully catch up. + if cdcEnabled { + now := timeutil.Now() + done.Store(now) + info, err := getChangefeedInfo(conn, jobID) + if err != nil { + return err + } + t.L().Printf("waiting for changefeed watermark to reach %s (lagging by %s)", + now.Format(time.RFC3339), now.Sub(info.highwaterTime).Truncate(time.Second)) + } + return nil + }) + + m.Wait() +} + // getAllZoneTargets returns all zone targets (e.g. "RANGE default", "DATABASE // system", etc). func getAllZoneTargets(ctx context.Context, t test.Test, conn *gosql.DB) []string { diff --git a/pkg/upgrade/upgrademanager/manager.go b/pkg/upgrade/upgrademanager/manager.go index e96f2a994e39..6260d8b9dadd 100644 --- a/pkg/upgrade/upgrademanager/manager.go +++ b/pkg/upgrade/upgrademanager/manager.go @@ -778,26 +778,56 @@ func (m *Manager) getOrCreateMigrationJob( return alreadyCompleted, alreadyExisting, jobID, nil } +const ( + preJobInfoTableQuery = ` +SELECT id, status +FROM ( + SELECT id, status, + crdb_internal.pb_to_json( + 'cockroach.sql.jobs.jobspb.Payload', + payload, + false -- emit_defaults + ) AS pl + FROM system.jobs + WHERE status IN ` + jobs.NonTerminalStatusTupleString + ` +) +WHERE ((pl->'migration')->'clusterVersion') = $1::JSONB` + // postJobInfoQuery avoids the crdb_internal.system_jobs table + // to avoid expensive full scans. + postJobInfoTableQuery = ` +WITH +running_migration_jobs AS ( + SELECT id, status + FROM system.jobs + WHERE status IN ` + jobs.NonTerminalStatusTupleString + ` + AND job_type = 'MIGRATION' +), +payloads AS ( + SELECT job_id, value + FROM system.job_info AS payload + WHERE info_key = 'legacy_payload' + AND job_id IN (SELECT id FROM running_migration_jobs) + ORDER BY written DESC +) +SELECT id, status FROM ( + SELECT id, status, crdb_internal.pb_to_json('cockroach.sql.jobs.jobspb.Payload', payloads.value, false) AS pl + FROM running_migration_jobs AS j + INNER JOIN payloads ON j.id = payloads.job_id +) WHERE ((pl->'migration')->'clusterVersion') = $1::JSONB` +) + func (m *Manager) getRunningMigrationJob( ctx context.Context, txn isql.Txn, version roachpb.Version, ) (found bool, jobID jobspb.JobID, _ error) { // Wrap the version into a ClusterVersion so that the JSON looks like what the // Payload proto has inside. cv := clusterversion.ClusterVersion{Version: version} - const query = ` -SELECT id, status - FROM ( - SELECT id, - status, - crdb_internal.pb_to_json( - 'cockroach.sql.jobs.jobspb.Payload', - payload, - false -- emit_defaults - ) AS pl - FROM crdb_internal.system_jobs - WHERE status IN ` + jobs.NonTerminalStatusTupleString + ` - ) - WHERE pl->'migration'->'clusterVersion' = $1::JSON;` + var query string + if m.settings.Version.IsActive(ctx, clusterversion.V23_1JobInfoTableIsBackfilled) { + query = postJobInfoTableQuery + } else { + query = preJobInfoTableQuery + } jsonMsg, err := protoreflect.MessageToJSON(&cv, protoreflect.FmtFlags{EmitDefaults: false}) if err != nil { return false, 0, errors.Wrap(err, "failed to marshal version to JSON") diff --git a/pkg/upgrade/upgrademanager/manager_external_test.go b/pkg/upgrade/upgrademanager/manager_external_test.go index 5cd231e9492f..5392ab8608a2 100644 --- a/pkg/upgrade/upgrademanager/manager_external_test.go +++ b/pkg/upgrade/upgrademanager/manager_external_test.go @@ -155,7 +155,10 @@ func TestAlreadyRunningJobsAreHandledProperly(t *testing.T) { created_by_type, created_by_id, claim_session_id, - claim_instance_id + claim_instance_id, + 0, + NULL, + job_type FROM crdb_internal.system_jobs WHERE id = $1 )