Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
111018: sql: fix index corruption in udfs with fk cascades r=rharding6373 a=rharding6373

This PR fixes an issue in which UDFs making a FK cascade could cause index corruption. It adds inbound foreign key origin tables to the statement tree, which is used to test for tables with multiple mutations.

No release note is necessary, since UDF mutation and FK cascade support is new in this release.

Epic: CRDB-25388
Informs: cockroachdb#87289

Release note: None

Co-authored-by: rharding6373 <[email protected]>
  • Loading branch information
craig[bot] and rharding6373 committed Sep 29, 2023
2 parents 26663e0 + c6bf7ed commit fad649d
Show file tree
Hide file tree
Showing 5 changed files with 255 additions and 9 deletions.
5 changes: 5 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/fk
Original file line number Diff line number Diff line change
Expand Up @@ -3748,6 +3748,9 @@ INSERT INTO nonunique_idx_child VALUES (0, 1, 10)
# stepping which caused issues when executing postqueries, so a query which
# involved a mixed situation would error out.

statement ok
SET enable_multiple_modifications_of_table = true

statement ok
CREATE TABLE x (
k INT PRIMARY KEY
Expand All @@ -3768,6 +3771,8 @@ SELECT
FROM
a

statement ok
RESET enable_multiple_modifications_of_table

# Test that reversing a constraint addition after adding a self foreign key
# works correctly.
Expand Down
119 changes: 119 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/udf_fk
Original file line number Diff line number Diff line change
Expand Up @@ -595,3 +595,122 @@ SELECT * FROM selfref;
2 2

subtest end


subtest corruption_check

statement ok
DROP TABLE IF EXISTS parent CASCADE;

statement ok
DROP TABLE IF EXISTS child CASCADE;

statement ok
DROP TABLE IF EXISTS grandchild CASCADE;

statement ok
CREATE TABLE parent (j INT PRIMARY KEY);

statement ok
CREATE TABLE child (i INT PRIMARY KEY, j INT REFERENCES parent (j) ON UPDATE CASCADE ON DELETE CASCADE, INDEX (j));

statement ok
INSERT INTO parent VALUES (0), (2), (4);

statement ok
INSERT INTO child VALUES (0, 0);

statement ok
CREATE OR REPLACE FUNCTION f(k INT) RETURNS INT AS $$
UPDATE parent SET j = j + 1 WHERE j = k RETURNING j
$$ LANGUAGE SQL;

# Check 1 level of cascades.
statement error pgcode 0A000 pq: multiple mutations of the same table "child" are not supported unless they all use INSERT without ON CONFLICT; this is to prevent data corruption, see documentation of sql.multiple_modifications_of_table.enabled
WITH x AS (SELECT f(0) AS j), y AS (UPDATE child SET j = 2 WHERE i = 0 RETURNING j) SELECT * FROM x;

query II rowsort
SELECT i, j FROM child@primary;
----
0 0

query II rowsort
SELECT i, j FROM child@child_j_idx;
----
0 0

statement ok
CREATE FUNCTION f2(old INT, new INT) RETURNS INT AS $$
UPDATE child SET j = new WHERE i = old RETURNING i
$$ LANGUAGE SQL;

# Test that we allow mutations in cases were the cascade happens after the
# function call.
# this should not cause corruption, and should be allowed
# (the cascade to cookie will always be strictly after the function call)
statement ok
UPDATE parent SET j = j + 1 WHERE j = f2(0, 2);

query II rowsort
SELECT i, j FROM child@primary;
----
0 2

query II rowsort
SELECT i, j FROM child@child_j_idx;
----
0 2

statement ok
DROP TABLE IF EXISTS child CASCADE;

statement ok
TRUNCATE TABLE parent;

statement ok
CREATE TABLE child (i INT PRIMARY KEY, j INT UNIQUE REFERENCES parent (j) ON UPDATE CASCADE ON DELETE CASCADE, INDEX (j));

statement ok
CREATE TABLE grandchild (i INT PRIMARY KEY, j INT REFERENCES child (j) ON UPDATE CASCADE ON DELETE CASCADE, INDEX (j));

statement ok
INSERT INTO parent VALUES (0), (2), (4);

statement ok
INSERT INTO child VALUES (0, 0);

statement ok
INSERT INTO grandchild VALUES (0,0)

# Check 2 levels of cascades.
statement error pgcode 0A000 pq: multiple mutations of the same table "grandchild" are not supported unless they all use INSERT without ON CONFLICT; this is to prevent data corruption, see documentation of sql.multiple_modifications_of_table.enabled
WITH x AS (SELECT f(0) AS j), y AS (UPDATE grandchild SET j = 2 WHERE i = 0 RETURNING j) SELECT * FROM x;

statement ok
DROP TABLE IF EXISTS child CASCADE;

statement ok
DROP TABLE IF EXISTS grandchild CASCADE;

statement ok
CREATE TABLE child (i INT PRIMARY KEY, j INT UNIQUE REFERENCES parent (j), k INT UNIQUE REFERENCES parent (j) ON UPDATE RESTRICT, INDEX (j));

statement ok
INSERT INTO child VALUES (0,4)

# Check that we can mutate if there are no actions.
statement ok
WITH x AS (SELECT f(0) AS j), y AS (UPDATE child SET j = 2, k = 2 WHERE i = 0 RETURNING j) SELECT * FROM x;

query II rowsort
SELECT i, j FROM child@primary;
----
0 2

query II rowsort
SELECT i, j FROM child@child_j_idx;
----
0 2


subtest end
40 changes: 35 additions & 5 deletions pkg/sql/opt/optbuilder/statement_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,17 +136,39 @@ func (st *statementTree) Pop() {
// statement1: UPDATE t1
// ├── statement2: UPDATE t2
// └── statement3: UPDATE t1
func (st *statementTree) CanMutateTable(tabID cat.StableID, typ mutationType) bool {
//
// isPostStmt indicates that this mutation will be evaluated at the end of the
// statement (e.g., as part of a foreign key constraint).
func (st *statementTree) CanMutateTable(
tabID cat.StableID, typ mutationType, isPostStmt bool,
) bool {
if len(st.stmts) == 0 {
panic(errors.AssertionFailedf("unexpected empty tree"))
}
curr := &st.stmts[len(st.stmts)-1]
if isPostStmt && len(st.stmts) == 1 {
return true
}
offset := 1
if isPostStmt {
// If this mutation will be evaluated at the end of the current statement,
// the mutation should be added to the parent statement. This is because
// during execution, we step the transaction sequence point before starting
// check evaluations, so all updates in the current statement will be
// visible.
offset = 2
}
curr := &st.stmts[len(st.stmts)-offset]
// Check the children of the current statement for a conflict.
if curr.childrenConflictWithMutation(tabID, typ) {
if !isPostStmt && curr.childrenConflictWithMutation(tabID, typ) {
return false
}
// Check the current statement and all parent statements for a conflict.
for i := range st.stmts {
if isPostStmt && i == len(st.stmts)-1 {
// Don't check against the originating statement since we're adding this
// mutation to the parent statement.
break
}
n := &st.stmts[i]
if n.conflictsWithMutation(tabID, typ) {
return false
Expand All @@ -155,9 +177,17 @@ func (st *statementTree) CanMutateTable(tabID cat.StableID, typ mutationType) bo
// The new mutation is valid, so track it.
switch typ {
case simpleInsert:
curr.simpleInsertTables.Add(int(tabID))
if isPostStmt {
curr.childrenSimpleInsertTables.Add(int(tabID))
} else {
curr.simpleInsertTables.Add(int(tabID))
}
case generalMutation:
curr.generalMutationTables.Add(int(tabID))
if isPostStmt {
curr.childrenGeneralMutationTables.Add(int(tabID))
} else {
curr.generalMutationTables.Add(int(tabID))
}
}
return true
}
57 changes: 56 additions & 1 deletion pkg/sql/opt/optbuilder/statement_tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func TestStatementTree(t *testing.T) {
pop
mut
simple
post
t1
t2
fail
Expand Down Expand Up @@ -374,6 +375,55 @@ func TestStatementTree(t *testing.T) {
mut | t1 | fail,
},
},
// 21.
// Push
// CanMutateTable(t1, simpleInsert)
// Push
// CanMutateTable(t2, default)
// CanMutateTable(t1, default, post) FAIL
{
cmds: []cmd{
push,
mut | t1 | simple,
push,
mut | t2,
mut | t1 | post | fail,
},
},
// 22.
// Push
// Push
// CanMutateTable(t1, default)
// CanMutateTable(t2, default, post)
// Pop
// CanMutateTable(t2, simpleInsert) FAIL
{
cmds: []cmd{
push,
push,
mut | t1,
mut | t2 | post,
pop,
mut | t1 | simple | fail,
},
},
// 23.
// Push
// Push
// CanMutateTable(t1, default)
// CanMutateTable(t2, default, post)
// Pop
// Pop
{
cmds: []cmd{
push,
push,
mut | t1,
mut | t2 | post,
pop,
pop,
},
},
}

for i, tc := range testCases {
Expand All @@ -400,7 +450,12 @@ func TestStatementTree(t *testing.T) {
typ = simpleInsert
}

res := mt.CanMutateTable(tabID, typ)
isPost := false
if c&post == post {
isPost = true
}

res := mt.CanMutateTable(tabID, typ, isPost)

expected := c&fail != fail
if res != expected {
Expand Down
43 changes: 40 additions & 3 deletions pkg/sql/opt/optbuilder/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/intsets"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -513,16 +514,52 @@ func (b *Builder) resolveSchemaForCreate(
}

func (b *Builder) checkMultipleMutations(tab cat.Table, typ mutationType) {
if !b.stmtTree.CanMutateTable(tab.ID(), typ) &&
!multipleModificationsOfTableEnabled.Get(&b.evalCtx.Settings.SV) &&
!b.evalCtx.SessionData().MultipleModificationsOfTable {
if multipleModificationsOfTableEnabled.Get(&b.evalCtx.Settings.SV) ||
b.evalCtx.SessionData().MultipleModificationsOfTable {
return
}
if !b.stmtTree.CanMutateTable(tab.ID(), typ, false /* isPostStmt */) {
panic(pgerror.Newf(
pgcode.FeatureNotSupported,
"multiple mutations of the same table %q are not supported unless they all "+
"use INSERT without ON CONFLICT; this is to prevent data corruption, see "+
"documentation of sql.multiple_modifications_of_table.enabled", tab.Name(),
))
}
if tab.InboundForeignKeyCount() > 0 {
var visited intsets.Fast
b.checkMultipleMutationsCascade(tab, typ, visited)
}
}

func (b *Builder) checkMultipleMutationsCascade(
tab cat.Table, typ mutationType, visited intsets.Fast,
) {
// If this table references foreign keys that will also be mutated, then add
// them to the statement tree via a recursive call. We only need to check each
// table once even if there are multiple references to it.
for i := 0; i < tab.InboundForeignKeyCount(); i++ {
fk := tab.InboundForeignKey(i)
if (fk.DeleteReferenceAction() != tree.NoAction && fk.DeleteReferenceAction() != tree.Restrict && typ != simpleInsert) ||
(fk.UpdateReferenceAction() != tree.NoAction && fk.UpdateReferenceAction() != tree.Restrict) {
fkTab := resolveTable(b.ctx, b.catalog, fk.OriginTableID())
// If the origin table is still being added, it will be nil. It's safe to
// do the mutation in this case.
if fkTab == nil || visited.Contains(int(fkTab.ID())) {
continue
}
if !b.stmtTree.CanMutateTable(fkTab.ID(), typ, true /* isPostStmt */) {
panic(pgerror.Newf(
pgcode.FeatureNotSupported,
"multiple mutations of the same table %q are not supported unless they all "+
"use INSERT without ON CONFLICT; this is to prevent data corruption, see "+
"documentation of sql.multiple_modifications_of_table.enabled", fkTab.Name(),
))
}
visited.Add(int(fkTab.ID()))
b.checkMultipleMutationsCascade(fkTab, typ, visited)
}
}
}

// resolveTableForMutation is a helper method for building mutations. It returns
Expand Down

0 comments on commit fad649d

Please sign in to comment.