Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
111178: streamingccl: split destination based on source r=adityamaru a=stevendanna

This adds initial splits and scatters to the stream ingestion job.  We split based on the topology delivered by the source to align our initial splits with the splits that are likely coming in from the source cluster.

These splits substantially improve the throughput of a stream's initial scan.

Careful reviewers may note that we are not calling EnsureSafeSplitKey here.  That is because the Spans in the source topology should already be safe split keys.  Since EnsureSafeSplitKey isn't idempotent, if we were ta call EnsureSafeSplitKey, we would end up creating splits at rather poor split locations since any key that ends in an integer would have components erroneously trimmed from it.

An alternative here would be to do what the buffering adder does and create an initial set of splits by distributing them over the keyspace of the first buffer. That has the advantage of allowing us to call EnsureSafeSplitKey at the cost of less effective splits.

Note that we face this same problem during bulk operations.  Perhaps in the future the source cluster should send some metadata about manual splits that are issued or _something_ that lets the destination know about the fact that we expect a large amount of data in a particular span.

Epic: none

Release note: None

Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
craig[bot] and stevendanna committed Sep 30, 2023
2 parents fad649d + 27e87b6 commit 757c17f
Show file tree
Hide file tree
Showing 4 changed files with 263 additions and 8 deletions.
15 changes: 9 additions & 6 deletions pkg/ccl/streamingccl/streamclient/random_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ type randomEventGenerator struct {
config randomStreamConfig
numEventsSinceLastResolved int
sstMaker SSTableMakerFn
codec keys.SQLCodec
tableDesc *tabledesc.Mutable
systemKVs []roachpb.KeyValue
}
Expand All @@ -228,6 +229,7 @@ func newRandomEventGenerator(
numEventsSinceLastResolved: 0,
sstMaker: fn,
tableDesc: tableDesc,
codec: keys.MakeSQLCodec(config.tenantID),
systemKVs: systemKVs,
}, nil
}
Expand All @@ -238,7 +240,7 @@ func (r *randomEventGenerator) generateNewEvent() streamingccl.Event {
// Emit a CheckpointEvent.
resolvedTime := timeutil.Now()
hlcResolvedTime := hlc.Timestamp{WallTime: resolvedTime.UnixNano()}
resolvedSpan := jobspb.ResolvedSpan{Span: r.tableDesc.TableSpan(keys.SystemSQLCodec), Timestamp: hlcResolvedTime}
resolvedSpan := jobspb.ResolvedSpan{Span: r.tableDesc.TableSpan(r.codec), Timestamp: hlcResolvedTime}
event = streamingccl.MakeCheckpointEvent([]jobspb.ResolvedSpan{resolvedSpan})
r.numEventsSinceLastResolved = 0
} else {
Expand All @@ -257,11 +259,11 @@ func (r *randomEventGenerator) generateNewEvent() streamingccl.Event {
size := 10 + r.rng.Intn(30)
keyVals := make([]roachpb.KeyValue, 0, size)
for i := 0; i < size; i++ {
keyVals = append(keyVals, makeRandomKey(r.rng, r.config, r.tableDesc))
keyVals = append(keyVals, makeRandomKey(r.rng, r.config, r.codec, r.tableDesc))
}
event = streamingccl.MakeSSTableEvent(r.sstMaker(keyVals))
} else {
event = streamingccl.MakeKVEvent(makeRandomKey(r.rng, r.config, r.tableDesc))
event = streamingccl.MakeKVEvent(makeRandomKey(r.rng, r.config, r.codec, r.tableDesc))
}
r.numEventsSinceLastResolved++
}
Expand Down Expand Up @@ -360,6 +362,7 @@ func (m *RandomStreamClient) Plan(ctx context.Context, _ streampb.StreamID) (Top
log.Infof(ctx, "planning random stream for tenant %d", m.config.tenantID)

// Allocate table IDs and return one per partition address in the topology.
srcCodec := keys.MakeSQLCodec(m.config.tenantID)
for i := 0; i < m.config.numPartitions; i++ {
tableID := m.getNextTableID()
tableDesc, err := m.tableDescForID(tableID)
Expand All @@ -375,7 +378,7 @@ func (m *RandomStreamClient) Plan(ctx context.Context, _ streampb.StreamID) (Top
ID: strconv.Itoa(i),
SrcAddr: streamingccl.PartitionAddress(partitionURI),
SubscriptionToken: []byte(partitionURI),
Spans: []roachpb.Span{tableDesc.TableSpan(keys.SystemSQLCodec)},
Spans: []roachpb.Span{tableDesc.TableSpan(srcCodec)},
})
}

Expand Down Expand Up @@ -579,7 +582,7 @@ func rekey(tenantID roachpb.TenantID, k roachpb.Key) roachpb.Key {
}

