Skip to content

Commit

Permalink
crosscluster: fast failback with reader tenant
Browse files Browse the repository at this point in the history
This commit ensures that a reader tenant is created if the clause
`WITH READ VIRTUAL CLUSTER` is present in a cut-back command:

```
ALTER VIRTUAL CLUSTER f START REPLICATION OF g ON $1 WITH READ VIRTUAL CLUSTER
```

Epic: CRDB-23575
Fixes: cockroachdb#130894
Release note: None
  • Loading branch information
azhu-crl committed Oct 1, 2024
1 parent 53be53d commit 0457470
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 4 deletions.
9 changes: 8 additions & 1 deletion pkg/ccl/crosscluster/physical/alter_replication_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ func alterReplicationJobHook(
srcTenant,
retentionTTLSeconds,
alterTenantStmt,
options,
)
}
jobRegistry := p.ExecCfg().JobRegistry
Expand Down Expand Up @@ -321,6 +322,7 @@ func alterTenantRestartReplication(
srcTenant string,
retentionTTLSeconds int32,
alterTenantStmt *tree.AlterTenantReplication,
options *resolvedTenantReplicationOptions,
) error {
dstTenantID, err := roachpb.MakeTenantID(tenInfo.ID)
if err != nil {
Expand Down Expand Up @@ -399,6 +401,11 @@ func alterTenantRestartReplication(
revertTo = tenInfo.PreviousSourceTenant.CutoverAsOf
}

readerID, err := createReaderTenant(ctx, p, tenInfo.Name, dstTenantID, options)
if err != nil {
return err
}

return errors.Wrap(createReplicationJob(
ctx,
p,
Expand All @@ -416,7 +423,7 @@ func alterTenantRestartReplication(
ReplicationSourceAddress: alterTenantStmt.ReplicationSourceAddress,
Options: alterTenantStmt.Options,
},
roachpb.TenantID{},
readerID,
), "creating replication job")
}

Expand Down
121 changes: 121 additions & 0 deletions pkg/ccl/crosscluster/physical/standby_read_ts_poller_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,25 @@ package physical
import (
"context"
"fmt"
"net/url"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationtestutils"
"github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationutils"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -89,3 +96,117 @@ INSERT INTO a VALUES (1);
return nil
}, time.Minute)
}

