diff --git a/pkg/es/ingest/client/client.go b/pkg/es/ingest/client/client.go new file mode 100644 index 0000000..56e0ff8 --- /dev/null +++ b/pkg/es/ingest/client/client.go @@ -0,0 +1,74 @@ +package esSinkClient + +import ( + "encoding/json" + "fmt" + "github.com/kaytu-io/kaytu-util/pkg/es" + "github.com/kaytu-io/kaytu-util/pkg/es/ingest/entity" + "github.com/kaytu-io/kaytu-util/pkg/httpclient" + "github.com/labstack/echo/v4" + "go.uber.org/zap" + "net/http" +) + +type EsSinkServiceClient interface { + Ingest(ctx *httpclient.Context, docs []es.Doc) ([]entity.FailedDoc, error) +} + +type esSinkServiceClient struct { + logger *zap.Logger + baseUrl string +} + +func NewEsSinkServiceClient(logger *zap.Logger, baseUrl string) EsSinkServiceClient { + return &esSinkServiceClient{ + logger: logger, + baseUrl: baseUrl, + } +} + +func (c *esSinkServiceClient) Ingest(ctx *httpclient.Context, docs []es.Doc) ([]entity.FailedDoc, error) { + url := fmt.Sprintf("%s/api/v1/ingest", c.baseUrl) + + jsonDocs, err := json.Marshal(docs) + if err != nil { + c.logger.Error("failed to marshal docs", zap.Error(err), zap.Any("docs", docs)) + return nil, err + } + var baseDocs []es.DocBase + err = json.Unmarshal(jsonDocs, &baseDocs) + if err != nil { + c.logger.Error("failed to unmarshal docs", zap.Error(err), zap.Any("docs", docs), zap.String("jsonDocs", string(jsonDocs))) + return nil, err + } + + req := entity.IngestRequest{ + Docs: baseDocs, + } + + reqJson, err := json.Marshal(req) + if err != nil { + c.logger.Error("failed to marshal request", zap.Error(err), zap.Any("request", req)) + return nil, err + } + + var res entity.IngestResponse + if statusCode, err := httpclient.DoRequest(ctx.Ctx, http.MethodPost, url, ctx.ToHeaders(), reqJson, &res); err != nil { + if 400 <= statusCode && statusCode < 500 { + return nil, echo.NewHTTPError(statusCode, err.Error()) + } + c.logger.Error("failed to do request", zap.Error(err), zap.String("url", url), zap.String("reqJson", string(reqJson))) + return nil, err + } + + for _, failedDoc := range res.FailedDocs { + c.logger.Error("failed to ingest doc", zap.Any("doc", failedDoc.Doc), zap.String("err", failedDoc.Err)) + } + + if len(res.FailedDocs) > len(docs)/2 { + c.logger.Error("failed to ingest more than half of the docs", zap.Any("failedDocs", res.FailedDocs)) + return nil, fmt.Errorf("failed to ingest more than half of the docs") + } + + return res.FailedDocs, nil +} diff --git a/pkg/es/ingest/entity/ingest.go b/pkg/es/ingest/entity/ingest.go new file mode 100644 index 0000000..a9ce6fe --- /dev/null +++ b/pkg/es/ingest/entity/ingest.go @@ -0,0 +1,16 @@ +package entity + +import "github.com/kaytu-io/kaytu-util/pkg/es" + +type IngestRequest struct { + Docs []es.DocBase `json:"doc"` +} + +type FailedDoc struct { + Doc es.DocBase `json:"doc"` + Err string `json:"err"` +} + +type IngestResponse struct { + FailedDocs []FailedDoc `json:"failed_docs"` +} diff --git a/pkg/httpclient/request.go b/pkg/httpclient/request.go new file mode 100644 index 0000000..2709348 --- /dev/null +++ b/pkg/httpclient/request.go @@ -0,0 +1,403 @@ +package httpclient + +import ( + "bytes" + "compress/gzip" + "context" + "encoding/json" + "fmt" + "github.com/kaytu-io/kaytu-engine/pkg/httpserver" + "io" + "mime/multipart" + "net/http" + url2 "net/url" + "time" + + "github.com/kaytu-io/kaytu-engine/pkg/auth/api" + "github.com/labstack/echo/v4" +) + +type EchoError struct { + Message string `json:"message"` +} +type Context struct { + Ctx context.Context + + UserRole api.Role + UserID string + WorkspaceName string + WorkspaceID string +} + +func (ctx *Context) Request() *http.Request { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) SetRequest(r *http.Request) { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) SetResponse(r *echo.Response) { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) Response() *echo.Response { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) IsTLS() bool { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) IsWebSocket() bool { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) Scheme() string { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) RealIP() string { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) Path() string { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) SetPath(p string) { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) Param(name string) string { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) ParamNames() []string { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) SetParamNames(names ...string) { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) ParamValues() []string { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) SetParamValues(values ...string) { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) QueryParam(name string) string { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) QueryParams() url2.Values { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) QueryString() string { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) FormValue(name string) string { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) FormParams() (url2.Values, error) { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) FormFile(name string) (*multipart.FileHeader, error) { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) MultipartForm() (*multipart.Form, error) { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) Cookie(name string) (*http.Cookie, error) { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) SetCookie(cookie *http.Cookie) { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) Cookies() []*http.Cookie { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) Get(key string) interface{} { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) Set(key string, val interface{}) { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) Bind(i interface{}) error { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) Validate(i interface{}) error { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) Render(code int, name string, data interface{}) error { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) HTML(code int, html string) error { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) HTMLBlob(code int, b []byte) error { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) String(code int, s string) error { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) JSON(code int, i interface{}) error { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) JSONPretty(code int, i interface{}, indent string) error { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) JSONBlob(code int, b []byte) error { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) JSONP(code int, callback string, i interface{}) error { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) JSONPBlob(code int, callback string, b []byte) error { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) XML(code int, i interface{}) error { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) XMLPretty(code int, i interface{}, indent string) error { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) XMLBlob(code int, b []byte) error { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) Blob(code int, contentType string, b []byte) error { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) Stream(code int, contentType string, r io.Reader) error { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) File(file string) error { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) Attachment(file string, name string) error { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) Inline(file string, name string) error { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) NoContent(code int) error { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) Redirect(code int, url string) error { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) Error(err error) { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) Handler() echo.HandlerFunc { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) SetHandler(h echo.HandlerFunc) { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) Logger() echo.Logger { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) SetLogger(l echo.Logger) { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) Echo() *echo.Echo { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) Reset(r *http.Request, w http.ResponseWriter) { + //TODO implement me + panic("implement me") +} + +func (ctx *Context) ToHeaders() map[string]string { + return map[string]string{ + httpserver.XKaytuUserIDHeader: ctx.UserID, + httpserver.XKaytuUserRoleHeader: string(ctx.UserRole), + httpserver.XKaytuWorkspaceIDHeader: ctx.WorkspaceID, + httpserver.XKaytuWorkspaceNameHeader: ctx.WorkspaceName, + } +} + +func (ctx *Context) GetWorkspaceName() string { + return ctx.WorkspaceName +} + +func (ctx *Context) GetWorkspaceID() string { + return ctx.WorkspaceID +} + +func FromEchoContext(c echo.Context) *Context { + wsID := c.Request().Header.Get(httpserver.XKaytuWorkspaceIDHeader) + name := c.Request().Header.Get(httpserver.XKaytuWorkspaceNameHeader) + role := c.Request().Header.Get(httpserver.XKaytuUserRoleHeader) + id := c.Request().Header.Get(httpserver.XKaytuUserIDHeader) + ctx := c.Request().Context() + return &Context{ + Ctx: ctx, + WorkspaceName: name, + WorkspaceID: wsID, + UserRole: api.Role(role), + UserID: id, + } +} + +func discardBody(res *http.Response) { + if res != nil && res.Body != nil { + io.Copy(io.Discard, res.Body) + res.Body.Close() + } +} + +func DoRequest(ctx context.Context, method string, url string, headers map[string]string, payload []byte, v interface{}) (statusCode int, err error) { + if ctx == nil { + ctx = context.Background() + } + req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewReader(payload)) + if err != nil { + return statusCode, fmt.Errorf("new request: %w", err) + } + req.Header.Set(echo.HeaderContentType, "application/json") + req.Header.Set(echo.HeaderContentEncoding, "gzip") + req.Header.Add(echo.HeaderAcceptEncoding, "gzip") + + for k, v := range headers { + req.Header.Add(k, v) + } + t := http.DefaultTransport.(*http.Transport) + t.MaxIdleConns = 100 + t.MaxConnsPerHost = 100 + t.MaxIdleConnsPerHost = 100 + client := http.Client{ + Timeout: 3 * time.Minute, + Transport: t, + } + res, err := client.Do(req) + defer discardBody(res) + if err != nil { + return statusCode, fmt.Errorf("do request: %w", err) + } + + body := res.Body + if res.Header.Get("Content-Encoding") == "gzip" { + body, err = gzip.NewReader(res.Body) + if err != nil { + return statusCode, fmt.Errorf("gzip new reader: %w", err) + } + defer body.Close() + } + + statusCode = res.StatusCode + if res.StatusCode != http.StatusOK { + d, err := io.ReadAll(body) + if err != nil { + return statusCode, fmt.Errorf("read body: %w", err) + } + + var echoerr EchoError + if jserr := json.Unmarshal(d, &echoerr); jserr == nil { + return statusCode, fmt.Errorf(echoerr.Message) + } + + return statusCode, fmt.Errorf("http status: %d: %s", res.StatusCode, d) + } + if v == nil { + return statusCode, nil + } + + return statusCode, json.NewDecoder(body).Decode(v) +} diff --git a/pkg/httpserver/auth.go b/pkg/httpserver/auth.go new file mode 100644 index 0000000..49b9bd4 --- /dev/null +++ b/pkg/httpserver/auth.go @@ -0,0 +1,148 @@ +package httpserver + +import ( + "fmt" + "net/http" + "strings" + + "github.com/kaytu-io/kaytu-engine/pkg/auth/api" + "github.com/labstack/echo/v4" +) + +const ( + XKaytuWorkspaceIDHeader = "X-Kaytu-WorkspaceID" + XKaytuWorkspaceNameHeader = "X-Kaytu-WorkspaceName" + XKaytuUserIDHeader = "X-Kaytu-UserId" + XKaytuUserRoleHeader = "X-Kaytu-UserRole" + XKaytuUserConnectionsScope = "X-Kaytu-UserConnectionsScope" +) + +func AuthorizeHandler(h echo.HandlerFunc, minRole api.Role) echo.HandlerFunc { + return func(ctx echo.Context) error { + if err := RequireMinRole(ctx, minRole); err != nil { + return err + } + + return h(ctx) + } +} + +func RequireMinRole(ctx echo.Context, minRole api.Role) error { + if !hasAccess(GetUserRole(ctx), minRole) { + userRole := GetUserRole(ctx) + fmt.Println("required role = ", minRole, " user role = ", userRole) + return echo.NewHTTPError(http.StatusForbidden, "missing required permission") + } + + return nil +} + +func GetWorkspaceName(ctx echo.Context) string { + name := ctx.Request().Header.Get(XKaytuWorkspaceNameHeader) + if strings.TrimSpace(name) == "" { + panic(fmt.Errorf("header %s is missing", XKaytuWorkspaceNameHeader)) + } + + return name +} + +func GetWorkspaceID(ctx echo.Context) string { + id := ctx.Request().Header.Get(XKaytuWorkspaceIDHeader) + if strings.TrimSpace(id) == "" { + panic(fmt.Errorf("header %s is missing", XKaytuWorkspaceIDHeader)) + } + + return id +} + +func GetUserRole(ctx echo.Context) api.Role { + role := ctx.Request().Header.Get(XKaytuUserRoleHeader) + if strings.TrimSpace(role) == "" { + panic(fmt.Errorf("header %s is missing", XKaytuUserRoleHeader)) + } + + return api.GetRole(role) +} + +func GetUserID(ctx echo.Context) string { + id := ctx.Request().Header.Get(XKaytuUserIDHeader) + if strings.TrimSpace(id) == "" { + panic(fmt.Errorf("header %s is missing", XKaytuUserIDHeader)) + } + + return id +} + +func CheckAccessToConnectionID(ctx echo.Context, connectionID string) error { + connectionIDsStr := ctx.Request().Header.Get(XKaytuUserConnectionsScope) + if len(connectionIDsStr) == 0 { + return nil + } + + arr := strings.Split(connectionIDsStr, ",") + if len(arr) == 0 { + return nil + } + + for _, item := range arr { + if item == connectionID { + return nil + } + } + return echo.NewHTTPError(http.StatusForbidden, "Invalid connection ID") +} + +func ResolveConnectionIDs(ctx echo.Context, connectionIDs []string) ([]string, error) { + connectionIDsStr := ctx.Request().Header.Get(XKaytuUserConnectionsScope) + if len(connectionIDsStr) == 0 { + return connectionIDs, nil + } + + arr := strings.Split(connectionIDsStr, ",") + if len(arr) == 0 { + return connectionIDs, nil + } + + if len(connectionIDs) == 0 { + return arr, nil + } else { + var res []string + for _, connID := range connectionIDs { + allowed := false + for _, item := range arr { + if item == connID { + allowed = true + } + } + + if allowed { + res = append(res, connID) + } + } + if len(res) == 0 { + return nil, echo.NewHTTPError(http.StatusForbidden, "invalid connection ids") + } + return res, nil + } +} + +func roleToPriority(role api.Role) int { + switch role { + case api.ViewerRole: + return 0 + case api.EditorRole: + return 1 + case api.AdminRole: + return 2 + case api.KaytuAdminRole: + return 98 + case api.InternalRole: + return 99 + default: + panic("unsupported role: " + role) + } +} + +func hasAccess(currRole, minRole api.Role) bool { + return roleToPriority(currRole) >= roleToPriority(minRole) +} diff --git a/pkg/httpserver/echo.go b/pkg/httpserver/echo.go new file mode 100644 index 0000000..4ffd27c --- /dev/null +++ b/pkg/httpserver/echo.go @@ -0,0 +1,131 @@ +package httpserver + +import ( + "context" + "fmt" + "os" + "strconv" + "strings" + + "github.com/kaytu-io/kaytu-util/pkg/metrics" + "github.com/labstack/echo/v4" + "github.com/labstack/echo/v4/middleware" + "go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/jaeger" + "go.opentelemetry.io/otel/propagation" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + + "go.uber.org/zap" +) + +var ( + agentHost = os.Getenv("JAEGER_AGENT_HOST") + serviceName = os.Getenv("JAEGER_SERVICE_NAME") + sampleRate = os.Getenv("JAEGER_SAMPLE_RATE") +) + +type Routes interface { + Register(router *echo.Echo) +} + +type EmptyRoutes struct{} + +func (EmptyRoutes) Register(router *echo.Echo) {} + +func Register(logger *zap.Logger, routes Routes) (*echo.Echo, *sdktrace.TracerProvider) { + e := echo.New() + e.HideBanner = true + + e.Use(middleware.Recover()) + e.Use(Logger(logger)) + e.Use(middleware.GzipWithConfig(middleware.GzipConfig{ + Skipper: func(c echo.Context) bool { + // skip metric endpoints + if strings.HasPrefix(c.Path(), "/metrics") { + return true + } + // skip if client does not accept gzip + acceptEncodingHeader := c.Request().Header.Values(echo.HeaderAcceptEncoding) + for _, value := range acceptEncodingHeader { + if strings.TrimSpace(value) == "gzip" { + return false + } + } + return true + }, + Level: 5, + })) + + metrics.AddEchoMiddleware(e) + + e.Pre(middleware.RemoveTrailingSlash()) + + tp, err := initTracer() + if err != nil { + logger.Error(err.Error()) + return nil, nil + } + + e.Validator = customValidator{ + validate: validator.New(), + } + + routes.Register(e) + + return e, tp +} + +func RegisterAndStart(ctx context.Context, logger *zap.Logger, address string, routes Routes) error { + e, tp := Register(logger, routes) + + defer func() { + if err := tp.Shutdown(ctx); err != nil { + } + }() + e.Use(otelecho.Middleware(serviceName)) + + return e.Start(address) +} + +type customValidator struct { + validate *validator.Validate +} + +func (v customValidator) Validate(i interface{}) error { + return v.validate.Struct(i) +} + +func QueryArrayParam(ctx echo.Context, paramName string) []string { + var values []string + for k, v := range ctx.QueryParams() { + if k == paramName || k == paramName+"[]" { + values = append(values, v...) + } + } + return values +} + +func initTracer() (*sdktrace.TracerProvider, error) { + exporter, err := jaeger.New(jaeger.WithAgentEndpoint(jaeger.WithAgentHost(agentHost))) + if err != nil { + return nil, err + } + + sampleRateFloat := 1.0 + if sampleRate != "" { + sampleRateFloat, err = strconv.ParseFloat(sampleRate, 64) + if err != nil { + fmt.Println("Error parsing sample rate for Jaeger. Using default value of 1.0", err) + sampleRateFloat = 1 + } + } + + tp := sdktrace.NewTracerProvider( + sdktrace.WithSampler(sdktrace.TraceIDRatioBased(sampleRateFloat)), + sdktrace.WithBatcher(exporter), + ) + otel.SetTracerProvider(tp) + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})) + return tp, nil +} diff --git a/pkg/httpserver/logger.go b/pkg/httpserver/logger.go new file mode 100644 index 0000000..a33e53f --- /dev/null +++ b/pkg/httpserver/logger.go @@ -0,0 +1,69 @@ +package httpserver + +import ( + "fmt" + "strings" + "time" + + "github.com/labstack/echo/v4" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +// Logger is a middleware and zap to provide an "access log" like logging for each request. +func Logger(log *zap.Logger) echo.MiddlewareFunc { + return func(next echo.HandlerFunc) echo.HandlerFunc { + return func(c echo.Context) error { + start := time.Now() + + req := c.Request() + fields := []zapcore.Field{ + zap.String("remote_ip", c.RealIP()), + zap.String("latency", time.Since(start).String()), + zap.String("host", req.Host), + zap.String("request", fmt.Sprintf("%s %s", req.Method, req.RequestURI)), + zap.String("user_agent", req.UserAgent()), + } + id := req.Header.Get(echo.HeaderXRequestID) + if id != "" { + fields = append(fields, zap.String("request_id", id)) + } + log.Info("Request received", fields...) + err := next(c) + if err != nil { + c.Error(err) + } + + res := c.Response() + + // skip metric endpoints + if strings.HasPrefix(c.Path(), "/metrics") { + return nil + } + + fields = append(fields, + zap.Int("status", res.Status), + zap.Int64("size", res.Size), + ) + + if id == "" { + id = res.Header().Get(echo.HeaderXRequestID) + fields = append(fields, zap.String("request_id", id)) + } + + n := res.Status + switch { + case n >= 500: + log.With(zap.Error(err)).Error("Server error", fields...) + case n >= 400: + log.With(zap.Error(err)).Warn("Client error", fields...) + case n >= 300: + log.Info("Redirection", fields...) + default: + log.Info("Success", fields...) + } + + return nil + } + } +}