Skip to content

Commit

Permalink
feat(handler): Add 2.0 compatible write endpoint (#16908)
Browse files Browse the repository at this point in the history
This commit adds a /api/v2/write endpoint that maps the supplied bucket
and org to a v1 database and retention policy.

* Add AllowedOrgs to httpd Config type.

* Add /api/v2/write handler
  • Loading branch information
ayang64 authored Mar 6, 2020
1 parent 5f47c38 commit f24bdb3
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 5 deletions.
63 changes: 58 additions & 5 deletions services/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,11 @@ func NewHandler(c Config) *Handler {
},
Route{
"write", // Data-ingest route.
"POST", "/write", true, writeLogEnabled, h.serveWrite,
"POST", "/write", true, writeLogEnabled, h.serveWriteV1,
},
Route{
"write", // Data-ingest route.
"POST", "/api/v2/write", true, writeLogEnabled, h.serveWriteV2,
},
Route{
"prometheus-write", // Prometheus remote write
Expand Down Expand Up @@ -772,8 +776,58 @@ func (h *Handler) async(q *influxql.Query, results <-chan *query.Result) {
}
}

// serveWrite receives incoming series data in line protocol format and writes it to the database.
func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.User) {
// bucket2drbp extracts a bucket and retention policy from a properly formatted
// string.
//
// The 2.x compatible endpoints encode the databse and retention policy names
// in the database URL query value. It is encoded using a forward slash like
// "database/retentionpolicy" and we should be able to simply split that string
// on the forward slash.
//
func bucket2dbrp(bucket string) (string, string, error) {
// test for a slash in our bucket name.
switch idx := strings.IndexByte(bucket, '/'); idx {
case -1:
// if there is no slash, we're mapping bucket to the databse.
switch db := bucket; db {
case "":
// if our "database" is an empty string, this is an error.
return "", "", fmt.Errorf(`bucket name %q is missing a slash; not in "database/retention-policy" format`, bucket)
default:
return db, "", nil
}
default:
// there is a slash
switch db, rp := bucket[:idx], bucket[idx+1:]; {
case db == "":
// empty database is unrecoverable
return "", "", fmt.Errorf(`bucket name %q is in db/rp form but has an empty database`, bucket)
default:
return db, rp, nil
}
}
}

// serveWriteV2 maps v2 write parameters to a v1 style handler. the concepts
// of an "org" and "bucket" are mapped to v1 "database" and "retention
// policies".
func (h *Handler) serveWriteV2(w http.ResponseWriter, r *http.Request, user meta.User) {
db, rp, err := bucket2dbrp(r.URL.Query().Get("bucket"))
if err != nil {
h.httpError(w, err.Error(), http.StatusNotFound)
return
}
h.serveWrite(db, rp, w, r, user)
}

// serveWriteV1 handles v1 style writes.
func (h *Handler) serveWriteV1(w http.ResponseWriter, r *http.Request, user meta.User) {
h.serveWrite(r.URL.Query().Get("db"), r.URL.Query().Get("rp"), w, r, user)
}

// serveWrite receives incoming series data in line protocol format and writes
// it to the database.
func (h *Handler) serveWrite(database string, retentionPolicy string, w http.ResponseWriter, r *http.Request, user meta.User) {
atomic.AddInt64(&h.stats.WriteRequests, 1)
atomic.AddInt64(&h.stats.ActiveWriteRequests, 1)
defer func(start time.Time) {
Expand All @@ -782,7 +836,6 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.U
}(time.Now())
h.requestTracker.Add(r, user)

database := r.URL.Query().Get("db")
if database == "" {
h.httpError(w, "database is required", http.StatusBadRequest)
return
Expand Down Expand Up @@ -877,7 +930,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.U
}

// Write points.
if err := h.PointsWriter.WritePoints(database, r.URL.Query().Get("rp"), consistency, user, points); influxdb.IsClientError(err) {
if err := h.PointsWriter.WritePoints(database, retentionPolicy, consistency, user, points); influxdb.IsClientError(err) {
atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points)))
h.httpError(w, err.Error(), http.StatusBadRequest)
return
Expand Down
95 changes: 95 additions & 0 deletions services/httpd/v2_write_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package httpd

import "testing"

// test of how we extract the database and retention policy from the bucket in
// our v2 api enpoint.
//
func TestV2DatabaseRetentionPolicyMapper(t *testing.T) {
tests := map[string]struct {
input string
db string
rp string
shoulderr bool
}{
"Properly Encoded": {
input: "database/retention",
db: "database",
rp: "retention",
shoulderr: false,
},
"Empty Database": {
input: "/retention",
db: "",
rp: "",
shoulderr: true,
},
"Empty Retention Policy": {
input: "database/",
db: "database",
rp: "",
shoulderr: false,
},
"No Slash, Empty Retention Policy": {
input: "database",
db: "database",
rp: "",
shoulderr: false,
},
"Empty String": {
input: "",
db: "",
rp: "",
shoulderr: true,
},
"Space Before DB": {
input: " database/retention",
db: " database",
rp: "retention",
shoulderr: false,
},
"Space After DB": {
input: "database /retention",
db: "database ",
rp: "retention",
shoulderr: false,
},
"Space Before RP": {
input: "database/ retention",
db: "database",
rp: " retention",
shoulderr: false,
},
"Space After RP": {
input: "database/retention ",
db: "database",
rp: "retention ",
shoulderr: false,
},
}

t.Parallel()
for name, test := range tests {
test := test
t.Run(name, func(t *testing.T) {
db, rp, err := bucket2dbrp(test.input)
switch goterr, shoulderr := err != nil, test.shoulderr; {
case goterr != shoulderr:
switch shoulderr {
case true:
t.Fatalf("bucket2dbrp(%q) did not return an error; expected to return an error", test.input)
default:
t.Fatalf("bucket2dbrp(%q) return an error %v; expected to return a nil error", test.input, err)
}
}

if got, expected := db, test.db; got != expected {
t.Fatalf("bucket2dbrp(%q) returned a database of %q; epected %q", test.input, got, expected)
}

if got, expected := rp, test.rp; got != expected {
t.Fatalf("bucket2dbrp(%q) returned a retention policy of %q; epected %q", test.input, got, expected)
}
})
}
}

0 comments on commit f24bdb3

Please sign in to comment.