diff --git a/pkg/ccl/crosscluster/physical/alter_replication_job.go b/pkg/ccl/crosscluster/physical/alter_replication_job.go index 341b5b563491..6db9c29367c2 100644 --- a/pkg/ccl/crosscluster/physical/alter_replication_job.go +++ b/pkg/ccl/crosscluster/physical/alter_replication_job.go @@ -239,6 +239,7 @@ func alterReplicationJobHook( srcTenant, retentionTTLSeconds, alterTenantStmt, + options, ) } jobRegistry := p.ExecCfg().JobRegistry @@ -321,6 +322,7 @@ func alterTenantRestartReplication( srcTenant string, retentionTTLSeconds int32, alterTenantStmt *tree.AlterTenantReplication, + options *resolvedTenantReplicationOptions, ) error { dstTenantID, err := roachpb.MakeTenantID(tenInfo.ID) if err != nil { @@ -399,6 +401,11 @@ func alterTenantRestartReplication( revertTo = tenInfo.PreviousSourceTenant.CutoverAsOf } + readerID, err := createReaderTenant(ctx, p, tenInfo.Name, dstTenantID, options) + if err != nil { + return err + } + return errors.Wrap(createReplicationJob( ctx, p, @@ -416,7 +423,7 @@ func alterTenantRestartReplication( ReplicationSourceAddress: alterTenantStmt.ReplicationSourceAddress, Options: alterTenantStmt.Options, }, - roachpb.TenantID{}, + readerID, ), "creating replication job") } diff --git a/pkg/ccl/crosscluster/physical/standby_read_ts_poller_job_test.go b/pkg/ccl/crosscluster/physical/standby_read_ts_poller_job_test.go index f324c03efc78..d87a7a99ecb8 100644 --- a/pkg/ccl/crosscluster/physical/standby_read_ts_poller_job_test.go +++ b/pkg/ccl/crosscluster/physical/standby_read_ts_poller_job_test.go @@ -11,18 +11,25 @@ package physical import ( "context" "fmt" + "net/url" "testing" "time" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationtestutils" "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationutils" + "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -89,3 +96,117 @@ INSERT INTO a VALUES (1); return nil }, time.Minute) } + +func TestFastFailbackWithReaderTenant(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + skip.UnderRace(t, "test takes ~5 minutes under race") + + serverA, aDB, _ := serverutils.StartServer(t, base.TestServerArgs{ + DefaultTestTenant: base.TestControlsTenantsExplicitly, + Knobs: base.TestingKnobs{ + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + }, + }) + defer serverA.Stopper().Stop(ctx) + serverB, bDB, _ := serverutils.StartServer(t, base.TestServerArgs{ + DefaultTestTenant: base.TestControlsTenantsExplicitly, + Knobs: base.TestingKnobs{ + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + }, + }) + defer serverB.Stopper().Stop(ctx) + + sqlA := sqlutils.MakeSQLRunner(aDB) + sqlB := sqlutils.MakeSQLRunner(bDB) + + serverAURL, cleanupURLA := sqlutils.PGUrl(t, serverA.SQLAddr(), t.Name(), url.User(username.RootUser)) + defer cleanupURLA() + serverBURL, cleanupURLB := sqlutils.PGUrl(t, serverB.SQLAddr(), t.Name(), url.User(username.RootUser)) + defer cleanupURLB() + + for _, s := range []string{ + "SET CLUSTER SETTING kv.rangefeed.enabled = true", + "SET CLUSTER SETTING kv.rangefeed.closed_timestamp_refresh_interval = '200ms'", + "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'", + "SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '50ms'", + + "SET CLUSTER SETTING physical_replication.consumer.heartbeat_frequency = '1s'", + "SET CLUSTER SETTING physical_replication.consumer.job_checkpoint_frequency = '100ms'", + "SET CLUSTER SETTING physical_replication.consumer.minimum_flush_interval = '10ms'", + "SET CLUSTER SETTING physical_replication.consumer.cutover_signal_poll_interval = '100ms'", + "SET CLUSTER SETTING spanconfig.reconciliation_job.checkpoint_interval = '100ms'", + } { + sqlA.Exec(t, s) + sqlB.Exec(t, s) + } + + t.Logf("creating tenant f") + sqlA.Exec(t, "CREATE VIRTUAL CLUSTER f") + sqlA.Exec(t, "ALTER VIRTUAL CLUSTER f START SERVICE SHARED") + + t.Logf("starting replication f->g") + sqlB.Exec(t, "CREATE VIRTUAL CLUSTER g FROM REPLICATION OF f ON $1 WITH READ VIRTUAL CLUSTER", serverAURL.String()) + + // Verify that reader tenant has been created for g + waitForReaderTenant(t, sqlB, "g-readonly") + + // FAILOVER + _, consumerGJobID := replicationtestutils.GetStreamJobIds(t, ctx, sqlB, roachpb.TenantName("g")) + var ts1 string + sqlA.QueryRow(t, "SELECT cluster_logical_timestamp()").Scan(&ts1) + + rng, _ := randutil.NewPseudoRand() + if rng.Intn(2) == 0 { + t.Logf("waiting for g@%s", ts1) + replicationtestutils.WaitUntilReplicatedTime(t, + replicationtestutils.DecimalTimeToHLC(t, ts1), + sqlB, + jobspb.JobID(consumerGJobID)) + + t.Logf("completing replication on g@%s", ts1) + sqlB.Exec(t, fmt.Sprintf("ALTER VIRTUAL CLUSTER g COMPLETE REPLICATION TO SYSTEM TIME '%s'", ts1)) + } else { + t.Log("waiting for initial scan on g") + replicationtestutils.WaitUntilStartTimeReached(t, sqlB, jobspb.JobID(consumerGJobID)) + t.Log("completing replication on g to latest") + sqlB.Exec(t, "ALTER VIRTUAL CLUSTER g COMPLETE REPLICATION TO LATEST") + } + jobutils.WaitForJobToSucceed(t, sqlB, jobspb.JobID(consumerGJobID)) + + sqlB.Exec(t, "ALTER VIRTUAL CLUSTER g START SERVICE SHARED") + var ts2 string + sqlA.QueryRow(t, "SELECT cluster_logical_timestamp()").Scan(&ts2) + + sqlA.Exec(t, "ALTER VIRTUAL CLUSTER f STOP SERVICE") + waitUntilTenantServerStopped(t, serverA.SystemLayer(), "f") + t.Logf("starting replication g->f") + sqlA.Exec(t, "ALTER VIRTUAL CLUSTER f START REPLICATION OF g ON $1 WITH READ VIRTUAL CLUSTER", serverBURL.String()) + _, consumerFJobID := replicationtestutils.GetStreamJobIds(t, ctx, sqlA, roachpb.TenantName("f")) + t.Logf("waiting for f@%s", ts2) + replicationtestutils.WaitUntilReplicatedTime(t, + replicationtestutils.DecimalTimeToHLC(t, ts2), + sqlA, + jobspb.JobID(consumerFJobID)) + + // Verify that reader tenant has been created for f + waitForReaderTenant(t, sqlA, "f-readonly") +} + +func waitForReaderTenant(t *testing.T, db *sqlutils.SQLRunner, tenantName string) { + testutils.SucceedsSoon(t, func() error { + var numTenants int + db.QueryRow(t, ` +SELECT count(*) +FROM system.tenants +WHERE name = $1 +`, tenantName).Scan(&numTenants) + + if numTenants != 1 { + return errors.Errorf("expected 1 tenant, got %d", numTenants) + } + return nil + }) +} diff --git a/pkg/ccl/crosscluster/physical/stream_ingestion_planning.go b/pkg/ccl/crosscluster/physical/stream_ingestion_planning.go index dae9e46b7093..e47d76983d61 100644 --- a/pkg/ccl/crosscluster/physical/stream_ingestion_planning.go +++ b/pkg/ccl/crosscluster/physical/stream_ingestion_planning.go @@ -190,7 +190,7 @@ func ingestionPlanHook( return nil } - readerID, err := createReaderTenant(ctx, p, tenantInfo, destinationTenantID, options) + readerID, err := createReaderTenant(ctx, p, tenantInfo.Name, destinationTenantID, options) if err != nil { return err } @@ -302,7 +302,7 @@ func createReplicationJob( func createReaderTenant( ctx context.Context, p sql.PlanHookState, - tenantInfo mtinfopb.TenantInfoWithUsage, + tenantName roachpb.TenantName, destinationTenantID roachpb.TenantID, options *resolvedTenantReplicationOptions, ) (roachpb.TenantID, error) { @@ -310,7 +310,7 @@ func createReaderTenant( if options.ReaderTenantEnabled() { var readerInfo mtinfopb.TenantInfoWithUsage readerInfo.DataState = mtinfopb.DataStateAdd - readerInfo.Name = tenantInfo.Name + "-readonly" + readerInfo.Name = tenantName + "-readonly" readerInfo.ReadFromTenant = &destinationTenantID readerZcfg, err := sql.GetHydratedZoneConfigForTenantsRange(ctx, p.Txn(), p.ExtendedEvalContext().Descs)