diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index b1662a466fe7..f7d62b23d866 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -4699,18 +4699,14 @@ ALTER TABLE t.test ADD COLUMN c INT AS (v + 4) STORED, ADD COLUMN d INT DEFAULT func TestCancelSchemaChange(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.WithIssue(t, 51796) const ( - numNodes = 3 maxValue = 100 ) var sqlDB *sqlutils.SQLRunner - var db *gosql.DB params, _ := createTestServerParams() doCancel := false - var enableAsyncSchemaChanges uint32 params.Knobs = base.TestingKnobs{ SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ WriteCheckpointInterval: time.Nanosecond, // checkpoint after every chunk. @@ -4721,52 +4717,41 @@ func TestCancelSchemaChange(t *testing.T) { if !doCancel { return nil } - if _, err := db.Exec(`CANCEL JOB ( + sqlDB.Exec(t, `CANCEL JOB ( SELECT job_id FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE' AND status = $1 AND description NOT LIKE 'ROLL BACK%' - )`, jobs.StatusRunning); err != nil { - panic(err) - } + )`, jobs.StatusRunning) return nil }, }, } - tc := serverutils.StartNewTestCluster(t, numNodes, base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, - ServerArgs: params, - }) - defer tc.Stopper().Stop(context.Background()) - db = tc.ServerConn(0) - kvDB := tc.Server(0).DB() - codec := tc.Server(0).ApplicationLayer().Codec() + server, db, kvDB := serverutils.StartServer(t, params) sqlDB = sqlutils.MakeSQLRunner(db) + defer server.Stopper().Stop(context.Background()) + codec := server.ApplicationLayer().Codec() // Disable strict GC TTL enforcement because we're going to shove a zero-value // TTL into the system with AddImmediateGCZoneConfig. defer sqltestutils.DisableGCTTLStrictEnforcement(t, db)() sqlDB.Exec(t, ` + SET use_declarative_schema_changer = 'off'; CREATE DATABASE t; CREATE TABLE t.test (k INT PRIMARY KEY, v INT, pi DECIMAL DEFAULT (DECIMAL '3.14')); `) + sqlDB.Exec(t, `SET CLUSTER SETTING jobs.registry.interval.adopt = '1s';`) + // Bulk insert. if err := sqltestutils.BulkInsertIntoTable(db, maxValue); err != nil { t.Fatal(err) } tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, codec, "t", "test") - // Split the table into multiple ranges. - var sps []serverutils.SplitPoint - const numSplits = numNodes * 2 - for i := 1; i <= numSplits-1; i++ { - sps = append(sps, serverutils.SplitPoint{TargetNodeIdx: i % numNodes, Vals: []interface{}{maxValue / numSplits * i}}) - } - tc.SplitTable(t, tableDesc, sps) ctx := context.Background() if err := sqltestutils.CheckTableKeyCount(ctx, kvDB, codec, 1, maxValue); err != nil { @@ -4780,49 +4765,48 @@ func TestCancelSchemaChange(t *testing.T) { // Set to true if the rollback returns in a running, waiting status. isGC bool }{ - {`ALTER TABLE t.public.test ADD COLUMN x DECIMAL DEFAULT 1.4::DECIMAL CREATE FAMILY f2, ADD CHECK (x >= 0)`, + {`ALTER TABLE t.public.test ADD COLUMN x DECIMAL DEFAULT 1.4 CREATE FAMILY f2, ADD CHECK (x >= 0)`, true, false}, {`CREATE INDEX foo ON t.public.test (v)`, true, true}, - {`ALTER TABLE t.public.test ADD COLUMN x DECIMAL DEFAULT 1.2::DECIMAL CREATE FAMILY f3, ADD CHECK (x >= 0)`, + {`ALTER TABLE t.public.test ADD COLUMN x DECIMAL DEFAULT 1.2 CREATE FAMILY f3, ADD CHECK (x >= 0)`, false, false}, {`CREATE INDEX foo ON t.public.test (v)`, false, true}, } idx := 0 + gcIdx := 0 for _, tc := range testCases { doCancel = tc.cancel if doCancel { if _, err := db.Exec(tc.sql); !testutils.IsError(err, "job canceled") { t.Fatalf("unexpected %v", err) } - if err := jobutils.VerifySystemJob(t, sqlDB, idx, jobspb.TypeSchemaChange, jobs.StatusCanceled, jobs.Record{ - Username: username.RootUserName(), - Description: tc.sql, - DescriptorIDs: descpb.IDs{ - tableDesc.GetID(), - }, - }); err != nil { - t.Fatal(err) - } - jobID := jobutils.GetJobID(t, sqlDB, idx) - idx++ + testutils.SucceedsSoon(t, func() error { + return jobutils.VerifySystemJob( + t, sqlDB, idx, jobspb.TypeSchemaChange, jobs.StatusCanceled, + jobs.Record{ + Username: username.RootUserName(), + Description: tc.sql, + DescriptorIDs: descpb.IDs{ + tableDesc.GetID(), + }, + }, + ) + }) jobRecord := jobs.Record{ Username: username.RootUserName(), - Description: fmt.Sprintf("ROLL BACK JOB %d: %s", jobID, tc.sql), + Description: fmt.Sprintf("GC for ROLLBACK of %s", tc.sql), DescriptorIDs: descpb.IDs{ tableDesc.GetID(), }, } - var err error if tc.isGC { - err = jobutils.VerifyRunningSystemJob(t, sqlDB, idx, jobspb.TypeSchemaChange, sql.RunningStatusWaitingGC, jobRecord) - } else { - err = jobutils.VerifySystemJob(t, sqlDB, idx, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobRecord) - } - if err != nil { - t.Fatal(err) + testutils.SucceedsSoon(t, func() error { + return jobutils.VerifyRunningSystemJob(t, sqlDB, gcIdx*2, jobspb.TypeSchemaChangeGC, sql.RunningStatusWaitingForMVCCGC, jobRecord) + }) + gcIdx++ } } else { sqlDB.Exec(t, tc.sql) @@ -4841,7 +4825,7 @@ func TestCancelSchemaChange(t *testing.T) { // Verify that the index foo over v is consistent, and that column x has // been backfilled properly. - rows, err := db.Query(`SELECT v, x from t.test@foo`) + rows, err := db.Query(`SELECT v, x from t.test@foo ORDER BY v`) if err != nil { t.Fatal(err) } @@ -4871,7 +4855,6 @@ func TestCancelSchemaChange(t *testing.T) { } // Verify that the data from the canceled CREATE INDEX is cleaned up. - atomic.StoreUint32(&enableAsyncSchemaChanges, 1) // TODO (lucy): when this test is no longer canceled, have it correctly handle doing GC immediately if _, err := sqltestutils.AddImmediateGCZoneConfig(db, tableDesc.GetID()); err != nil { t.Fatal(err)