Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
107939: roachtest: add changefeed workload benchmarks r=erikgrinaker a=erikgrinaker

This patch adds a set of benchmarks measuring the workload impact of a changefeed. The workload is single-row KV read-only and write-only, recording the throughput and latencies of the workload both without and with a changefeed running, graphed via roachperf. The watermark lag is also logged, but not recorded or asserted.

```
cdc/workload/kv0/nodes=5/cpu=16/ranges=100/control [cdc]
cdc/workload/kv0/nodes=5/cpu=16/ranges=100/server=processor/protocol=mux/format=json/sink=null [cdc]
cdc/workload/kv0/nodes=5/cpu=16/ranges=100/server=processor/protocol=rangefeed/format=json/sink=null [cdc]
cdc/workload/kv0/nodes=5/cpu=16/ranges=100000/control [cdc]
cdc/workload/kv0/nodes=5/cpu=16/ranges=100000/server=processor/protocol=mux/format=json/sink=null [cdc]
cdc/workload/kv0/nodes=5/cpu=16/ranges=100000/server=processor/protocol=rangefeed/format=json/sink=null [cdc]
cdc/workload/kv100/nodes=5/cpu=16/ranges=100/control [cdc]
cdc/workload/kv100/nodes=5/cpu=16/ranges=100/server=processor/protocol=mux/format=json/sink=null [cdc]
cdc/workload/kv100/nodes=5/cpu=16/ranges=100/server=processor/protocol=rangefeed/format=json/sink=null [cdc]
cdc/workload/kv100/nodes=5/cpu=16/ranges=100000/control [cdc]
cdc/workload/kv100/nodes=5/cpu=16/ranges=100000/server=processor/protocol=mux/format=json/sink=null [cdc]
cdc/workload/kv100/nodes=5/cpu=16/ranges=100000/server=processor/protocol=rangefeed/format=json/sink=null [cdc]
```

Resolves cockroachdb#107441.
Release note: None

108080: upgrades: avoid crdb_internal.system_jobs in upgrade manager r=adityamaru a=stevendanna

The crdb_internal.system_jobs is a virtual table that joins information from the jobs table and the jobs_info table. When given a job status predicate it does this by running a query such as:

    WITH latestpayload AS (
      SELECT job_id, value
      FROM system.job_info AS payload
      WHERE info_key = 'legacy_payload'
      ORDER BY written DESC
    ),
    latestprogress AS (
      SELECT job_id, value
      FROM system.job_info AS progress
      WHERE info_key = 'legacy_progress'
      ORDER BY written DESC
    )
    SELECT
      distinct(id),  status, created,
      payload.value AS payload,
      progress.value AS progress,
      created_by_type, created_by_id,
      claim_session_id, claim_instance_id,
      num_runs, last_run,job_type
    FROM system.jobs AS j
    INNER JOIN latestpayload AS payload ON j.id = payload.job_id
    LEFT JOIN latestprogress AS progress ON j.id = progress.job_id
    WHERE j.status = 'cancel-requested';

This uses 2 full scans of the job_info table:

```
  • distinct
  │ distinct on: id, value, value
  │
  └── • merge join
      │ equality: (job_id) = (id)
      │
      ├── • render
      │   │
      │   └── • filter
      │       │ estimated row count: 2,787
      │       │ filter: info_key = 'legacy_payload'
      │       │
      │       └── • scan
      │             estimated row count: 5,597 (100% of the table; stats collected 27 minutes ago; using stats forecast for 17 minutes ago)
      │             table: job_info@primary
      │             spans: FULL SCAN
      │
      └── • merge join (right outer)
          │ equality: (job_id) = (id)
          │ right cols are key
          │
          ├── • render
          │   │
          │   └── • filter
          │       │ estimated row count: 2,787
          │       │ filter: info_key = 'legacy_progress'
          │       │
          │       └── • scan
          │             estimated row count: 5,597 (100% of the table; stats collected 27 minutes ago; using stats forecast for 17 minutes ago)
          │             table: job_info@primary
          │             spans: FULL SCAN
          │
          └── • index join
              │ table: jobs@primary
              │
              └── • sort
                  │ order: +id
                  │
                  └── • scan
                        missing stats
                        table: jobs@jobs_status_created_idx
                        spans: [/'cancel-requested' - /'cancel-requested']
```

