Skip to content

Commit

Permalink
feat: move http client and server as well as ingest to utils
Browse files Browse the repository at this point in the history
  • Loading branch information
Mahanmmi committed Jul 24, 2024
1 parent 911cf82 commit 4bcbd19
Show file tree
Hide file tree
Showing 6 changed files with 841 additions and 0 deletions.
74 changes: 74 additions & 0 deletions pkg/es/ingest/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package esSinkClient

import (
"encoding/json"
"fmt"
"github.com/kaytu-io/kaytu-util/pkg/es"
"github.com/kaytu-io/kaytu-util/pkg/es/ingest/entity"
"github.com/kaytu-io/kaytu-util/pkg/httpclient"
"github.com/labstack/echo/v4"
"go.uber.org/zap"
"net/http"
)

type EsSinkServiceClient interface {
Ingest(ctx *httpclient.Context, docs []es.Doc) ([]entity.FailedDoc, error)
}

type esSinkServiceClient struct {
logger *zap.Logger
baseUrl string
}

func NewEsSinkServiceClient(logger *zap.Logger, baseUrl string) EsSinkServiceClient {
return &esSinkServiceClient{
logger: logger,
baseUrl: baseUrl,
}
}

func (c *esSinkServiceClient) Ingest(ctx *httpclient.Context, docs []es.Doc) ([]entity.FailedDoc, error) {
url := fmt.Sprintf("%s/api/v1/ingest", c.baseUrl)

jsonDocs, err := json.Marshal(docs)
if err != nil {
c.logger.Error("failed to marshal docs", zap.Error(err), zap.Any("docs", docs))
return nil, err
}
var baseDocs []es.DocBase
err = json.Unmarshal(jsonDocs, &baseDocs)
if err != nil {
c.logger.Error("failed to unmarshal docs", zap.Error(err), zap.Any("docs", docs), zap.String("jsonDocs", string(jsonDocs)))
return nil, err
}

req := entity.IngestRequest{
Docs: baseDocs,
}

reqJson, err := json.Marshal(req)
if err != nil {
c.logger.Error("failed to marshal request", zap.Error(err), zap.Any("request", req))
return nil, err
}

var res entity.IngestResponse
if statusCode, err := httpclient.DoRequest(ctx.Ctx, http.MethodPost, url, ctx.ToHeaders(), reqJson, &res); err != nil {
if 400 <= statusCode && statusCode < 500 {
return nil, echo.NewHTTPError(statusCode, err.Error())
}
c.logger.Error("failed to do request", zap.Error(err), zap.String("url", url), zap.String("reqJson", string(reqJson)))
return nil, err
}

for _, failedDoc := range res.FailedDocs {
c.logger.Error("failed to ingest doc", zap.Any("doc", failedDoc.Doc), zap.String("err", failedDoc.Err))
}

if len(res.FailedDocs) > len(docs)/2 {
c.logger.Error("failed to ingest more than half of the docs", zap.Any("failedDocs", res.FailedDocs))
return nil, fmt.Errorf("failed to ingest more than half of the docs")
}

return res.FailedDocs, nil
}
16 changes: 16 additions & 0 deletions pkg/es/ingest/entity/ingest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package entity

import "github.com/kaytu-io/kaytu-util/pkg/es"

type IngestRequest struct {
Docs []es.DocBase `json:"doc"`
}

type FailedDoc struct {
Doc es.DocBase `json:"doc"`
Err string `json:"err"`
}

type IngestResponse struct {
FailedDocs []FailedDoc `json:"failed_docs"`
}
Loading

0 comments on commit 4bcbd19

Please sign in to comment.