diff --git a/pkg/sql/catalog/dbdesc/database_desc.go b/pkg/sql/catalog/dbdesc/database_desc.go index 09560838b274..4898f9548e92 100644 --- a/pkg/sql/catalog/dbdesc/database_desc.go +++ b/pkg/sql/catalog/dbdesc/database_desc.go @@ -466,6 +466,15 @@ func (desc *immutable) HasConcurrentSchemaChanges() bool { desc.DeclarativeSchemaChangerState.JobID != catpb.InvalidJobID } +// ConcurrentSchemaChangeJobIDs implements catalog.Descriptor. +func (desc *immutable) ConcurrentSchemaChangeJobIDs() (ret []catpb.JobID) { + if desc.DeclarativeSchemaChangerState != nil && + desc.DeclarativeSchemaChangerState.JobID != catpb.InvalidJobID { + ret = append(ret, desc.DeclarativeSchemaChangerState.JobID) + } + return ret +} + // GetDefaultPrivilegeDescriptor returns a DefaultPrivilegeDescriptor. func (desc *immutable) GetDefaultPrivilegeDescriptor() catalog.DefaultPrivilegeDescriptor { defaultPrivilegeDescriptor := desc.GetDefaultPrivileges() diff --git a/pkg/sql/catalog/descriptor.go b/pkg/sql/catalog/descriptor.go index 7afe70f67113..32bcda3b0efc 100644 --- a/pkg/sql/catalog/descriptor.go +++ b/pkg/sql/catalog/descriptor.go @@ -246,6 +246,10 @@ type Descriptor interface { // in progress, either legacy or declarative. HasConcurrentSchemaChanges() bool + // ConcurrentSchemaChangeJobIDs returns all in-progress schema change + // jobs, either legacy or declarative. + ConcurrentSchemaChangeJobIDs() []catpb.JobID + // SkipNamespace is true when a descriptor should not have a namespace record. SkipNamespace() bool diff --git a/pkg/sql/catalog/funcdesc/func_desc.go b/pkg/sql/catalog/funcdesc/func_desc.go index ddee8fb12b12..ed732a75cc34 100644 --- a/pkg/sql/catalog/funcdesc/func_desc.go +++ b/pkg/sql/catalog/funcdesc/func_desc.go @@ -394,6 +394,15 @@ func (desc *immutable) HasConcurrentSchemaChanges() bool { desc.DeclarativeSchemaChangerState.JobID != catpb.InvalidJobID } +// ConcurrentSchemaChangeJobIDs implements the catalog.Descriptor interface. +func (desc *immutable) ConcurrentSchemaChangeJobIDs() (ret []catpb.JobID) { + if desc.DeclarativeSchemaChangerState != nil && + desc.DeclarativeSchemaChangerState.JobID != catpb.InvalidJobID { + ret = append(ret, desc.DeclarativeSchemaChangerState.JobID) + } + return ret +} + // SkipNamespace implements the catalog.Descriptor interface. func (desc *immutable) SkipNamespace() bool { return true diff --git a/pkg/sql/catalog/schemadesc/schema_desc.go b/pkg/sql/catalog/schemadesc/schema_desc.go index 75a2f9179417..f47bf94f6938 100644 --- a/pkg/sql/catalog/schemadesc/schema_desc.go +++ b/pkg/sql/catalog/schemadesc/schema_desc.go @@ -336,6 +336,15 @@ func (desc *immutable) HasConcurrentSchemaChanges() bool { desc.DeclarativeSchemaChangerState.JobID != catpb.InvalidJobID } +// ConcurrentSchemaChangeJobIDs implements catalog.Descriptor. +func (desc *immutable) ConcurrentSchemaChangeJobIDs() (ret []catpb.JobID) { + if desc.DeclarativeSchemaChangerState != nil && + desc.DeclarativeSchemaChangerState.JobID != catpb.InvalidJobID { + ret = append(ret, desc.DeclarativeSchemaChangerState.JobID) + } + return ret +} + // MaybeIncrementVersion implements the MutableDescriptor interface. func (desc *Mutable) MaybeIncrementVersion() { // Already incremented, no-op. diff --git a/pkg/sql/catalog/schemadesc/synthetic_schema_desc.go b/pkg/sql/catalog/schemadesc/synthetic_schema_desc.go index 6cc39dc334a5..57220a14b7f1 100644 --- a/pkg/sql/catalog/schemadesc/synthetic_schema_desc.go +++ b/pkg/sql/catalog/schemadesc/synthetic_schema_desc.go @@ -111,6 +111,11 @@ func (p synthetic) HasConcurrentSchemaChanges() bool { return false } +// ConcurrentSchemaChangeJobIDs implements catalog.Descriptor. +func (p synthetic) ConcurrentSchemaChangeJobIDs() []catpb.JobID { + return nil +} + // SkipNamespace implements the descriptor interface. // We never store synthetic descriptors. func (p synthetic) SkipNamespace() bool { diff --git a/pkg/sql/catalog/tabledesc/table_desc.go b/pkg/sql/catalog/tabledesc/table_desc.go index 3c755697f537..0a47d92d677a 100644 --- a/pkg/sql/catalog/tabledesc/table_desc.go +++ b/pkg/sql/catalog/tabledesc/table_desc.go @@ -73,6 +73,20 @@ func (desc *wrapper) HasConcurrentSchemaChanges() bool { len(desc.MutationJobs) > 0 } +// ConcurrentSchemaChangeJobIDs implements catalog.Descriptor. +func (desc *wrapper) ConcurrentSchemaChangeJobIDs() (ret []catpb.JobID) { + if desc.DeclarativeSchemaChangerState != nil && + desc.DeclarativeSchemaChangerState.JobID != catpb.InvalidJobID { + ret = append(ret, desc.DeclarativeSchemaChangerState.JobID) + } + if len(desc.MutationJobs) > 0 { + for _, mutationJob := range desc.MutationJobs { + ret = append(ret, mutationJob.JobID) + } + } + return ret +} + // SkipNamespace implements the descriptor interface. func (desc *wrapper) SkipNamespace() bool { // Virtual tables are hard-coded and don't have entries in the diff --git a/pkg/sql/catalog/typedesc/table_implicit_record_type.go b/pkg/sql/catalog/typedesc/table_implicit_record_type.go index 938473cf129b..61b9c916162d 100644 --- a/pkg/sql/catalog/typedesc/table_implicit_record_type.go +++ b/pkg/sql/catalog/typedesc/table_implicit_record_type.go @@ -281,6 +281,11 @@ func (v *tableImplicitRecordType) HasConcurrentSchemaChanges() bool { return false } +// ConcurrentSchemaChangeJobIDs implements catalog.Descriptor. +func (v *tableImplicitRecordType) ConcurrentSchemaChangeJobIDs() []catpb.JobID { + return nil +} + // SkipNamespace implements catalog.Descriptor. We never store table implicit // record type which is always constructed in memory. func (v *tableImplicitRecordType) SkipNamespace() bool { diff --git a/pkg/sql/catalog/typedesc/type_desc.go b/pkg/sql/catalog/typedesc/type_desc.go index 9f55d7cdd746..953b96cc0d8c 100644 --- a/pkg/sql/catalog/typedesc/type_desc.go +++ b/pkg/sql/catalog/typedesc/type_desc.go @@ -872,6 +872,15 @@ func (desc *immutable) HasConcurrentSchemaChanges() bool { return false } +// ConcurrentSchemaChangeJobIDs implements catalog.Descriptor. +func (desc *immutable) ConcurrentSchemaChangeJobIDs() (ret []catpb.JobID) { + if desc.DeclarativeSchemaChangerState != nil && + desc.DeclarativeSchemaChangerState.JobID != catpb.InvalidJobID { + ret = append(ret, desc.DeclarativeSchemaChangerState.JobID) + } + return ret +} + // SkipNamespace implements the descriptor interface. func (desc *immutable) SkipNamespace() bool { return false diff --git a/pkg/sql/schema_change_plan_node.go b/pkg/sql/schema_change_plan_node.go index 872db6684c18..1b94ed21330f 100644 --- a/pkg/sql/schema_change_plan_node.go +++ b/pkg/sql/schema_change_plan_node.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/descmetadata" @@ -202,6 +203,7 @@ func (p *planner) waitForDescriptorSchemaChanges( for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); { now := p.ExecCfg().Clock.Now() var isBlocked bool + var blockingJobIDs []catpb.JobID if err := p.ExecCfg().InternalDB.DescsTxn(ctx, func( ctx context.Context, txn descs.Txn, ) error { @@ -213,6 +215,7 @@ func (p *planner) waitForDescriptorSchemaChanges( return err } isBlocked = desc.HasConcurrentSchemaChanges() + blockingJobIDs = desc.ConcurrentSchemaChangeJobIDs() return nil }); err != nil { return err @@ -222,8 +225,8 @@ func (p *planner) waitForDescriptorSchemaChanges( } if logEvery.ShouldLog() { log.Infof(ctx, - "schema change waiting for concurrent schema changes on descriptor %d,"+ - " waited %v so far", descID, timeutil.Since(start), + "schema change waiting for %v concurrent schema change job(s) %v on descriptor %d,"+ + " waited %v so far", len(blockingJobIDs), blockingJobIDs, descID, timeutil.Since(start), ) } if knobs != nil && knobs.WhileWaitingForConcurrentSchemaChanges != nil { diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index d4fa2e03bce6..4144fc25f407 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -584,14 +584,29 @@ func (sc *SchemaChanger) notFirstInLine(ctx context.Context, desc catalog.Descri // descriptor, it seems possible for a job to be resumed after the mutation // has already been removed. If there's a mutation provided, we should check // whether it actually exists on the table descriptor and exit the job if not. - for i, mutation := range tableDesc.AllMutations() { + allMutations := tableDesc.AllMutations() + for i, mutation := range allMutations { if mutation.MutationID() == sc.mutationID { if i != 0 { + blockingJobIDsAsSet := make(map[catpb.JobID]struct{}) + for j := 0; j < i; j++ { + blockingJobID, err := mustGetJobIDWithMutationID(tableDesc, allMutations[j].MutationID()) + if err != nil { + return err + } + blockingJobIDsAsSet[blockingJobID] = struct{}{} + } + blockingJobIDs := make([]catpb.JobID, 0, len(blockingJobIDsAsSet)) + for jobID := range blockingJobIDsAsSet { + blockingJobIDs = append(blockingJobIDs, jobID) + } log.Infof(ctx, - "schema change on %q (v%d): another change is still in progress", - desc.GetName(), desc.GetVersion(), + "schema change on %q (v%d): another %v schema change job(s) %v is still in progress "+ + "and it is blocking this job from proceeding", + desc.GetName(), desc.GetVersion(), len(blockingJobIDs), blockingJobIDs, ) - return errSchemaChangeNotFirstInLine + return errors.Wrapf(errSchemaChangeNotFirstInLine, "schema change is "+ + "blocked by %v other schema change job(s) %v", len(blockingJobIDs), blockingJobIDs) } break } @@ -600,6 +615,21 @@ func (sc *SchemaChanger) notFirstInLine(ctx context.Context, desc catalog.Descri return nil } +func mustGetJobIDWithMutationID( + tableDesc catalog.TableDescriptor, mutationID descpb.MutationID, +) (jobID catpb.JobID, err error) { + for _, mutationJob := range tableDesc.GetMutationJobs() { + if mutationJob.MutationID == mutationID { + jobID = mutationJob.JobID + } + } + if jobID == catpb.InvalidJobID { + return 0, errors.AssertionFailedf("mutation job with mutation ID %v is not found in table %q", + mutationID, tableDesc.GetName()) + } + return jobID, nil +} + func (sc *SchemaChanger) getTargetDescriptor(ctx context.Context) (catalog.Descriptor, error) { // Retrieve the descriptor that is being changed. var desc catalog.Descriptor diff --git a/pkg/sql/schema_changer_helpers_test.go b/pkg/sql/schema_changer_helpers_test.go index bc1740472c99..079ed15eb0d6 100644 --- a/pkg/sql/schema_changer_helpers_test.go +++ b/pkg/sql/schema_changer_helpers_test.go @@ -12,13 +12,18 @@ package sql import ( "context" + "regexp" + "sort" + "strings" "testing" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/backfill" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -88,3 +93,89 @@ func TestCalculateSplitAtShards(t *testing.T) { }) } } + +// TestNotFirstInLine tests that if a schema change's mutation is not first in +// line, the error message clearly state what the blocking schema change job ID +// is. +func TestNotFirstInLine(t *testing.T) { + defer leaktest.AfterTest(t)() + + // A helper to extract blocking schema change job IDs from the error message + // which is of the form "schema change is blocked by x other schema change + // job(s) [yyy zzz]". + extractSortedBlockingJobIDsFromNotFirstInLineErr := func(errMsg string) []string { + p := regexp.MustCompile(`\[(.*?)\]`) + m := p.FindStringSubmatch(errMsg) + require.NotNilf(t, m, "did not find any blocking job IDs") + blockingJobIDs := strings.Fields(m[1]) + sort.Slice(blockingJobIDs, func(i, j int) bool { + return blockingJobIDs[i] <= blockingJobIDs[j] + }) + return blockingJobIDs + } + + ctx := context.Background() + desc := descpb.TableDescriptor{ + Name: "t", + ID: 104, + Mutations: []descpb.DescriptorMutation{ + { + Descriptor_: &descpb.DescriptorMutation_Index{}, + MutationID: 1, + }, + { + Descriptor_: &descpb.DescriptorMutation_Index{}, + MutationID: 1, + }, + { + Descriptor_: &descpb.DescriptorMutation_Column{}, + MutationID: 2, + }, + { + Descriptor_: &descpb.DescriptorMutation_Column{}, + MutationID: 3, + }, + }, + MutationJobs: []descpb.TableDescriptor_MutationJob{ + { + JobID: 11111, + MutationID: 1, + }, + { + JobID: 22222, + MutationID: 2, + }, + { + JobID: 33333, + MutationID: 3, + }, + }, + } + mut := tabledesc.NewBuilder(&desc).BuildExistingMutableTable() + { + sc := SchemaChanger{ + descID: 104, + mutationID: 1, + } + err := sc.notFirstInLine(ctx, mut) + require.NoError(t, err) + } + { + sc := SchemaChanger{ + descID: 104, + mutationID: 2, + } + err := sc.notFirstInLine(ctx, mut) + require.True(t, errors.Is(err, errSchemaChangeNotFirstInLine)) + require.Equal(t, []string{"11111"}, extractSortedBlockingJobIDsFromNotFirstInLineErr(err.Error())) + } + { + sc := SchemaChanger{ + descID: 104, + mutationID: 3, + } + err := sc.notFirstInLine(ctx, mut) + require.True(t, errors.Is(err, errSchemaChangeNotFirstInLine)) + require.Equal(t, []string{"11111", "22222"}, extractSortedBlockingJobIDsFromNotFirstInLineErr(err.Error())) + } +} diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index c9bb0fe548d8..8ca058cac8dc 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -16,6 +16,7 @@ import ( "database/sql/driver" "fmt" "math/rand" + "regexp" "strconv" "strings" "sync" @@ -8389,3 +8390,41 @@ SELECT fraction_completed > 0 } } } + +// TestLegacySchemaChangerWaitsForOtherSchemaChanges tests concurrent legacy schema changes +// wait properly for preceding ones if it's not first in line. +func TestLegacySchemaChangerWaitsForOtherSchemaChanges(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + params, _ := tests.CreateTestServerParams() + params.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals() + + s, sqlDB, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + tdb := sqlutils.MakeSQLRunner(sqlDB) + + tdb.Exec(t, `SET use_declarative_schema_changer = off`) + tdb.Exec(t, `CREATE TABLE t (i INT PRIMARY KEY);`) + tdb.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints = 'schemachanger.before.exec';`) + + pattern, err := regexp.Compile(`\d+`) + require.NoError(t, err) + _, err = sqlDB.Exec(`CREATE INDEX idx ON t (i);`) + jobID1 := pattern.FindString(err.Error()) + require.NotEmpty(t, jobID1) + _, err = sqlDB.Exec(`ALTER TABLE t ADD COLUMN j INT DEFAULT 30;`) + jobID2 := pattern.FindString(err.Error()) + require.NotEmpty(t, jobID2) + + tdb.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints = '';`) + tdb.Exec(t, `RESUME JOB $1`, jobID2) + tdb.Exec(t, `RESUME JOB $1`, jobID1) + testutils.SucceedsSoon(t, func() error { + res := tdb.QueryStr(t, `SELECT status FROM [SHOW JOBS] WHERE job_id in ($1, $2)`, jobID1, jobID2) + if len(res) == 2 && res[0][0] == "succeeded" && res[1][0] == "succeeded" { + return nil + } + return errors.New("") + }) +}