Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
107504: sql: Add observability when schema change jobs are blocked by concurrent ones r=Xiang-Gu a=Xiang-Gu

Previously, when using legacy schema changes, concurrent schema changes will wait for preceding ones if it's not first in line but we didn't provide the blocking schema change job ID in the log nor error. This is inadequate in debugging and troubling shooting support ticket. This commit fixes that by providing the blocking schema change job ID if this happens.

Fix cockroachdb#103733
Release note (sql): When there are concurrent legacy schema change jobs, we will be able to know what the blocking schema change job ID is from the relevant log entries.

Co-authored-by: Xiang Gu <[email protected]>
  • Loading branch information
craig[bot] and Xiang-Gu committed Jul 26, 2023
2 parents 284b6c0 + da746bf commit bac4c79
Show file tree
Hide file tree
Showing 12 changed files with 233 additions and 6 deletions.
9 changes: 9 additions & 0 deletions pkg/sql/catalog/dbdesc/database_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,15 @@ func (desc *immutable) HasConcurrentSchemaChanges() bool {
desc.DeclarativeSchemaChangerState.JobID != catpb.InvalidJobID
}

// ConcurrentSchemaChangeJobIDs implements catalog.Descriptor.
func (desc *immutable) ConcurrentSchemaChangeJobIDs() (ret []catpb.JobID) {
if desc.DeclarativeSchemaChangerState != nil &&
desc.DeclarativeSchemaChangerState.JobID != catpb.InvalidJobID {
ret = append(ret, desc.DeclarativeSchemaChangerState.JobID)
}
return ret
}