Previously, the upgrade manager was using this virtual table as part of a larger query:

    SELECT id, status
    FROM (
      SELECT id, status,
        crdb_internal.pb_to_json(
          'cockroach.sql.jobs.jobspb.Payload', payload, false
        ) AS pl
      FROM crdb_internal.system_jobs
      WHERE status IN ('running', 'pending', 'cancel-requested', 'pause-requested', 'reverting', 'paused')
    )
    WHERE pl->'migration'->'clusterVersion' = $1::JSONB;

I believe the use of the IN operator causes the virtual index's populate function to be called for each value. Perhaps the optimizer accounts for this in some way to avoid this resulting in 2 * 6 full scans of the job table, but it is hard to confirm with the explain output.

In at least one recent escalation, we observed this query taking a substantial amount of time as it continually conflicted with other job system queries.

Here, we avoid using the virtual table. This allows us to avoid the full scasn of the info table since we don't need the progress (only the payload). It also allows us to use the full `IN` predicate directly, avoiding any uncertainty.

    ```
      • root
      │
      ├── • hash join
      │   │ equality: (job_id) = (id)
      │   │ right cols are key
      │   │
      │   ├── • render
      │   │   │
      │   │   └── • lookup join
      │   │       │ table: job_info@primary
      │   │       │ equality: (id, lookup_join_const_col_@16) = (job_id,info_key)
      │   │       │
      │   │       └── • render
      │   │           │
      │   │           └── • scan buffer
      │   │                 label: buffer 1 (running_migration_jobs)
      │   │
      │   └── • scan buffer
      │         label: buffer 1 (running_migration_jobs)
      │
      └── • subquery
          │ id: `@S1`
          │ original sql: SELECT id, status FROM system.jobs WHERE (status IN ('running', 'pending', 'cancel-requested', 'pause-requested', 'reverting', 'paused')) AND (job_type = 'MIGRATION')
          │ exec mode: all rows
          │
          └── • buffer
              │ label: buffer 1 (running_migration_jobs)
              │
              └── • filter
                  │ filter: status IN ('cancel-requested', 'pause-requested', 'paused', 'pending', 'reverting', 'running')
                  │
                  └── • index join
                      │ table: jobs@primary
                      │
                      └── • scan
                            missing stats
                            table: jobs@jobs_job_type_idx
                            spans: [/'MIGRATION' - /'MIGRATION']
    ```

In a local example, this is substantially faster

```
root@localhost:26257/defaultdb> SELECT id, status
                             ->     FROM (
                             ->         SELECT id,
                             ->         status,
                             ->         crdb_internal.pb_to_json(
                             ->             'cockroach.sql.jobs.jobspb.Payload',
                             ->             payload,
                             ->       false -- emit_defaults
                             ->         ) AS pl
                             ->     FROM crdb_internal.system_jobs
                             ->   WHERE status IN ('running', 'pending', 'cancel-requested', 'pause-requested', 'reverting', 'paused')
                             ->     )
                             ->     WHERE pl->'migration'->'clusterVersion' = '{"activeVersion": {"internal": 84, "majorVal": 22, "minorVal": 2}}'::JSONB;
  id | status
-----+---------
(0 rows)

Time: 384ms total (execution 384ms / network 0ms)


root@localhost:26257/defaultdb> WITH                                                                                                                                                                                                                        
                             -> running_migration_jobs AS (                                                                                                                                                                                                 
                             ->     SELECT id, status                                                                                                                                                                                                       
                             ->     FROM system.jobs                                                                                                                                                                                                        
                             ->     WHERE status IN ('running', 'pending', 'cancel-requested', 'pause-requested', 'reverting', 'paused')                                                                                                                    
                             ->     AND job_type = 'MIGRATION'                                                                                                                                                                                              
                             -> ),                                                                                                                                                                                                                          
                             -> payloads AS (                                                                                                                                                                                                               
                             ->     SELECT job_id, value                                                                                                                                                                                                    
                             ->     FROM system.job_info AS payload                                                                                                                                                                                         
                             ->     WHERE info_key = 'legacy_payload'                                                                                                                                                                                       
                             ->     AND job_id IN (SELECT id FROM running_migration_jobs)                                                                                                                                                                   
                             ->     ORDER BY written DESC                                                                                                                                                                                                   
                             -> )                                                                                                                                                                                                                           
                             -> SELECT id, status FROM (                                                                                                                                                                                                    
                             ->     SELECT id, status, crdb_internal.pb_to_json('cockroach.sql.jobs.jobspb.Payload', payloads.value, false) AS pl                                                                                                           
                             ->     FROM running_migration_jobs AS j                                                                                                                                                                                        
                             ->     INNER JOIN payloads ON j.id = payloads.job_id                                                                                                                                                                           
                             -> );                                                                                                                                                                                                                          
  id | status
-----+---------
(0 rows)

Time: 3ms total (execution 2ms / network 0ms)
```

