From b443797ecdae0a771c800255b6dc81327c3a636d Mon Sep 17 00:00:00 2001 From: Faizan Qazi Date: Wed, 1 Nov 2023 17:09:16 -0400 Subject: [PATCH] sql/schemachanger: prevent concurrent type desc changes Previously, the declarative schema changer only waited for legacy schema changer mutations/jobs on relations and did not correctly account for type schema changes. As a result, it was possible to drop a type concurrently while a schema change was trying to mutate a type, which could lead to type change jobs hanging. To address this, this patch causes declarative schema changes to wait for type modifications. Fixes: #112390, #111540 Release note (bug fix): ALTER TYPE could get stuck if DROP TYPE was executed concurrently. --- pkg/sql/catalog/typedesc/type_desc.go | 7 +++ pkg/sql/schemachanger/schemachanger_test.go | 65 +++++++++++++++++++++ 2 files changed, 72 insertions(+) diff --git a/pkg/sql/catalog/typedesc/type_desc.go b/pkg/sql/catalog/typedesc/type_desc.go index 3aac1ac8d95d..2ed3914defb6 100644 --- a/pkg/sql/catalog/typedesc/type_desc.go +++ b/pkg/sql/catalog/typedesc/type_desc.go @@ -866,6 +866,13 @@ func (desc *immutable) HasConcurrentSchemaChanges() bool { desc.DeclarativeSchemaChangerState.JobID != catpb.InvalidJobID { return true } + // Check if any enum members are transitioning, which should + // block declarative jobs. + for _, member := range desc.EnumMembers { + if member.Direction != descpb.TypeDescriptor_EnumMember_NONE { + return true + } + } // TODO(fqazi): In the future we may not have concurrent declarative schema // changes without a job ID. So, we should scan the elements involved for // types. diff --git a/pkg/sql/schemachanger/schemachanger_test.go b/pkg/sql/schemachanger/schemachanger_test.go index bce03f843f4d..0e935f20c3bd 100644 --- a/pkg/sql/schemachanger/schemachanger_test.go +++ b/pkg/sql/schemachanger/schemachanger_test.go @@ -490,6 +490,67 @@ func TestSchemaChangeWaitsForConcurrentSchemaChanges(t *testing.T) { "defaultdb", "t", expectedKeyCount) } + typeSchemaChangeFunc := func(t *testing.T, modeFor1stStmt, modeFor2ndStmt sessiondatapb.NewSchemaChangerMode) { + ctx, cancel := context.WithCancel(context.Background()) + addTypeStartedChan := make(chan struct{}) + resumeAddTypeJobChan := make(chan struct{}) + var closeAddTypeValueChanOnce sync.Once + schemaChangeWaitCompletedChan := make(chan struct{}) + + var params base.TestServerArgs + params.Knobs = base.TestingKnobs{ + SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{ + // If the blocked schema changer is from legacy schema changer, we let + // it hijack this knob (which is originally design for declarative + // schema changer) if `stmt` is nil. + WhileWaitingForConcurrentSchemaChanges: func(stmts []string) { + if (len(stmts) == 1 && strings.Contains(stmts[0], "DROP TYPE")) || + stmts == nil { + closeAddTypeValueChanOnce.Do(func() { + close(resumeAddTypeJobChan) + close(schemaChangeWaitCompletedChan) + }) + } + }, + }, + // Decrease the adopt loop interval so that retries happen quickly. + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + // Prevent the GC job from running so we ensure that all the keys which + // were written remain. + GCJob: &sql.GCJobTestingKnobs{RunBeforeResume: func(jobID jobspb.JobID) error { + <-ctx.Done() + return ctx.Err() + }}, + } + s, sqlDB, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + defer cancel() + tdb := sqlutils.MakeSQLRunner(sqlDB) + + tdb.Exec(t, "CREATE TYPE status AS ENUM ('open', 'closed', 'inactive');") + + // Execute 1st DDL asynchronously and block until it's executing. + tdb.Exec(t, `SET use_declarative_schema_changer = $1`, modeFor1stStmt.String()) + go func() { + tdb.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints="typeschemachanger.before.exec"`) + tdb.ExpectErr(t, + ".*was paused before it completed with reason: pause point \"typeschemachanger.before.exec\" hit", + `ALTER TYPE status ADD VALUE 'unknown';`) + close(addTypeStartedChan) + <-resumeAddTypeJobChan // wait for DROP TYPE to unblock me + tdb.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints=""`) + tdb.Exec(t, "RESUME JOB (SELECT job_id FROM crdb_internal.jobs WHERE status='paused' FETCH FIRST 1 ROWS ONLY);\n") + }() + <-addTypeStartedChan + + // Execute 2st DDL synchronously. During waiting, it will unblock 1st DDL so + // it will eventually be able to proceed after waiting for a while. + tdb.Exec(t, `SET use_declarative_schema_changer = $1`, modeFor2ndStmt.String()) + tdb.Exec(t, `DROP TYPE STATUS;`) + // After completion make sure we actually waited for schema changes. + <-schemaChangeWaitCompletedChan + } + t.Run("declarative-then-declarative", func(t *testing.T) { tf(t, sessiondatapb.UseNewSchemaChangerUnsafeAlways, sessiondatapb.UseNewSchemaChangerUnsafeAlways) }) @@ -502,6 +563,10 @@ func TestSchemaChangeWaitsForConcurrentSchemaChanges(t *testing.T) { tf(t, sessiondatapb.UseNewSchemaChangerOff, sessiondatapb.UseNewSchemaChangerUnsafeAlways) }) + t.Run("typedesc legacy-then-declarative", func(t *testing.T) { + typeSchemaChangeFunc(t, sessiondatapb.UseNewSchemaChangerOff, sessiondatapb.UseNewSchemaChangerUnsafeAlways) + }) + // legacy + legacy case is tested in TestLegacySchemaChangerWaitsForOtherSchemaChanges // because the waiting occurred under a different code path. }