Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
110945: sql: plumb locking durability into Get/Scan/ReverseScan requests r=arulajmani,nvanbenschoten,yuzefovich a=michae2

**execinfrapb: remove IndexSkipTableReaderSpec**

The `indexSkipTableReader` was deleted in cockroachdb#51178 but the distsql spec has stuck around for a few versions. I think we can delete the spec now, too. If someone wants to dust off cockroachdb#39668 they can bring it back.

Release note: None

**sql: plumb locking durability into Get/Scan/ReverseScan requests**

Building on top of cockroachdb#110201, plumb locking durability from the optimizer to creation of Get/Scan/ReverseScan requests in txnKVFetcher and txnKVStreamer.

Also add a version gate for usage of guaranteed-durable locking.

Fixes: cockroachdb#100194

Release note: None

Co-authored-by: Michael Erickson <[email protected]>
  • Loading branch information
craig[bot] and michae2 committed Oct 2, 2023
2 parents fed4d7a + 3d593e1 commit f6f355b
Show file tree
Hide file tree
Showing 41 changed files with 841 additions and 318 deletions.
29 changes: 17 additions & 12 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,12 @@ type Streamer struct {
distSender *kvcoord.DistSender
stopper *stop.Stopper

mode OperationMode
hints Hints
maxKeysPerRow int32
budget *budget
keyLocking lock.Strength
mode OperationMode
hints Hints
maxKeysPerRow int32
budget *budget
lockStrength lock.Strength
lockDurability lock.Durability

streamerStatistics

Expand Down Expand Up @@ -370,16 +371,18 @@ func NewStreamer(
acc *mon.BoundAccount,
kvPairsRead *int64,
batchRequestsIssued *int64,
keyLocking lock.Strength,
lockStrength lock.Strength,
lockDurability lock.Durability,
) *Streamer {
if txn.Type() != kv.LeafTxn {
panic(errors.AssertionFailedf("RootTxn is given to the Streamer"))
}
s := &Streamer{
distSender: distSender,
stopper: stopper,
budget: newBudget(acc, limitBytes),
keyLocking: keyLocking,
distSender: distSender,
stopper: stopper,
budget: newBudget(acc, limitBytes),
lockStrength: lockStrength,
lockDurability: lockDurability,
}

if kvPairsRead == nil {
Expand Down Expand Up @@ -1744,7 +1747,8 @@ func buildResumeSingleRangeBatch(
newGet := gets[0]
gets = gets[1:]
newGet.req.SetSpan(*get.ResumeSpan)
newGet.req.KeyLockingStrength = s.keyLocking
newGet.req.KeyLockingStrength = s.lockStrength
newGet.req.KeyLockingDurability = s.lockDurability
newGet.union.Get = &newGet.req
resumeReq.reqs[resumeReqIdx].Value = &newGet.union
resumeReq.positions = append(resumeReq.positions, position)
Expand Down Expand Up @@ -1772,7 +1776,8 @@ func buildResumeSingleRangeBatch(
scans = scans[1:]
newScan.req.SetSpan(*scan.ResumeSpan)
newScan.req.ScanFormat = kvpb.BATCH_RESPONSE
newScan.req.KeyLockingStrength = s.keyLocking
newScan.req.KeyLockingStrength = s.lockStrength
newScan.req.KeyLockingDurability = s.lockDurability
newScan.union.Scan = &newScan.req
resumeReq.reqs[resumeReqIdx].Value = &newScan.union
resumeReq.positions = append(resumeReq.positions, position)
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvstreamer/streamer_accounting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func TestStreamerMemoryAccounting(t *testing.T) {
nil, /* kvPairsRead */
nil, /* batchRequestsIssued */
lock.None,
lock.Unreplicated,
)
s.Init(OutOfOrder, Hints{UniqueRequests: true, SingleRowLookup: singleRowLookup}, 1 /* maxKeysPerRow */, nil /* diskBuffer */)
return s
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvclient/kvstreamer/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func getStreamer(
nil, /* kvPairsRead */
nil, /* batchRequestsIssued */
lock.None,
lock.Unreplicated,
)
}

Expand Down Expand Up @@ -115,6 +116,7 @@ func TestStreamerLimitations(t *testing.T) {
nil, /* kvPairsRead */
nil, /* batchRequestsIssued */
lock.None,
lock.Unreplicated,
)
})
})
Expand Down
25 changes: 25 additions & 0 deletions pkg/sql/catalog/descpb/locking.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,28 @@ func ToScanLockingWaitPolicy(wp tree.LockingWaitPolicy) ScanLockingWaitPolicy {
panic(errors.AssertionFailedf("unknown locking wait policy %s", wp))
}
}