func TestFastFailbackWithReaderTenant(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

skip.UnderRace(t, "test takes ~5 minutes under race")

serverA, aDB, _ := serverutils.StartServer(t, base.TestServerArgs{
DefaultTestTenant: base.TestControlsTenantsExplicitly,
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
})
defer serverA.Stopper().Stop(ctx)
serverB, bDB, _ := serverutils.StartServer(t, base.TestServerArgs{
DefaultTestTenant: base.TestControlsTenantsExplicitly,
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
})
defer serverB.Stopper().Stop(ctx)

sqlA := sqlutils.MakeSQLRunner(aDB)
sqlB := sqlutils.MakeSQLRunner(bDB)

serverAURL, cleanupURLA := sqlutils.PGUrl(t, serverA.SQLAddr(), t.Name(), url.User(username.RootUser))
defer cleanupURLA()
serverBURL, cleanupURLB := sqlutils.PGUrl(t, serverB.SQLAddr(), t.Name(), url.User(username.RootUser))
defer cleanupURLB()

for _, s := range []string{
"SET CLUSTER SETTING kv.rangefeed.enabled = true",
"SET CLUSTER SETTING kv.rangefeed.closed_timestamp_refresh_interval = '200ms'",
"SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'",
"SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '50ms'",

"SET CLUSTER SETTING physical_replication.consumer.heartbeat_frequency = '1s'",
"SET CLUSTER SETTING physical_replication.consumer.job_checkpoint_frequency = '100ms'",
"SET CLUSTER SETTING physical_replication.consumer.minimum_flush_interval = '10ms'",
"SET CLUSTER SETTING physical_replication.consumer.cutover_signal_poll_interval = '100ms'",
"SET CLUSTER SETTING spanconfig.reconciliation_job.checkpoint_interval = '100ms'",
} {
sqlA.Exec(t, s)
sqlB.Exec(t, s)
}

t.Logf("creating tenant f")
sqlA.Exec(t, "CREATE VIRTUAL CLUSTER f")
sqlA.Exec(t, "ALTER VIRTUAL CLUSTER f START SERVICE SHARED")

t.Logf("starting replication f->g")
sqlB.Exec(t, "CREATE VIRTUAL CLUSTER g FROM REPLICATION OF f ON $1 WITH READ VIRTUAL CLUSTER", serverAURL.String())

// Verify that reader tenant has been created for g
waitForReaderTenant(t, sqlB, "g-readonly")

// FAILOVER
_, consumerGJobID := replicationtestutils.GetStreamJobIds(t, ctx, sqlB, roachpb.TenantName("g"))
var ts1 string
sqlA.QueryRow(t, "SELECT cluster_logical_timestamp()").Scan(&ts1)

rng, _ := randutil.NewPseudoRand()
if rng.Intn(2) == 0 {
t.Logf("waiting for g@%s", ts1)
replicationtestutils.WaitUntilReplicatedTime(t,
replicationtestutils.DecimalTimeToHLC(t, ts1),
sqlB,
jobspb.JobID(consumerGJobID))

t.Logf("completing replication on g@%s", ts1)
sqlB.Exec(t, fmt.Sprintf("ALTER VIRTUAL CLUSTER g COMPLETE REPLICATION TO SYSTEM TIME '%s'", ts1))
} else {
t.Log("waiting for initial scan on g")
replicationtestutils.WaitUntilStartTimeReached(t, sqlB, jobspb.JobID(consumerGJobID))
t.Log("completing replication on g to latest")
sqlB.Exec(t, "ALTER VIRTUAL CLUSTER g COMPLETE REPLICATION TO LATEST")
}
jobutils.WaitForJobToSucceed(t, sqlB, jobspb.JobID(consumerGJobID))

sqlB.Exec(t, "ALTER VIRTUAL CLUSTER g START SERVICE SHARED")
var ts2 string
sqlA.QueryRow(t, "SELECT cluster_logical_timestamp()").Scan(&ts2)

sqlA.Exec(t, "ALTER VIRTUAL CLUSTER f STOP SERVICE")
waitUntilTenantServerStopped(t, serverA.SystemLayer(), "f")
t.Logf("starting replication g->f")
sqlA.Exec(t, "ALTER VIRTUAL CLUSTER f START REPLICATION OF g ON $1 WITH READ VIRTUAL CLUSTER", serverBURL.String())
_, consumerFJobID := replicationtestutils.GetStreamJobIds(t, ctx, sqlA, roachpb.TenantName("f"))
t.Logf("waiting for f@%s", ts2)
replicationtestutils.WaitUntilReplicatedTime(t,
replicationtestutils.DecimalTimeToHLC(t, ts2),
sqlA,
jobspb.JobID(consumerFJobID))

// Verify that reader tenant has been created for f
waitForReaderTenant(t, sqlA, "f-readonly")
}

func waitForReaderTenant(t *testing.T, db *sqlutils.SQLRunner, tenantName string) {
testutils.SucceedsSoon(t, func() error {
var numTenants int
db.QueryRow(t, `
SELECT count(*)
FROM system.tenants
WHERE name = $1
`, tenantName).Scan(&numTenants)

if numTenants != 1 {
return errors.Errorf("expected 1 tenant, got %d", numTenants)
}
return nil
})
}
6 changes: 3 additions & 3 deletions pkg/ccl/crosscluster/physical/stream_ingestion_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func ingestionPlanHook(
return nil
}

readerID, err := createReaderTenant(ctx, p, tenantInfo, destinationTenantID, options)
readerID, err := createReaderTenant(ctx, p, tenantInfo.Name, destinationTenantID, options)
if err != nil {
return err
}
Expand Down Expand Up @@ -302,15 +302,15 @@ func createReplicationJob(
func createReaderTenant(
ctx context.Context,
p sql.PlanHookState,
tenantInfo mtinfopb.TenantInfoWithUsage,
tenantName roachpb.TenantName,
destinationTenantID roachpb.TenantID,
options *resolvedTenantReplicationOptions,
) (roachpb.TenantID, error) {
var readerID roachpb.TenantID
if options.ReaderTenantEnabled() {
var readerInfo mtinfopb.TenantInfoWithUsage
readerInfo.DataState = mtinfopb.DataStateAdd
readerInfo.Name = tenantInfo.Name + "-readonly"
readerInfo.Name = tenantName + "-readonly"
readerInfo.ReadFromTenant = &destinationTenantID

readerZcfg, err := sql.GetHydratedZoneConfigForTenantsRange(ctx, p.Txn(), p.ExtendedEvalContext().Descs)
Expand Down

0 comments on commit 0457470

Please sign in to comment.