From f24bdb3ee57f0f143967c53615f4d5b91a779176 Mon Sep 17 00:00:00 2001 From: Ayan George Date: Fri, 6 Mar 2020 10:03:25 -0500 Subject: [PATCH] feat(handler): Add 2.0 compatible write endpoint (#16908) This commit adds a /api/v2/write endpoint that maps the supplied bucket and org to a v1 database and retention policy. * Add AllowedOrgs to httpd Config type. * Add /api/v2/write handler --- services/httpd/handler.go | 63 ++++++++++++++++++++-- services/httpd/v2_write_test.go | 95 +++++++++++++++++++++++++++++++++ 2 files changed, 153 insertions(+), 5 deletions(-) create mode 100644 services/httpd/v2_write_test.go diff --git a/services/httpd/handler.go b/services/httpd/handler.go index 6d4eaf21069..14dd199eedb 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -188,7 +188,11 @@ func NewHandler(c Config) *Handler { }, Route{ "write", // Data-ingest route. - "POST", "/write", true, writeLogEnabled, h.serveWrite, + "POST", "/write", true, writeLogEnabled, h.serveWriteV1, + }, + Route{ + "write", // Data-ingest route. + "POST", "/api/v2/write", true, writeLogEnabled, h.serveWriteV2, }, Route{ "prometheus-write", // Prometheus remote write @@ -772,8 +776,58 @@ func (h *Handler) async(q *influxql.Query, results <-chan *query.Result) { } } -// serveWrite receives incoming series data in line protocol format and writes it to the database. -func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.User) { +// bucket2drbp extracts a bucket and retention policy from a properly formatted +// string. +// +// The 2.x compatible endpoints encode the databse and retention policy names +// in the database URL query value. It is encoded using a forward slash like +// "database/retentionpolicy" and we should be able to simply split that string +// on the forward slash. +// +func bucket2dbrp(bucket string) (string, string, error) { + // test for a slash in our bucket name. + switch idx := strings.IndexByte(bucket, '/'); idx { + case -1: + // if there is no slash, we're mapping bucket to the databse. + switch db := bucket; db { + case "": + // if our "database" is an empty string, this is an error. + return "", "", fmt.Errorf(`bucket name %q is missing a slash; not in "database/retention-policy" format`, bucket) + default: + return db, "", nil + } + default: + // there is a slash + switch db, rp := bucket[:idx], bucket[idx+1:]; { + case db == "": + // empty database is unrecoverable + return "", "", fmt.Errorf(`bucket name %q is in db/rp form but has an empty database`, bucket) + default: + return db, rp, nil + } + } +} + +// serveWriteV2 maps v2 write parameters to a v1 style handler. the concepts +// of an "org" and "bucket" are mapped to v1 "database" and "retention +// policies". +func (h *Handler) serveWriteV2(w http.ResponseWriter, r *http.Request, user meta.User) { + db, rp, err := bucket2dbrp(r.URL.Query().Get("bucket")) + if err != nil { + h.httpError(w, err.Error(), http.StatusNotFound) + return + } + h.serveWrite(db, rp, w, r, user) +} + +// serveWriteV1 handles v1 style writes. +func (h *Handler) serveWriteV1(w http.ResponseWriter, r *http.Request, user meta.User) { + h.serveWrite(r.URL.Query().Get("db"), r.URL.Query().Get("rp"), w, r, user) +} + +// serveWrite receives incoming series data in line protocol format and writes +// it to the database. +func (h *Handler) serveWrite(database string, retentionPolicy string, w http.ResponseWriter, r *http.Request, user meta.User) { atomic.AddInt64(&h.stats.WriteRequests, 1) atomic.AddInt64(&h.stats.ActiveWriteRequests, 1) defer func(start time.Time) { @@ -782,7 +836,6 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.U }(time.Now()) h.requestTracker.Add(r, user) - database := r.URL.Query().Get("db") if database == "" { h.httpError(w, "database is required", http.StatusBadRequest) return @@ -877,7 +930,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.U } // Write points. - if err := h.PointsWriter.WritePoints(database, r.URL.Query().Get("rp"), consistency, user, points); influxdb.IsClientError(err) { + if err := h.PointsWriter.WritePoints(database, retentionPolicy, consistency, user, points); influxdb.IsClientError(err) { atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points))) h.httpError(w, err.Error(), http.StatusBadRequest) return diff --git a/services/httpd/v2_write_test.go b/services/httpd/v2_write_test.go new file mode 100644 index 00000000000..66cc819e613 --- /dev/null +++ b/services/httpd/v2_write_test.go @@ -0,0 +1,95 @@ +package httpd + +import "testing" + +// test of how we extract the database and retention policy from the bucket in +// our v2 api enpoint. +// +func TestV2DatabaseRetentionPolicyMapper(t *testing.T) { + tests := map[string]struct { + input string + db string + rp string + shoulderr bool + }{ + "Properly Encoded": { + input: "database/retention", + db: "database", + rp: "retention", + shoulderr: false, + }, + "Empty Database": { + input: "/retention", + db: "", + rp: "", + shoulderr: true, + }, + "Empty Retention Policy": { + input: "database/", + db: "database", + rp: "", + shoulderr: false, + }, + "No Slash, Empty Retention Policy": { + input: "database", + db: "database", + rp: "", + shoulderr: false, + }, + "Empty String": { + input: "", + db: "", + rp: "", + shoulderr: true, + }, + "Space Before DB": { + input: " database/retention", + db: " database", + rp: "retention", + shoulderr: false, + }, + "Space After DB": { + input: "database /retention", + db: "database ", + rp: "retention", + shoulderr: false, + }, + "Space Before RP": { + input: "database/ retention", + db: "database", + rp: " retention", + shoulderr: false, + }, + "Space After RP": { + input: "database/retention ", + db: "database", + rp: "retention ", + shoulderr: false, + }, + } + + t.Parallel() + for name, test := range tests { + test := test + t.Run(name, func(t *testing.T) { + db, rp, err := bucket2dbrp(test.input) + switch goterr, shoulderr := err != nil, test.shoulderr; { + case goterr != shoulderr: + switch shoulderr { + case true: + t.Fatalf("bucket2dbrp(%q) did not return an error; expected to return an error", test.input) + default: + t.Fatalf("bucket2dbrp(%q) return an error %v; expected to return a nil error", test.input, err) + } + } + + if got, expected := db, test.db; got != expected { + t.Fatalf("bucket2dbrp(%q) returned a database of %q; epected %q", test.input, got, expected) + } + + if got, expected := rp, test.rp; got != expected { + t.Fatalf("bucket2dbrp(%q) returned a retention policy of %q; epected %q", test.input, got, expected) + } + }) + } +}