// PrettyString returns the locking durability as a user-readable string.
func (s ScanLockingDurability) PrettyString() string {
switch s {
case ScanLockingDurability_BEST_EFFORT:
return "best effort"
case ScanLockingDurability_GUARANTEED:
return "guaranteed"
default:
panic(errors.AssertionFailedf("unexpected durability %s", s))
}
}

// ToScanLockingDurability converts a tree.LockingDurability to its
// corresponding ScanLockingDurability.
func ToScanLockingDurability(s tree.LockingDurability) ScanLockingDurability {
switch s {
case tree.LockDurabilityBestEffort:
return ScanLockingDurability_BEST_EFFORT
case tree.LockDurabilityGuaranteed:
return ScanLockingDurability_GUARANTEED
default:
panic(errors.AssertionFailedf("unknown locking durability %d", s))
}
}
6 changes: 6 additions & 0 deletions pkg/sql/catalog/descpb/locking.proto
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,9 @@ enum ScanLockingWaitPolicy {
// ERROR represents NOWAIT - raise an error if a row cannot be locked.
ERROR = 2;
}

// LockingDurability controls the durability of locks.
enum ScanLockingDurability {
BEST_EFFORT = 0;
GUARANTEED = 1;
}
1 change: 1 addition & 0 deletions pkg/sql/colfetcher/colbatch_direct_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ func NewColBatchDirectScan(
spec.Reverse,
spec.LockingStrength,
spec.LockingWaitPolicy,
spec.LockingDurability,
flowCtx.EvalCtx.SessionData().LockTimeout,
kvFetcherMemAcc,
flowCtx.EvalCtx.TestingKnobs.ForceProductionValues,
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colfetcher/colbatch_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ func NewColBatchScan(
spec.Reverse,
spec.LockingStrength,
spec.LockingWaitPolicy,
spec.LockingDurability,
flowCtx.EvalCtx.SessionData().LockTimeout,
kvFetcherMemAcc,
flowCtx.EvalCtx.TestingKnobs.ForceProductionValues,
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colfetcher/index_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,7 @@ func NewColIndexJoin(
flowCtx.EvalCtx.Settings,
spec.LockingWaitPolicy,
spec.LockingStrength,
spec.LockingDurability,
streamerBudgetLimit,
streamerBudgetAcc,
spec.MaintainOrdering,
Expand All @@ -580,6 +581,7 @@ func NewColIndexJoin(
false, /* reverse */
spec.LockingStrength,
spec.LockingWaitPolicy,
spec.LockingDurability,
flowCtx.EvalCtx.SessionData().LockTimeout,
kvFetcherMemAcc,
flowCtx.EvalCtx.TestingKnobs.ForceProductionValues,
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1665,6 +1665,7 @@ func initTableReaderSpecTemplate(
TableDescriptorModificationTime: n.desc.GetModificationTime(),
LockingStrength: n.lockingStrength,
LockingWaitPolicy: n.lockingWaitPolicy,
LockingDurability: n.lockingDurability,
}
if err := rowenc.InitIndexFetchSpec(&s.FetchSpec, codec, n.desc, n.index, colIDs); err != nil {
return nil, execinfrapb.PostProcessSpec{}, err
Expand Down Expand Up @@ -2740,6 +2741,7 @@ func (dsp *DistSQLPlanner) createPlanForIndexJoin(
Type: descpb.InnerJoin,
LockingStrength: n.table.lockingStrength,
LockingWaitPolicy: n.table.lockingWaitPolicy,
LockingDurability: n.table.lockingDurability,
MaintainOrdering: len(n.reqOrdering) > 0,
LimitHint: n.limitHint,
}
Expand Down Expand Up @@ -2817,6 +2819,7 @@ func (dsp *DistSQLPlanner) createPlanForLookupJoin(
Type: n.joinType,
LockingStrength: n.table.lockingStrength,
LockingWaitPolicy: n.table.lockingWaitPolicy,
LockingDurability: n.table.lockingDurability,
// TODO(sumeer): specifying ordering here using isFirstJoinInPairedJoiner
// is late in the sense that the cost of this has not been taken into
// account. Make this decision earlier in CustomFuncs.GenerateLookupJoins.
Expand Down Expand Up @@ -2949,6 +2952,7 @@ func (dsp *DistSQLPlanner) createPlanForInvertedJoin(
OutputGroupContinuationForLeftRow: n.isFirstJoinInPairedJoiner,
LockingStrength: n.table.lockingStrength,
LockingWaitPolicy: n.table.lockingWaitPolicy,
LockingDurability: n.table.lockingDurability,
}

fetchColIDs := make([]descpb.ColumnID, len(n.table.cols))
Expand Down Expand Up @@ -3045,6 +3049,7 @@ func (dsp *DistSQLPlanner) createPlanForZigzagJoin(
fixedValues: valuesSpec,
lockingStrength: side.scan.lockingStrength,
lockingWaitPolicy: side.scan.lockingWaitPolicy,
lockingDurability: side.scan.lockingDurability,
}
}

Expand All @@ -3064,6 +3069,7 @@ type zigzagPlanningSide struct {
fixedValues *execinfrapb.ValuesCoreSpec
lockingStrength descpb.ScanLockingStrength
lockingWaitPolicy descpb.ScanLockingWaitPolicy
lockingDurability descpb.ScanLockingDurability
}

type zigzagPlanningInfo struct {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ func (e *distSQLSpecExecFactory) ConstructScan(
}
trSpec.LockingStrength = descpb.ToScanLockingStrength(params.Locking.Strength)
trSpec.LockingWaitPolicy = descpb.ToScanLockingWaitPolicy(params.Locking.WaitPolicy)
trSpec.LockingDurability = descpb.ToScanLockingDurability(params.Locking.Durability)
if trSpec.LockingStrength != descpb.ScanLockingStrength_FOR_NONE {
// Scans that are performing row-level locking cannot currently be
// distributed because their locks would not be propagated back to
Expand Down Expand Up @@ -736,6 +737,7 @@ func (e *distSQLSpecExecFactory) constructZigzagJoinSide(
fixedValues: valuesSpec,
lockingStrength: descpb.ToScanLockingStrength(locking.Strength),
lockingWaitPolicy: descpb.ToScanLockingWaitPolicy(locking.WaitPolicy),
lockingDurability: descpb.ToScanLockingDurability(locking.Durability),
}, nil
}

Expand Down
42 changes: 12 additions & 30 deletions pkg/sql/execinfrapb/processors_sql.proto
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ message TableReaderSpec {
// to BLOCK when locking_strength is FOR_NONE.
optional sqlbase.ScanLockingWaitPolicy locking_wait_policy = 11 [(gogoproto.nullable) = false];

// Indicates the row-level locking durability to be used by the scan.
optional sqlbase.ScanLockingDurability locking_durability = 23 [(gogoproto.nullable) = false];

// Indicates that misplanned ranges metadata should not be sent back to the
// DistSQLReceiver. This will be set to true for the scan with a hard limit
// (in which case we create a single processor that is placed at the
Expand All @@ -138,36 +141,6 @@ message FiltererSpec {
optional Expression filter = 1 [(gogoproto.nullable) = false];
}

// IndexSkipTableReaderSpec is the specification for a table reader that
// is performing a loose index scan over rows in the table. This means that
// this reader will return distinct rows from the table while using the index
// to skip unnecessary rows. This reader is used for different optimizations
// when operating on a prefix of a compound key.
message IndexSkipTableReaderSpec {
optional sqlbase.TableDescriptor table = 1 [(gogoproto.nullable) = false];
// If 0, we use the primary index. If non-zero, we use the index_idx-th index,
// i.e. table.indexes[index_idx-1]
optional uint32 index_idx = 2 [(gogoproto.nullable) = false];

reserved 3;
repeated roachpb.Span spans = 8 [(gogoproto.nullable) = false];

// This field used to be a visibility level of the columns that should be
// produced. We now always produce all columns (public and not public).
reserved 4;

optional bool reverse = 5 [(gogoproto.nullable) = false];

// Indicates the row-level locking strength to be used by the scan. If set to
// FOR_NONE, no row-level locking should be performed.
optional sqlbase.ScanLockingStrength locking_strength = 6 [(gogoproto.nullable) = false];

// Indicates the policy to be used by the scan for handling conflicting locks
// held by other active transactions when attempting to lock rows. Always set
// to BLOCK when locking_strength is FOR_NONE.
optional sqlbase.ScanLockingWaitPolicy locking_wait_policy = 7 [(gogoproto.nullable) = false];
}

// JoinReaderSpec is the specification for a "join reader". A join reader
// performs KV operations to retrieve specific rows that correspond to the
// values in the input stream (join by lookup). The output can optionally
Expand Down Expand Up @@ -357,6 +330,9 @@ message JoinReaderSpec {
// to BLOCK when locking_strength is FOR_NONE.
optional sqlbase.ScanLockingWaitPolicy locking_wait_policy = 10 [(gogoproto.nullable) = false];

// Indicates the row-level locking durability to be used by the scan.
optional sqlbase.ScanLockingDurability locking_durability = 24 [(gogoproto.nullable) = false];

// Indicates that the join reader should maintain the ordering of the input
// stream. This is applicable to both lookup joins and index joins. For lookup
// joins, maintaining order is expensive because it requires buffering. For
Expand Down Expand Up @@ -499,6 +475,9 @@ message ZigzagJoinerSpec {
// held by other active transactions when attempting to lock rows. Always set
// to BLOCK when locking_strength is FOR_NONE.
optional sqlbase.ScanLockingWaitPolicy locking_wait_policy = 5 [(gogoproto.nullable) = false];

// Indicates the row-level locking durability to be used by the scan.
optional sqlbase.ScanLockingDurability locking_durability = 8 [(gogoproto.nullable) = false];
}

repeated Side sides = 7 [(gogoproto.nullable) = false];
Expand Down Expand Up @@ -747,6 +726,9 @@ message InvertedJoinerSpec {
// held by other active transactions when attempting to lock rows. Always set
// to BLOCK when locking_strength is FOR_NONE.
optional sqlbase.ScanLockingWaitPolicy locking_wait_policy = 13 [(gogoproto.nullable) = false];

// Indicates the row-level locking durability to be used by the scan.
optional sqlbase.ScanLockingDurability locking_durability = 14 [(gogoproto.nullable) = false];
}

// InvertedFiltererSpec is the specification of a processor that does filtering
Expand Down
7 changes: 4 additions & 3 deletions pkg/sql/insert_fast_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func (r *insertFastPathRun) addFKChecks(
}
lockStrength := row.GetKeyLockingStrength(descpb.ToScanLockingStrength(c.Locking.Strength))
lockWaitPolicy := row.GetWaitPolicy(descpb.ToScanLockingWaitPolicy(c.Locking.WaitPolicy))
lockDurability := row.GetKeyLockingDurability(descpb.ToScanLockingDurability(c.Locking.Durability))
if r.fkBatch.Header.WaitPolicy != lockWaitPolicy {
return errors.AssertionFailedf(
"FK check lock wait policy %s did not match %s",
Expand All @@ -200,9 +201,9 @@ func (r *insertFastPathRun) addFKChecks(
reqIdx := len(r.fkBatch.Requests)
r.fkBatch.Requests = append(r.fkBatch.Requests, kvpb.RequestUnion{})
r.fkBatch.Requests[reqIdx].MustSetInner(&kvpb.ScanRequest{
RequestHeader: kvpb.RequestHeaderFromSpan(span),
KeyLockingStrength: lockStrength,
// TODO(michae2): Once #100193 is finished, also include c.Locking.Durability.
RequestHeader: kvpb.RequestHeaderFromSpan(span),
KeyLockingStrength: lockStrength,
KeyLockingDurability: lockDurability,
})
r.fkSpanInfo = append(r.fkSpanInfo, insertFastPathFKSpanInfo{
check: c,
Expand Down
21 changes: 3 additions & 18 deletions pkg/sql/logictest/testdata/logic_test/fk_read_committed
Original file line number Diff line number Diff line change
Expand Up @@ -3,47 +3,32 @@
statement ok
SET CLUSTER SETTING sql.txn.read_committed_syntax.enabled = true

# Some foreign key checks are prohibited under weaker isolation levels until we
# improve locking. See #80683, #100156, #100193.

statement ok
CREATE TABLE jars (j INT PRIMARY KEY)

statement ok
CREATE TABLE cookies (c INT PRIMARY KEY, j INT REFERENCES jars (j))
CREATE TABLE cookies (c INT PRIMARY KEY, j INT REFERENCES jars (j), FAMILY (c, j))

statement ok
SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED

statement ok
INSERT INTO jars VALUES (1), (2)

# Foreign key checks of the parent require durable shared locking under weaker
# isolation levels, and are not yet supported.
query error pgcode 0A000 guaranteed-durable locking not yet implemented
INSERT INTO cookies VALUES (1, 1)

statement ok
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE

statement ok
INSERT INTO cookies VALUES (1, 1)

statement ok
COMMIT

query error pgcode 0A000 guaranteed-durable locking not yet implemented
UPDATE cookies SET j = 2 WHERE c = 1

# Foreign key checks of the child do not require locking.
query error violates foreign key constraint
UPDATE jars SET j = j + 4

query error violates foreign key constraint
DELETE FROM jars WHERE j = 1
DELETE FROM jars WHERE j = 2

statement ok
DELETE FROM cookies WHERE c = 1

statement ok
DELETE FROM jars WHERE j = 1
DELETE FROM jars WHERE j = 2
Loading

0 comments on commit f6f355b

Please sign in to comment.