diff --git a/pkg/kaytu-es-sdk/elasticsearch.go b/pkg/kaytu-es-sdk/elasticsearch.go index f2e9e26..670795f 100644 --- a/pkg/kaytu-es-sdk/elasticsearch.go +++ b/pkg/kaytu-es-sdk/elasticsearch.go @@ -629,9 +629,12 @@ func (p *BaseESPaginator) SearchWithLog(ctx context.Context, response any, doLog return nil } +func (p *BaseESPaginator) CreatePit(ctx context.Context) (err error) { + return p.CreatePitWithRetry(ctx, 0) +} // createPit, sets up the PointInTime for the search with more than 10000 limit -func (p *BaseESPaginator) CreatePit(ctx context.Context) (err error) { +func (p *BaseESPaginator) CreatePitWithRetry(ctx context.Context, retry int) (err error) { if p.limit <= p.pageSize { return nil } else if p.pitID != "" { @@ -667,6 +670,10 @@ func (p *BaseESPaginator) CreatePit(ctx context.Context) (err error) { return err } else if errIf := CheckErrorWithContext(pitRaw, ctx); errIf != nil || (err != nil && strings.Contains(err.Error(), "illegal_argument_exception")) { LogWarn(ctx, fmt.Sprintf("PointInTime.CheckErr err=%v errIf=%v pitRaw=%s", err, errIf, pitRaw.String())) + if pitRaw.StatusCode == http.StatusTooManyRequests && retry < 10 { + time.Sleep(time.Duration(retry+1) * time.Second) + return p.CreatePitWithRetry(ctx, retry+1) + } // try elasticsearch api instead req := esapi.OpenPointInTimeRequest{