Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add application dashboard #16

Merged
merged 3 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ jobs:

# Upload test coverage
- name: Upload coverage report
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: coverage-report
path: coverage.out

docker-build-test:
name: Build and Test Docker Image
name: Build and test container Image
runs-on: ubuntu-latest
needs: build

Expand All @@ -63,7 +63,7 @@ jobs:
# # exit 1 # Fail the script if the container is not healthy
# fi

# Step 4: Verify metrics are exposed
# Verify metrics are exposed
- name: Verify metrics endpoint
run: |
RESPONSE=$(curl -s -o /dev/null -w "%{http_code}" http://127.0.0.1:8080/api/v1/metrics)
Expand All @@ -73,7 +73,7 @@ jobs:
fi
echo "Metrics endpoint is reachable and returned HTTP $RESPONSE"

# Step 5: Optional - Inspect metrics content
# Optional - Inspect metrics content
- name: Fetch and log metrics
run: |
RESPONSE=$(curl -s http://127.0.0.1:8080/api/v1/metrics)
Expand Down Expand Up @@ -101,6 +101,7 @@ jobs:
echo "metricly_network_rx_bytes not found!" && exit 1
fi

# Verify /query api
- name: Test /api/v1/query
run: |
time=$(($(date +%s) - 10))
Expand All @@ -111,6 +112,7 @@ jobs:
exit 1
fi

# Verify /query_range api
- name: Test /api/v1/query_range
run: |
start=$(($(date +%s) - 100))
Expand All @@ -122,6 +124,13 @@ jobs:
exit 1
fi

RESPONSE=$(curl -s "http://127.0.0.1:8080/api/v1/query_range?metric=metricly_cpu_total&last=2m&step=15s")
if [[ $(echo "$RESPONSE" | jq '.data.result[0].values|length') -le 1 ]]; then
echo "Didn't multiple data points"
echo $RESPONSE
exit 1
fi
# Verify /aggregate api
- name: Test /api/v1/aggregate
run: |
RESPONSE=$(curl -s "http://127.0.0.1:8080/api/v1/aggregate?metric=metricly_cpu_total&operation=avg&window=1m")
Expand All @@ -137,7 +146,7 @@ jobs:
run: make run_compose_down

docker-build-push:
name: Build and Test Docker Image
name: Push Metricly to Quay.io
runs-on: ubuntu-latest
needs: docker-build-test
if: github.event_name == 'push'
Expand Down
53 changes: 31 additions & 22 deletions api/v1/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,54 +22,63 @@ func PrometheusAggregateHandler(conf *config.Config) http.HandlerFunc {

requestParams := r.URL.Query()

metricName := requestParams.Get("metric")
operation := requestParams.Get("operation")
window := requestParams.Get("window")
supportedParams := map[string]bool{
"metric": true,
"operation": true,
"window": true,
}
requiredParams := []string{"metric", "operation", "window"}

if err := validateAggregateParams(metricName, operation, window); err != nil {
http.Error(w, fmt.Sprintf("bad request: %s", err), http.StatusInternalServerError)
queryParams, err := processQueryParams(requestParams, supportedParams, requiredParams)
if err != nil {
sendErrorResponse(w, http.StatusBadRequest, err.Error())
return
}

baseQuery, _ := prometheus.NewQuery(conf, queryEndpoint)

aggregateQuery := fmt.Sprintf("%s(%s[%s])", supportedOperations[operation], metricName, window)
queryParams := map[string]string{
"query": aggregateQuery,
// needed to replace "avg" with "avg_over_time" to make it compatible with Prom
queryParams, err = processAggregateParams(queryParams)
if err != nil {
sendErrorResponse(w, http.StatusInternalServerError, fmt.Sprintf("bad request: %s", err))
return
}

promURL, err := baseQuery.BuildPrometheusURL(queryParams)
baseQuery, err := prometheus.NewQuery(conf, queryEndpoint)
if err != nil {
http.Error(w, fmt.Sprintf("failed to build prometheus query %v", err), http.StatusBadRequest)
sendErrorResponse(w, http.StatusBadRequest, err.Error())
return
}

var response PrometheusQueryResponse
promURL := baseQuery.BuildPrometheusURL(queryParams)

var response PrometheusResponse
err = QueryPrometheus(promURL, &response)
if err != nil {
http.Error(w, fmt.Sprintf("failed to query Prometheus: %v %s", err, promURL), http.StatusInternalServerError)
sendErrorResponse(w, http.StatusInternalServerError, fmt.Sprintf("failed to query Prometheus: %v %s", err, promURL))
return
}
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
err = json.NewEncoder(w).Encode(response)
if err != nil {
http.Error(w, fmt.Sprintf("failed to encode response: %s", err), http.StatusInternalServerError)
sendErrorResponse(w, http.StatusInternalServerError, fmt.Sprintf("failed to encode response: %s", err))
return
}
}
}

// validates input params for aggregate query
func validateAggregateParams(metricName, operation, window string) error {
if metricName == "" || operation == "" || window == "" {
return fmt.Errorf("metric, operation and window, all required to aggregate metrics")
}
func processAggregateParams(queryParams map[string]string) (map[string]string, error) {

_, valid := supportedOperations[operation]
_, valid := supportedOperations[queryParams["operation"]]
if !valid {
return fmt.Errorf("unsupported operation: %s", operation)
return nil, fmt.Errorf("unsupported operation: %s", queryParams["operation"])
}

return nil
queryParams["operation"] = supportedOperations[queryParams["operation"]]

aggregateQuery := fmt.Sprintf("%s(%s[%s])", queryParams["operation"], queryParams["query"], queryParams["window"])

return map[string]string{
"query": aggregateQuery,
}, nil
}
59 changes: 59 additions & 0 deletions api/v1/aggregate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package v1

import (
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"strings"

"testing"
)

func TestPrometheusAggregateHandler(t *testing.T) {

mockBody, _ := json.Marshal(mockPrometheusResponse)

mockServer, _ := newMockServer(mockBody)
defer mockServer.Close()

conf.Prometheus.Address = strings.Split(mockServer.URL, ":")[1][2:]
conf.Prometheus.Port = strings.Split(mockServer.URL, ":")[2]

handler := PrometheusAggregateHandler(conf)

req := httptest.NewRequest(http.MethodGet, "/aggregate?metric=metricly_cpu_total&operation=avg&window=1h", nil)
w := httptest.NewRecorder()

handler.ServeHTTP(w, req)

// Assertions
resp := w.Result()
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
t.Errorf("expected status %d; got %d", http.StatusOK, resp.StatusCode)
}

body, _ := io.ReadAll(resp.Body)
var parsedResponse PrometheusResponse
err := json.Unmarshal(body, &parsedResponse)
if err != nil {
t.Fatalf("failed to parse response: %v", err)
}

// Check the mock response data
if parsedResponse.Status != "success" {
t.Errorf("expected status 'success'; got %s", parsedResponse.Status)
}
if len(parsedResponse.Data.Result) != 1 {
t.Errorf("expected 1 result; got %d", len(parsedResponse.Data.Result))
}
if parsedResponse.Data.Result[0].Metric["__name__"] != "test_metric" {
t.Errorf("expected metric name 'mock_metric'; got %s", parsedResponse.Data.Result[0].Metric["__name__"])
}
if parsedResponse.Data.Result[0].Value[1] != "100" {
t.Errorf("expected value '100'; got %v", parsedResponse.Data.Result[0].Value[1])
}

}
57 changes: 57 additions & 0 deletions api/v1/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package v1

import (
"metricly/config"
"net/http"
"net/http/httptest"
)

var (
conf = &config.Config{
Prometheus: struct {
Address string "yaml:\"address\""
Port string "yaml:\"port\""
}{
Address: "localhost",
Port: "9090",
},
}

mockPrometheusResponse = PrometheusResponse{
Status: "success",
Data: struct {
ResultType string `json:"resultType"`
Result []struct {
Metric map[string]string `json:"metric"`
Value [2]interface{} `json:"value"`
} `json:"result"`
}{
ResultType: "vector",
Result: []struct {
Metric map[string]string `json:"metric"`
Value [2]interface{} `json:"value"`
}{
{
Metric: map[string]string{"__name__": "test_metric"},
Value: [2]interface{}{"1689636523", "100"},
},
},
},
}
)

func newMockServer(response []byte) (*httptest.Server, error) {

// emptyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// w.WriteHeader(http.StatusNoContent) // 204 No Content
// }))

mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
if _, err := w.Write(response); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}))
return mockServer, nil
}
86 changes: 86 additions & 0 deletions api/v1/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,44 @@ import (
"io"
"log/slog"
"net/http"
"net/url"
)

