Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
113639: sql/schemachanger: prevent concurrent type desc changes r=fqazi a=fqazi

Previously, the declarative schema changer only waited for legacy schema changer mutations/jobs on relations and did not correctly account for type schema changes.  As a result, it was possible to drop a type concurrently while a schema change was trying to mutate a type, which could lead to type change jobs hanging. To address this, this patch causes declarative schema changes to wait for type modifications.

Fixes: cockroachdb#112390, cockroachdb#111540

Release note (bug fix): ALTER TYPE could get stuck if DROP TYPE
 was executed concurrently.

Co-authored-by: Faizan Qazi <[email protected]>
  • Loading branch information
craig[bot] and fqazi committed Nov 2, 2023
2 parents f515f70 + b443797 commit cedd409
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 0 deletions.
7 changes: 7 additions & 0 deletions pkg/sql/catalog/typedesc/type_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,13 @@ func (desc *immutable) HasConcurrentSchemaChanges() bool {
desc.DeclarativeSchemaChangerState.JobID != catpb.InvalidJobID {
return true
}
// Check if any enum members are transitioning, which should
// block declarative jobs.
for _, member := range desc.EnumMembers {
if member.Direction != descpb.TypeDescriptor_EnumMember_NONE {
return true
}
}
// TODO(fqazi): In the future we may not have concurrent declarative schema
// changes without a job ID. So, we should scan the elements involved for
// types.
Expand Down
65 changes: 65 additions & 0 deletions pkg/sql/schemachanger/schemachanger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,67 @@ func TestSchemaChangeWaitsForConcurrentSchemaChanges(t *testing.T) {
"defaultdb", "t", expectedKeyCount)
}

typeSchemaChangeFunc := func(t *testing.T, modeFor1stStmt, modeFor2ndStmt sessiondatapb.NewSchemaChangerMode) {
ctx, cancel := context.WithCancel(context.Background())
addTypeStartedChan := make(chan struct{})
resumeAddTypeJobChan := make(chan struct{})
var closeAddTypeValueChanOnce sync.Once
schemaChangeWaitCompletedChan := make(chan struct{})

var params base.TestServerArgs
params.Knobs = base.TestingKnobs{
SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{
// If the blocked schema changer is from legacy schema changer, we let
// it hijack this knob (which is originally design for declarative
// schema changer) if `stmt` is nil.
WhileWaitingForConcurrentSchemaChanges: func(stmts []string) {
if (len(stmts) == 1 && strings.Contains(stmts[0], "DROP TYPE")) ||
stmts == nil {
closeAddTypeValueChanOnce.Do(func() {
close(resumeAddTypeJobChan)
close(schemaChangeWaitCompletedChan)
})
}
},
},
// Decrease the adopt loop interval so that retries happen quickly.
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
// Prevent the GC job from running so we ensure that all the keys which
// were written remain.
GCJob: &sql.GCJobTestingKnobs{RunBeforeResume: func(jobID jobspb.JobID) error {
<-ctx.Done()
return ctx.Err()
}},
}
s, sqlDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)
defer cancel()
tdb := sqlutils.MakeSQLRunner(sqlDB)

tdb.Exec(t, "CREATE TYPE status AS ENUM ('open', 'closed', 'inactive');")

// Execute 1st DDL asynchronously and block until it's executing.
tdb.Exec(t, `SET use_declarative_schema_changer = $1`, modeFor1stStmt.String())
go func() {
tdb.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints="typeschemachanger.before.exec"`)
tdb.ExpectErr(t,
".*was paused before it completed with reason: pause point \"typeschemachanger.before.exec\" hit",
`ALTER TYPE status ADD VALUE 'unknown';`)
close(addTypeStartedChan)
<-resumeAddTypeJobChan // wait for DROP TYPE to unblock me
tdb.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints=""`)
tdb.Exec(t, "RESUME JOB (SELECT job_id FROM crdb_internal.jobs WHERE status='paused' FETCH FIRST 1 ROWS ONLY);\n")
}()
<-addTypeStartedChan

// Execute 2st DDL synchronously. During waiting, it will unblock 1st DDL so
// it will eventually be able to proceed after waiting for a while.
tdb.Exec(t, `SET use_declarative_schema_changer = $1`, modeFor2ndStmt.String())
tdb.Exec(t, `DROP TYPE STATUS;`)
// After completion make sure we actually waited for schema changes.
<-schemaChangeWaitCompletedChan
}

t.Run("declarative-then-declarative", func(t *testing.T) {
tf(t, sessiondatapb.UseNewSchemaChangerUnsafeAlways, sessiondatapb.UseNewSchemaChangerUnsafeAlways)
})
Expand All @@ -502,6 +563,10 @@ func TestSchemaChangeWaitsForConcurrentSchemaChanges(t *testing.T) {
tf(t, sessiondatapb.UseNewSchemaChangerOff, sessiondatapb.UseNewSchemaChangerUnsafeAlways)
})

t.Run("typedesc legacy-then-declarative", func(t *testing.T) {
typeSchemaChangeFunc(t, sessiondatapb.UseNewSchemaChangerOff, sessiondatapb.UseNewSchemaChangerUnsafeAlways)
})

// legacy + legacy case is tested in TestLegacySchemaChangerWaitsForOtherSchemaChanges
// because the waiting occurred under a different code path.
}
Expand Down

0 comments on commit cedd409

Please sign in to comment.