diff --git a/br/pkg/restore/log_client/BUILD.bazel b/br/pkg/restore/log_client/BUILD.bazel index 13bfa3bac9334..73b70fcbae739 100644 --- a/br/pkg/restore/log_client/BUILD.bazel +++ b/br/pkg/restore/log_client/BUILD.bazel @@ -101,6 +101,7 @@ go_test( "//br/pkg/mock", "//br/pkg/restore", "//br/pkg/restore/internal/import_client", + "//br/pkg/restore/internal/rawkv", "//br/pkg/restore/split", "//br/pkg/restore/utils", "//br/pkg/storage", @@ -132,6 +133,7 @@ go_test( "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_pingcap_log//:log", "@com_github_stretchr_testify//require", + "@com_github_tikv_client_go_v2//rawkv", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//keepalive", "@org_golang_google_grpc//status", diff --git a/br/pkg/restore/log_client/client.go b/br/pkg/restore/log_client/client.go index daf535fc10d39..8e1389681cf44 100644 --- a/br/pkg/restore/log_client/client.go +++ b/br/pkg/restore/log_client/client.go @@ -1493,7 +1493,7 @@ func (rc *LogClient) restoreMetaKvEntries( failpoint.Inject("failed-to-restore-metakv", func(_ failpoint.Value) { failpoint.Return(0, 0, errors.Errorf("failpoint: failed to restore metakv")) }) - if err := rc.rawKVClient.Put(ctx, newEntry.Key, newEntry.Value, entry.Ts); err != nil { + if err := PutRawKvWithRetry(ctx, rc.rawKVClient, newEntry.Key, newEntry.Value, entry.Ts); err != nil { return 0, 0, errors.Trace(err) } // for failpoint, we need to flush the cache in rawKVClient every time @@ -2053,3 +2053,13 @@ func (rc *LogClient) FailpointDoChecksumForLogRestore( return eg.Wait() } + +func PutRawKvWithRetry(ctx context.Context, client *rawkv.RawKVBatchClient, key, value []byte, originTs uint64) error { + err := utils.WithRetry(ctx, func() error { + return client.Put(ctx, key, value, originTs) + }, utils.NewRawClientBackoffStrategy()) + if err != nil { + return errors.Errorf("failed to put raw kv after retry") + } + return nil +} diff --git a/br/pkg/restore/log_client/client_test.go b/br/pkg/restore/log_client/client_test.go index 58a34dcfa61bb..ecf9452fe4f7c 100644 --- a/br/pkg/restore/log_client/client_test.go +++ b/br/pkg/restore/log_client/client_test.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/br/pkg/gluetidb" "github.com/pingcap/tidb/br/pkg/mock" "github.com/pingcap/tidb/br/pkg/restore" + rawclient "github.com/pingcap/tidb/br/pkg/restore/internal/rawkv" logclient "github.com/pingcap/tidb/br/pkg/restore/log_client" "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/br/pkg/restore/utils" @@ -49,6 +50,7 @@ import ( "github.com/pingcap/tidb/pkg/util/sqlexec" filter "github.com/pingcap/tidb/pkg/util/table-filter" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/rawkv" "google.golang.org/grpc/keepalive" ) @@ -1986,3 +1988,69 @@ func fakeRowKey(tableID, rowID int64) kv.Key { func fakeRowRawKey(tableID, rowID int64) kv.Key { return tablecodec.EncodeRecordKey(tablecodec.GenTableRecordPrefix(tableID), kv.IntHandle(rowID)) } + +type mockRawKVClient struct { + rawkv.Client + putCount int + errThreshold int +} + +func (m *mockRawKVClient) BatchPut(ctx context.Context, keys, values [][]byte, options ...rawkv.RawOption) error { + m.putCount += 1 + if m.errThreshold >= m.putCount { + return errors.New("rpcClient is idle") + } + return nil +} + +func TestPutRawKvWithRetry(t *testing.T) { + tests := []struct { + name string + errThreshold int + cancelAfter time.Duration + wantErr string + wantPuts int + }{ + { + name: "success on first try", + errThreshold: 0, + wantPuts: 1, + }, + { + name: "success on after failure", + errThreshold: 2, + wantPuts: 3, + }, + { + name: "fails all retries", + errThreshold: 5, + wantErr: "failed to put raw kv after retry", + wantPuts: 5, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockRawClient := &mockRawKVClient{ + errThreshold: tt.errThreshold, + } + client := rawclient.NewRawKVBatchClient(mockRawClient, 1) + + ctx := context.Background() + if tt.cancelAfter > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, tt.cancelAfter) + defer cancel() + } + + err := logclient.PutRawKvWithRetry(ctx, client, []byte("key"), []byte("value"), 1) + + if tt.wantErr != "" { + require.ErrorContains(t, err, tt.wantErr) + } else { + require.NoError(t, err) + } + require.Equal(t, tt.wantPuts, mockRawClient.putCount) + }) + } +} diff --git a/br/pkg/utils/backoff.go b/br/pkg/utils/backoff.go index 7de5e999654e4..eccc5efd9225c 100644 --- a/br/pkg/utils/backoff.go +++ b/br/pkg/utils/backoff.go @@ -53,6 +53,10 @@ const ( recoveryMaxAttempts = 16 recoveryDelayTime = 30 * time.Second recoveryMaxDelayTime = 4 * time.Minute + + rawClientMaxAttempts = 5 + rawClientDelayTime = 500 * time.Millisecond + rawClientMaxDelayTime = 5 * time.Second ) // BackoffStrategy implements a backoff strategy for retry operations. @@ -379,6 +383,17 @@ func NewChecksumBackoffStrategy() BackoffStrategy { ) } +func NewRawClientBackoffStrategy() BackoffStrategy { + return NewBackoffStrategy( + WithRemainingAttempts(rawClientMaxAttempts), + WithDelayTime(rawClientDelayTime), + WithMaxDelayTime(rawClientMaxDelayTime), + WithErrorContext(NewZeroRetryContext("raw client")), + WithRetryErrorFunc(alwaysTrueFunc()), + WithNonRetryErrorFunc(alwaysFalseFunc()), + ) +} + func (bo *backoffStrategyImpl) NextBackoff(err error) time.Duration { errs := multierr.Errors(err) lastErr := errs[len(errs)-1]