// represents the structure of the reponse Prometheus's API call
// used for /query & /aggregate apis
type PrometheusResponse struct {
Status string `json:"status"`
Data struct {
ResultType string `json:"resultType"`
Result []struct {
Metric map[string]string `json:"metric"`
Value [2]interface{} `json:"value"`
} `json:"result"`
} `json:"data"`
}

// represents the structure of the reponse Prometheus's API call
// used for /query_range api
type PrometheusQueryRangeResponse struct {
Status string `json:"status"`
Data struct {
ResultType string `json:"resultType"`
Result []struct {
Metric map[string]string `json:"metric"`
Values [][]interface{} `json:"values"`
} `json:"result"`
} `json:"data"`
}

// represents error reponse
type ErrorResponse struct {
Status string `json:"status"`
Data struct {
StatusCode int `json:"status_code"`
Message string `json:"message"`
} `json:"data"`
}

// make QueryPrometheus a generic function so that it can serve both reuests, query & query_range
func QueryPrometheus[T any](queryURL string, target *T) error {

Expand Down Expand Up @@ -43,3 +79,53 @@ func logAPIRequests(r *http.Request, duration int64, statusCode int) {
),
)
}

func sendErrorResponse(w http.ResponseWriter, statusCode int, message string) {

error := ErrorResponse{
Status: "failed",
Data: struct {
StatusCode int "json:\"status_code\""
Message string "json:\"message\""
}{
StatusCode: statusCode,
Message: message,
},
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(statusCode)
if err := json.NewEncoder(w).Encode(error); err != nil {
slog.Error("failed to parse error into json")
}
}

func processQueryParams(reqParam url.Values, supportedParams map[string]bool, requiredParams []string) (map[string]string, error) {

result := map[string]string{}
for param, value := range reqParam {
if !supportedParams[param] {
return nil, fmt.Errorf("unknown parameter in request: %s", param)
}
if len(value) == 0 {
return nil, fmt.Errorf("parameter %s found empty", param)
}
for _, v := range value {
if v == "" {
return nil, fmt.Errorf("value for parameter %s found empty", param)
}
result[param] = v
}
}

for _, req := range requiredParams {
if _, exists := result[req]; !exists {
return nil, fmt.Errorf("parameter %s required", req)
}
}

tmp := result["metric"]
delete(result, "metric")
result["query"] = tmp
return result, nil
}
Loading
Loading