diff --git a/pkg/ccl/streamingccl/streamclient/random_stream_client.go b/pkg/ccl/streamingccl/streamclient/random_stream_client.go index 49934b65684c..d2cafd7dcf16 100644 --- a/pkg/ccl/streamingccl/streamclient/random_stream_client.go +++ b/pkg/ccl/streamingccl/streamclient/random_stream_client.go @@ -206,6 +206,7 @@ type randomEventGenerator struct { config randomStreamConfig numEventsSinceLastResolved int sstMaker SSTableMakerFn + codec keys.SQLCodec tableDesc *tabledesc.Mutable systemKVs []roachpb.KeyValue } @@ -228,6 +229,7 @@ func newRandomEventGenerator( numEventsSinceLastResolved: 0, sstMaker: fn, tableDesc: tableDesc, + codec: keys.MakeSQLCodec(config.tenantID), systemKVs: systemKVs, }, nil } @@ -238,7 +240,7 @@ func (r *randomEventGenerator) generateNewEvent() streamingccl.Event { // Emit a CheckpointEvent. resolvedTime := timeutil.Now() hlcResolvedTime := hlc.Timestamp{WallTime: resolvedTime.UnixNano()} - resolvedSpan := jobspb.ResolvedSpan{Span: r.tableDesc.TableSpan(keys.SystemSQLCodec), Timestamp: hlcResolvedTime} + resolvedSpan := jobspb.ResolvedSpan{Span: r.tableDesc.TableSpan(r.codec), Timestamp: hlcResolvedTime} event = streamingccl.MakeCheckpointEvent([]jobspb.ResolvedSpan{resolvedSpan}) r.numEventsSinceLastResolved = 0 } else { @@ -257,11 +259,11 @@ func (r *randomEventGenerator) generateNewEvent() streamingccl.Event { size := 10 + r.rng.Intn(30) keyVals := make([]roachpb.KeyValue, 0, size) for i := 0; i < size; i++ { - keyVals = append(keyVals, makeRandomKey(r.rng, r.config, r.tableDesc)) + keyVals = append(keyVals, makeRandomKey(r.rng, r.config, r.codec, r.tableDesc)) } event = streamingccl.MakeSSTableEvent(r.sstMaker(keyVals)) } else { - event = streamingccl.MakeKVEvent(makeRandomKey(r.rng, r.config, r.tableDesc)) + event = streamingccl.MakeKVEvent(makeRandomKey(r.rng, r.config, r.codec, r.tableDesc)) } r.numEventsSinceLastResolved++ } @@ -360,6 +362,7 @@ func (m *RandomStreamClient) Plan(ctx context.Context, _ streampb.StreamID) (Top log.Infof(ctx, "planning random stream for tenant %d", m.config.tenantID) // Allocate table IDs and return one per partition address in the topology. + srcCodec := keys.MakeSQLCodec(m.config.tenantID) for i := 0; i < m.config.numPartitions; i++ { tableID := m.getNextTableID() tableDesc, err := m.tableDescForID(tableID) @@ -375,7 +378,7 @@ func (m *RandomStreamClient) Plan(ctx context.Context, _ streampb.StreamID) (Top ID: strconv.Itoa(i), SrcAddr: streamingccl.PartitionAddress(partitionURI), SubscriptionToken: []byte(partitionURI), - Spans: []roachpb.Span{tableDesc.TableSpan(keys.SystemSQLCodec)}, + Spans: []roachpb.Span{tableDesc.TableSpan(srcCodec)}, }) } @@ -579,7 +582,7 @@ func rekey(tenantID roachpb.TenantID, k roachpb.Key) roachpb.Key { } func makeRandomKey( - r *rand.Rand, config randomStreamConfig, tableDesc *tabledesc.Mutable, + r *rand.Rand, config randomStreamConfig, codec keys.SQLCodec, tableDesc *tabledesc.Mutable, ) roachpb.KeyValue { // Create a key holding a random integer. keyDatum := tree.NewDInt(tree.DInt(r.Intn(config.valueRange))) @@ -590,7 +593,7 @@ func makeRandomKey( var colIDToRowIndex catalog.TableColMap colIDToRowIndex.Set(index.GetKeyColumnID(0), 0) - keyPrefix := rowenc.MakeIndexKeyPrefix(keys.SystemSQLCodec, tableDesc.GetID(), index.GetID()) + keyPrefix := rowenc.MakeIndexKeyPrefix(codec, tableDesc.GetID(), index.GetID()) k, _, err := rowenc.EncodeIndexKey(tableDesc, index, colIDToRowIndex, tree.Datums{keyDatum}, keyPrefix) if err != nil { panic(err) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go index 9f852d1b961d..d685bb65e873 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go @@ -15,12 +15,15 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" @@ -33,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" "github.com/cockroachdb/redact" @@ -211,7 +215,30 @@ func startDistIngestion( return rw.Err() } - updateRunningStatus(ctx, ingestionJob, jobspb.Replicating, "physical replication running") + // We now attempt to create initial splits. We currently do + // this once during initial planning to avoid re-splitting on + // resume since it isn't clear to us at the moment whether + // re-splitting is always going to be useful. + if !streamProgress.InitialSplitComplete { + codec := execCtx.ExtendedEvalContext().Codec + splitter := &dbSplitAndScatter{db: execCtx.ExecCfg().DB} + if err := createInitialSplits(ctx, codec, splitter, planner.initialTopology, details.DestinationTenantID); err != nil { + return err + } + } else { + log.Infof(ctx, "initial splits already complete") + } + + if err := ingestionJob.NoTxn().Update(ctx, func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { + md.Progress.GetStreamIngest().ReplicationStatus = jobspb.Replicating + md.Progress.GetStreamIngest().InitialSplitComplete = true + md.Progress.RunningStatus = "physical replication running" + ju.UpdateProgress(md.Progress) + return nil + }); err != nil { + return err + } + err = ctxgroup.GoAndWait(ctx, execInitialPlan, replanner, tracingAggLoop, streamSpanConfigs) if errors.Is(err, sql.ErrPlanChanged) { execCtx.ExecCfg().JobRegistry.MetricsStruct().StreamIngest.(*Metrics).ReplanCount.Inc(1) @@ -219,6 +246,131 @@ func startDistIngestion( return err } +// TODO(ssd): This is a duplicative with the split_and_scatter processor in +// backupccl. +type splitAndScatterer interface { + split( + ctx context.Context, + splitKey roachpb.Key, + expirationTime hlc.Timestamp, + ) error + + scatter( + ctx context.Context, + scatterKey roachpb.Key, + ) error + + now() hlc.Timestamp +} + +type dbSplitAndScatter struct { + db *kv.DB +} + +func (s *dbSplitAndScatter) split( + ctx context.Context, splitKey roachpb.Key, expirationTime hlc.Timestamp, +) error { + return s.db.AdminSplit(ctx, splitKey, expirationTime) +} + +func (s *dbSplitAndScatter) scatter(ctx context.Context, scatterKey roachpb.Key) error { + _, pErr := kv.SendWrapped(ctx, s.db.NonTransactionalSender(), &kvpb.AdminScatterRequest{ + RequestHeader: kvpb.RequestHeaderFromSpan(roachpb.Span{ + Key: scatterKey, + EndKey: scatterKey.Next(), + }), + RandomizeLeases: true, + MaxSize: 1, // don't scatter non-empty ranges on resume. + }) + return pErr.GoError() +} + +func (s *dbSplitAndScatter) now() hlc.Timestamp { + return s.db.Clock().Now() +} + +// createInitialSplits creates splits based on the given toplogy from the +// source. +// +// The idea here is to use the information from the source cluster about +// the distribution of the data to produce split points to help prevent +// ingestion processors from pushing data into the same ranges during +// the initial scan. +func createInitialSplits( + ctx context.Context, + codec keys.SQLCodec, + splitter splitAndScatterer, + topology streamclient.Topology, + destTenantID roachpb.TenantID, +) error { + ctx, sp := tracing.ChildSpan(ctx, "streamingest.createInitialSplits") + defer sp.Finish() + + rekeyer, err := backupccl.MakeKeyRewriterFromRekeys(codec, + nil /* tableRekeys */, []execinfrapb.TenantRekey{ + { + OldID: topology.SourceTenantID, + NewID: destTenantID, + }}, + true /* restoreTenantFromStream */) + if err != nil { + return err + } + for _, partition := range topology.Partitions { + for _, span := range partition.Spans { + startKey := span.Key.Clone() + splitKey, _, err := rekeyer.RewriteKey(startKey, 0 /* walltimeForImportElision */) + if err != nil { + return err + } + + // NOTE(ssd): EnsureSafeSplitKey called on an arbitrary + // key unfortunately results in many of our split keys + // mapping to the same key for workloads like TPCC where + // the schema of the table includes integers that will + // get erroneously treated as the column family length. + // + // Since the partitions are generated from a call to + // PartitionSpans on the source cluster, they should be + // aligned with the split points in the original cluster + // and thus should be valid split keys. But, we are + // opening ourselves up to replicating bad splits from + // the original cluster. + // + // if newSplitKey, err := keys.EnsureSafeSplitKey(splitKey); err != nil { + // // Ignore the error since keys such as + // // /Tenant/2/Table/13 is an OK start key but + // // returns an error. + // } else if len(newSplitKey) != 0 { + // splitKey = newSplitKey + // } + // + if err := splitAndScatter(ctx, roachpb.Key(splitKey), splitter); err != nil { + return err + } + + } + } + return nil +} + +var splitAndScatterSitckyBitDuration = time.Hour + +func splitAndScatter( + ctx context.Context, splitAndScatterKey roachpb.Key, s splitAndScatterer, +) error { + log.Infof(ctx, "splitting and scattering at %s", splitAndScatterKey) + expirationTime := s.now().AddDuration(splitAndScatterSitckyBitDuration) + if err := s.split(ctx, splitAndScatterKey, expirationTime); err != nil { + return err + } + if err := s.scatter(ctx, splitAndScatterKey); err != nil { + log.Warningf(ctx, "failed to scatter span starting at %s: %v", + splitAndScatterKey, err) + } + return nil +} + // makeReplicationFlowPlanner creates a replicationFlowPlanner and the initial physical plan. func makeReplicationFlowPlanner( ctx context.Context, @@ -254,6 +406,7 @@ type replicationFlowPlanner struct { initialPlanCtx *sql.PlanningCtx initialStreamAddresses []string + initialTopology streamclient.Topology srcTenantID roachpb.TenantID } @@ -287,6 +440,7 @@ func (p *replicationFlowPlanner) constructPlanGenerator( return nil, nil, err } if !p.containsInitialStreamAddresses() { + p.initialTopology = topology p.initialStreamAddresses = topology.StreamAddresses() } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist_test.go index a2fa6624c064..1770a1dd9ec2 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist_test.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" @@ -27,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -324,3 +326,94 @@ func TestSourceDestMatching(t *testing.T) { }) } } + +type testSplitter struct { + splits []roachpb.Key + scatters []roachpb.Key + splitErr func(key roachpb.Key) error + scatterErr func(key roachpb.Key) error +} + +func (ts *testSplitter) split(_ context.Context, splitKey roachpb.Key, _ hlc.Timestamp) error { + ts.splits = append(ts.splits, splitKey) + if ts.splitErr != nil { + return ts.splitErr(splitKey) + } + return nil +} + +func (ts *testSplitter) scatter(_ context.Context, scatterKey roachpb.Key) error { + ts.scatters = append(ts.scatters, scatterKey) + if ts.scatterErr != nil { + return ts.scatterErr(scatterKey) + } + return nil +} + +func (ts *testSplitter) now() hlc.Timestamp { + return hlc.Timestamp{} +} + +func TestCreateInitialSplits(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + sourceTenantID := roachpb.MustMakeTenantID(11) + inputCodec := keys.MakeSQLCodec(sourceTenantID) + destTenantID := roachpb.MustMakeTenantID(12) + outputCodec := keys.MakeSQLCodec(destTenantID) + + testSpan := func(codec keys.SQLCodec, tableID uint32) roachpb.Span { + return roachpb.Span{ + Key: codec.IndexPrefix(tableID, 1), + EndKey: codec.IndexPrefix(tableID, 2), + } + } + inputSpans := []roachpb.Span{ + testSpan(inputCodec, 100), + testSpan(inputCodec, 200), + testSpan(inputCodec, 300), + testSpan(inputCodec, 400), + } + outputSpans := []roachpb.Span{ + testSpan(outputCodec, 100), + testSpan(outputCodec, 200), + testSpan(outputCodec, 300), + testSpan(outputCodec, 400), + } + topo := streamclient.Topology{ + SourceTenantID: sourceTenantID, + Partitions: []streamclient.PartitionInfo{ + {Spans: inputSpans}, + }, + } + + t.Run("rekeys before splitting", func(t *testing.T) { + ts := &testSplitter{} + err := createInitialSplits(ctx, keys.SystemSQLCodec, ts, topo, destTenantID) + require.NoError(t, err) + expectedSplitsAndScatters := make([]roachpb.Key, 0, len(outputSpans)) + for _, sp := range outputSpans { + expectedSplitsAndScatters = append(expectedSplitsAndScatters, sp.Key) + } + + require.Equal(t, expectedSplitsAndScatters, ts.splits) + require.Equal(t, expectedSplitsAndScatters, ts.scatters) + + }) + t.Run("split errors are fatal", func(t *testing.T) { + require.Error(t, createInitialSplits(ctx, keys.SystemSQLCodec, &testSplitter{ + splitErr: func(_ roachpb.Key) error { + return errors.New("test error") + }, + }, topo, destTenantID)) + }) + t.Run("ignores scatter errors", func(t *testing.T) { + require.NoError(t, createInitialSplits(ctx, keys.SystemSQLCodec, &testSplitter{ + scatterErr: func(_ roachpb.Key) error { + return errors.New("test error") + }, + }, topo, destTenantID)) + }) +} diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index a65b996f0b69..3e0678a78cd4 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -179,7 +179,12 @@ message StreamIngestionProgress { // the cutover time gets set. repeated roachpb.Span remaining_cutover_spans = 8 [(gogoproto.nullable) = false]; - // Next Id: 9 + // InitialSplitComplete is true if the stream ingestion job has + // already split the tenant's keyspace according to the plan from + // the source tenant. + bool initial_split_complete = 9; + + // Next Id: 10 } message StreamReplicationDetails {