// GetDefaultPrivilegeDescriptor returns a DefaultPrivilegeDescriptor.
func (desc *immutable) GetDefaultPrivilegeDescriptor() catalog.DefaultPrivilegeDescriptor {
defaultPrivilegeDescriptor := desc.GetDefaultPrivileges()
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/catalog/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,10 @@ type Descriptor interface {
// in progress, either legacy or declarative.
HasConcurrentSchemaChanges() bool

// ConcurrentSchemaChangeJobIDs returns all in-progress schema change
// jobs, either legacy or declarative.
ConcurrentSchemaChangeJobIDs() []catpb.JobID

// SkipNamespace is true when a descriptor should not have a namespace record.
SkipNamespace() bool

Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/catalog/funcdesc/func_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,15 @@ func (desc *immutable) HasConcurrentSchemaChanges() bool {
desc.DeclarativeSchemaChangerState.JobID != catpb.InvalidJobID
}

// ConcurrentSchemaChangeJobIDs implements the catalog.Descriptor interface.
func (desc *immutable) ConcurrentSchemaChangeJobIDs() (ret []catpb.JobID) {
if desc.DeclarativeSchemaChangerState != nil &&
desc.DeclarativeSchemaChangerState.JobID != catpb.InvalidJobID {
ret = append(ret, desc.DeclarativeSchemaChangerState.JobID)
}
return ret
}

// SkipNamespace implements the catalog.Descriptor interface.
func (desc *immutable) SkipNamespace() bool {
return true
Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/catalog/schemadesc/schema_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,15 @@ func (desc *immutable) HasConcurrentSchemaChanges() bool {
desc.DeclarativeSchemaChangerState.JobID != catpb.InvalidJobID
}

// ConcurrentSchemaChangeJobIDs implements catalog.Descriptor.
func (desc *immutable) ConcurrentSchemaChangeJobIDs() (ret []catpb.JobID) {
if desc.DeclarativeSchemaChangerState != nil &&
desc.DeclarativeSchemaChangerState.JobID != catpb.InvalidJobID {
ret = append(ret, desc.DeclarativeSchemaChangerState.JobID)
}
return ret
}

// MaybeIncrementVersion implements the MutableDescriptor interface.
func (desc *Mutable) MaybeIncrementVersion() {
// Already incremented, no-op.
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/catalog/schemadesc/synthetic_schema_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ func (p synthetic) HasConcurrentSchemaChanges() bool {
return false
}

// ConcurrentSchemaChangeJobIDs implements catalog.Descriptor.
func (p synthetic) ConcurrentSchemaChangeJobIDs() []catpb.JobID {
return nil
}

// SkipNamespace implements the descriptor interface.
// We never store synthetic descriptors.
func (p synthetic) SkipNamespace() bool {
Expand Down
14 changes: 14 additions & 0 deletions pkg/sql/catalog/tabledesc/table_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,20 @@ func (desc *wrapper) HasConcurrentSchemaChanges() bool {
len(desc.MutationJobs) > 0
}

// ConcurrentSchemaChangeJobIDs implements catalog.Descriptor.
func (desc *wrapper) ConcurrentSchemaChangeJobIDs() (ret []catpb.JobID) {
if desc.DeclarativeSchemaChangerState != nil &&
desc.DeclarativeSchemaChangerState.JobID != catpb.InvalidJobID {
ret = append(ret, desc.DeclarativeSchemaChangerState.JobID)
}
if len(desc.MutationJobs) > 0 {
for _, mutationJob := range desc.MutationJobs {
ret = append(ret, mutationJob.JobID)
}
}
return ret
}

// SkipNamespace implements the descriptor interface.
func (desc *wrapper) SkipNamespace() bool {
// Virtual tables are hard-coded and don't have entries in the
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/catalog/typedesc/table_implicit_record_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,11 @@ func (v *tableImplicitRecordType) HasConcurrentSchemaChanges() bool {
return false
}

// ConcurrentSchemaChangeJobIDs implements catalog.Descriptor.
func (v *tableImplicitRecordType) ConcurrentSchemaChangeJobIDs() []catpb.JobID {
return nil
}

// SkipNamespace implements catalog.Descriptor. We never store table implicit
// record type which is always constructed in memory.
func (v *tableImplicitRecordType) SkipNamespace() bool {
Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/catalog/typedesc/type_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,15 @@ func (desc *immutable) HasConcurrentSchemaChanges() bool {
return false
}

// ConcurrentSchemaChangeJobIDs implements catalog.Descriptor.
func (desc *immutable) ConcurrentSchemaChangeJobIDs() (ret []catpb.JobID) {
if desc.DeclarativeSchemaChangerState != nil &&
desc.DeclarativeSchemaChangerState.JobID != catpb.InvalidJobID {
ret = append(ret, desc.DeclarativeSchemaChangerState.JobID)
}
return ret
}

// SkipNamespace implements the descriptor interface.
func (desc *immutable) SkipNamespace() bool {
return false
Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/schema_change_plan_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/descmetadata"
Expand Down Expand Up @@ -202,6 +203,7 @@ func (p *planner) waitForDescriptorSchemaChanges(
for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); {
now := p.ExecCfg().Clock.Now()
var isBlocked bool
var blockingJobIDs []catpb.JobID
if err := p.ExecCfg().InternalDB.DescsTxn(ctx, func(
ctx context.Context, txn descs.Txn,
) error {
Expand All @@ -213,6 +215,7 @@ func (p *planner) waitForDescriptorSchemaChanges(
return err
}
isBlocked = desc.HasConcurrentSchemaChanges()
blockingJobIDs = desc.ConcurrentSchemaChangeJobIDs()
return nil
}); err != nil {
return err
Expand All @@ -222,8 +225,8 @@ func (p *planner) waitForDescriptorSchemaChanges(
}
if logEvery.ShouldLog() {
log.Infof(ctx,
"schema change waiting for concurrent schema changes on descriptor %d,"+
" waited %v so far", descID, timeutil.Since(start),
"schema change waiting for %v concurrent schema change job(s) %v on descriptor %d,"+
" waited %v so far", len(blockingJobIDs), blockingJobIDs, descID, timeutil.Since(start),
)
}
if knobs != nil && knobs.WhileWaitingForConcurrentSchemaChanges != nil {
Expand Down
38 changes: 34 additions & 4 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,14 +584,29 @@ func (sc *SchemaChanger) notFirstInLine(ctx context.Context, desc catalog.Descri
// descriptor, it seems possible for a job to be resumed after the mutation
// has already been removed. If there's a mutation provided, we should check
// whether it actually exists on the table descriptor and exit the job if not.
for i, mutation := range tableDesc.AllMutations() {
allMutations := tableDesc.AllMutations()
for i, mutation := range allMutations {
if mutation.MutationID() == sc.mutationID {
if i != 0 {
blockingJobIDsAsSet := make(map[catpb.JobID]struct{})
for j := 0; j < i; j++ {
blockingJobID, err := mustGetJobIDWithMutationID(tableDesc, allMutations[j].MutationID())
if err != nil {
return err
}
blockingJobIDsAsSet[blockingJobID] = struct{}{}
}
blockingJobIDs := make([]catpb.JobID, 0, len(blockingJobIDsAsSet))
for jobID := range blockingJobIDsAsSet {
blockingJobIDs = append(blockingJobIDs, jobID)
}
log.Infof(ctx,
"schema change on %q (v%d): another change is still in progress",
desc.GetName(), desc.GetVersion(),
"schema change on %q (v%d): another %v schema change job(s) %v is still in progress "+
"and it is blocking this job from proceeding",
desc.GetName(), desc.GetVersion(), len(blockingJobIDs), blockingJobIDs,
)
return errSchemaChangeNotFirstInLine
return errors.Wrapf(errSchemaChangeNotFirstInLine, "schema change is "+
"blocked by %v other schema change job(s) %v", len(blockingJobIDs), blockingJobIDs)
}
break
}
Expand All @@ -600,6 +615,21 @@ func (sc *SchemaChanger) notFirstInLine(ctx context.Context, desc catalog.Descri
return nil
}

func mustGetJobIDWithMutationID(
tableDesc catalog.TableDescriptor, mutationID descpb.MutationID,
) (jobID catpb.JobID, err error) {
for _, mutationJob := range tableDesc.GetMutationJobs() {
if mutationJob.MutationID == mutationID {
jobID = mutationJob.JobID
}
}
if jobID == catpb.InvalidJobID {
return 0, errors.AssertionFailedf("mutation job with mutation ID %v is not found in table %q",
mutationID, tableDesc.GetName())
}
return jobID, nil
}

func (sc *SchemaChanger) getTargetDescriptor(ctx context.Context) (catalog.Descriptor, error) {
// Retrieve the descriptor that is being changed.
var desc catalog.Descriptor
Expand Down
91 changes: 91 additions & 0 deletions pkg/sql/schema_changer_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,18 @@ package sql

import (
"context"
"regexp"
"sort"
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/backfill"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -88,3 +93,89 @@ func TestCalculateSplitAtShards(t *testing.T) {
})
}
}

// TestNotFirstInLine tests that if a schema change's mutation is not first in
// line, the error message clearly state what the blocking schema change job ID
// is.
func TestNotFirstInLine(t *testing.T) {
defer leaktest.AfterTest(t)()

// A helper to extract blocking schema change job IDs from the error message
// which is of the form "schema change is blocked by x other schema change
// job(s) [yyy zzz]".
extractSortedBlockingJobIDsFromNotFirstInLineErr := func(errMsg string) []string {
p := regexp.MustCompile(`\[(.*?)\]`)
m := p.FindStringSubmatch(errMsg)
require.NotNilf(t, m, "did not find any blocking job IDs")
blockingJobIDs := strings.Fields(m[1])
sort.Slice(blockingJobIDs, func(i, j int) bool {
return blockingJobIDs[i] <= blockingJobIDs[j]
})
return blockingJobIDs
}

ctx := context.Background()
desc := descpb.TableDescriptor{
Name: "t",
ID: 104,
Mutations: []descpb.DescriptorMutation{
{
Descriptor_: &descpb.DescriptorMutation_Index{},
MutationID: 1,
},
{
Descriptor_: &descpb.DescriptorMutation_Index{},
MutationID: 1,
},
{
Descriptor_: &descpb.DescriptorMutation_Column{},
MutationID: 2,
},
{
Descriptor_: &descpb.DescriptorMutation_Column{},
MutationID: 3,
},
},
MutationJobs: []descpb.TableDescriptor_MutationJob{
{
JobID: 11111,
MutationID: 1,
},
{
JobID: 22222,
MutationID: 2,
},
{
JobID: 33333,
MutationID: 3,
},
},
}
mut := tabledesc.NewBuilder(&desc).BuildExistingMutableTable()
{
sc := SchemaChanger{
descID: 104,
mutationID: 1,
}
err := sc.notFirstInLine(ctx, mut)
require.NoError(t, err)
}
{
sc := SchemaChanger{
descID: 104,
mutationID: 2,
}
err := sc.notFirstInLine(ctx, mut)
require.True(t, errors.Is(err, errSchemaChangeNotFirstInLine))
require.Equal(t, []string{"11111"}, extractSortedBlockingJobIDsFromNotFirstInLineErr(err.Error()))
}
{
sc := SchemaChanger{
descID: 104,
mutationID: 3,
}
err := sc.notFirstInLine(ctx, mut)
require.True(t, errors.Is(err, errSchemaChangeNotFirstInLine))
require.Equal(t, []string{"11111", "22222"}, extractSortedBlockingJobIDsFromNotFirstInLineErr(err.Error()))
}
}
39 changes: 39 additions & 0 deletions pkg/sql/schema_changer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"database/sql/driver"
"fmt"
"math/rand"
"regexp"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -8389,3 +8390,41 @@ SELECT fraction_completed > 0
}
}
}

// TestLegacySchemaChangerWaitsForOtherSchemaChanges tests concurrent legacy schema changes
// wait properly for preceding ones if it's not first in line.
func TestLegacySchemaChangerWaitsForOtherSchemaChanges(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
params, _ := tests.CreateTestServerParams()
params.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals()

s, sqlDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)
tdb := sqlutils.MakeSQLRunner(sqlDB)

tdb.Exec(t, `SET use_declarative_schema_changer = off`)
tdb.Exec(t, `CREATE TABLE t (i INT PRIMARY KEY);`)
tdb.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints = 'schemachanger.before.exec';`)

pattern, err := regexp.Compile(`\d+`)
require.NoError(t, err)
_, err = sqlDB.Exec(`CREATE INDEX idx ON t (i);`)
jobID1 := pattern.FindString(err.Error())
require.NotEmpty(t, jobID1)
_, err = sqlDB.Exec(`ALTER TABLE t ADD COLUMN j INT DEFAULT 30;`)
jobID2 := pattern.FindString(err.Error())
require.NotEmpty(t, jobID2)

tdb.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints = '';`)
tdb.Exec(t, `RESUME JOB $1`, jobID2)
tdb.Exec(t, `RESUME JOB $1`, jobID1)
testutils.SucceedsSoon(t, func() error {
res := tdb.QueryStr(t, `SELECT status FROM [SHOW JOBS] WHERE job_id in ($1, $2)`, jobID1, jobID2)
if len(res) == 2 && res[0][0] == "succeeded" && res[1][0] == "succeeded" {
return nil
}
return errors.New("")
})
}

0 comments on commit bac4c79

Please sign in to comment.