Note that the new query will return 2 rows if we happen to have 2 legacy_payload keys for a given job.  This will result in an assertion failure. But I think this is reasonable since we take care to only ever have 1 legacy payload row.

We should do more work to understand contention within the job system, but perhaps speeding up this query will help a bit.

Epic: None
Release note: None

Co-authored-by: Erik Grinaker <[email protected]>
Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
3 people committed Aug 8, 2023
3 parents b87852a + 4398dc4 + 612d49f commit 29c1a7c
Show file tree
Hide file tree
Showing 3 changed files with 307 additions and 15 deletions.
259 changes: 259 additions & 0 deletions pkg/cmd/roachtest/tests/cdc_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"encoding/json"
"fmt"
"path/filepath"
"strconv"
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
Expand All @@ -26,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload/histogram"
"github.com/cockroachdb/errors"
Expand All @@ -34,6 +37,7 @@ import (
)

type cdcBenchScanType string
type cdcBenchServer string
type cdcBenchProtocol string

const (
Expand All @@ -56,6 +60,10 @@ const (
// practice it can.
cdcBenchColdCatchupScan cdcBenchScanType = "catchup-cold"

cdcBenchNoServer cdcBenchServer = ""
cdcBenchProcessorServer cdcBenchServer = "processor" // legacy processor
cdcBenchSchedulerServer cdcBenchServer = "scheduler" // new scheduler

cdcBenchNoProtocol cdcBenchProtocol = ""
cdcBenchRangefeedProtocol cdcBenchProtocol = "rangefeed" // basic rangefeed protocol
cdcBenchMuxProtocol cdcBenchProtocol = "mux" // multiplexing rangefeed protocol
Expand All @@ -64,6 +72,7 @@ const (
var (
cdcBenchScanTypes = []cdcBenchScanType{
cdcBenchInitialScan, cdcBenchCatchupScan, cdcBenchColdCatchupScan}
cdcBenchServers = []cdcBenchServer{cdcBenchProcessorServer} // TODO(erikgrinaker): scheduler
cdcBenchProtocols = []cdcBenchProtocol{cdcBenchRangefeedProtocol, cdcBenchMuxProtocol}
)

Expand Down Expand Up @@ -99,6 +108,53 @@ func registerCDCBench(r registry.Registry) {
}
}
}

// Workload impact benchmarks.
for _, readPercent := range []int{0, 100} {
for _, ranges := range []int64{100, 100000} {
readPercent, ranges := readPercent, ranges // pin loop variables
const (
nodes = 5 // excluding coordinator and workload nodes
cpus = 16
format = "json"
)

// Control run that only runs the workload, with no changefeed.
r.Add(registry.TestSpec{
Name: fmt.Sprintf(
"cdc/workload/kv%d/nodes=%d/cpu=%d/ranges=%d/control",
readPercent, nodes, cpus, ranges),
Owner: registry.OwnerCDC,
Benchmark: true,
Cluster: r.MakeClusterSpec(nodes+2, spec.CPU(cpus)),
RequiresLicense: true,
Timeout: time.Hour,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runCDCBenchWorkload(ctx, t, c, ranges, readPercent, "", "", "")
},
})

// Workloads with a concurrent changefeed running.
for _, server := range cdcBenchServers {
for _, protocol := range cdcBenchProtocols {
server, protocol := server, protocol // pin loop variables
r.Add(registry.TestSpec{
Name: fmt.Sprintf(
"cdc/workload/kv%d/nodes=%d/cpu=%d/ranges=%d/server=%s/protocol=%s/format=%s/sink=null",
readPercent, nodes, cpus, ranges, server, protocol, format),
Owner: registry.OwnerCDC,
Benchmark: true,
Cluster: r.MakeClusterSpec(nodes+2, spec.CPU(cpus)),
RequiresLicense: true,
Timeout: time.Hour,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runCDCBenchWorkload(ctx, t, c, ranges, readPercent, server, protocol, format)
},
})
}
}
}
}
}

