From 27e87b6ffbaf381ef2960debc7d01d7802606fbe Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Thu, 14 Sep 2023 19:28:25 +0100 Subject: [PATCH] streamingccl: split destination based on source This adds initial splits and scatters to the stream ingestion job. We split based on the topology delivered by the source to align our initial splits with the splits that are likely coming in from the source cluster. These splits substantially improve the throughput of a stream's initial scan. Careful reviewers may note that we are not calling EnsureSafeSplitKey here. That is because the Spans in the source topology should already be safe split keys. Since EnsureSafeSplitKey isn't idempotent, if we were ta call EnsureSafeSplitKey, we would end up creating splits at rather poor split locations since any key that ends in an integer would have components erroneously trimmed from it. An alternative here would be to do what the buffering adder does and create an initial set of splits by distributing them over the keyspace of the first buffer. That has the advantage of allowing us to call EnsureSafeSplitKey at the cost of less effective splits. Note that we face this same problem during bulk operations. Perhaps in the future the source cluster should send some metadata about manual splits that are issued or _something_ that lets the destination know about the fact that we expect a large amount of data in a particular span. Epic: none Release note: None --- .../streamclient/random_stream_client.go | 15 +- .../streamingest/stream_ingestion_dist.go | 156 +++++++++++++++++- .../stream_ingestion_dist_test.go | 93 +++++++++++ pkg/jobs/jobspb/jobs.proto | 7 +- 4 files changed, 263 insertions(+), 8 deletions(-) diff --git a/pkg/ccl/streamingccl/streamclient/random_stream_client.go b/pkg/ccl/streamingccl/streamclient/random_stream_client.go index 49934b65684c..d2cafd7dcf16 100644 --- a/pkg/ccl/streamingccl/streamclient/random_stream_client.go +++ b/pkg/ccl/streamingccl/streamclient/random_stream_client.go @@ -206,6 +206,7 @@ type randomEventGenerator struct { config randomStreamConfig numEventsSinceLastResolved int sstMaker SSTableMakerFn + codec keys.SQLCodec tableDesc *tabledesc.Mutable systemKVs []roachpb.KeyValue } @@ -228,6 +229,7 @@ func newRandomEventGenerator( numEventsSinceLastResolved: 0, sstMaker: fn, tableDesc: tableDesc, + codec: keys.MakeSQLCodec(config.tenantID), systemKVs: systemKVs, }, nil } @@ -238,7 +240,7 @@ func (r *randomEventGenerator) generateNewEvent() streamingccl.Event { // Emit a CheckpointEvent. resolvedTime := timeutil.Now() hlcResolvedTime := hlc.Timestamp{WallTime: resolvedTime.UnixNano()} - resolvedSpan := jobspb.ResolvedSpan{Span: r.tableDesc.TableSpan(keys.SystemSQLCodec), Timestamp: hlcResolvedTime} + resolvedSpan := jobspb.ResolvedSpan{Span: r.tableDesc.TableSpan(r.codec), Timestamp: hlcResolvedTime} event = streamingccl.MakeCheckpointEvent([]jobspb.ResolvedSpan{resolvedSpan}) r.numEventsSinceLastResolved = 0 } else { @@ -257,11 +259,11 @@ func (r *randomEventGenerator) generateNewEvent() streamingccl.Event { size := 10 + r.rng.Intn(30) keyVals := make([]roachpb.KeyValue, 0, size) for i := 0; i < size; i++ { - keyVals = append(keyVals, makeRandomKey(r.rng, r.config, r.tableDesc)) + keyVals = append(keyVals, makeRandomKey(r.rng, r.config, r.codec, r.tableDesc)) } event = streamingccl.MakeSSTableEvent(r.sstMaker(keyVals)) } else { - event = streamingccl.MakeKVEvent(makeRandomKey(r.rng, r.config, r.tableDesc)) + event = streamingccl.MakeKVEvent(makeRandomKey(r.rng, r.config, r.codec, r.tableDesc)) } r.numEventsSinceLastResolved++ } @@ -360,6 +362,7 @@ func (m *RandomStreamClient) Plan(ctx context.Context, _ streampb.StreamID) (Top log.Infof(ctx, "planning random stream for tenant %d", m.config.tenantID) // Allocate table IDs and return one per partition address in the topology. + srcCodec := keys.MakeSQLCodec(m.config.tenantID) for i := 0; i < m.config.numPartitions; i++ { tableID := m.getNextTableID() tableDesc, err := m.tableDescForID(tableID) @@ -375,7 +378,7 @@ func (m *RandomStreamClient) Plan(ctx context.Context, _ streampb.StreamID) (Top ID: strconv.Itoa(i), SrcAddr: streamingccl.PartitionAddress(partitionURI), SubscriptionToken: []byte(partitionURI), - Spans: []roachpb.Span{tableDesc.TableSpan(keys.SystemSQLCodec)}, + Spans: []roachpb.Span{tableDesc.TableSpan(srcCodec)}, }) } @@ -579,7 +582,7 @@ func rekey(tenantID roachpb.TenantID, k roachpb.Key) roachpb.Key { } func makeRandomKey( - r *rand.Rand, config randomStreamConfig, tableDesc *tabledesc.Mutable, + r *rand.Rand, config randomStreamConfig, codec keys.SQLCodec, tableDesc *tabledesc.Mutable, ) roachpb.KeyValue { // Create a key holding a random integer. keyDatum := tree.NewDInt(tree.DInt(r.Intn(config.valueRange))) @@ -590,7 +593,7 @@ func makeRandomKey( var colIDToRowIndex catalog.TableColMap colIDToRowIndex.Set(index.GetKeyColumnID(0), 0) - keyPrefix := rowenc.MakeIndexKeyPrefix(keys.SystemSQLCodec, tableDesc.GetID(), index.GetID()) + keyPrefix := rowenc.MakeIndexKeyPrefix(codec, tableDesc.GetID(), index.GetID()) k, _, err := rowenc.EncodeIndexKey(tableDesc, index, colIDToRowIndex, tree.Datums{keyDatum}, keyPrefix) if err != nil { panic(err) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go index 5c79614db213..a33351064a64 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go @@ -15,12 +15,15 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" @@ -33,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" "github.com/cockroachdb/redact" @@ -211,7 +215,30 @@ func startDistIngestion( return rw.Err() } - updateRunningStatus(ctx, ingestionJob, jobspb.Replicating, "physical replication running") + // We now attempt to create initial splits. We currently do + // this once during initial planning to avoid re-splitting on + // resume since it isn't clear to us at the moment whether + // re-splitting is always going to be useful. + if !streamProgress.InitialSplitComplete { + codec := execCtx.ExtendedEvalContext().Codec + splitter := &dbSplitAndScatter{db: execCtx.ExecCfg().DB} + if err := createInitialSplits(ctx, codec, splitter, planner.initialTopology, details.DestinationTenantID); err != nil { + return err + } + } else { + log.Infof(ctx, "initial splits already complete") + } + + if err := ingestionJob.NoTxn().Update(ctx, func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { + md.Progress.GetStreamIngest().ReplicationStatus = jobspb.Replicating + md.Progress.GetStreamIngest().InitialSplitComplete = true + md.Progress.RunningStatus = "physical replication running" + ju.UpdateProgress(md.Progress) + return nil + }); err != nil { + return err + } + err = ctxgroup.GoAndWait(ctx, execInitialPlan, replanner, tracingAggLoop, streamSpanConfigs) if errors.Is(err, sql.ErrPlanChanged) { execCtx.ExecCfg().JobRegistry.MetricsStruct().StreamIngest.(*Metrics).ReplanCount.Inc(1) @@ -219,6 +246,131 @@ func startDistIngestion( return err } +// TODO(ssd): This is a duplicative with the split_and_scatter processor in +// backupccl. +type splitAndScatterer interface { + split( + ctx context.Context, + splitKey roachpb.Key, + expirationTime hlc.Timestamp, + ) error + + scatter( + ctx context.Context, + scatterKey roachpb.Key, + ) error + + now() hlc.Timestamp +} + +type dbSplitAndScatter struct { + db *kv.DB +} + +func (s *dbSplitAndScatter) split( + ctx context.Context, splitKey roachpb.Key, expirationTime hlc.Timestamp, +) error { + return s.db.AdminSplit(ctx, splitKey, expirationTime) +} + +func (s *dbSplitAndScatter) scatter(ctx context.Context, scatterKey roachpb.Key) error { + _, pErr := kv.SendWrapped(ctx, s.db.NonTransactionalSender(), &kvpb.AdminScatterRequest{ + RequestHeader: kvpb.RequestHeaderFromSpan(roachpb.Span{ + Key: scatterKey, + EndKey: scatterKey.Next(), + }), + RandomizeLeases: true, + MaxSize: 1, // don't scatter non-empty ranges on resume. + }) + return pErr.GoError() +} + +func (s *dbSplitAndScatter) now() hlc.Timestamp { + return s.db.Clock().Now() +} + +// createInitialSplits creates splits based on the given toplogy from the +// source. +// +// The idea here is to use the information from the source cluster about +// the distribution of the data to produce split points to help prevent +// ingestion processors from pushing data into the same ranges during +// the initial scan. +func createInitialSplits( + ctx context.Context, + codec keys.SQLCodec, + splitter splitAndScatterer, + topology streamclient.Topology, + destTenantID roachpb.TenantID, +) error { + ctx, sp := tracing.ChildSpan(ctx, "streamingest.createInitialSplits") + defer sp.Finish() + + rekeyer, err := backupccl.MakeKeyRewriterFromRekeys(codec, + nil /* tableRekeys */, []execinfrapb.TenantRekey{ + { + OldID: topology.SourceTenantID, + NewID: destTenantID, + }}, + true /* restoreTenantFromStream */) + if err != nil { + return err + } + for _, partition := range topology.Partitions { + for _, span := range partition.Spans { + startKey := span.Key.Clone() + splitKey, _, err := rekeyer.RewriteKey(startKey, 0 /* walltimeForImportElision */) + if err != nil { + return err + } + + // NOTE(ssd): EnsureSafeSplitKey called on an arbitrary + // key unfortunately results in many of our split keys + // mapping to the same key for workloads like TPCC where + // the schema of the table includes integers that will + // get erroneously treated as the column family length. + // + // Since the partitions are generated from a call to + // PartitionSpans on the source cluster, they should be + // aligned with the split points in the original cluster + // and thus should be valid split keys. But, we are + // opening ourselves up to replicating bad splits from + // the original cluster. + // + // if newSplitKey, err := keys.EnsureSafeSplitKey(splitKey); err != nil { + // // Ignore the error since keys such as + // // /Tenant/2/Table/13 is an OK start key but + // // returns an error. + // } else if len(newSplitKey) != 0 { + // splitKey = newSplitKey + // } + // + if err := splitAndScatter(ctx, roachpb.Key(splitKey), splitter); err != nil { + return err + } + + } + } + return nil +} + +var splitAndScatterSitckyBitDuration = time.Hour + +func splitAndScatter( + ctx context.Context, splitAndScatterKey roachpb.Key, s splitAndScatterer, +) error { + log.Infof(ctx, "splitting and scattering at %s", splitAndScatterKey) + expirationTime := s.now().AddDuration(splitAndScatterSitckyBitDuration) + if err := s.split(ctx, splitAndScatterKey, expirationTime); err != nil { + return err + } + if err := s.scatter(ctx, splitAndScatterKey); err != nil { + log.Warningf(ctx, "failed to scatter span starting at %s: %v", + splitAndScatterKey, err) + } + return nil +} + // makeReplicationFlowPlanner creates a replicationFlowPlanner and the initial physical plan. func makeReplicationFlowPlanner( ctx context.Context, @@ -254,6 +406,7 @@ type replicationFlowPlanner struct { initialPlanCtx *sql.PlanningCtx initialStreamAddresses []string + initialTopology streamclient.Topology srcTenantID roachpb.TenantID } @@ -287,6 +440,7 @@ func (p *replicationFlowPlanner) constructPlanGenerator( return nil, nil, err } if !p.containsInitialStreamAddresses() { + p.initialTopology = topology p.initialStreamAddresses = topology.StreamAddresses() } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist_test.go index a2fa6624c064..1770a1dd9ec2 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist_test.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" @@ -27,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -324,3 +326,94 @@ func TestSourceDestMatching(t *testing.T) { }) } } + +type testSplitter struct { + splits []roachpb.Key + scatters []roachpb.Key + splitErr func(key roachpb.Key) error + scatterErr func(key roachpb.Key) error +} + +func (ts *testSplitter) split(_ context.Context, splitKey roachpb.Key, _ hlc.Timestamp) error { + ts.splits = append(ts.splits, splitKey) + if ts.splitErr != nil { + return ts.splitErr(splitKey) + } + return nil +} + +func (ts *testSplitter) scatter(_ context.Context, scatterKey roachpb.Key) error { + ts.scatters = append(ts.scatters, scatterKey) + if ts.scatterErr != nil { + return ts.scatterErr(scatterKey) + } + return nil +} + +func (ts *testSplitter) now() hlc.Timestamp { + return hlc.Timestamp{} +} + +func TestCreateInitialSplits(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + sourceTenantID := roachpb.MustMakeTenantID(11) + inputCodec := keys.MakeSQLCodec(sourceTenantID) + destTenantID := roachpb.MustMakeTenantID(12) + outputCodec := keys.MakeSQLCodec(destTenantID) + + testSpan := func(codec keys.SQLCodec, tableID uint32) roachpb.Span { + return roachpb.Span{ + Key: codec.IndexPrefix(tableID, 1), + EndKey: codec.IndexPrefix(tableID, 2), + } + } + inputSpans := []roachpb.Span{ + testSpan(inputCodec, 100), + testSpan(inputCodec, 200), + testSpan(inputCodec, 300), + testSpan(inputCodec, 400), + } + outputSpans := []roachpb.Span{ + testSpan(outputCodec, 100), + testSpan(outputCodec, 200), + testSpan(outputCodec, 300), + testSpan(outputCodec, 400), + } + topo := streamclient.Topology{ + SourceTenantID: sourceTenantID, + Partitions: []streamclient.PartitionInfo{ + {Spans: inputSpans}, + }, + } + + t.Run("rekeys before splitting", func(t *testing.T) { + ts := &testSplitter{} + err := createInitialSplits(ctx, keys.SystemSQLCodec, ts, topo, destTenantID) + require.NoError(t, err) + expectedSplitsAndScatters := make([]roachpb.Key, 0, len(outputSpans)) + for _, sp := range outputSpans { + expectedSplitsAndScatters = append(expectedSplitsAndScatters, sp.Key) + } + + require.Equal(t, expectedSplitsAndScatters, ts.splits) + require.Equal(t, expectedSplitsAndScatters, ts.scatters) + + }) + t.Run("split errors are fatal", func(t *testing.T) { + require.Error(t, createInitialSplits(ctx, keys.SystemSQLCodec, &testSplitter{ + splitErr: func(_ roachpb.Key) error { + return errors.New("test error") + }, + }, topo, destTenantID)) + }) + t.Run("ignores scatter errors", func(t *testing.T) { + require.NoError(t, createInitialSplits(ctx, keys.SystemSQLCodec, &testSplitter{ + scatterErr: func(_ roachpb.Key) error { + return errors.New("test error") + }, + }, topo, destTenantID)) + }) +} diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index a65b996f0b69..3e0678a78cd4 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -179,7 +179,12 @@ message StreamIngestionProgress { // the cutover time gets set. repeated roachpb.Span remaining_cutover_spans = 8 [(gogoproto.nullable) = false]; - // Next Id: 9 + // InitialSplitComplete is true if the stream ingestion job has + // already split the tenant's keyspace according to the plan from + // the source tenant. + bool initial_split_complete = 9; + + // Next Id: 10 } message StreamReplicationDetails {