Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: accept new SLO specs using the filesystem HTTP api #914

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,23 @@ picked up by a Prometheus instance. While running Pyrra on its own works, there
won't be any SLO configured, nor will there be any data from a Prometheus to
work with. It's designed to work alongside a Prometheus.

SLO specs may be published using the `/specs/create` endpoint. This will cause `pyrra` to
generate the corresponding recordingRules, write them out to disk and trigger a reload
of Prometheus.

Example:
```
% curl -i -X POST -H "content-type: multipart/form-data" -F "spec=@service-levels/some-slo.yaml" http://localhost:9444/ingest
HTTP/1.1 200 OK
Date: Fri, 15 Sep 2023 14:21:54 GMT
Content-Length: 2
Content-Type: text/plain; charset=utf-8

ok
```

Note: the specs endpoint does not require authentication and needs to be explicitly enabled using `--specs-api`.

## Tech Stack

**Client:** TypeScript with React, Bootstrap, and uPlot.
Expand Down
146 changes: 145 additions & 1 deletion filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import (
"context"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
Expand Down Expand Up @@ -89,7 +90,140 @@
return objectives
}

func cmdFilesystem(logger log.Logger, reg *prometheus.Registry, promClient api.Client, configFiles, prometheusFolder string, genericRules bool) int {
func _listFolderContents(folderPath string, w http.ResponseWriter) error {
harmw marked this conversation as resolved.
Show resolved Hide resolved
err := filepath.Walk(folderPath, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
fmt.Fprintf(w, "%v\n", filepath.Base(path))
harmw marked this conversation as resolved.
Show resolved Hide resolved
return nil
})
return err
}

func listSpecsHandler(logger log.Logger, specsDir string, prometheusDir string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
level.Info(logger).Log("msg", "listing available specs", "dir", specsDir)
fmt.Fprintf(w, "Specs currently available:\n")

err := _listFolderContents(specsDir, w)
if err != nil {
level.Error(logger).Log("msg", "error listing available specs", "err", err)
}

level.Info(logger).Log("msg", "listing generated rules", "dir", prometheusDir)
fmt.Fprintf(w, "Rules currently generated:\n")

err = _listFolderContents(prometheusDir, w)
if err != nil {
level.Error(logger).Log("msg", "error listing generated rules", "err", err)
}
}
}

func createSpecHandler(logger log.Logger, dir string, prometheusFolder string, reload chan struct{}, genericRules bool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}

r.ParseMultipartForm(32 << 20)
file, handler, err := r.FormFile("spec")
if err != nil {
http.Error(w, "Failure reading spec field in form upload: "+err.Error(), 400)
return
}

level.Info(logger).Log("msg", "processing SLO spec", "specFile", handler.Filename)

if err != nil {
level.Error(logger).Log("msg", "failed to process HTTP request", "err", err)
http.Error(w, "Failure: "+err.Error(), 400)
return
}

var ingestedSpec = dir + "/" + handler.Filename

level.Info(logger).Log("msg", "writing spec to disk", "location", ingestedSpec)
f, err := os.OpenFile(ingestedSpec, os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
level.Error(logger).Log("msg", "failed to write spec to disk", "err", err)
http.Error(w, "Failure: "+err.Error(), 500)
return
}
defer f.Close()

io.Copy(f, file)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty pragmatic and should be ok for a proof-of-concept.
Quickly, this will become a foot gun where the API will accept any file and write the contents to disk.

We want to ensure the sent data is correct and has a JSON/YAML objective configuration.

A helper function to read from a file and then unmarshal and validate the content already exists with:

pyrra/filesystem.go

Lines 387 to 404 in 174b026

func objectiveFromFile(file string) (v1alpha1.ServiceLevelObjective, slo.Objective, error) {
bytes, err := os.ReadFile(file)
if err != nil {
return v1alpha1.ServiceLevelObjective{}, slo.Objective{}, fmt.Errorf("failed to read file %q: %w", file, err)
}
var config v1alpha1.ServiceLevelObjective
if err := yaml.UnmarshalStrict(bytes, &config); err != nil {
return v1alpha1.ServiceLevelObjective{}, slo.Objective{}, fmt.Errorf("failed to unmarshal objective %q: %w", file, err)
}
objective, err := config.Internal()
if err != nil {
return v1alpha1.ServiceLevelObjective{}, slo.Objective{}, fmt.Errorf("failed to get objective %q: %w", file, err)
}
return config, objective, nil
}

