From 86aa88a224d6ad50b736febab0836e0a958556b5 Mon Sep 17 00:00:00 2001 From: Mahan Zendedel DH Date: Wed, 27 Dec 2023 16:30:08 +0400 Subject: [PATCH] fix: add sending one by one in case of too large --- pkg/pipeline/sender.go | 64 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/pkg/pipeline/sender.go b/pkg/pipeline/sender.go index 186e117..0cdf432 100644 --- a/pkg/pipeline/sender.go +++ b/pkg/pipeline/sender.go @@ -14,6 +14,64 @@ import ( "time" ) +func sendToPipelineIndividually(ingestionPipelineEndpoint string, resourcesToSend []kafka.Doc) error { + httpClient := &http.Client{Timeout: 10 * time.Second} + if len(resourcesToSend) == 0 { + return nil + } + + for _, resource := range resourcesToSend { + jsonResourcesToSend, err := json.Marshal(resource) + if err != nil { + return err + } + + req, err := http.NewRequest( + http.MethodPost, + ingestionPipelineEndpoint, + strings.NewReader(string(jsonResourcesToSend)), + ) + req.Header.Add("Content-Type", "application/json") + + cfg, err := config.LoadDefaultConfig(context.TODO()) + if err != nil { + return err + } + + creds, err := cfg.Credentials.Retrieve(context.Background()) + if err != nil { + return err + } + + signer := v4.NewSigner() + err = signer.SignHTTP(context.TODO(), creds, req, + fmt.Sprintf("%x", sha256.Sum256(jsonResourcesToSend)), + "osis", "us-east-2", time.Now()) + if err != nil { + return err + } + + resp, err := httpClient.Do(req) + if err != nil { + return err + } + + defer resp.Body.Close() + // check status + if resp.StatusCode != http.StatusOK { + bodyStr := "" + if resp.Body != nil { + bodyBytes, err := io.ReadAll(resp.Body) + if err == nil { + bodyStr = string(bodyBytes) + } + } + return fmt.Errorf("failed to send resources individually to OpenSearch, statusCode=%d, body=%s, requestSize=%d", resp.StatusCode, bodyStr, len(jsonResourcesToSend)) + } + } + return nil +} + func SendToPipeline(ingestionPipelineEndpoint string, resourcesToSend []kafka.Doc) error { httpClient := &http.Client{Timeout: 10 * time.Second} if len(resourcesToSend) == 0 { @@ -59,6 +117,12 @@ func SendToPipeline(ingestionPipelineEndpoint string, resourcesToSend []kafka.Do defer resp.Body.Close() // check status if resp.StatusCode != http.StatusOK { + if resp.StatusCode == http.StatusRequestEntityTooLarge { + err = sendToPipelineIndividually(ingestionPipelineEndpoint, resourcesToSend[page:min(page+100, len(resourcesToSend))]) + if err != nil { + return err + } + } bodyStr := "" if resp.Body != nil { bodyBytes, err := io.ReadAll(resp.Body)