Skip to content

Commit

Permalink
fix: add sending one by one in case of too large
Browse files Browse the repository at this point in the history
  • Loading branch information
Mahanmmi committed Dec 27, 2023
1 parent bba4c07 commit 86aa88a
Showing 1 changed file with 64 additions and 0 deletions.
64 changes: 64 additions & 0 deletions pkg/pipeline/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,64 @@ import (
"time"
)

func sendToPipelineIndividually(ingestionPipelineEndpoint string, resourcesToSend []kafka.Doc) error {
httpClient := &http.Client{Timeout: 10 * time.Second}
if len(resourcesToSend) == 0 {
return nil
}

for _, resource := range resourcesToSend {
jsonResourcesToSend, err := json.Marshal(resource)
if err != nil {
return err
}

req, err := http.NewRequest(
http.MethodPost,
ingestionPipelineEndpoint,
strings.NewReader(string(jsonResourcesToSend)),
)
req.Header.Add("Content-Type", "application/json")

cfg, err := config.LoadDefaultConfig(context.TODO())
if err != nil {
return err
}

creds, err := cfg.Credentials.Retrieve(context.Background())
if err != nil {
return err
}

signer := v4.NewSigner()
err = signer.SignHTTP(context.TODO(), creds, req,
fmt.Sprintf("%x", sha256.Sum256(jsonResourcesToSend)),
"osis", "us-east-2", time.Now())
if err != nil {
return err
}

resp, err := httpClient.Do(req)
if err != nil {
return err
}

defer resp.Body.Close()
// check status
if resp.StatusCode != http.StatusOK {
bodyStr := ""
if resp.Body != nil {
bodyBytes, err := io.ReadAll(resp.Body)
if err == nil {
bodyStr = string(bodyBytes)
}
}
return fmt.Errorf("failed to send resources individually to OpenSearch, statusCode=%d, body=%s, requestSize=%d", resp.StatusCode, bodyStr, len(jsonResourcesToSend))
}
}
return nil
}

func SendToPipeline(ingestionPipelineEndpoint string, resourcesToSend []kafka.Doc) error {
httpClient := &http.Client{Timeout: 10 * time.Second}
if len(resourcesToSend) == 0 {
Expand Down Expand Up @@ -59,6 +117,12 @@ func SendToPipeline(ingestionPipelineEndpoint string, resourcesToSend []kafka.Do
defer resp.Body.Close()
// check status
if resp.StatusCode != http.StatusOK {
if resp.StatusCode == http.StatusRequestEntityTooLarge {
err = sendToPipelineIndividually(ingestionPipelineEndpoint, resourcesToSend[page:min(page+100, len(resourcesToSend))])
if err != nil {
return err
}
}
bodyStr := ""
if resp.Body != nil {
bodyBytes, err := io.ReadAll(resp.Body)
Expand Down

0 comments on commit 86aa88a

Please sign in to comment.