If we change that function to take a io.Reader we can pass both a os.File (for the existing use case) or a bytes.Buffer which we would write the HTTP payload content into to that helper function.

-func objectiveFromFile(file string) (v1alpha1.ServiceLevelObjective, slo.Objective, error) {
+func objectiveFromReader(content io.Reader) (v1alpha1.ServiceLevelObjective, slo.Objective, error) { 

It's probably easiest to then marshal the parsed config struct back into YAML and write that to disk.

Let me know if you need any more pointers or anything else. I'm happy to help!


level.Info(logger).Log("msg", "attempting to build rules from spec", "location", ingestedSpec)
err = writeRuleFile(logger, ingestedSpec, prometheusFolder, genericRules, false)
if err != nil {
level.Error(logger).Log("msg", "error building rules from spec", "file", ingestedSpec, "err", err)
http.Error(w, "Failure: "+err.Error(), 400)

err := os.Remove(ingestedSpec)
if err != nil {
level.Error(logger).Log("msg", "failed to remove spec", "file", ingestedSpec, "err", err)
}
return
}

level.Info(logger).Log("msg", "signaling Prometheus reload")
reload <- struct{}{}

fmt.Fprintf(w, "ok")
}
}

func removeSpecHandler(logger log.Logger, dir string, prometheusFolder string, reload chan struct{}) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodDelete {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}

filePath := r.URL.Query().Get("f")
if filePath == "" {
http.Error(w, "Missing 'f' parameter in the query", http.StatusBadRequest)
return
}

cleanPath := filepath.Clean(filePath)

level.Info(logger).Log("msg", "removing for", "file", filePath)

level.Debug(logger).Log("msg", "removing spec", "file", filePath, "dir", dir)
err := os.Remove(dir + "/" + cleanPath)
if err != nil {
level.Error(logger).Log("msg", "failed to remove file", "file", filePath, "clean", cleanPath, "err", err)
http.Error(w, fmt.Sprintf("Error deleting spec: %s", err), http.StatusInternalServerError)
return
}

level.Debug(logger).Log("msg", "removing generated rules", "file", filePath, "dir", prometheusFolder)
err = os.Remove(prometheusFolder + "/" + cleanPath)
if err != nil {
level.Error(logger).Log("msg", "failed to remove Prometheus rules file", "file", filePath, "clean", cleanPath, "err", err)
http.Error(w, fmt.Sprintf("Error deleting rules: %s", err), http.StatusInternalServerError)
return
}

level.Info(logger).Log("msg", "signaling Prometheus reload")
reload <- struct{}{}

fmt.Fprintf(w, "ok")
}
}

func cmdFilesystem(logger log.Logger, reg *prometheus.Registry, promClient api.Client, configFiles, prometheusFolder string, specsApi bool, genericRules bool) int {
reconcilesTotal := prometheus.NewCounter(prometheus.CounterOpts{
Name: "pyrra_filesystem_reconciles_total",
Help: "The total amount of reconciles.",
Expand Down Expand Up @@ -122,7 +256,7 @@
}
<-ctx.Done()
return nil
}, func(err error) {

Check warning on line 259 in filesystem.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'err' seems to be unused, consider removing or renaming it as _ (revive)
cancel()
})
}
Expand Down Expand Up @@ -242,6 +376,7 @@
}
{
prometheusInterceptor := connectprometheus.NewInterceptor(reg)
dir := filepath.Dir(configFiles)

router := http.NewServeMux()
router.Handle(objectivesv1alpha1connect.NewObjectiveBackendServiceHandler(
Expand All @@ -252,6 +387,15 @@
))
router.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))

if specsApi {
level.Info(logger).Log("msg", "specs API endpoints are enabled")
router.HandleFunc("/specs/remove", removeSpecHandler(logger, dir, prometheusFolder, reload))
router.HandleFunc("/specs/create", createSpecHandler(logger, dir, prometheusFolder, reload, genericRules))
router.HandleFunc("/specs/list", listSpecsHandler(logger, dir, prometheusFolder))
} else {
level.Info(logger).Log("msg", "specs API endpoints are disabled")
}

server := http.Server{
Addr: ":9444",
Handler: h2c.NewHandler(router, &http2.Server{}),
Expand Down
Loading
Loading