func makeRandomKey(
r *rand.Rand, config randomStreamConfig, tableDesc *tabledesc.Mutable,
r *rand.Rand, config randomStreamConfig, codec keys.SQLCodec, tableDesc *tabledesc.Mutable,
) roachpb.KeyValue {
// Create a key holding a random integer.
keyDatum := tree.NewDInt(tree.DInt(r.Intn(config.valueRange)))
Expand All @@ -590,7 +593,7 @@ func makeRandomKey(
var colIDToRowIndex catalog.TableColMap
colIDToRowIndex.Set(index.GetKeyColumnID(0), 0)

keyPrefix := rowenc.MakeIndexKeyPrefix(keys.SystemSQLCodec, tableDesc.GetID(), index.GetID())
keyPrefix := rowenc.MakeIndexKeyPrefix(codec, tableDesc.GetID(), index.GetID())
k, _, err := rowenc.EncodeIndexKey(tableDesc, index, colIDToRowIndex, tree.Datums{keyDatum}, keyPrefix)
if err != nil {
panic(err)
Expand Down
156 changes: 155 additions & 1 deletion pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
Expand All @@ -33,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/cockroachdb/redact"
Expand Down Expand Up @@ -211,14 +215,162 @@ func startDistIngestion(
return rw.Err()
}

updateRunningStatus(ctx, ingestionJob, jobspb.Replicating, "physical replication running")
// We now attempt to create initial splits. We currently do
// this once during initial planning to avoid re-splitting on
// resume since it isn't clear to us at the moment whether
// re-splitting is always going to be useful.
if !streamProgress.InitialSplitComplete {
codec := execCtx.ExtendedEvalContext().Codec
splitter := &dbSplitAndScatter{db: execCtx.ExecCfg().DB}
if err := createInitialSplits(ctx, codec, splitter, planner.initialTopology, details.DestinationTenantID); err != nil {
return err
}
} else {
log.Infof(ctx, "initial splits already complete")
}

if err := ingestionJob.NoTxn().Update(ctx, func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
md.Progress.GetStreamIngest().ReplicationStatus = jobspb.Replicating
md.Progress.GetStreamIngest().InitialSplitComplete = true
md.Progress.RunningStatus = "physical replication running"
ju.UpdateProgress(md.Progress)
return nil
}); err != nil {
return err
}

err = ctxgroup.GoAndWait(ctx, execInitialPlan, replanner, tracingAggLoop, streamSpanConfigs)
if errors.Is(err, sql.ErrPlanChanged) {
execCtx.ExecCfg().JobRegistry.MetricsStruct().StreamIngest.(*Metrics).ReplanCount.Inc(1)
}
return err
}

// TODO(ssd): This is a duplicative with the split_and_scatter processor in
// backupccl.
type splitAndScatterer interface {
split(
ctx context.Context,
splitKey roachpb.Key,
expirationTime hlc.Timestamp,
) error

scatter(
ctx context.Context,
scatterKey roachpb.Key,
) error

now() hlc.Timestamp
}

type dbSplitAndScatter struct {
db *kv.DB
}

func (s *dbSplitAndScatter) split(
ctx context.Context, splitKey roachpb.Key, expirationTime hlc.Timestamp,
) error {
return s.db.AdminSplit(ctx, splitKey, expirationTime)
}

func (s *dbSplitAndScatter) scatter(ctx context.Context, scatterKey roachpb.Key) error {
_, pErr := kv.SendWrapped(ctx, s.db.NonTransactionalSender(), &kvpb.AdminScatterRequest{
RequestHeader: kvpb.RequestHeaderFromSpan(roachpb.Span{
Key: scatterKey,
EndKey: scatterKey.Next(),
}),
RandomizeLeases: true,
MaxSize: 1, // don't scatter non-empty ranges on resume.
})
return pErr.GoError()
}

func (s *dbSplitAndScatter) now() hlc.Timestamp {
return s.db.Clock().Now()
}

// createInitialSplits creates splits based on the given toplogy from the
// source.
//
// The idea here is to use the information from the source cluster about
// the distribution of the data to produce split points to help prevent
// ingestion processors from pushing data into the same ranges during
// the initial scan.
func createInitialSplits(
ctx context.Context,
codec keys.SQLCodec,
splitter splitAndScatterer,
topology streamclient.Topology,
destTenantID roachpb.TenantID,
) error {
ctx, sp := tracing.ChildSpan(ctx, "streamingest.createInitialSplits")
defer sp.Finish()

rekeyer, err := backupccl.MakeKeyRewriterFromRekeys(codec,
nil /* tableRekeys */, []execinfrapb.TenantRekey{
{
OldID: topology.SourceTenantID,
NewID: destTenantID,
}},
true /* restoreTenantFromStream */)
if err != nil {
return err
}
for _, partition := range topology.Partitions {
for _, span := range partition.Spans {
startKey := span.Key.Clone()
splitKey, _, err := rekeyer.RewriteKey(startKey, 0 /* walltimeForImportElision */)
if err != nil {
return err
}

// NOTE(ssd): EnsureSafeSplitKey called on an arbitrary
// key unfortunately results in many of our split keys
// mapping to the same key for workloads like TPCC where
// the schema of the table includes integers that will
// get erroneously treated as the column family length.
//
// Since the partitions are generated from a call to
// PartitionSpans on the source cluster, they should be
// aligned with the split points in the original cluster
// and thus should be valid split keys. But, we are
// opening ourselves up to replicating bad splits from
// the original cluster.
//
// if newSplitKey, err := keys.EnsureSafeSplitKey(splitKey); err != nil {
// // Ignore the error since keys such as
// // /Tenant/2/Table/13 is an OK start key but
// // returns an error.
// } else if len(newSplitKey) != 0 {
// splitKey = newSplitKey
// }
//
if err := splitAndScatter(ctx, roachpb.Key(splitKey), splitter); err != nil {
return err
}

}
}
return nil
}

var splitAndScatterSitckyBitDuration = time.Hour

func splitAndScatter(
ctx context.Context, splitAndScatterKey roachpb.Key, s splitAndScatterer,
) error {
log.Infof(ctx, "splitting and scattering at %s", splitAndScatterKey)
expirationTime := s.now().AddDuration(splitAndScatterSitckyBitDuration)
if err := s.split(ctx, splitAndScatterKey, expirationTime); err != nil {
return err
}
if err := s.scatter(ctx, splitAndScatterKey); err != nil {
log.Warningf(ctx, "failed to scatter span starting at %s: %v",
splitAndScatterKey, err)
}
return nil
}

// makeReplicationFlowPlanner creates a replicationFlowPlanner and the initial physical plan.
func makeReplicationFlowPlanner(
ctx context.Context,
Expand Down Expand Up @@ -254,6 +406,7 @@ type replicationFlowPlanner struct {
initialPlanCtx *sql.PlanningCtx

initialStreamAddresses []string
initialTopology streamclient.Topology

srcTenantID roachpb.TenantID
}
Expand Down Expand Up @@ -287,6 +440,7 @@ func (p *replicationFlowPlanner) constructPlanGenerator(
return nil, nil, err
}
if !p.containsInitialStreamAddresses() {
p.initialTopology = topology
p.initialStreamAddresses = topology.StreamAddresses()
}

Expand Down
93 changes: 93 additions & 0 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_dist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -324,3 +326,94 @@ func TestSourceDestMatching(t *testing.T) {
})
}
}