func formatSI(num int64) string {
Expand Down Expand Up @@ -263,6 +319,209 @@ func runCDCBenchScan(
m.Wait()
}

// runCDCBenchWorkload runs a KV workload on top of a changefeed, measuring the
// workload throughput and latency. Rangefeeds are configured to backpressure
// writers, which yields reliable results for the full write+emission cost.
// The workload results (throughput and latency) can be compared to separate
// control runs that only run the workload without changefeeds and rangefeeds.
//
// It sets up a cluster with N-2 data nodes, and a separate changefeed
// coordinator node and workload runner.
func runCDCBenchWorkload(
ctx context.Context,
t test.Test,
c cluster.Cluster,
numRanges int64,
readPercent int,
server cdcBenchServer,
protocol cdcBenchProtocol,
format string,
) {
const sink = "null://"
var (
numNodes = c.Spec().NodeCount
nData = c.Range(1, numNodes-2)
nCoord = c.Node(numNodes - 1)
nWorkload = c.Node(numNodes)

workloadSeed = randutil.NewPseudoSeed()
concurrency = len(nData) * 64
duration = 20 * time.Minute
insertCount = int64(0)
cdcEnabled = true
)
if readPercent == 100 {
insertCount = 1_000_000 // ingest some data to read
}
// Either of these will disable changefeeds. Make sure they're all disabled.
if server == "" || protocol == "" || format == "" {
require.Empty(t, server)
require.Empty(t, protocol)
require.Empty(t, format)
cdcEnabled = false
}

// Start data nodes first to place data on them. We'll start the changefeed
// coordinator later, since we don't want any data on it.
opts, settings := makeCDCBenchOptions()
settings.ClusterSettings["kv.rangefeed.enabled"] = strconv.FormatBool(cdcEnabled)

switch protocol {
case cdcBenchMuxProtocol:
settings.ClusterSettings["changefeed.mux_rangefeed.enabled"] = "true"
case cdcBenchRangefeedProtocol:
settings.ClusterSettings["changefeed.mux_rangefeed.enabled"] = "false"
case cdcBenchNoProtocol:
default:
t.Fatalf("unknown protocol %q", protocol)
}

c.Put(ctx, t.Cockroach(), "./cockroach")
c.Start(ctx, t.L(), opts, settings, nData)
m := c.NewMonitor(ctx, nData.Merge(nCoord))

conn := c.Conn(ctx, t.L(), nData[0])
defer conn.Close()

// Prohibit ranges on the changefeed coordinator.
t.L().Printf("configuring zones")
for _, target := range getAllZoneTargets(ctx, t, conn) {
_, err := conn.ExecContext(ctx, fmt.Sprintf(
`ALTER %s CONFIGURE ZONE USING num_replicas=3, constraints='[-node%d]'`, target, nCoord[0]))
require.NoError(t, err)
}

// Wait for system ranges to upreplicate.
require.NoError(t, WaitFor3XReplication(ctx, t, conn))

// Create and split the workload table.
//
// NB: don't scatter -- the ranges end up fairly well-distributed anyway, and
// the scatter can often fail with 100k ranges.
t.L().Printf("creating table with %s ranges", humanize.Comma(numRanges))
c.Run(ctx, nWorkload, fmt.Sprintf(
`./cockroach workload init kv --splits %d {pgurl:%d}`, numRanges, nData[0]))
require.NoError(t, WaitFor3XReplication(ctx, t, conn))

// For read-only workloads, ingest some data. init --insert-count does not use
// the standard key generator that the read workload uses, so we have to write
// them with a separate write workload first, see:
// https://github.com/cockroachdb/cockroach/issues/107874
if insertCount > 0 {
const batchSize = 1000
batches := (insertCount-1)/batchSize + 1 // ceiling division
t.L().Printf("ingesting %s rows", humanize.Comma(insertCount))
c.Run(ctx, nWorkload, fmt.Sprintf(
`./cockroach workload run kv --seed %d --read-percent 0 --batch %d --max-ops %d {pgurl:%d}`,
workloadSeed, batchSize, batches, nData[0]))
}

// Now that the ranges are placed, start the changefeed coordinator.
t.L().Printf("starting coordinator node")
c.Start(ctx, t.L(), opts, settings, nCoord)

conn = c.Conn(ctx, t.L(), nCoord[0])
defer conn.Close()

// Start the changefeed if enabled. We disable the initial scan, since we
// don't care about the historical data.
var jobID int
var done atomic.Value // time.Time
if cdcEnabled {
t.L().Printf("starting changefeed")
require.NoError(t, conn.QueryRowContext(ctx, fmt.Sprintf(
`CREATE CHANGEFEED FOR kv.kv INTO '%s' WITH format = '%s', initial_scan = 'no'`,
sink, format)).
Scan(&jobID))

// Monitor the changefeed for failures. When the workload finishes, it will
// store the completion timestamp in done, and we'll wait for the
// changefeed's watermark to reach it.
//
// The watermark and lag isn't recorded by the benchmark, but we make sure
// all data is eventually emitted. It is also helpful for inspection, and we
// may want to track or assert on it later. Initially, this asserted that
// the changefeed wasn't lagging by more than 1-2 minutes, but with 100k
// ranges it was found to sometimes lag by over 8 minutes.
m.Go(func(ctx context.Context) error {
info, err := waitForChangefeed(ctx, conn, jobID, func(info changefeedInfo) (bool, error) {
switch jobs.Status(info.status) {
case jobs.StatusPending, jobs.StatusRunning:
doneValue := done.Load()
return doneValue != nil && info.highwaterTime.After(doneValue.(time.Time)), nil
default:
return false, errors.Errorf("unexpected changefeed status %s", info.status)
}
})
if err != nil {
return err
}
t.L().Printf("changefeed watermark is %s", info.highwaterTime.Format(time.RFC3339))
return nil
})

// Wait for a stable changefeed before starting the workload, by waiting for
// the watermark to reach the current time.
now := timeutil.Now()
t.L().Printf("waiting for changefeed watermark to reach current time (%s)",
now.Format(time.RFC3339))
info, err := waitForChangefeed(ctx, conn, jobID, func(info changefeedInfo) (bool, error) {
switch jobs.Status(info.status) {
case jobs.StatusPending, jobs.StatusRunning:
return info.highwaterTime.After(now), nil
default:
return false, errors.Errorf("unexpected changefeed status %s", info.status)
}
})
require.NoError(t, err)
t.L().Printf("changefeed watermark is %s", info.highwaterTime.Format(time.RFC3339))

} else {
t.L().Printf("control run, not starting changefeed")
}

// Run the workload and record stats. Make sure to use the same seed, so we
// read any rows we wrote above.
m.Go(func(ctx context.Context) error {
// If there's more than 10,000 replicas per node they may struggle to
// maintain RPC connections or liveness, which occasionally fails client
// write requests with ambiguous errors. We tolerate errors in this case
// until we optimize rangefeeds.
//
// TODO(erikgrinaker): remove this when benchmarks are stable.
var extra string
if readPercent < 100 && (numRanges/int64(len(nData))) >= 10000 {
extra += ` --tolerate-errors`
}
t.L().Printf("running workload")
err := c.RunE(ctx, nWorkload, fmt.Sprintf(
`./cockroach workload run kv --seed %d --histograms=%s/stats.json `+
`--concurrency %d --duration %s --write-seq R%d --read-percent %d %s {pgurl:%d-%d}`,
workloadSeed, t.PerfArtifactsDir(), concurrency, duration, insertCount, readPercent, extra,
nData[0], nData[len(nData)-1]))
if err != nil {
return err
}
t.L().Printf("workload completed")

// When the workload completes, signal the completion time to the changefeed
// monitor via done, which will wait for it to fully catch up.
if cdcEnabled {
now := timeutil.Now()
done.Store(now)
info, err := getChangefeedInfo(conn, jobID)
if err != nil {
return err
}
t.L().Printf("waiting for changefeed watermark to reach %s (lagging by %s)",
now.Format(time.RFC3339), now.Sub(info.highwaterTime).Truncate(time.Second))
}
return nil
})

m.Wait()
}

// getAllZoneTargets returns all zone targets (e.g. "RANGE default", "DATABASE
// system", etc).
func getAllZoneTargets(ctx context.Context, t test.Test, conn *gosql.DB) []string {
Expand Down
58 changes: 44 additions & 14 deletions pkg/upgrade/upgrademanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,26 +778,56 @@ func (m *Manager) getOrCreateMigrationJob(
return alreadyCompleted, alreadyExisting, jobID, nil
}

const (
preJobInfoTableQuery = `
SELECT id, status
FROM (
SELECT id, status,
crdb_internal.pb_to_json(
'cockroach.sql.jobs.jobspb.Payload',
payload,
false -- emit_defaults
) AS pl
FROM system.jobs
WHERE status IN ` + jobs.NonTerminalStatusTupleString + `
)
WHERE ((pl->'migration')->'clusterVersion') = $1::JSONB`
// postJobInfoQuery avoids the crdb_internal.system_jobs table
// to avoid expensive full scans.
postJobInfoTableQuery = `
WITH
running_migration_jobs AS (
SELECT id, status
FROM system.jobs
WHERE status IN ` + jobs.NonTerminalStatusTupleString + `
AND job_type = 'MIGRATION'
),
payloads AS (
SELECT job_id, value
FROM system.job_info AS payload
WHERE info_key = 'legacy_payload'
AND job_id IN (SELECT id FROM running_migration_jobs)
ORDER BY written DESC
)
SELECT id, status FROM (
SELECT id, status, crdb_internal.pb_to_json('cockroach.sql.jobs.jobspb.Payload', payloads.value, false) AS pl
FROM running_migration_jobs AS j
INNER JOIN payloads ON j.id = payloads.job_id
) WHERE ((pl->'migration')->'clusterVersion') = $1::JSONB`
)

func (m *Manager) getRunningMigrationJob(
ctx context.Context, txn isql.Txn, version roachpb.Version,
) (found bool, jobID jobspb.JobID, _ error) {
// Wrap the version into a ClusterVersion so that the JSON looks like what the
// Payload proto has inside.
cv := clusterversion.ClusterVersion{Version: version}
const query = `
SELECT id, status
FROM (
SELECT id,
status,
crdb_internal.pb_to_json(
'cockroach.sql.jobs.jobspb.Payload',
payload,
false -- emit_defaults
) AS pl
FROM crdb_internal.system_jobs
WHERE status IN ` + jobs.NonTerminalStatusTupleString + `
)
WHERE pl->'migration'->'clusterVersion' = $1::JSON;`
var query string
if m.settings.Version.IsActive(ctx, clusterversion.V23_1JobInfoTableIsBackfilled) {
query = postJobInfoTableQuery
} else {
query = preJobInfoTableQuery
}
jsonMsg, err := protoreflect.MessageToJSON(&cv, protoreflect.FmtFlags{EmitDefaults: false})
if err != nil {
return false, 0, errors.Wrap(err, "failed to marshal version to JSON")
Expand Down
5 changes: 4 additions & 1 deletion pkg/upgrade/upgrademanager/manager_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,10 @@ func TestAlreadyRunningJobsAreHandledProperly(t *testing.T) {
created_by_type,
created_by_id,
claim_session_id,
claim_instance_id
claim_instance_id,
0,
NULL,
job_type
FROM crdb_internal.system_jobs
WHERE id = $1
)
Expand Down

0 comments on commit 29c1a7c

Please sign in to comment.