Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#58963
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
Tristan1900 authored and ti-chi-bot committed Jan 21, 2025
1 parent f2f8d42 commit ada93ce
Show file tree
Hide file tree
Showing 4 changed files with 2,650 additions and 0 deletions.
323 changes: 323 additions & 0 deletions br/pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,14 @@ import (
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/mock"
"github.com/pingcap/tidb/br/pkg/restore"
<<<<<<< HEAD:br/pkg/restore/client_test.go
"github.com/pingcap/tidb/br/pkg/restore/tiflashrec"
=======
rawclient "github.com/pingcap/tidb/br/pkg/restore/internal/rawkv"
logclient "github.com/pingcap/tidb/br/pkg/restore/log_client"
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/br/pkg/restore/utils"
>>>>>>> 3a378c8e384 (br: add retry for raw kv client put (#58963)):br/pkg/restore/log_client/client_test.go
"github.com/pingcap/tidb/br/pkg/stream"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/utils/iter"
Expand All @@ -33,7 +40,11 @@ import (
"github.com/pingcap/tidb/pkg/tablecodec"
filter "github.com/pingcap/tidb/pkg/util/table-filter"
"github.com/stretchr/testify/require"
<<<<<<< HEAD:br/pkg/restore/client_test.go
pd "github.com/tikv/pd/client"
=======
"github.com/tikv/client-go/v2/rawkv"
>>>>>>> 3a378c8e384 (br: add retry for raw kv client put (#58963)):br/pkg/restore/log_client/client_test.go
"google.golang.org/grpc/keepalive"
)

Expand Down Expand Up @@ -2007,3 +2018,315 @@ func TestCheckNewCollationEnable(t *testing.T) {
require.Equal(t, ca.newCollationEnableInCluster == "True", enabled)
}
}
<<<<<<< HEAD:br/pkg/restore/client_test.go
=======

func TestCompactedSplitStrategyWithCheckpoint(t *testing.T) {
ctx := context.Background()

rules := map[int64]*utils.RewriteRules{
1: {
Data: []*import_sstpb.RewriteRule{
{
OldKeyPrefix: tablecodec.GenTableRecordPrefix(1),
NewKeyPrefix: tablecodec.GenTableRecordPrefix(100),
},
},
},
2: {
Data: []*import_sstpb.RewriteRule{
{
OldKeyPrefix: tablecodec.GenTableRecordPrefix(2),
NewKeyPrefix: tablecodec.GenTableRecordPrefix(200),
},
},
},
}

oriRegions := [][]byte{
{},
codec.EncodeBytes(nil, tablecodec.EncodeTablePrefix(100)),
codec.EncodeBytes(nil, tablecodec.EncodeTablePrefix(200)),
codec.EncodeBytes(nil, tablecodec.EncodeTablePrefix(402)),
}

cases := []struct {
MockSubcompationIter iter.TryNextor[logclient.SSTs]
CheckpointSet map[string]struct{}
ProcessedKVCount int
ProcessedSize int
ExpectRegionEndKeys [][]byte
}{
{
iter.FromSlice([]logclient.SSTs{
fakeSubCompactionWithOneSst(1, 100, 16*units.MiB, 100),
fakeSubCompactionWithOneSst(1, 200, 32*units.MiB, 200),
fakeSubCompactionWithOneSst(2, 100, 48*units.MiB, 300),
fakeSubCompactionWithOneSst(3, 100, 100*units.MiB, 100000),
}),
map[string]struct{}{
"1:100": {},
"1:200": {},
},
300,
48 * units.MiB,
// no split, checkpoint files came in order
[][]byte{
codec.EncodeBytes(nil, tablecodec.EncodeTablePrefix(100)),
codec.EncodeBytes(nil, tablecodec.EncodeTablePrefix(200)),
codec.EncodeBytes(nil, tablecodec.EncodeTablePrefix(402)),
},
},
{
iter.FromSlice([]logclient.SSTs{
fakeSubCompactionWithOneSst(1, 100, 16*units.MiB, 100),
fakeSubCompactionWithOneSst(1, 200, 32*units.MiB, 200),
fakeSubCompactionWithOneSst(1, 100, 32*units.MiB, 10),
fakeSubCompactionWithOneSst(2, 100, 48*units.MiB, 300),
}),
map[string]struct{}{
"1:100": {},
},
110,
48 * units.MiB,
// no split, checkpoint files came in different order
[][]byte{
codec.EncodeBytes(nil, tablecodec.EncodeTablePrefix(100)),
codec.EncodeBytes(nil, tablecodec.EncodeTablePrefix(200)),
codec.EncodeBytes(nil, tablecodec.EncodeTablePrefix(402)),
},
},
{
iter.FromSlice([]logclient.SSTs{
fakeSubCompactionWithOneSst(1, 100, 16*units.MiB, 100),
fakeSubCompactionWithOneSst(1, 200, 32*units.MiB, 200),
fakeSubCompactionWithOneSst(2, 100, 32*units.MiB, 300),
fakeSubCompactionWithOneSst(3, 100, 10*units.MiB, 100000),
fakeSubCompactionWithOneSst(1, 300, 48*units.MiB, 13),
fakeSubCompactionWithOneSst(1, 400, 64*units.MiB, 14),
fakeSubCompactionWithOneSst(1, 100, 1*units.MiB, 15),
}),
map[string]struct{}{
"1:300": {},
"1:400": {},
},
27,
112 * units.MiB,
// no split, the main file has skipped due to checkpoint.
[][]byte{
codec.EncodeBytes(nil, tablecodec.EncodeTablePrefix(100)),
codec.EncodeBytes(nil, tablecodec.EncodeTablePrefix(200)),
codec.EncodeBytes(nil, tablecodec.EncodeTablePrefix(402)),
},
},
{
iter.FromSlice([]logclient.SSTs{
fakeSubCompactionWithOneSst(1, 100, 16*units.MiB, 100),
fakeSubCompactionWithOneSst(1, 200, 32*units.MiB, 200),
fakeSubCompactionWithOneSst(2, 100, 32*units.MiB, 300),
fakeSubCompactionWithOneSst(3, 100, 10*units.MiB, 100000),
fakeSubCompactionWithOneSst(1, 300, 48*units.MiB, 13),
fakeSubCompactionWithOneSst(1, 400, 64*units.MiB, 14),
fakeSubCompactionWithOneSst(1, 100, 1*units.MiB, 15),
}),
map[string]struct{}{
"1:100": {},
"1:200": {},
},
315,
49 * units.MiB,
[][]byte{
codec.EncodeBytes(nil, tablecodec.EncodeTablePrefix(100)),
[]byte(fakeRowKey(100, 400)),
codec.EncodeBytes(nil, tablecodec.EncodeTablePrefix(200)),
codec.EncodeBytes(nil, tablecodec.EncodeTablePrefix(402)),
},
},
{
iter.FromSlice([]logclient.SSTs{
fakeSubCompactionWithOneSst(1, 100, 16*units.MiB, 100),
fakeSubCompactionWithMultiSsts(1, 200, 32*units.MiB, 200),
fakeSubCompactionWithOneSst(2, 100, 32*units.MiB, 300),
fakeSubCompactionWithOneSst(3, 100, 10*units.MiB, 100000),
fakeSubCompactionWithOneSst(1, 300, 48*units.MiB, 13),
fakeSubCompactionWithOneSst(1, 400, 64*units.MiB, 14),
fakeSubCompactionWithOneSst(1, 100, 1*units.MiB, 15),
}),
map[string]struct{}{
"1:100": {},
"1:200": {},
},
315,
49 * units.MiB,
[][]byte{
codec.EncodeBytes(nil, tablecodec.EncodeTablePrefix(100)),
[]byte(fakeRowKey(100, 300)),
codec.EncodeBytes(nil, tablecodec.EncodeTablePrefix(200)),
codec.EncodeBytes(nil, tablecodec.EncodeTablePrefix(402)),
},
},
}
for _, ca := range cases {
storesMap := make(map[uint64]*metapb.Store)
storesMap[1] = &metapb.Store{Id: 1}
mockPDCli := split.NewMockPDClientForSplit()
mockPDCli.SetStores(storesMap)
mockPDCli.SetRegions(oriRegions)

client := split.NewClient(mockPDCli, nil, nil, 100, 4)
wrapper := restore.PipelineRestorerWrapper[logclient.SSTs]{
PipelineRegionsSplitter: split.NewPipelineRegionsSplitter(client, 4*units.MB, 400),
}
totalSize := 0
totalKvCount := 0

strategy := logclient.NewCompactedFileSplitStrategy(rules, ca.CheckpointSet, func(u1, u2 uint64) {
totalKvCount += int(u1)
totalSize += int(u2)
})
mockStrategy := &mockCompactedStrategy{
CompactedFileSplitStrategy: strategy,
expectSplitCount: 3,
}

helper := wrapper.WithSplit(ctx, ca.MockSubcompationIter, mockStrategy)

for i := helper.TryNext(ctx); !i.Finished; i = helper.TryNext(ctx) {
require.NoError(t, i.Err)
}

regions, err := mockPDCli.ScanRegions(ctx, []byte{}, []byte{}, 0)
require.NoError(t, err)
require.Len(t, regions, len(ca.ExpectRegionEndKeys))
for i, endKey := range ca.ExpectRegionEndKeys {
require.Equal(t, endKey, regions[i].Meta.EndKey)
}
require.Equal(t, totalKvCount, ca.ProcessedKVCount)
require.Equal(t, totalSize, ca.ProcessedSize)
}
}

func fakeSubCompactionWithMultiSsts(tableID, rowID int64, length uint64, num uint64) logclient.SSTs {
return &logclient.CompactedSSTs{&backuppb.LogFileSubcompaction{
Meta: &backuppb.LogFileSubcompactionMeta{
TableId: tableID,
},
SstOutputs: []*backuppb.File{
{
Name: fmt.Sprintf("%d:%d", tableID, rowID),
StartKey: fakeRowRawKey(tableID, rowID),
EndKey: fakeRowRawKey(tableID, rowID+1),
Size_: length,
TotalKvs: num,
},
{
Name: fmt.Sprintf("%d:%d", tableID, rowID+1),
StartKey: fakeRowRawKey(tableID, rowID+1),
EndKey: fakeRowRawKey(tableID, rowID+2),
Size_: length,
TotalKvs: num,
},
},
}}
}
func fakeSubCompactionWithOneSst(tableID, rowID int64, length uint64, num uint64) logclient.SSTs {
return &logclient.CompactedSSTs{&backuppb.LogFileSubcompaction{
Meta: &backuppb.LogFileSubcompactionMeta{
TableId: tableID,
},
SstOutputs: []*backuppb.File{
{
Name: fmt.Sprintf("%d:%d", tableID, rowID),
StartKey: fakeRowRawKey(tableID, rowID),
EndKey: fakeRowRawKey(tableID, rowID+1),
Size_: length,
TotalKvs: num,
},
},
}}
}

func fakeFile(tableID, rowID int64, length uint64, num int64) *backuppb.DataFileInfo {
return &backuppb.DataFileInfo{
StartKey: fakeRowKey(tableID, rowID),
EndKey: fakeRowKey(tableID, rowID+1),
TableId: tableID,
Length: length,
NumberOfEntries: num,
}
}

func fakeRowKey(tableID, rowID int64) kv.Key {
return codec.EncodeBytes(nil, fakeRowRawKey(tableID, rowID))
}

func fakeRowRawKey(tableID, rowID int64) kv.Key {
return tablecodec.EncodeRecordKey(tablecodec.GenTableRecordPrefix(tableID), kv.IntHandle(rowID))
}

type mockRawKVClient struct {
rawkv.Client
putCount int
errThreshold int
}

func (m *mockRawKVClient) BatchPut(ctx context.Context, keys, values [][]byte, options ...rawkv.RawOption) error {
m.putCount += 1
if m.errThreshold >= m.putCount {
return errors.New("rpcClient is idle")
}
return nil
}

func TestPutRawKvWithRetry(t *testing.T) {
tests := []struct {
name string
errThreshold int
cancelAfter time.Duration
wantErr string
wantPuts int
}{
{
name: "success on first try",
errThreshold: 0,
wantPuts: 1,
},
{
name: "success on after failure",
errThreshold: 2,
wantPuts: 3,
},
{
name: "fails all retries",
errThreshold: 5,
wantErr: "failed to put raw kv after retry",
wantPuts: 5,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mockRawClient := &mockRawKVClient{
errThreshold: tt.errThreshold,
}
client := rawclient.NewRawKVBatchClient(mockRawClient, 1)

ctx := context.Background()
if tt.cancelAfter > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, tt.cancelAfter)
defer cancel()
}

err := logclient.PutRawKvWithRetry(ctx, client, []byte("key"), []byte("value"), 1)

if tt.wantErr != "" {
require.ErrorContains(t, err, tt.wantErr)
} else {
require.NoError(t, err)
}
require.Equal(t, tt.wantPuts, mockRawClient.putCount)
})
}
}
>>>>>>> 3a378c8e384 (br: add retry for raw kv client put (#58963)):br/pkg/restore/log_client/client_test.go
Loading

0 comments on commit ada93ce

Please sign in to comment.