type testSplitter struct {
splits []roachpb.Key
scatters []roachpb.Key
splitErr func(key roachpb.Key) error
scatterErr func(key roachpb.Key) error
}

func (ts *testSplitter) split(_ context.Context, splitKey roachpb.Key, _ hlc.Timestamp) error {
ts.splits = append(ts.splits, splitKey)
if ts.splitErr != nil {
return ts.splitErr(splitKey)
}
return nil
}

func (ts *testSplitter) scatter(_ context.Context, scatterKey roachpb.Key) error {
ts.scatters = append(ts.scatters, scatterKey)
if ts.scatterErr != nil {
return ts.scatterErr(scatterKey)
}
return nil
}

func (ts *testSplitter) now() hlc.Timestamp {
return hlc.Timestamp{}
}

func TestCreateInitialSplits(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
sourceTenantID := roachpb.MustMakeTenantID(11)
inputCodec := keys.MakeSQLCodec(sourceTenantID)
destTenantID := roachpb.MustMakeTenantID(12)
outputCodec := keys.MakeSQLCodec(destTenantID)

testSpan := func(codec keys.SQLCodec, tableID uint32) roachpb.Span {
return roachpb.Span{
Key: codec.IndexPrefix(tableID, 1),
EndKey: codec.IndexPrefix(tableID, 2),
}
}
inputSpans := []roachpb.Span{
testSpan(inputCodec, 100),
testSpan(inputCodec, 200),
testSpan(inputCodec, 300),
testSpan(inputCodec, 400),
}
outputSpans := []roachpb.Span{
testSpan(outputCodec, 100),
testSpan(outputCodec, 200),
testSpan(outputCodec, 300),
testSpan(outputCodec, 400),
}
topo := streamclient.Topology{
SourceTenantID: sourceTenantID,
Partitions: []streamclient.PartitionInfo{
{Spans: inputSpans},
},
}

t.Run("rekeys before splitting", func(t *testing.T) {
ts := &testSplitter{}
err := createInitialSplits(ctx, keys.SystemSQLCodec, ts, topo, destTenantID)
require.NoError(t, err)
expectedSplitsAndScatters := make([]roachpb.Key, 0, len(outputSpans))
for _, sp := range outputSpans {
expectedSplitsAndScatters = append(expectedSplitsAndScatters, sp.Key)
}

require.Equal(t, expectedSplitsAndScatters, ts.splits)
require.Equal(t, expectedSplitsAndScatters, ts.scatters)

})
t.Run("split errors are fatal", func(t *testing.T) {
require.Error(t, createInitialSplits(ctx, keys.SystemSQLCodec, &testSplitter{
splitErr: func(_ roachpb.Key) error {
return errors.New("test error")
},
}, topo, destTenantID))
})
t.Run("ignores scatter errors", func(t *testing.T) {
require.NoError(t, createInitialSplits(ctx, keys.SystemSQLCodec, &testSplitter{
scatterErr: func(_ roachpb.Key) error {
return errors.New("test error")
},
}, topo, destTenantID))
})
}
Loading

0 comments on commit 757c17f

Please sign in to comment.