diff --git a/CHANGELOG.md b/CHANGELOG.md index 400857915ec..8bf17458f00 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -78,6 +78,7 @@ This release adds an embedded SQLite database for storing metadata required by t 1. [21950](https://github.com/influxdata/influxdb/pull/21950): Invalid requests to /api/v2 subroutes now return 404 instead of a list of links. 1. [21962](https://github.com/influxdata/influxdb/pull/21962): Flux metaqueries for `_field` take fast path if `_measurement` is the only predicate. 1. [22059](https://github.com/influxdata/influxdb/pull/22059): Copy names from mmapped memory before closing iterator +1. [22186](https://github.com/influxdata/influxdb/pull/22186): Preserve comments in flux queries when saving task definitions ## v2.0.7 [2021-06-04] diff --git a/checks/service_external_test.go b/checks/service_external_test.go index 0e9857e2a72..8535e0902b6 100644 --- a/checks/service_external_test.go +++ b/checks/service_external_test.go @@ -317,7 +317,7 @@ func CreateCheck( Organization: "theorg", OwnerID: MustIDBase16("020f755c3c082001"), Status: "active", - Flux: "package main\nimport \"influxdata/influxdb/monitor\"\nimport \"experimental\"\nimport \"influxdata/influxdb/v1\"\n\ndata = from(bucket: \"telegraf\")\n\t|> range(start: -1h)\n\t|> filter(fn: (r) =>\n\t\t(r._field == \"usage_user\"))\n\noption task = {name: \"name1\", every: 1m}\n\ncheck = {\n\t_check_id: \"020f755c3c082000\",\n\t_check_name: \"name1\",\n\t_type: \"deadman\",\n\ttags: {k1: \"v1\", k2: \"v2\"},\n}\ncrit = (r) =>\n\t(r[\"dead\"])\nmessageFn = (r) =>\n\t(\"msg1\")\n\ndata\n\t|> v1[\"fieldsAsCols\"]()\n\t|> monitor[\"deadman\"](t: experimental[\"subDuration\"](from: now(), d: 21s))\n\t|> monitor[\"check\"](data: check, messageFn: messageFn, crit: crit)", + Flux: "import \"influxdata/influxdb/monitor\"\nimport \"experimental\"\nimport \"influxdata/influxdb/v1\"\n\ndata = from(bucket: \"telegraf\") |> range(start: -1h) |> filter(fn: (r) => r._field == \"usage_user\")\n\noption task = {name: \"name1\", every: 1m}\n\ncheck = {_check_id: \"020f755c3c082000\", _check_name: \"name1\", _type: \"deadman\", tags: {k1: \"v1\", k2: \"v2\"}}\ncrit = (r) => r[\"dead\"]\nmessageFn = (r) => \"msg1\"\n\ndata |> v1[\"fieldsAsCols\"]() |> monitor[\"deadman\"](t: experimental[\"subDuration\"](from: now(), d: 21s))\n |> monitor[\"check\"](data: check, messageFn: messageFn, crit: crit)", Every: "1m", }, }, @@ -447,7 +447,7 @@ func CreateCheck( OwnerID: MustIDBase16("020f755c3c082005"), Status: "active", Every: "1m", - Flux: "package main\nimport \"influxdata/influxdb/monitor\"\nimport \"influxdata/influxdb/v1\"\n\ndata = from(bucket: \"telegraf\")\n\t|> range(start: -1m)\n\t|> filter(fn: (r) =>\n\t\t(r._field == \"usage_user\"))\n\noption task = {name: \"name2\", every: 1m}\n\ncheck = {\n\t_check_id: \"020f755c3c082001\",\n\t_check_name: \"name2\",\n\t_type: \"threshold\",\n\ttags: {k11: \"v11\"},\n}\nok = (r) =>\n\t(r[\"usage_user\"] < 1000.0)\nwarn = (r) =>\n\t(r[\"usage_user\"] > 2000.0)\ninfo = (r) =>\n\t(r[\"usage_user\"] < 1900.0 and r[\"usage_user\"] > 1500.0)\nmessageFn = (r) =>\n\t(\"msg2\")\n\ndata\n\t|> v1[\"fieldsAsCols\"]()\n\t|> monitor[\"check\"](\n\t\tdata: check,\n\t\tmessageFn: messageFn,\n\t\tok: ok,\n\t\twarn: warn,\n\t\tinfo: info,\n\t)", + Flux: "import \"influxdata/influxdb/monitor\"\nimport \"influxdata/influxdb/v1\"\n\ndata = from(bucket: \"telegraf\") |> range(start: -1m) |> filter(fn: (r) => r._field == \"usage_user\")\n\noption task = {name: \"name2\", every: 1m}\n\ncheck = {_check_id: \"020f755c3c082001\", _check_name: \"name2\", _type: \"threshold\", tags: {k11: \"v11\"}}\nok = (r) => r[\"usage_user\"] < 1000.0\nwarn = (r) => r[\"usage_user\"] > 2000.0\ninfo = (r) => r[\"usage_user\"] < 1900.0 and r[\"usage_user\"] > 1500.0\nmessageFn = (r) => \"msg2\"\n\ndata |> v1[\"fieldsAsCols\"]() |> monitor[\"check\"](\n data: check,\n messageFn: messageFn,\n ok: ok,\n warn: warn,\n info: info,\n)", }, }, }, @@ -584,7 +584,7 @@ func CreateCheck( OwnerID: MustIDBase16("020f755c3c082001"), Status: "active", Every: "1m", - Flux: "package main\nimport \"influxdata/influxdb/monitor\"\nimport \"influxdata/influxdb/v1\"\n\ndata = from(bucket: \"telegraf\")\n\t|> range(start: -1m)\n\t|> filter(fn: (r) =>\n\t\t(r._field == \"usage_user\"))\n\noption task = {name: \"name1\", every: 1m}\n\ncheck = {\n\t_check_id: \"020f755c3c082001\",\n\t_check_name: \"name1\",\n\t_type: \"threshold\",\n\ttags: {k11: \"v11\", k22: \"v22\"},\n}\nmessageFn = (r) =>\n\t(\"msg2\")\n\ndata\n\t|> v1[\"fieldsAsCols\"]()\n\t|> monitor[\"check\"](data: check, messageFn: messageFn)", + Flux: "import \"influxdata/influxdb/monitor\"\nimport \"influxdata/influxdb/v1\"\n\ndata = from(bucket: \"telegraf\") |> range(start: -1m) |> filter(fn: (r) => r._field == \"usage_user\")\n\noption task = {name: \"name1\", every: 1m}\n\ncheck = {_check_id: \"020f755c3c082001\", _check_name: \"name1\", _type: \"threshold\", tags: {k11: \"v11\", k22: \"v22\"}}\nmessageFn = (r) => \"msg2\"\n\ndata |> v1[\"fieldsAsCols\"]() |> monitor[\"check\"](data: check, messageFn: messageFn)", }, }, checks: []influxdb.Check{ diff --git a/cmd/influxd/launcher/pkger_test.go b/cmd/influxd/launcher/pkger_test.go index 3991639bfa1..dad640cd307 100644 --- a/cmd/influxd/launcher/pkger_test.go +++ b/cmd/influxd/launcher/pkger_test.go @@ -3832,15 +3832,12 @@ spec: expectedQuery := expectedParams + ` from(bucket: params.bucket) - |> range(start: params.start, end: params.stop) - |> filter(fn: (r) => - (r._measurement == "processes")) - |> filter(fn: (r) => - (r.floater == params.floatVal)) - |> filter(fn: (r) => - (r._value > params.minVal)) - |> aggregateWindow(every: v.windowPeriod, fn: max) - |> yield(name: params.name)` + |> range(start: params.start, end: params.stop) + |> filter(fn: (r) => r._measurement == "processes") + |> filter(fn: (r) => r.floater == params.floatVal) + |> filter(fn: (r) => r._value > params.minVal) + |> aggregateWindow(every: v.windowPeriod, fn: max) + |> yield(name: params.name)` assert.Equal(t, expectedQuery, props.Queries[0].Text) assert.Equal(t, "advanced", props.Queries[0].EditMode) @@ -3869,12 +3866,12 @@ from(bucket: params.bucket) actual := impact.Summary.Dashboards[0] expectedParams := `option params = { - bucket: "bar", - start: -24h0m0s, - stop: now(), - name: "max", - floatVal: 37.2, - minVal: 10, + bucket: "bar", + start: -24h0m0s, + stop: now(), + name: "max", + floatVal: 37.2, + minVal: 10, }` isExpectedQuery(t, actual, expectedParams) @@ -3955,12 +3952,12 @@ from(bucket: params.bucket) actual := impact.Summary.Dashboards[0] expectedParams := `option params = { - bucket: "foobar", - start: -5d, - stop: now(), - name: "min", - floatVal: 33.3, - minVal: 3, + bucket: "foobar", + start: -5d, + stop: now(), + name: "min", + floatVal: 33.3, + minVal: 3, }` isExpectedQuery(t, actual, expectedParams) @@ -4084,15 +4081,12 @@ spec: expectedQuery := expectedParams + ` from(bucket: params.bucket) - |> range(start: params.start, stop: params.stop) - |> filter(fn: (r) => - (r._measurement == "processes")) - |> filter(fn: (r) => - (r.floater == params.floatVal)) - |> filter(fn: (r) => - (r._value > params.minVal)) - |> aggregateWindow(every: 1m, fn: max) - |> yield(name: params.name)` + |> range(start: params.start, stop: params.stop) + |> filter(fn: (r) => r._measurement == "processes") + |> filter(fn: (r) => r.floater == params.floatVal) + |> filter(fn: (r) => r._value > params.minVal) + |> aggregateWindow(every: 1m, fn: max) + |> yield(name: params.name)` assert.Equal(t, expectedQuery, actual.Query) } @@ -4120,12 +4114,12 @@ from(bucket: params.bucket) actual := impact.Summary.Tasks[0] expectedParams := `option params = { - bucket: "bar", - start: -24h0m0s, - stop: now(), - name: "max", - floatVal: 37.2, - minVal: 10, + bucket: "bar", + start: -24h0m0s, + stop: now(), + name: "max", + floatVal: 37.2, + minVal: 10, }` isExpectedQuery(t, actual, expectedParams) @@ -4206,12 +4200,12 @@ from(bucket: params.bucket) actual := impact.Summary.Tasks[0] expectedParams := `option params = { - bucket: "foobar", - start: -5d, - stop: now(), - name: "min", - floatVal: 33.3, - minVal: 3, + bucket: "foobar", + start: -5d, + stop: now(), + name: "min", + floatVal: 33.3, + minVal: 3, }` isExpectedQuery(t, actual, expectedParams) diff --git a/http/check_test.go b/http/check_test.go index e6af29ebd5a..a63f0d8df08 100644 --- a/http/check_test.go +++ b/http/check_test.go @@ -418,7 +418,7 @@ func TestService_handleGetCheckQuery(t *testing.T) { wants: wants{ statusCode: http.StatusOK, contentType: "application/json; charset=utf-8", - body: "{\"flux\":\"package main\\nimport \\\"influxdata/influxdb/monitor\\\"\\nimport \\\"influxdata/influxdb/v1\\\"\\n\\ndata = from(bucket: \\\"foo\\\")\\n\\t|\\u003e range(start: -1h)\\n\\t|\\u003e filter(fn: (r) =\\u003e\\n\\t\\t(r._field == \\\"usage_idle\\\"))\\n\\t|\\u003e aggregateWindow(every: 1h, fn: mean, createEmpty: false)\\n\\noption task = {name: \\\"hello\\\", every: 1h}\\n\\ncheck = {\\n\\t_check_id: \\\"020f755c3c082000\\\",\\n\\t_check_name: \\\"hello\\\",\\n\\t_type: \\\"threshold\\\",\\n\\ttags: {aaa: \\\"vaaa\\\", bbb: \\\"vbbb\\\"},\\n}\\nok = (r) =\\u003e\\n\\t(r[\\\"usage_idle\\\"] \\u003e 10.0)\\ninfo = (r) =\\u003e\\n\\t(r[\\\"usage_idle\\\"] \\u003c 40.0)\\nwarn = (r) =\\u003e\\n\\t(r[\\\"usage_idle\\\"] \\u003c 40.0 and r[\\\"usage_idle\\\"] \\u003e 10.0)\\ncrit = (r) =\\u003e\\n\\t(r[\\\"usage_idle\\\"] \\u003c 40.0 and r[\\\"usage_idle\\\"] \\u003e 10.0)\\nmessageFn = (r) =\\u003e\\n\\t(\\\"whoa! {check.yeah}\\\")\\n\\ndata\\n\\t|\\u003e v1[\\\"fieldsAsCols\\\"]()\\n\\t|\\u003e monitor[\\\"check\\\"](\\n\\t\\tdata: check,\\n\\t\\tmessageFn: messageFn,\\n\\t\\tok: ok,\\n\\t\\tinfo: info,\\n\\t\\twarn: warn,\\n\\t\\tcrit: crit,\\n\\t)\"}\n", + body: "{\"flux\":\"import \\\"influxdata/influxdb/monitor\\\"\\nimport \\\"influxdata/influxdb/v1\\\"\\n\\ndata = from(bucket: \\\"foo\\\") |\\u003e range(start: -1h) |\\u003e filter(fn: (r) =\\u003e r._field == \\\"usage_idle\\\")\\n |\\u003e aggregateWindow(every: 1h, fn: mean, createEmpty: false)\\n\\noption task = {name: \\\"hello\\\", every: 1h}\\n\\ncheck = {_check_id: \\\"020f755c3c082000\\\", _check_name: \\\"hello\\\", _type: \\\"threshold\\\", tags: {aaa: \\\"vaaa\\\", bbb: \\\"vbbb\\\"}}\\nok = (r) =\\u003e r[\\\"usage_idle\\\"] \\u003e 10.0\\ninfo = (r) =\\u003e r[\\\"usage_idle\\\"] \\u003c 40.0\\nwarn = (r) =\\u003e r[\\\"usage_idle\\\"] \\u003c 40.0 and r[\\\"usage_idle\\\"] \\u003e 10.0\\ncrit = (r) =\\u003e r[\\\"usage_idle\\\"] \\u003c 40.0 and r[\\\"usage_idle\\\"] \\u003e 10.0\\nmessageFn = (r) =\\u003e \\\"whoa! {check.yeah}\\\"\\n\\ndata |\\u003e v1[\\\"fieldsAsCols\\\"]() |\\u003e monitor[\\\"check\\\"](\\n data: check,\\n messageFn: messageFn,\\n ok: ok,\\n info: info,\\n warn: warn,\\n crit: crit,\\n)\"}\n", }, }, } diff --git a/notification/check/check_test.go b/notification/check/check_test.go index 3c26ba0d43a..f70e286833a 100644 --- a/notification/check/check_test.go +++ b/notification/check/check_test.go @@ -2,10 +2,14 @@ package check_test import ( "encoding/json" - "github.com/influxdata/influxdb/v2/kit/platform/errors" "testing" "time" + "github.com/influxdata/flux/ast" + "github.com/influxdata/flux/ast/astutil" + "github.com/influxdata/influxdb/v2/kit/platform/errors" + "github.com/stretchr/testify/require" + "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/influxdata/flux/parser" @@ -282,3 +286,12 @@ func TestJSON(t *testing.T) { t.Run(c.name, fn) } } + +func mustFormatPackage(t *testing.T, pkg *ast.Package) string { + if len(pkg.Files) == 0 { + t.Fatal("package expected to have at least one file") + } + v, err := astutil.Format(pkg.Files[0]) + require.NoError(t, err) + return v +} diff --git a/notification/check/custom.go b/notification/check/custom.go index 2980d7ba048..6c4511a6fb9 100644 --- a/notification/check/custom.go +++ b/notification/check/custom.go @@ -5,6 +5,7 @@ import ( "time" "github.com/influxdata/flux/ast" + "github.com/influxdata/flux/ast/astutil" "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/kit/platform" "github.com/influxdata/influxdb/v2/kit/platform/errors" @@ -98,7 +99,7 @@ func (c Custom) sanitizeFlux(lang fluxlang.FluxLanguageService) (string, error) } }) - return ast.Format(p), nil + return astutil.Format(p.Files[0]) } func propertyHasValue(prop *ast.Property, key string, value string) bool { diff --git a/notification/check/custom_test.go b/notification/check/custom_test.go index f734e8cb010..dcd35905f6b 100644 --- a/notification/check/custom_test.go +++ b/notification/check/custom_test.go @@ -6,7 +6,6 @@ import ( "testing" "github.com/andreyvit/diff" - "github.com/influxdata/flux/ast" "github.com/influxdata/flux/parser" "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/notification/check" @@ -28,10 +27,8 @@ import "influxdata/influxdb/v1" data = from(bucket: "_tasks") |> range(start: -1m) -|> filter(fn: (r) => - (r._measurement == "runs")) -|> filter(fn: (r) => - (r._field == "finishedAt")) +|> filter(fn: (r) => r._measurement == "runs") +|> filter(fn: (r) => r._field == "finishedAt") |> aggregateWindow(every: 1m, fn: mean, createEmpty: false) option task = {name: "moo", every: 1m, offset: 0s} @@ -110,13 +107,13 @@ data ID: 10, Name: "moo", Query: influxdb.DashboardQuery{ - Text: ast.Format(parser.ParseSource(fmt.Sprintf(validQuery, "000000000000000a"))), + Text: mustFormatPackage(t, parser.ParseSource(fmt.Sprintf(validQuery, "000000000000000a"))), }, }, }, wants: wants{ err: nil, - script: ast.Format(parser.ParseSource(fmt.Sprintf(validQuery, "000000000000000a"))), + script: mustFormatPackage(t, parser.ParseSource(fmt.Sprintf(validQuery, "000000000000000a"))), }, }, { @@ -126,13 +123,13 @@ data ID: 10, Name: "moo", Query: influxdb.DashboardQuery{ - Text: ast.Format(parser.ParseSource(fmt.Sprintf(validQuery, "000000000000000b"))), + Text: mustFormatPackage(t, parser.ParseSource(fmt.Sprintf(validQuery, "000000000000000b"))), }, }, }, wants: wants{ err: nil, - script: ast.Format(parser.ParseSource(fmt.Sprintf(validQuery, "000000000000000a"))), + script: mustFormatPackage(t, parser.ParseSource(fmt.Sprintf(validQuery, "000000000000000a"))), }, }, { @@ -157,7 +154,7 @@ data ID: 10, Name: "moo", Query: influxdb.DashboardQuery{ - Text: ast.Format(parser.ParseSource(fmt.Sprintf(invalidTaskQuery, "000000000000000b"))), + Text: mustFormatPackage(t, parser.ParseSource(fmt.Sprintf(invalidTaskQuery, "000000000000000b"))), }, }, }, diff --git a/notification/check/deadman.go b/notification/check/deadman.go index b96a366a15b..bf1bb93e1ad 100644 --- a/notification/check/deadman.go +++ b/notification/check/deadman.go @@ -6,6 +6,7 @@ import ( "strings" "github.com/influxdata/flux/ast" + "github.com/influxdata/flux/ast/astutil" "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/notification" "github.com/influxdata/influxdb/v2/notification/flux" @@ -33,18 +34,18 @@ func (c Deadman) Type() string { // GenerateFlux returns a flux script for the Deadman provided. func (c Deadman) GenerateFlux(lang fluxlang.FluxLanguageService) (string, error) { - p, err := c.GenerateFluxAST(lang) + f, err := c.GenerateFluxAST(lang) if err != nil { return "", err } - return ast.Format(p), nil + return astutil.Format(f) } // GenerateFluxAST returns a flux AST for the deadman provided. If there // are any errors in the flux that the user provided the function will return // an error for each error found when the script is parsed. -func (c Deadman) GenerateFluxAST(lang fluxlang.FluxLanguageService) (*ast.Package, error) { +func (c Deadman) GenerateFluxAST(lang fluxlang.FluxLanguageService) (*ast.File, error) { p, err := query.Parse(lang, c.Query.Text) if p == nil { return nil, err @@ -69,7 +70,7 @@ func (c Deadman) GenerateFluxAST(lang fluxlang.FluxLanguageService) (*ast.Packag f.Imports = append(f.Imports, flux.Imports("influxdata/influxdb/monitor", "experimental", "influxdata/influxdb/v1")...) f.Body = append(f.Body, c.generateFluxASTBody()...) - return p, nil + return f, nil } func (c Deadman) generateFluxASTBody() []ast.Statement { diff --git a/notification/check/deadman_test.go b/notification/check/deadman_test.go index 2f6d574e031..5cd7f90fbcf 100644 --- a/notification/check/deadman_test.go +++ b/notification/check/deadman_test.go @@ -7,6 +7,8 @@ import ( "github.com/influxdata/influxdb/v2/notification" "github.com/influxdata/influxdb/v2/notification/check" "github.com/influxdata/influxdb/v2/query/fluxlang" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestDeadman_GenerateFlux(t *testing.T) { @@ -58,31 +60,20 @@ func TestDeadman_GenerateFlux(t *testing.T) { }, }, wants: wants{ - script: `package main -import "influxdata/influxdb/monitor" + script: `import "influxdata/influxdb/monitor" import "experimental" import "influxdata/influxdb/v1" -data = from(bucket: "foo") - |> range(start: -10m) +data = from(bucket: "foo") |> range(start: -10m) option task = {name: "moo", every: 1h} -check = { - _check_id: "000000000000000a", - _check_name: "moo", - _type: "deadman", - tags: {aaa: "vaaa", bbb: "vbbb"}, -} -info = (r) => - (r["dead"]) -messageFn = (r) => - ("whoa! {r[\"dead\"]}") - -data - |> v1["fieldsAsCols"]() - |> monitor["deadman"](t: experimental["subDuration"](from: now(), d: 60s)) - |> monitor["check"](data: check, messageFn: messageFn, info: info)`, +check = {_check_id: "000000000000000a", _check_name: "moo", _type: "deadman", tags: {aaa: "vaaa", bbb: "vbbb"}} +info = (r) => r["dead"] +messageFn = (r) => "whoa! {r[\"dead\"]}" + +data |> v1["fieldsAsCols"]() |> monitor["deadman"](t: experimental["subDuration"](from: now(), d: 60s)) + |> monitor["check"](data: check, messageFn: messageFn, info: info)`, }, }, { @@ -121,31 +112,20 @@ data }, }, wants: wants{ - script: `package main -import "influxdata/influxdb/monitor" + script: `import "influxdata/influxdb/monitor" import "experimental" import "influxdata/influxdb/v1" -data = from(bucket: "foo") - |> range(start: -10m) +data = from(bucket: "foo") |> range(start: -10m) option task = {name: "moo", every: 1h} -check = { - _check_id: "000000000000000a", - _check_name: "moo", - _type: "deadman", - tags: {aaa: "vaaa", bbb: "vbbb"}, -} -info = (r) => - (r["dead"]) -messageFn = (r) => - ("whoa! {r[\"dead\"]}") - -data - |> v1["fieldsAsCols"]() - |> monitor["deadman"](t: experimental["subDuration"](from: now(), d: 60s)) - |> monitor["check"](data: check, messageFn: messageFn, info: info)`, +check = {_check_id: "000000000000000a", _check_name: "moo", _type: "deadman", tags: {aaa: "vaaa", bbb: "vbbb"}} +info = (r) => r["dead"] +messageFn = (r) => "whoa! {r[\"dead\"]}" + +data |> v1["fieldsAsCols"]() |> monitor["deadman"](t: experimental["subDuration"](from: now(), d: 60s)) + |> monitor["check"](data: check, messageFn: messageFn, info: info)`, }, }, { @@ -184,33 +164,20 @@ data }, }, wants: wants{ - script: `package main -import "influxdata/influxdb/monitor" + script: `import "influxdata/influxdb/monitor" import "experimental" import "influxdata/influxdb/v1" -data = from(bucket: "foo") - |> range(start: -10m) - |> filter(fn: (r) => - (r._field == "usage user")) +data = from(bucket: "foo") |> range(start: -10m) |> filter(fn: (r) => r._field == "usage user") option task = {name: "moo", every: 1h} -check = { - _check_id: "000000000000000a", - _check_name: "moo", - _type: "deadman", - tags: {aaa: "vaaa", bbb: "vbbb"}, -} -info = (r) => - (r["dead"]) -messageFn = (r) => - ("whoa! {r[\"dead\"]}") - -data - |> v1["fieldsAsCols"]() - |> monitor["deadman"](t: experimental["subDuration"](from: now(), d: 60s)) - |> monitor["check"](data: check, messageFn: messageFn, info: info)`, +check = {_check_id: "000000000000000a", _check_name: "moo", _type: "deadman", tags: {aaa: "vaaa", bbb: "vbbb"}} +info = (r) => r["dead"] +messageFn = (r) => "whoa! {r[\"dead\"]}" + +data |> v1["fieldsAsCols"]() |> monitor["deadman"](t: experimental["subDuration"](from: now(), d: 60s)) + |> monitor["check"](data: check, messageFn: messageFn, info: info)`, }, }, } @@ -218,13 +185,8 @@ data for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { s, err := tt.args.deadman.GenerateFlux(fluxlang.DefaultService) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - if exp, got := tt.wants.script, s; exp != got { - t.Errorf("expected:\n%v\n\ngot:\n%v\n", exp, got) - } + require.NoError(t, err) + assert.Equal(t, tt.wants.script, s) }) } diff --git a/notification/check/threshold.go b/notification/check/threshold.go index 78be942f49e..249a1a69cbd 100644 --- a/notification/check/threshold.go +++ b/notification/check/threshold.go @@ -6,6 +6,7 @@ import ( "strings" "github.com/influxdata/flux/ast" + "github.com/influxdata/flux/ast/astutil" "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/kit/platform/errors" "github.com/influxdata/influxdb/v2/notification" @@ -107,18 +108,18 @@ func multiError(errs []error) error { // are any errors in the flux that the user provided the function will return // an error for each error found when the script is parsed. func (t Threshold) GenerateFlux(lang fluxlang.FluxLanguageService) (string, error) { - p, err := t.GenerateFluxAST(lang) + f, err := t.GenerateFluxAST(lang) if err != nil { return "", err } - return ast.Format(p), nil + return astutil.Format(f) } // GenerateFluxAST returns a flux AST for the threshold provided. If there // are any errors in the flux that the user provided the function will return // an error for each error found when the script is parsed. -func (t Threshold) GenerateFluxAST(lang fluxlang.FluxLanguageService) (*ast.Package, error) { +func (t Threshold) GenerateFluxAST(lang fluxlang.FluxLanguageService) (*ast.File, error) { p, err := query.Parse(lang, t.Query.Text) if p == nil { return nil, err @@ -148,7 +149,7 @@ func (t Threshold) GenerateFluxAST(lang fluxlang.FluxLanguageService) (*ast.Pack f.Imports = append(f.Imports, flux.Imports("influxdata/influxdb/monitor", "influxdata/influxdb/v1")...) f.Body = append(f.Body, t.generateFluxASTBody(fields[0])...) - return p, nil + return f, nil } // TODO(desa): we'll likely want something slightly more sophisitcated long term, but this should work for now. diff --git a/notification/check/threshold_test.go b/notification/check/threshold_test.go index aa4710df5d4..efe5008f271 100644 --- a/notification/check/threshold_test.go +++ b/notification/check/threshold_test.go @@ -3,12 +3,12 @@ package check_test import ( "testing" - "github.com/influxdata/flux/ast" "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/notification" "github.com/influxdata/influxdb/v2/notification/check" "github.com/influxdata/influxdb/v2/query/fluxlang" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestThreshold_GenerateFlux(t *testing.T) { @@ -77,45 +77,29 @@ func TestThreshold_GenerateFlux(t *testing.T) { }, }, wants: wants{ - script: `package main -import "influxdata/influxdb/monitor" + script: `import "influxdata/influxdb/monitor" import "influxdata/influxdb/v1" -data = from(bucket: "foo") - |> range(start: -1h) - |> filter(fn: (r) => - (r._field == "usage_user")) - |> aggregateWindow(every: 1h, fn: mean, createEmpty: false) +data = from(bucket: "foo") |> range(start: -1h) |> filter(fn: (r) => r._field == "usage_user") + |> aggregateWindow(every: 1h, fn: mean, createEmpty: false) option task = {name: "moo", every: 1h} -check = { - _check_id: "000000000000000a", - _check_name: "moo", - _type: "threshold", - tags: {aaa: "vaaa", bbb: "vbbb"}, -} -ok = (r) => - (r["usage_user"] > 10.0) -info = (r) => - (r["usage_user"] < 40.0) -warn = (r) => - (r["usage_user"] < 40.0 and r["usage_user"] > 10.0) -crit = (r) => - (r["usage_user"] < 10.0 or r["usage_user"] > 40.0) -messageFn = (r) => - ("whoa! {r[\"usage_user\"]}") +check = {_check_id: "000000000000000a", _check_name: "moo", _type: "threshold", tags: {aaa: "vaaa", bbb: "vbbb"}} +ok = (r) => r["usage_user"] > 10.0 +info = (r) => r["usage_user"] < 40.0 +warn = (r) => r["usage_user"] < 40.0 and r["usage_user"] > 10.0 +crit = (r) => r["usage_user"] < 10.0 or r["usage_user"] > 40.0 +messageFn = (r) => "whoa! {r[\"usage_user\"]}" -data - |> v1["fieldsAsCols"]() - |> monitor["check"]( - data: check, - messageFn: messageFn, - ok: ok, - info: info, - warn: warn, - crit: crit, - )`, +data |> v1["fieldsAsCols"]() |> monitor["check"]( + data: check, + messageFn: messageFn, + ok: ok, + info: info, + warn: warn, + crit: crit, +)`, }, }, { @@ -168,45 +152,29 @@ data }, }, wants: wants{ - script: `package main -import "influxdata/influxdb/monitor" + script: `import "influxdata/influxdb/monitor" import "influxdata/influxdb/v1" -data = from(bucket: "foo") - |> range(start: -1h) - |> filter(fn: (r) => - (r._field == "usage_user")) - |> aggregateWindow(every: 1h, fn: mean, createEmpty: false) +data = from(bucket: "foo") |> range(start: -1h) |> filter(fn: (r) => r._field == "usage_user") + |> aggregateWindow(every: 1h, fn: mean, createEmpty: false) option task = {name: "moo", every: 1h} -check = { - _check_id: "000000000000000a", - _check_name: "moo", - _type: "threshold", - tags: {aaa: "vaaa", bbb: "vbbb"}, -} -ok = (r) => - (r["usage_user"] > 10.0) -info = (r) => - (r["usage_user"] < 40.0) -warn = (r) => - (r["usage_user"] < 40.0 and r["usage_user"] > 10.0) -crit = (r) => - (r["usage_user"] < 10.0 or r["usage_user"] > 40.0) -messageFn = (r) => - ("whoa! {r[\"usage_user\"]}") +check = {_check_id: "000000000000000a", _check_name: "moo", _type: "threshold", tags: {aaa: "vaaa", bbb: "vbbb"}} +ok = (r) => r["usage_user"] > 10.0 +info = (r) => r["usage_user"] < 40.0 +warn = (r) => r["usage_user"] < 40.0 and r["usage_user"] > 10.0 +crit = (r) => r["usage_user"] < 10.0 or r["usage_user"] > 40.0 +messageFn = (r) => "whoa! {r[\"usage_user\"]}" -data - |> v1["fieldsAsCols"]() - |> monitor["check"]( - data: check, - messageFn: messageFn, - ok: ok, - info: info, - warn: warn, - crit: crit, - )`, +data |> v1["fieldsAsCols"]() |> monitor["check"]( + data: check, + messageFn: messageFn, + ok: ok, + info: info, + warn: warn, + crit: crit, +)`, }, }, { @@ -259,45 +227,29 @@ data }, }, wants: wants{ - script: `package main -import "influxdata/influxdb/monitor" + script: `import "influxdata/influxdb/monitor" import "influxdata/influxdb/v1" -data = from(bucket: "foo") - |> range(start: -1h) - |> filter(fn: (r) => - (r._field == "usage user")) - |> aggregateWindow(every: 1h, fn: mean, createEmpty: false) +data = from(bucket: "foo") |> range(start: -1h) |> filter(fn: (r) => r._field == "usage user") + |> aggregateWindow(every: 1h, fn: mean, createEmpty: false) option task = {name: "moo", every: 1h} -check = { - _check_id: "000000000000000a", - _check_name: "moo", - _type: "threshold", - tags: {aaa: "vaaa", bbb: "vbbb"}, -} -ok = (r) => - (r["usage user"] > 10.0) -info = (r) => - (r["usage user"] < 40.0) -warn = (r) => - (r["usage user"] < 40.0 and r["usage user"] > 10.0) -crit = (r) => - (r["usage user"] < 10.0 or r["usage user"] > 40.0) -messageFn = (r) => - ("whoa! {r[\"usage user\"]}") +check = {_check_id: "000000000000000a", _check_name: "moo", _type: "threshold", tags: {aaa: "vaaa", bbb: "vbbb"}} +ok = (r) => r["usage user"] > 10.0 +info = (r) => r["usage user"] < 40.0 +warn = (r) => r["usage user"] < 40.0 and r["usage user"] > 10.0 +crit = (r) => r["usage user"] < 10.0 or r["usage user"] > 40.0 +messageFn = (r) => "whoa! {r[\"usage user\"]}" -data - |> v1["fieldsAsCols"]() - |> monitor["check"]( - data: check, - messageFn: messageFn, - ok: ok, - info: info, - warn: warn, - crit: crit, - )`, +data |> v1["fieldsAsCols"]() |> monitor["check"]( + data: check, + messageFn: messageFn, + ok: ok, + info: info, + warn: warn, + crit: crit, +)`, }, }, { @@ -350,59 +302,38 @@ data }, }, wants: wants{ - script: `package main -import "influxdata/influxdb/monitor" + script: `import "influxdata/influxdb/monitor" import "influxdata/influxdb/v1" -data = from(bucket: "foo") - |> range(start: -1h) - |> filter(fn: (r) => - (r._field == "usage_user")) - |> aggregateWindow(every: 1h, fn: mean, createEmpty: false) +data = from(bucket: "foo") |> range(start: -1h) |> filter(fn: (r) => r._field == "usage_user") + |> aggregateWindow(every: 1h, fn: mean, createEmpty: false) option task = {name: "moo", every: 1h} -check = { - _check_id: "000000000000000a", - _check_name: "moo", - _type: "threshold", - tags: {aaa: "vaaa", bbb: "vbbb"}, -} -ok = (r) => - (r["usage_user"] > 10.0) -info = (r) => - (r["usage_user"] < 40.0) -warn = (r) => - (r["usage_user"] < 40.0 and r["usage_user"] > 10.0) -crit = (r) => - (r["usage_user"] < 40.0 and r["usage_user"] > 10.0) -messageFn = (r) => - ("whoa! {r[\"usage_user\"]}") +check = {_check_id: "000000000000000a", _check_name: "moo", _type: "threshold", tags: {aaa: "vaaa", bbb: "vbbb"}} +ok = (r) => r["usage_user"] > 10.0 +info = (r) => r["usage_user"] < 40.0 +warn = (r) => r["usage_user"] < 40.0 and r["usage_user"] > 10.0 +crit = (r) => r["usage_user"] < 40.0 and r["usage_user"] > 10.0 +messageFn = (r) => "whoa! {r[\"usage_user\"]}" -data - |> v1["fieldsAsCols"]() - |> monitor["check"]( - data: check, - messageFn: messageFn, - ok: ok, - info: info, - warn: warn, - crit: crit, - )`, +data |> v1["fieldsAsCols"]() |> monitor["check"]( + data: check, + messageFn: messageFn, + ok: ok, + info: info, + warn: warn, + crit: crit, +)`, }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - // TODO(desa): change this to GenerateFlux() when we don't need to code - // around the monitor package not being available. - p, err := tt.args.threshold.GenerateFluxAST(fluxlang.DefaultService) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - assert.Equal(t, tt.wants.script, ast.Format(p)) + s, err := tt.args.threshold.GenerateFlux(fluxlang.DefaultService) + require.NoError(t, err) + assert.Equal(t, tt.wants.script, s) }) } diff --git a/notification/rule/http.go b/notification/rule/http.go index 8f43401da2b..0dfd2e8df88 100644 --- a/notification/rule/http.go +++ b/notification/rule/http.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/influxdata/flux/ast" + "github.com/influxdata/flux/ast/astutil" "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/notification/endpoint" "github.com/influxdata/influxdb/v2/notification/flux" @@ -21,21 +22,12 @@ func (s *HTTP) GenerateFlux(e influxdb.NotificationEndpoint) (string, error) { if !ok { return "", fmt.Errorf("endpoint provided is a %s, not an HTTP endpoint", e.Type()) } - p, err := s.GenerateFluxAST(httpEndpoint) - if err != nil { - return "", err - } - return ast.Format(p), nil + return astutil.Format(s.GenerateFluxAST(httpEndpoint)) } // GenerateFluxAST generates a flux AST for the http notification rule. -func (s *HTTP) GenerateFluxAST(e *endpoint.HTTP) (*ast.Package, error) { - f := flux.File( - s.Name, - s.imports(e), - s.generateFluxASTBody(e), - ) - return &ast.Package{Package: "main", Files: []*ast.File{f}}, nil +func (s *HTTP) GenerateFluxAST(e *endpoint.HTTP) *ast.File { + return flux.File(s.Name, s.imports(e), s.generateFluxASTBody(e)) } func (s *HTTP) imports(e *endpoint.HTTP) []*ast.ImportDeclaration { diff --git a/notification/rule/http_test.go b/notification/rule/http_test.go index f4e86974998..62fee572f41 100644 --- a/notification/rule/http_test.go +++ b/notification/rule/http_test.go @@ -1,9 +1,12 @@ package rule_test import ( - "github.com/influxdata/influxdb/v2/kit/platform" "testing" + "github.com/influxdata/influxdb/v2/kit/platform" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/notification" "github.com/influxdata/influxdb/v2/notification/endpoint" @@ -11,9 +14,7 @@ import ( ) func TestHTTP_GenerateFlux(t *testing.T) { - want := `package main -// foo -import "influxdata/influxdb/monitor" + want := `import "influxdata/influxdb/monitor" import "http" import "json" import "experimental" @@ -22,26 +23,16 @@ option task = {name: "foo", every: 1h, offset: 1s} headers = {"Content-Type": "application/json"} endpoint = http["endpoint"](url: "http://localhost:7777") -notification = { - _notification_rule_id: "0000000000000001", - _notification_rule_name: "foo", - _notification_endpoint_id: "0000000000000002", - _notification_endpoint_name: "foo", -} +notification = {_notification_rule_id: "0000000000000001", _notification_rule_name: "foo", _notification_endpoint_id: "0000000000000002", _notification_endpoint_name: "foo"} statuses = monitor["from"](start: -2h) -crit = statuses - |> filter(fn: (r) => - (r["_level"] == "crit")) -all_statuses = crit - |> filter(fn: (r) => - (r["_time"] >= experimental["subDuration"](from: now(), d: 1h))) +crit = statuses |> filter(fn: (r) => r["_level"] == "crit") +all_statuses = crit |> filter(fn: (r) => r["_time"] >= experimental["subDuration"](from: now(), d: 1h)) -all_statuses - |> monitor["notify"](data: notification, endpoint: endpoint(mapFn: (r) => { - body = {r with _version: 1} +all_statuses |> monitor["notify"](data: notification, endpoint: endpoint(mapFn: (r) => { + body = {r with _version: 1} - return {headers: headers, data: json["encode"](v: body)} - }))` + return {headers: headers, data: json["encode"](v: body)} +}))` s := &rule.HTTP{ Base: rule.Base{ @@ -79,9 +70,7 @@ all_statuses } func TestHTTP_GenerateFlux_basicAuth(t *testing.T) { - want := `package main -// foo -import "influxdata/influxdb/monitor" + want := `import "influxdata/influxdb/monitor" import "http" import "json" import "experimental" @@ -91,26 +80,16 @@ option task = {name: "foo", every: 1h, offset: 1s} headers = {"Content-Type": "application/json", "Authorization": http["basicAuth"](u: secrets["get"](key: "000000000000000e-username"), p: secrets["get"](key: "000000000000000e-password"))} endpoint = http["endpoint"](url: "http://localhost:7777") -notification = { - _notification_rule_id: "0000000000000001", - _notification_rule_name: "foo", - _notification_endpoint_id: "0000000000000002", - _notification_endpoint_name: "foo", -} +notification = {_notification_rule_id: "0000000000000001", _notification_rule_name: "foo", _notification_endpoint_id: "0000000000000002", _notification_endpoint_name: "foo"} statuses = monitor["from"](start: -2h) -crit = statuses - |> filter(fn: (r) => - (r["_level"] == "crit")) -all_statuses = crit - |> filter(fn: (r) => - (r["_time"] >= experimental["subDuration"](from: now(), d: 1h))) - -all_statuses - |> monitor["notify"](data: notification, endpoint: endpoint(mapFn: (r) => { - body = {r with _version: 1} - - return {headers: headers, data: json["encode"](v: body)} - }))` +crit = statuses |> filter(fn: (r) => r["_level"] == "crit") +all_statuses = crit |> filter(fn: (r) => r["_time"] >= experimental["subDuration"](from: now(), d: 1h)) + +all_statuses |> monitor["notify"](data: notification, endpoint: endpoint(mapFn: (r) => { + body = {r with _version: 1} + + return {headers: headers, data: json["encode"](v: body)} +}))` s := &rule.HTTP{ Base: rule.Base{ ID: 1, @@ -154,9 +133,7 @@ all_statuses } func TestHTTP_GenerateFlux_bearer(t *testing.T) { - want := `package main -// foo -import "influxdata/influxdb/monitor" + want := `import "influxdata/influxdb/monitor" import "http" import "json" import "experimental" @@ -166,26 +143,16 @@ option task = {name: "foo", every: 1h, offset: 1s} headers = {"Content-Type": "application/json", "Authorization": "Bearer " + secrets["get"](key: "000000000000000e-token")} endpoint = http["endpoint"](url: "http://localhost:7777") -notification = { - _notification_rule_id: "0000000000000001", - _notification_rule_name: "foo", - _notification_endpoint_id: "0000000000000002", - _notification_endpoint_name: "foo", -} +notification = {_notification_rule_id: "0000000000000001", _notification_rule_name: "foo", _notification_endpoint_id: "0000000000000002", _notification_endpoint_name: "foo"} statuses = monitor["from"](start: -2h) -crit = statuses - |> filter(fn: (r) => - (r["_level"] == "crit")) -all_statuses = crit - |> filter(fn: (r) => - (r["_time"] >= experimental["subDuration"](from: now(), d: 1h))) +crit = statuses |> filter(fn: (r) => r["_level"] == "crit") +all_statuses = crit |> filter(fn: (r) => r["_time"] >= experimental["subDuration"](from: now(), d: 1h)) -all_statuses - |> monitor["notify"](data: notification, endpoint: endpoint(mapFn: (r) => { - body = {r with _version: 1} +all_statuses |> monitor["notify"](data: notification, endpoint: endpoint(mapFn: (r) => { + body = {r with _version: 1} - return {headers: headers, data: json["encode"](v: body)} - }))` + return {headers: headers, data: json["encode"](v: body)} +}))` s := &rule.HTTP{ Base: rule.Base{ @@ -227,9 +194,7 @@ all_statuses } func TestHTTP_GenerateFlux_bearer_every_second(t *testing.T) { - want := `package main -// foo -import "influxdata/influxdb/monitor" + want := `import "influxdata/influxdb/monitor" import "http" import "json" import "experimental" @@ -239,26 +204,16 @@ option task = {name: "foo", every: 5s, offset: 1s} headers = {"Content-Type": "application/json", "Authorization": "Bearer " + secrets["get"](key: "000000000000000e-token")} endpoint = http["endpoint"](url: "http://localhost:7777") -notification = { - _notification_rule_id: "0000000000000001", - _notification_rule_name: "foo", - _notification_endpoint_id: "0000000000000002", - _notification_endpoint_name: "foo", -} +notification = {_notification_rule_id: "0000000000000001", _notification_rule_name: "foo", _notification_endpoint_id: "0000000000000002", _notification_endpoint_name: "foo"} statuses = monitor["from"](start: -10s) -crit = statuses - |> filter(fn: (r) => - (r["_level"] == "crit")) -all_statuses = crit - |> filter(fn: (r) => - (r["_time"] >= experimental["subDuration"](from: now(), d: 5s))) +crit = statuses |> filter(fn: (r) => r["_level"] == "crit") +all_statuses = crit |> filter(fn: (r) => r["_time"] >= experimental["subDuration"](from: now(), d: 5s)) -all_statuses - |> monitor["notify"](data: notification, endpoint: endpoint(mapFn: (r) => { - body = {r with _version: 1} +all_statuses |> monitor["notify"](data: notification, endpoint: endpoint(mapFn: (r) => { + body = {r with _version: 1} - return {headers: headers, data: json["encode"](v: body)} - }))` + return {headers: headers, data: json["encode"](v: body)} +}))` s := &rule.HTTP{ Base: rule.Base{ @@ -290,11 +245,6 @@ all_statuses } f, err := s.GenerateFlux(e) - if err != nil { - t.Fatal(err) - } - - if f != want { - t.Errorf("scripts did not match. want:\n%v\n\ngot:\n%v", want, f) - } + require.NoError(t, err) + assert.Equal(t, want, f) } diff --git a/notification/rule/pagerduty.go b/notification/rule/pagerduty.go index 166acf18064..18ef5e8df1d 100644 --- a/notification/rule/pagerduty.go +++ b/notification/rule/pagerduty.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" + "github.com/influxdata/flux/ast/astutil" "github.com/influxdata/influxdb/v2/kit/platform/errors" "github.com/influxdata/flux/ast" @@ -57,21 +58,16 @@ func (s *PagerDuty) GenerateFlux(e influxdb.NotificationEndpoint) (string, error if !ok { return "", fmt.Errorf("endpoint provided is a %s, not an PagerDuty endpoint", e.Type()) } - p, err := s.GenerateFluxAST(pagerdutyEndpoint) - if err != nil { - return "", err - } - return ast.Format(p), nil + return astutil.Format(s.GenerateFluxAST(pagerdutyEndpoint)) } // GenerateFluxAST generates a flux AST for the pagerduty notification rule. -func (s *PagerDuty) GenerateFluxAST(e *endpoint.PagerDuty) (*ast.Package, error) { - f := flux.File( +func (s *PagerDuty) GenerateFluxAST(e *endpoint.PagerDuty) *ast.File { + return flux.File( s.Name, flux.Imports("influxdata/influxdb/monitor", "pagerduty", "influxdata/influxdb/secrets", "experimental"), s.generateFluxASTBody(e), ) - return &ast.Package{Package: "main", Files: []*ast.File{f}}, nil } func (s *PagerDuty) generateFluxASTBody(e *endpoint.PagerDuty) []ast.Statement { diff --git a/notification/rule/pagerduty_test.go b/notification/rule/pagerduty_test.go index 97d1cef34ed..5b9e53560fb 100644 --- a/notification/rule/pagerduty_test.go +++ b/notification/rule/pagerduty_test.go @@ -59,9 +59,7 @@ func TestPagerDuty_GenerateFlux(t *testing.T) { }, }, }, - script: `package main -// foo -import "influxdata/influxdb/monitor" + script: `import "influxdata/influxdb/monitor" import "pagerduty" import "influxdata/influxdb/secrets" import "experimental" @@ -70,35 +68,23 @@ option task = {name: "foo", every: 1h} pagerduty_secret = secrets["get"](key: "pagerduty_token") pagerduty_endpoint = pagerduty["endpoint"]() -notification = { - _notification_rule_id: "0000000000000001", - _notification_rule_name: "foo", - _notification_endpoint_id: "0000000000000002", - _notification_endpoint_name: "foo", -} -statuses = monitor["from"](start: -2h, fn: (r) => - (r["foo"] == "bar" and r["baz"] == "bang")) -crit = statuses - |> filter(fn: (r) => - (r["_level"] == "crit")) -all_statuses = crit - |> filter(fn: (r) => - (r["_time"] >= experimental["subDuration"](from: now(), d: 1h))) - -all_statuses - |> monitor["notify"](data: notification, endpoint: pagerduty_endpoint(mapFn: (r) => - ({ - routingKey: pagerduty_secret, - client: "influxdata", - clientURL: "http://localhost:7777/host/${r.host}", - class: r._check_name, - group: r["_source_measurement"], - severity: pagerduty["severityFromLevel"](level: r["_level"]), - eventAction: pagerduty["actionFromLevel"](level: r["_level"]), - source: notification["_notification_rule_name"], - summary: r["_message"], - timestamp: time(v: r["_source_timestamp"]), - })))`, +notification = {_notification_rule_id: "0000000000000001", _notification_rule_name: "foo", _notification_endpoint_id: "0000000000000002", _notification_endpoint_name: "foo"} +statuses = monitor["from"](start: -2h, fn: (r) => r["foo"] == "bar" and r["baz"] == "bang") +crit = statuses |> filter(fn: (r) => r["_level"] == "crit") +all_statuses = crit |> filter(fn: (r) => r["_time"] >= experimental["subDuration"](from: now(), d: 1h)) + +all_statuses |> monitor["notify"](data: notification, endpoint: pagerduty_endpoint(mapFn: (r) => ({ + routingKey: pagerduty_secret, + client: "influxdata", + clientURL: "http://localhost:7777/host/${r.host}", + class: r._check_name, + group: r["_source_measurement"], + severity: pagerduty["severityFromLevel"](level: r["_level"]), + eventAction: pagerduty["actionFromLevel"](level: r["_level"]), + source: notification["_notification_rule_name"], + summary: r["_message"], + timestamp: time(v: r["_source_timestamp"]), +})))`, }, { name: "notify on info to crit", @@ -143,9 +129,7 @@ all_statuses }, }, }, - script: `package main -// foo -import "influxdata/influxdb/monitor" + script: `import "influxdata/influxdb/monitor" import "pagerduty" import "influxdata/influxdb/secrets" import "experimental" @@ -154,34 +138,23 @@ option task = {name: "foo", every: 1h} pagerduty_secret = secrets["get"](key: "pagerduty_token") pagerduty_endpoint = pagerduty["endpoint"]() -notification = { - _notification_rule_id: "0000000000000001", - _notification_rule_name: "foo", - _notification_endpoint_id: "0000000000000002", - _notification_endpoint_name: "foo", -} -statuses = monitor["from"](start: -2h, fn: (r) => - (r["foo"] == "bar" and r["baz"] == "bang")) -info_to_crit = statuses - |> monitor["stateChanges"](fromLevel: "info", toLevel: "crit") -all_statuses = info_to_crit - |> filter(fn: (r) => - (r["_time"] >= experimental["subDuration"](from: now(), d: 1h))) - -all_statuses - |> monitor["notify"](data: notification, endpoint: pagerduty_endpoint(mapFn: (r) => - ({ - routingKey: pagerduty_secret, - client: "influxdata", - clientURL: "http://localhost:7777/host/${r.host}", - class: r._check_name, - group: r["_source_measurement"], - severity: pagerduty["severityFromLevel"](level: r["_level"]), - eventAction: pagerduty["actionFromLevel"](level: r["_level"]), - source: notification["_notification_rule_name"], - summary: r["_message"], - timestamp: time(v: r["_source_timestamp"]), - })))`, +notification = {_notification_rule_id: "0000000000000001", _notification_rule_name: "foo", _notification_endpoint_id: "0000000000000002", _notification_endpoint_name: "foo"} +statuses = monitor["from"](start: -2h, fn: (r) => r["foo"] == "bar" and r["baz"] == "bang") +info_to_crit = statuses |> monitor["stateChanges"](fromLevel: "info", toLevel: "crit") +all_statuses = info_to_crit |> filter(fn: (r) => r["_time"] >= experimental["subDuration"](from: now(), d: 1h)) + +all_statuses |> monitor["notify"](data: notification, endpoint: pagerduty_endpoint(mapFn: (r) => ({ + routingKey: pagerduty_secret, + client: "influxdata", + clientURL: "http://localhost:7777/host/${r.host}", + class: r._check_name, + group: r["_source_measurement"], + severity: pagerduty["severityFromLevel"](level: r["_level"]), + eventAction: pagerduty["actionFromLevel"](level: r["_level"]), + source: notification["_notification_rule_name"], + summary: r["_message"], + timestamp: time(v: r["_source_timestamp"]), +})))`, }, { name: "notify on crit or ok to warn", @@ -229,9 +202,7 @@ all_statuses }, }, }, - script: `package main -// foo -import "influxdata/influxdb/monitor" + script: `import "influxdata/influxdb/monitor" import "pagerduty" import "influxdata/influxdb/secrets" import "experimental" @@ -240,38 +211,24 @@ option task = {name: "foo", every: 1h} pagerduty_secret = secrets["get"](key: "pagerduty_token") pagerduty_endpoint = pagerduty["endpoint"]() -notification = { - _notification_rule_id: "0000000000000001", - _notification_rule_name: "foo", - _notification_endpoint_id: "0000000000000002", - _notification_endpoint_name: "foo", -} -statuses = monitor["from"](start: -2h, fn: (r) => - (r["foo"] == "bar" and r["baz"] == "bang")) -crit = statuses - |> filter(fn: (r) => - (r["_level"] == "crit")) -ok_to_warn = statuses - |> monitor["stateChanges"](fromLevel: "ok", toLevel: "warn") -all_statuses = union(tables: [crit, ok_to_warn]) - |> sort(columns: ["_time"]) - |> filter(fn: (r) => - (r["_time"] >= experimental["subDuration"](from: now(), d: 1h))) - -all_statuses - |> monitor["notify"](data: notification, endpoint: pagerduty_endpoint(mapFn: (r) => - ({ - routingKey: pagerduty_secret, - client: "influxdata", - clientURL: "http://localhost:7777/host/${r.host}", - class: r._check_name, - group: r["_source_measurement"], - severity: pagerduty["severityFromLevel"](level: r["_level"]), - eventAction: pagerduty["actionFromLevel"](level: r["_level"]), - source: notification["_notification_rule_name"], - summary: r["_message"], - timestamp: time(v: r["_source_timestamp"]), - })))`, +notification = {_notification_rule_id: "0000000000000001", _notification_rule_name: "foo", _notification_endpoint_id: "0000000000000002", _notification_endpoint_name: "foo"} +statuses = monitor["from"](start: -2h, fn: (r) => r["foo"] == "bar" and r["baz"] == "bang") +crit = statuses |> filter(fn: (r) => r["_level"] == "crit") +ok_to_warn = statuses |> monitor["stateChanges"](fromLevel: "ok", toLevel: "warn") +all_statuses = union(tables: [crit, ok_to_warn]) |> sort(columns: ["_time"]) |> filter(fn: (r) => r["_time"] >= experimental["subDuration"](from: now(), d: 1h)) + +all_statuses |> monitor["notify"](data: notification, endpoint: pagerduty_endpoint(mapFn: (r) => ({ + routingKey: pagerduty_secret, + client: "influxdata", + clientURL: "http://localhost:7777/host/${r.host}", + class: r._check_name, + group: r["_source_measurement"], + severity: pagerduty["severityFromLevel"](level: r["_level"]), + eventAction: pagerduty["actionFromLevel"](level: r["_level"]), + source: notification["_notification_rule_name"], + summary: r["_message"], + timestamp: time(v: r["_source_timestamp"]), +})))`, }, } diff --git a/notification/rule/service/service_external_test.go b/notification/rule/service/service_external_test.go index 7ca93e67215..d009b64575b 100644 --- a/notification/rule/service/service_external_test.go +++ b/notification/rule/service/service_external_test.go @@ -11,7 +11,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/influxdata/flux/ast" - influxdb "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/kit/platform" "github.com/influxdata/influxdb/v2/kit/platform/errors" "github.com/influxdata/influxdb/v2/mock" @@ -295,7 +295,7 @@ func CreateNotificationRule( OwnerID: MustIDBase16("020f755c3c082005"), Name: "name2", Status: "active", - Flux: "package main\n// name2\nimport \"influxdata/influxdb/monitor\"\nimport \"slack\"\nimport \"influxdata/influxdb/secrets\"\nimport \"experimental\"\n\noption task = {name: \"name2\", every: 1h}\n\nslack_secret = secrets[\"get\"](key: \"020f755c3c082001-token\")\nslack_endpoint = slack[\"endpoint\"](token: slack_secret, url: \"http://localhost:7777\")\nnotification = {\n\t_notification_rule_id: \"020f755c3c082001\",\n\t_notification_rule_name: \"name2\",\n\t_notification_endpoint_id: \"020f755c3c082001\",\n\t_notification_endpoint_name: \"foo\",\n}\nstatuses = monitor[\"from\"](start: -2h, fn: (r) =>\n\t(r[\"k1\"] == \"v1\" and r[\"k2\"] == \"v2\"))\ncrit = statuses\n\t|> filter(fn: (r) =>\n\t\t(r[\"_level\"] == \"crit\"))\nall_statuses = crit\n\t|> filter(fn: (r) =>\n\t\t(r[\"_time\"] >= experimental[\"subDuration\"](from: now(), d: 1h)))\n\nall_statuses\n\t|> monitor[\"notify\"](data: notification, endpoint: slack_endpoint(mapFn: (r) =>\n\t\t({channel: \"\", text: \"msg1\", color: if r[\"_level\"] == \"crit\" then \"danger\" else if r[\"_level\"] == \"warn\" then \"warning\" else \"good\"})))", + Flux: "import \"influxdata/influxdb/monitor\"\nimport \"slack\"\nimport \"influxdata/influxdb/secrets\"\nimport \"experimental\"\n\noption task = {name: \"name2\", every: 1h}\n\nslack_secret = secrets[\"get\"](key: \"020f755c3c082001-token\")\nslack_endpoint = slack[\"endpoint\"](token: slack_secret, url: \"http://localhost:7777\")\nnotification = {_notification_rule_id: \"020f755c3c082001\", _notification_rule_name: \"name2\", _notification_endpoint_id: \"020f755c3c082001\", _notification_endpoint_name: \"foo\"}\nstatuses = monitor[\"from\"](start: -2h, fn: (r) => r[\"k1\"] == \"v1\" and r[\"k2\"] == \"v2\")\ncrit = statuses |> filter(fn: (r) => r[\"_level\"] == \"crit\")\nall_statuses = crit |> filter(fn: (r) => r[\"_time\"] >= experimental[\"subDuration\"](from: now(), d: 1h))\n\nall_statuses |> monitor[\"notify\"](data: notification, endpoint: slack_endpoint(mapFn: (r) => ({channel: \"\", text: \"msg1\", color: if r[\"_level\"] == \"crit\" then \"danger\" else if r[\"_level\"] == \"warn\" then \"warning\" else \"good\"})))", Every: "1h", }, }, diff --git a/notification/rule/slack.go b/notification/rule/slack.go index f962e166d6f..1b5c3949d41 100644 --- a/notification/rule/slack.go +++ b/notification/rule/slack.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" + "github.com/influxdata/flux/ast/astutil" "github.com/influxdata/influxdb/v2/kit/platform/errors" "github.com/influxdata/flux/ast" @@ -25,21 +26,16 @@ func (s *Slack) GenerateFlux(e influxdb.NotificationEndpoint) (string, error) { if !ok { return "", fmt.Errorf("endpoint provided is a %s, not an Slack endpoint", e.Type()) } - p, err := s.GenerateFluxAST(slackEndpoint) - if err != nil { - return "", err - } - return ast.Format(p), nil + return astutil.Format(s.GenerateFluxAST(slackEndpoint)) } // GenerateFluxAST generates a flux AST for the slack notification rule. -func (s *Slack) GenerateFluxAST(e *endpoint.Slack) (*ast.Package, error) { - f := flux.File( +func (s *Slack) GenerateFluxAST(e *endpoint.Slack) *ast.File { + return flux.File( s.Name, flux.Imports("influxdata/influxdb/monitor", "slack", "influxdata/influxdb/secrets", "experimental"), s.generateFluxASTBody(e), ) - return &ast.Package{Package: "main", Files: []*ast.File{f}}, nil } func (s *Slack) generateFluxASTBody(e *endpoint.Slack) []ast.Statement { diff --git a/notification/rule/slack_test.go b/notification/rule/slack_test.go index 529d0245a89..06fc866645b 100644 --- a/notification/rule/slack_test.go +++ b/notification/rule/slack_test.go @@ -1,9 +1,10 @@ package rule_test import ( - "github.com/influxdata/influxdb/v2/kit/platform" "testing" + "github.com/influxdata/influxdb/v2/kit/platform" + "github.com/influxdata/flux/ast" "github.com/influxdata/flux/parser" "github.com/influxdata/influxdb/v2" @@ -39,9 +40,7 @@ func TestSlack_GenerateFlux(t *testing.T) { }{ { name: "with any status", - want: `package main -// foo -import "influxdata/influxdb/monitor" + want: `import "influxdata/influxdb/monitor" import "slack" import "influxdata/influxdb/secrets" import "experimental" @@ -49,24 +48,12 @@ import "experimental" option task = {name: "foo", every: 1h} slack_endpoint = slack["endpoint"](url: "http://localhost:7777") -notification = { - _notification_rule_id: "0000000000000001", - _notification_rule_name: "foo", - _notification_endpoint_id: "0000000000000002", - _notification_endpoint_name: "foo", -} -statuses = monitor["from"](start: -2h, fn: (r) => - (r["foo"] == "bar" and r["baz"] == "bang")) -any = statuses - |> filter(fn: (r) => - (true)) -all_statuses = any - |> filter(fn: (r) => - (r["_time"] >= experimental["subDuration"](from: now(), d: 1h))) +notification = {_notification_rule_id: "0000000000000001", _notification_rule_name: "foo", _notification_endpoint_id: "0000000000000002", _notification_endpoint_name: "foo"} +statuses = monitor["from"](start: -2h, fn: (r) => r["foo"] == "bar" and r["baz"] == "bang") +any = statuses |> filter(fn: (r) => true) +all_statuses = any |> filter(fn: (r) => r["_time"] >= experimental["subDuration"](from: now(), d: 1h)) -all_statuses - |> monitor["notify"](data: notification, endpoint: slack_endpoint(mapFn: (r) => - ({channel: "bar", text: "blah", color: if r["_level"] == "crit" then "danger" else if r["_level"] == "warn" then "warning" else "good"})))`, +all_statuses |> monitor["notify"](data: notification, endpoint: slack_endpoint(mapFn: (r) => ({channel: "bar", text: "blah", color: if r["_level"] == "crit" then "danger" else if r["_level"] == "warn" then "warning" else "good"})))`, rule: &rule.Slack{ Channel: "bar", MessageTemplate: "blah", @@ -108,9 +95,7 @@ all_statuses }, { name: "with url", - want: `package main -// foo -import "influxdata/influxdb/monitor" + want: `import "influxdata/influxdb/monitor" import "slack" import "influxdata/influxdb/secrets" import "experimental" @@ -118,27 +103,13 @@ import "experimental" option task = {name: "foo", every: 1h} slack_endpoint = slack["endpoint"](url: "http://localhost:7777") -notification = { - _notification_rule_id: "0000000000000001", - _notification_rule_name: "foo", - _notification_endpoint_id: "0000000000000002", - _notification_endpoint_name: "foo", -} -statuses = monitor["from"](start: -2h, fn: (r) => - (r["foo"] == "bar" and r["baz"] == "bang")) -crit = statuses - |> filter(fn: (r) => - (r["_level"] == "crit")) -info_to_warn = statuses - |> monitor["stateChanges"](fromLevel: "info", toLevel: "warn") -all_statuses = union(tables: [crit, info_to_warn]) - |> sort(columns: ["_time"]) - |> filter(fn: (r) => - (r["_time"] >= experimental["subDuration"](from: now(), d: 1h))) +notification = {_notification_rule_id: "0000000000000001", _notification_rule_name: "foo", _notification_endpoint_id: "0000000000000002", _notification_endpoint_name: "foo"} +statuses = monitor["from"](start: -2h, fn: (r) => r["foo"] == "bar" and r["baz"] == "bang") +crit = statuses |> filter(fn: (r) => r["_level"] == "crit") +info_to_warn = statuses |> monitor["stateChanges"](fromLevel: "info", toLevel: "warn") +all_statuses = union(tables: [crit, info_to_warn]) |> sort(columns: ["_time"]) |> filter(fn: (r) => r["_time"] >= experimental["subDuration"](from: now(), d: 1h)) -all_statuses - |> monitor["notify"](data: notification, endpoint: slack_endpoint(mapFn: (r) => - ({channel: "bar", text: "blah", color: if r["_level"] == "crit" then "danger" else if r["_level"] == "warn" then "warning" else "good"})))`, +all_statuses |> monitor["notify"](data: notification, endpoint: slack_endpoint(mapFn: (r) => ({channel: "bar", text: "blah", color: if r["_level"] == "crit" then "danger" else if r["_level"] == "warn" then "warning" else "good"})))`, rule: &rule.Slack{ Channel: "bar", MessageTemplate: "blah", @@ -184,9 +155,7 @@ all_statuses }, { name: "with token", - want: `package main -// foo -import "influxdata/influxdb/monitor" + want: `import "influxdata/influxdb/monitor" import "slack" import "influxdata/influxdb/secrets" import "experimental" @@ -195,27 +164,13 @@ option task = {name: "foo", every: 1h} slack_secret = secrets["get"](key: "slack_token") slack_endpoint = slack["endpoint"](token: slack_secret) -notification = { - _notification_rule_id: "0000000000000001", - _notification_rule_name: "foo", - _notification_endpoint_id: "0000000000000002", - _notification_endpoint_name: "foo", -} -statuses = monitor["from"](start: -2h, fn: (r) => - (r["foo"] == "bar" and r["baz"] == "bang")) -crit = statuses - |> filter(fn: (r) => - (r["_level"] == "crit")) -info_to_warn = statuses - |> monitor["stateChanges"](fromLevel: "info", toLevel: "warn") -all_statuses = union(tables: [crit, info_to_warn]) - |> sort(columns: ["_time"]) - |> filter(fn: (r) => - (r["_time"] >= experimental["subDuration"](from: now(), d: 1h))) +notification = {_notification_rule_id: "0000000000000001", _notification_rule_name: "foo", _notification_endpoint_id: "0000000000000002", _notification_endpoint_name: "foo"} +statuses = monitor["from"](start: -2h, fn: (r) => r["foo"] == "bar" and r["baz"] == "bang") +crit = statuses |> filter(fn: (r) => r["_level"] == "crit") +info_to_warn = statuses |> monitor["stateChanges"](fromLevel: "info", toLevel: "warn") +all_statuses = union(tables: [crit, info_to_warn]) |> sort(columns: ["_time"]) |> filter(fn: (r) => r["_time"] >= experimental["subDuration"](from: now(), d: 1h)) -all_statuses - |> monitor["notify"](data: notification, endpoint: slack_endpoint(mapFn: (r) => - ({channel: "bar", text: "blah", color: if r["_level"] == "crit" then "danger" else if r["_level"] == "warn" then "warning" else "good"})))`, +all_statuses |> monitor["notify"](data: notification, endpoint: slack_endpoint(mapFn: (r) => ({channel: "bar", text: "blah", color: if r["_level"] == "crit" then "danger" else if r["_level"] == "warn" then "warning" else "good"})))`, rule: &rule.Slack{ Channel: "bar", MessageTemplate: "blah", @@ -263,9 +218,7 @@ all_statuses }, { name: "with token and url", - want: `package main -// foo -import "influxdata/influxdb/monitor" + want: `import "influxdata/influxdb/monitor" import "slack" import "influxdata/influxdb/secrets" import "experimental" @@ -274,27 +227,13 @@ option task = {name: "foo", every: 1h} slack_secret = secrets["get"](key: "slack_token") slack_endpoint = slack["endpoint"](token: slack_secret, url: "http://localhost:7777") -notification = { - _notification_rule_id: "0000000000000001", - _notification_rule_name: "foo", - _notification_endpoint_id: "0000000000000002", - _notification_endpoint_name: "foo", -} -statuses = monitor["from"](start: -2h, fn: (r) => - (r["foo"] == "bar" and r["baz"] == "bang")) -crit = statuses - |> filter(fn: (r) => - (r["_level"] == "crit")) -info_to_warn = statuses - |> monitor["stateChanges"](fromLevel: "info", toLevel: "warn") -all_statuses = union(tables: [crit, info_to_warn]) - |> sort(columns: ["_time"]) - |> filter(fn: (r) => - (r["_time"] >= experimental["subDuration"](from: now(), d: 1h))) +notification = {_notification_rule_id: "0000000000000001", _notification_rule_name: "foo", _notification_endpoint_id: "0000000000000002", _notification_endpoint_name: "foo"} +statuses = monitor["from"](start: -2h, fn: (r) => r["foo"] == "bar" and r["baz"] == "bang") +crit = statuses |> filter(fn: (r) => r["_level"] == "crit") +info_to_warn = statuses |> monitor["stateChanges"](fromLevel: "info", toLevel: "warn") +all_statuses = union(tables: [crit, info_to_warn]) |> sort(columns: ["_time"]) |> filter(fn: (r) => r["_time"] >= experimental["subDuration"](from: now(), d: 1h)) -all_statuses - |> monitor["notify"](data: notification, endpoint: slack_endpoint(mapFn: (r) => - ({channel: "bar", text: "blah", color: if r["_level"] == "crit" then "danger" else if r["_level"] == "warn" then "warning" else "good"})))`, +all_statuses |> monitor["notify"](data: notification, endpoint: slack_endpoint(mapFn: (r) => ({channel: "bar", text: "blah", color: if r["_level"] == "crit" then "danger" else if r["_level"] == "warn" then "warning" else "good"})))`, rule: &rule.Slack{ Channel: "bar", MessageTemplate: "blah", diff --git a/notification/rule/telegram.go b/notification/rule/telegram.go index e31c9b42342..ee30df75768 100644 --- a/notification/rule/telegram.go +++ b/notification/rule/telegram.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" + "github.com/influxdata/flux/ast/astutil" "github.com/influxdata/influxdb/v2/kit/platform/errors" "github.com/influxdata/flux/ast" @@ -26,21 +27,16 @@ func (s *Telegram) GenerateFlux(e influxdb.NotificationEndpoint) (string, error) if !ok { return "", fmt.Errorf("endpoint provided is a %s, not a Telegram endpoint", e.Type()) } - p, err := s.GenerateFluxAST(telegramEndpoint) - if err != nil { - return "", err - } - return ast.Format(p), nil + return astutil.Format(s.GenerateFluxAST(telegramEndpoint)) } // GenerateFluxAST generates a flux AST for the telegram notification rule. -func (s *Telegram) GenerateFluxAST(e *endpoint.Telegram) (*ast.Package, error) { - f := flux.File( +func (s *Telegram) GenerateFluxAST(e *endpoint.Telegram) *ast.File { + return flux.File( s.Name, flux.Imports("influxdata/influxdb/monitor", "contrib/sranka/telegram", "influxdata/influxdb/secrets", "experimental"), s.generateFluxASTBody(e), ) - return &ast.Package{Package: "main", Files: []*ast.File{f}}, nil } func (s *Telegram) generateFluxASTBody(e *endpoint.Telegram) []ast.Statement { diff --git a/notification/rule/telegram_test.go b/notification/rule/telegram_test.go index 67945481035..484a01f6e71 100644 --- a/notification/rule/telegram_test.go +++ b/notification/rule/telegram_test.go @@ -1,9 +1,10 @@ package rule_test import ( - "github.com/influxdata/influxdb/v2/kit/platform/errors" "testing" + "github.com/influxdata/influxdb/v2/kit/platform/errors" + "github.com/andreyvit/diff" "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/notification" @@ -102,9 +103,7 @@ func TestTelegram_GenerateFlux(t *testing.T) { }, }, }, - script: `package main -// foo -import "influxdata/influxdb/monitor" + script: `import "influxdata/influxdb/monitor" import "contrib/sranka/telegram" import "influxdata/influxdb/secrets" import "experimental" @@ -113,24 +112,12 @@ option task = {name: "foo", every: 1h} telegram_secret = secrets["get"](key: "3-key") telegram_endpoint = telegram["endpoint"](token: telegram_secret, disableWebPagePreview: false) -notification = { - _notification_rule_id: "0000000000000001", - _notification_rule_name: "foo", - _notification_endpoint_id: "0000000000000003", - _notification_endpoint_name: "foo", -} -statuses = monitor["from"](start: -2h, fn: (r) => - (r["foo"] == "bar" and r["baz"] == "bang")) -crit = statuses - |> filter(fn: (r) => - (r["_level"] == "crit")) -all_statuses = crit - |> filter(fn: (r) => - (r["_time"] >= experimental["subDuration"](from: now(), d: 1h))) +notification = {_notification_rule_id: "0000000000000001", _notification_rule_name: "foo", _notification_endpoint_id: "0000000000000003", _notification_endpoint_name: "foo"} +statuses = monitor["from"](start: -2h, fn: (r) => r["foo"] == "bar" and r["baz"] == "bang") +crit = statuses |> filter(fn: (r) => r["_level"] == "crit") +all_statuses = crit |> filter(fn: (r) => r["_time"] >= experimental["subDuration"](from: now(), d: 1h)) -all_statuses - |> monitor["notify"](data: notification, endpoint: telegram_endpoint(mapFn: (r) => - ({channel: "-12345", text: "blah", silent: if r["_level"] == "crit" then true else if r["_level"] == "warn" then true else false})))`, +all_statuses |> monitor["notify"](data: notification, endpoint: telegram_endpoint(mapFn: (r) => ({channel: "-12345", text: "blah", silent: if r["_level"] == "crit" then true else if r["_level"] == "warn" then true else false})))`, }, { name: "with DisableWebPagePreview and ParseMode", @@ -174,9 +161,7 @@ all_statuses }, }, }, - script: `package main -// foo -import "influxdata/influxdb/monitor" + script: `import "influxdata/influxdb/monitor" import "contrib/sranka/telegram" import "influxdata/influxdb/secrets" import "experimental" @@ -185,24 +170,12 @@ option task = {name: "foo", every: 1h} telegram_secret = secrets["get"](key: "3-key") telegram_endpoint = telegram["endpoint"](token: telegram_secret, parseMode: "HTML", disableWebPagePreview: true) -notification = { - _notification_rule_id: "0000000000000001", - _notification_rule_name: "foo", - _notification_endpoint_id: "0000000000000003", - _notification_endpoint_name: "foo", -} -statuses = monitor["from"](start: -2h, fn: (r) => - (r["foo"] == "bar" and r["baz"] == "bang")) -any = statuses - |> filter(fn: (r) => - (true)) -all_statuses = any - |> filter(fn: (r) => - (r["_time"] >= experimental["subDuration"](from: now(), d: 1h))) +notification = {_notification_rule_id: "0000000000000001", _notification_rule_name: "foo", _notification_endpoint_id: "0000000000000003", _notification_endpoint_name: "foo"} +statuses = monitor["from"](start: -2h, fn: (r) => r["foo"] == "bar" and r["baz"] == "bang") +any = statuses |> filter(fn: (r) => true) +all_statuses = any |> filter(fn: (r) => r["_time"] >= experimental["subDuration"](from: now(), d: 1h)) -all_statuses - |> monitor["notify"](data: notification, endpoint: telegram_endpoint(mapFn: (r) => - ({channel: "-12345", text: "blah", silent: if r["_level"] == "crit" then true else if r["_level"] == "warn" then true else false})))`, +all_statuses |> monitor["notify"](data: notification, endpoint: telegram_endpoint(mapFn: (r) => ({channel: "-12345", text: "blah", silent: if r["_level"] == "crit" then true else if r["_level"] == "warn" then true else false})))`, }, } diff --git a/pkger/parser_models.go b/pkger/parser_models.go index 292ada656d8..a5e51067b2e 100644 --- a/pkger/parser_models.go +++ b/pkger/parser_models.go @@ -10,6 +10,7 @@ import ( "time" "github.com/influxdata/flux/ast" + "github.com/influxdata/flux/ast/astutil" "github.com/influxdata/flux/ast/edit" "github.com/influxdata/flux/parser" "github.com/influxdata/influxdb/v2" @@ -1304,7 +1305,11 @@ func (q query) DashboardQuery() string { edit.SetOption(files[0], "task", tobj) } } - return ast.Format(files[0]) + // TODO(danmoran): I'm not happy about ignoring this error, but pkger doesn't have adequate error return values + // in the callstack. In most cases errors are simply ignored and the desired output of the operation is skipped. + // If I were to change the contract here, a lot of other things would need to be changed. + s, _ := astutil.Format(files[0]) + return s } type queries []query diff --git a/pkger/parser_test.go b/pkger/parser_test.go index 5782806aa53..91b7a2e79c1 100644 --- a/pkger/parser_test.go +++ b/pkger/parser_test.go @@ -2599,24 +2599,21 @@ spec: // parmas queryText := `option params = { - bucket: "bar", - start: -24h0m0s, - stop: now(), - name: "max", - floatVal: 37.2, - minVal: 10, + bucket: "bar", + start: -24h0m0s, + stop: now(), + name: "max", + floatVal: 37.2, + minVal: 10, } from(bucket: params.bucket) - |> range(start: params.start, stop: params.stop) - |> filter(fn: (r) => - (r._measurement == "processes")) - |> filter(fn: (r) => - (r.floater == params.floatVal)) - |> filter(fn: (r) => - (r._value > params.minVal)) - |> aggregateWindow(every: v.windowPeriod, fn: max) - |> yield(name: params.name)` + |> range(start: params.start, stop: params.stop) + |> filter(fn: (r) => r._measurement == "processes") + |> filter(fn: (r) => r.floater == params.floatVal) + |> filter(fn: (r) => r._value > params.minVal) + |> aggregateWindow(every: v.windowPeriod, fn: max) + |> yield(name: params.name)` q := props.Queries[0] assert.Equal(t, queryText, q.Text) @@ -3599,24 +3596,21 @@ spec: assert.Equal(t, "task-uuid", actual.MetaName) queryText := `option params = { - bucket: "bar", - start: -24h0m0s, - stop: now(), - name: "max", - floatVal: 37.2, - minVal: 10, + bucket: "bar", + start: -24h0m0s, + stop: now(), + name: "max", + floatVal: 37.2, + minVal: 10, } from(bucket: params.bucket) - |> range(start: params.start, stop: params.stop) - |> filter(fn: (r) => - (r._measurement == "processes")) - |> filter(fn: (r) => - (r.floater == params.floatVal)) - |> filter(fn: (r) => - (r._value > params.minVal)) - |> aggregateWindow(every: v.windowPeriod, fn: max) - |> yield(name: params.name)` + |> range(start: params.start, stop: params.stop) + |> filter(fn: (r) => r._measurement == "processes") + |> filter(fn: (r) => r.floater == params.floatVal) + |> filter(fn: (r) => r._value > params.minVal) + |> aggregateWindow(every: v.windowPeriod, fn: max) + |> yield(name: params.name)` assert.Equal(t, queryText, actual.Query) @@ -3732,13 +3726,11 @@ from(bucket: params.bucket) queryText := `option task = {name: "foo", every: 1m0s, offset: 1m0s} from(bucket: "rucket_1") - |> range(start: -5d, stop: -1h) - |> filter(fn: (r) => - (r._measurement == "cpu")) - |> filter(fn: (r) => - (r._field == "usage_idle")) - |> aggregateWindow(every: 1m, fn: mean) - |> yield(name: "mean")` + |> range(start: -5d, stop: -1h) + |> filter(fn: (r) => r._measurement == "cpu") + |> filter(fn: (r) => r._field == "usage_idle") + |> aggregateWindow(every: 1m, fn: mean) + |> yield(name: "mean")` assert.Equal(t, queryText, actual[0].Query) @@ -3763,13 +3755,11 @@ from(bucket: "rucket_1") queryText := `option params = {this: "foo"} from(bucket: "rucket_1") - |> range(start: -5d, stop: -1h) - |> filter(fn: (r) => - (r._measurement == params.this)) - |> filter(fn: (r) => - (r._field == "usage_idle")) - |> aggregateWindow(every: 1m, fn: mean) - |> yield(name: "mean")` + |> range(start: -5d, stop: -1h) + |> filter(fn: (r) => r._measurement == params.this) + |> filter(fn: (r) => r._field == "usage_idle") + |> aggregateWindow(every: 1m, fn: mean) + |> yield(name: "mean")` assert.Equal(t, queryText, actual[0].Query) diff --git a/query/influxql/spectests/aggregates.go b/query/influxql/spectests/aggregates.go index 13a140b0b18..125f4436c99 100644 --- a/query/influxql/spectests/aggregates.go +++ b/query/influxql/spectests/aggregates.go @@ -32,9 +32,7 @@ func init() { return fmt.Sprintf(`SELECT %s(value) FROM db0..cpu`, name), `package main -` + fmt.Sprintf(`from(bucketID: "%s")`, bucketID.String()) + ` - |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) - |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") +` + fmt.Sprintf(`from(bucketID: "%s")`, bucketID.String()) + ` |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") |> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by") |> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"]) |> ` + name + `() diff --git a/query/influxql/spectests/aggregates_with_condition.go b/query/influxql/spectests/aggregates_with_condition.go index 5b8efeec013..19ecd2c5010 100644 --- a/query/influxql/spectests/aggregates_with_condition.go +++ b/query/influxql/spectests/aggregates_with_condition.go @@ -8,9 +8,7 @@ func init() { return fmt.Sprintf(`SELECT %s(value) FROM db0..cpu WHERE host = 'server01'`, name), `package main -` + fmt.Sprintf(`from(bucketID: "%s"`, bucketID.String()) + `) - |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) - |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") +` + fmt.Sprintf(`from(bucketID: "%s"`, bucketID.String()) + `) |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") |> filter(fn: (r) => r["host"] == "server01") |> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by") |> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"]) diff --git a/query/influxql/spectests/aggregates_with_groupby.go b/query/influxql/spectests/aggregates_with_groupby.go index 89110d1f192..c1b19e7c336 100644 --- a/query/influxql/spectests/aggregates_with_groupby.go +++ b/query/influxql/spectests/aggregates_with_groupby.go @@ -8,9 +8,7 @@ func init() { return fmt.Sprintf(`SELECT %s(value) FROM db0..cpu GROUP BY host`, name), `package main -` + fmt.Sprintf(`from(bucketID: "%s"`, bucketID.String()) + `) - |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) - |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") +` + fmt.Sprintf(`from(bucketID: "%s"`, bucketID.String()) + `) |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") |> group(columns: ["_measurement", "_start", "_stop", "_field", "host"], mode: "by") |> keep(columns: ["_measurement", "_start", "_stop", "_field", "host", "_time", "_value"]) |> ` + name + `() diff --git a/query/influxql/spectests/aggregates_with_window.go b/query/influxql/spectests/aggregates_with_window.go index 80df211e86d..e96625dfc0f 100644 --- a/query/influxql/spectests/aggregates_with_window.go +++ b/query/influxql/spectests/aggregates_with_window.go @@ -8,9 +8,7 @@ func init() { return fmt.Sprintf(`SELECT %s(value) FROM db0..cpu WHERE time >= now() - 10m GROUP BY time(1m)`, name), `package main -` + fmt.Sprintf(`from(bucketID: "%s"`, bucketID.String()) + `) - |> range(start: 2010-09-15T08:50:00Z, stop: 2010-09-15T09:00:00Z) - |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") +` + fmt.Sprintf(`from(bucketID: "%s"`, bucketID.String()) + `) |> range(start: 2010-09-15T08:50:00Z, stop: 2010-09-15T09:00:00Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") |> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by") |> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"]) |> window(every: 1m) diff --git a/query/influxql/spectests/aggregates_with_window_offset.go b/query/influxql/spectests/aggregates_with_window_offset.go index e7a06b30f74..63978a7a1c7 100644 --- a/query/influxql/spectests/aggregates_with_window_offset.go +++ b/query/influxql/spectests/aggregates_with_window_offset.go @@ -8,9 +8,7 @@ func init() { return fmt.Sprintf(`SELECT %s(value) FROM db0..cpu WHERE time >= now() - 10m GROUP BY time(5m, 12m)`, name), `package main -` + fmt.Sprintf(`from(bucketID: "%s"`, bucketID.String()) + `) - |> range(start: 2010-09-15T08:50:00Z, stop: 2010-09-15T09:00:00Z) - |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") +` + fmt.Sprintf(`from(bucketID: "%s"`, bucketID.String()) + `) |> range(start: 2010-09-15T08:50:00Z, stop: 2010-09-15T09:00:00Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") |> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by") |> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"]) |> window(every: 5m, start: 1970-01-01T00:02:00Z) diff --git a/query/influxql/spectests/multiple_statements.go b/query/influxql/spectests/multiple_statements.go index bac20eb1f2f..79b7b28c48a 100644 --- a/query/influxql/spectests/multiple_statements.go +++ b/query/influxql/spectests/multiple_statements.go @@ -6,18 +6,14 @@ func init() { `SELECT mean(value) FROM db0..cpu; SELECT max(value) FROM db0..cpu`, `package main -from(bucketID: "") - |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) - |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") +from(bucketID: "") |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") |> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by") |> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"]) |> mean() |> map(fn: (r) => ({r with _time: 1970-01-01T00:00:00Z})) |> rename(columns: {_value: "mean"}) |> yield(name: "0") -from(bucketID: "") - |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) - |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") +from(bucketID: "") |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") |> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by") |> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"]) |> max() diff --git a/query/influxql/spectests/raw.go b/query/influxql/spectests/raw.go index d4cf6bd19a4..1f7534160e9 100644 --- a/query/influxql/spectests/raw.go +++ b/query/influxql/spectests/raw.go @@ -6,9 +6,7 @@ func init() { `SELECT value FROM db0..cpu`, `package main -from(bucketID: "") - |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) - |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") +from(bucketID: "") |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") |> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by") |> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"]) |> rename(columns: {_value: "value"}) diff --git a/query/influxql/spectests/raw_with_condition.go b/query/influxql/spectests/raw_with_condition.go index 6cac226266d..b6bab1e9dec 100644 --- a/query/influxql/spectests/raw_with_condition.go +++ b/query/influxql/spectests/raw_with_condition.go @@ -6,9 +6,7 @@ func init() { `SELECT value FROM db0..cpu WHERE host = 'server01'`, `package main -from(bucketID: "") - |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) - |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") +from(bucketID: "") |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") |> filter(fn: (r) => r["host"] == "server01") |> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by") |> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"]) diff --git a/query/influxql/spectests/raw_with_regex_condition.go b/query/influxql/spectests/raw_with_regex_condition.go index b2a4c25b4cc..b04cb6999b5 100644 --- a/query/influxql/spectests/raw_with_regex_condition.go +++ b/query/influxql/spectests/raw_with_regex_condition.go @@ -6,9 +6,7 @@ func init() { `SELECT value FROM db0..cpu WHERE host =~ /.*er01/`, `package main -from(bucketID: "") - |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) - |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") +from(bucketID: "") |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") |> filter(fn: (r) => r["host"] =~ /.*er01/) |> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by") |> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"]) diff --git a/query/influxql/spectests/retention_policy.go b/query/influxql/spectests/retention_policy.go index 3d05653f1d9..80ef8154b1b 100644 --- a/query/influxql/spectests/retention_policy.go +++ b/query/influxql/spectests/retention_policy.go @@ -8,9 +8,7 @@ func init() { `SELECT value FROM db0.alternate.cpu`, `package main -`+fmt.Sprintf(`from(bucketID: "%s")`, altBucketID.String())+` - |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) - |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") +`+fmt.Sprintf(`from(bucketID: "%s")`, altBucketID.String())+` |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") |> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by") |> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"]) |> rename(columns: {_value: "value"}) diff --git a/query/influxql/spectests/selectors.go b/query/influxql/spectests/selectors.go index 9877a88b56b..8cc232ccfe0 100644 --- a/query/influxql/spectests/selectors.go +++ b/query/influxql/spectests/selectors.go @@ -33,9 +33,7 @@ func init() { return fmt.Sprintf(`SELECT %s(value) FROM db0..cpu`, name), `package main -` + fmt.Sprintf(`from(bucketID: "%s")`, bucketID.String()) + ` - |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) - |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") +` + fmt.Sprintf(`from(bucketID: "%s")`, bucketID.String()) + ` |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") |> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by") |> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"]) |> ` + name + `() diff --git a/query/influxql/spectests/show_databases.go b/query/influxql/spectests/show_databases.go index 2bb1d9b7813..c4af2c4c4aa 100644 --- a/query/influxql/spectests/show_databases.go +++ b/query/influxql/spectests/show_databases.go @@ -8,9 +8,7 @@ func init() { import v1 "influxdata/influxdb/v1" -v1.databases() - |> rename(columns: {databaseName: "name"}) - |> keep(columns: ["name"]) +v1.databases() |> rename(columns: {databaseName: "name"}) |> keep(columns: ["name"]) |> yield(name: "0") `, ), diff --git a/query/influxql/spectests/show_retention_policies.go b/query/influxql/spectests/show_retention_policies.go index b5c455e145c..9d831b436ee 100644 --- a/query/influxql/spectests/show_retention_policies.go +++ b/query/influxql/spectests/show_retention_policies.go @@ -8,9 +8,7 @@ func init() { import v1 "influxdata/influxdb/v1" -v1.databases() - |> filter(fn: (r) => r.databaseName == "telegraf") - |> rename(columns: {retentionPolicy: "name", retentionPeriod: "duration"}) +v1.databases() |> filter(fn: (r) => r.databaseName == "telegraf") |> rename(columns: {retentionPolicy: "name", retentionPeriod: "duration"}) |> set(key: "shardGroupDuration", value: "0") |> set(key: "replicaN", value: "2") |> keep(columns: ["name", "duration", "shardGroupDuration", "replicaN", "default"]) diff --git a/query/influxql/spectests/show_tag_values.go b/query/influxql/spectests/show_tag_values.go index 24cbd4673ae..96c78957f1a 100644 --- a/query/influxql/spectests/show_tag_values.go +++ b/query/influxql/spectests/show_tag_values.go @@ -6,9 +6,7 @@ func init() { `SHOW TAG VALUES ON "db0" WITH KEY = "host"`, `package main -from(bucketID: "") - |> range(start: -1h) - |> keyValues(keyColumns: ["host"]) +from(bucketID: "") |> range(start: -1h) |> keyValues(keyColumns: ["host"]) |> group(columns: ["_measurement", "_key"], mode: "by") |> distinct() |> group(columns: ["_measurement"], mode: "by") diff --git a/query/influxql/spectests/show_tag_values_in_list.go b/query/influxql/spectests/show_tag_values_in_list.go index c49ef125772..44aee79ac7f 100644 --- a/query/influxql/spectests/show_tag_values_in_list.go +++ b/query/influxql/spectests/show_tag_values_in_list.go @@ -6,9 +6,7 @@ func init() { `SHOW TAG VALUES ON "db0" WITH KEY IN ("host", "region")`, `package main -from(bucketID: "") - |> range(start: -1h) - |> keyValues(keyColumns: ["host", "region"]) +from(bucketID: "") |> range(start: -1h) |> keyValues(keyColumns: ["host", "region"]) |> group(columns: ["_measurement", "_key"], mode: "by") |> distinct() |> group(columns: ["_measurement"], mode: "by") diff --git a/query/influxql/spectests/show_tag_values_multiple_measurements.go b/query/influxql/spectests/show_tag_values_multiple_measurements.go index 7db4bc96a8a..e9218710085 100644 --- a/query/influxql/spectests/show_tag_values_multiple_measurements.go +++ b/query/influxql/spectests/show_tag_values_multiple_measurements.go @@ -6,9 +6,7 @@ func init() { `SHOW TAG VALUES ON "db0" FROM "cpu", "mem", "gpu" WITH KEY = "host"`, `package main -from(bucketID: "") - |> range(start: -1h) - |> filter(fn: (r) => r._measurement == "cpu" or (r._measurement == "mem" or r._measurement == "gpu")) +from(bucketID: "") |> range(start: -1h) |> filter(fn: (r) => r._measurement == "cpu" or (r._measurement == "mem" or r._measurement == "gpu")) |> keyValues(keyColumns: ["host"]) |> group(columns: ["_measurement", "_key"], mode: "by") |> distinct() diff --git a/query/influxql/spectests/testing.go b/query/influxql/spectests/testing.go index deca31134cd..3e915b0bc08 100644 --- a/query/influxql/spectests/testing.go +++ b/query/influxql/spectests/testing.go @@ -7,7 +7,9 @@ import ( "testing" "time" + "github.com/influxdata/flux/ast/astutil" platform2 "github.com/influxdata/influxdb/v2/kit/platform" + "github.com/stretchr/testify/require" "github.com/andreyvit/diff" "github.com/influxdata/flux/ast" @@ -84,7 +86,8 @@ func (f *fixture) Run(t *testing.T) { err := ast.GetError(wantAST) t.Fatalf("found parser errors in the want text: %s", err.Error()) } - want := ast.Format(wantAST) + want, err := astutil.Format(wantAST.Files[0]) + require.NoError(t, err) transpiler := influxql.NewTranspilerWithConfig( dbrpMappingSvc, @@ -98,7 +101,8 @@ func (f *fixture) Run(t *testing.T) { if err != nil { t.Fatalf("%s:%d: unexpected error: %s", f.file, f.line, err) } - got := ast.Format(pkg) + got, err := astutil.Format(pkg.Files[0]) + require.NoError(t, err) // Encode both of these to JSON and compare the results. if want != got { diff --git a/task/backend/analytical_storage_test.go b/task/backend/analytical_storage_test.go index 316bb85349f..e510a9b9790 100644 --- a/task/backend/analytical_storage_test.go +++ b/task/backend/analytical_storage_test.go @@ -116,7 +116,7 @@ func TestDeduplicateRuns(t *testing.T) { }, FindRunsFn: func(context.Context, taskmodel.RunFilter) ([]*taskmodel.Run, int, error) { return []*taskmodel.Run{ - &taskmodel.Run{ID: 2, Status: "started"}, + {ID: 2, Status: "started"}, }, 1, nil }, } diff --git a/task/options/options.go b/task/options/options.go index b2f99310a0f..52568fbc85b 100644 --- a/task/options/options.go +++ b/task/options/options.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "strconv" "strings" "time" @@ -46,7 +47,18 @@ type Duration struct { } func (a Duration) String() string { - return ast.Format(&a.Node) + // NOTE: This is a copy of `formatDurationLiteral` from the flux codebase. + // We copy it here so we can break the dependency on the Go formatter in this method without a change in behavior. + // The Rust-based formatter doesn't expose an interface for formatting individual nodes. + builder := strings.Builder{} + formatDuration := func(d ast.Duration) { + builder.WriteString(strconv.FormatInt(d.Magnitude, 10)) + builder.WriteString(d.Unit) + } + for _, d := range a.Node.Values { + formatDuration(d) + } + return builder.String() } // Parse parses a string into a Duration. diff --git a/task/servicetest/servicetest.go b/task/servicetest/servicetest.go index ee068737e44..189abb1746e 100644 --- a/task/servicetest/servicetest.go +++ b/task/servicetest/servicetest.go @@ -415,7 +415,7 @@ func testTaskCRUD(t *testing.T, sys *System) { // Update task: just update an option. newStatus = string(taskmodel.TaskActive) - newFlux = "option task = {\n\tname: \"task-changed #98\",\n\tcron: \"* * * * *\",\n\toffset: 5s,\n\tconcurrency: 100,\n}\n\nfrom(bucket: \"b\")\n\t|> to(bucket: \"two\", orgID: \"000000000000000\")" + newFlux = "option task = {name: \"task-changed #98\", cron: \"* * * * *\", offset: 5s, concurrency: 100}\n\n// This comment should persist.\nfrom(bucket: \"b\")\n |> to(bucket: \"two\", orgID: \"000000000000000\")" f, err = sys.TaskService.UpdateTask(authorizedCtx, origID, taskmodel.TaskUpdate{Options: options.Options{Name: "task-changed #98"}}) if err != nil { t.Fatal(err) @@ -430,7 +430,7 @@ func testTaskCRUD(t *testing.T, sys *System) { // Update task: switch to every. newStatus = string(taskmodel.TaskActive) - newFlux = "option task = {\n\tname: \"task-changed #98\",\n\tevery: 30s,\n\toffset: 5s,\n\tconcurrency: 100,\n}\n\nfrom(bucket: \"b\")\n\t|> to(bucket: \"two\", orgID: \"000000000000000\")" + newFlux = "option task = {name: \"task-changed #98\", every: 30s, offset: 5s, concurrency: 100}\n\n// This comment should persist.\nfrom(bucket: \"b\")\n |> to(bucket: \"two\", orgID: \"000000000000000\")" f, err = sys.TaskService.UpdateTask(authorizedCtx, origID, taskmodel.TaskUpdate{Options: options.Options{Every: *(options.MustParseDuration("30s"))}}) if err != nil { t.Fatal(err) @@ -497,15 +497,10 @@ func testTaskCRUD(t *testing.T, sys *System) { } func testTaskFindTasksPaging(t *testing.T, sys *System) { - script := `option task = { - name: "Task %03d", - cron: "* * * * *", - concurrency: 100, - offset: 10s, -} + script := `option task = {name: "Task %03d", cron: "* * * * *", concurrency: 100, offset: 10s} -from(bucket: "b") - |> to(bucket: "two", orgID: "000000000000000")` +from(bucket: "b") + |> to(bucket: "two", orgID: "000000000000000")` cr := creds(t, sys) @@ -554,15 +549,10 @@ from(bucket: "b") func testTaskFindTasksAfterPaging(t *testing.T, sys *System) { var ( - script = `option task = { - name: "some-unique-task-name", - cron: "* * * * *", - concurrency: 100, - offset: 10s, -} + script = `option task = {name: "some-unique-task-name", cron: "* * * * *", concurrency: 100, offset: 10s} from(bucket: "b") - |> to(bucket: "two", orgID: "000000000000000")` + |> to(bucket: "two", orgID: "000000000000000")` cr = creds(t, sys) tc = taskmodel.TaskCreate{ OrganizationID: cr.OrgID, @@ -636,15 +626,10 @@ from(bucket: "b") //Retrieve the task again to ensure the options are now Every, without Cron or Offset func testTaskOptionsUpdateFull(t *testing.T, sys *System) { - script := `option task = { - name: "task-Options-Update", - cron: "* * * * *", - concurrency: 100, - offset: 10s, -} + script := `option task = {name: "task-Options-Update", cron: "* * * * *", concurrency: 100, offset: 10s} from(bucket: "b") - |> to(bucket: "two", orgID: "000000000000000")` + |> to(bucket: "two", orgID: "000000000000000")` cr := creds(t, sys) @@ -662,7 +647,7 @@ from(bucket: "b") expectedFlux := `option task = {name: "task-Options-Update", every: 10s, concurrency: 100} from(bucket: "b") - |> to(bucket: "two", orgID: "000000000000000")` + |> to(bucket: "two", orgID: "000000000000000")` f, err := sys.TaskService.UpdateTask(authorizedCtx, task.ID, taskmodel.TaskUpdate{Options: options.Options{Offset: &options.Duration{}, Every: *(options.MustParseDuration("10s"))}}) if err != nil { t.Fatal(err) @@ -677,15 +662,10 @@ from(bucket: "b") } }) t.Run("update task with different offset option", func(t *testing.T) { - expectedFlux := `option task = { - name: "task-Options-Update", - every: 10s, - concurrency: 100, - offset: 10s, -} + expectedFlux := `option task = {name: "task-Options-Update", every: 10s, concurrency: 100, offset: 10s} from(bucket: "b") - |> to(bucket: "two", orgID: "000000000000000")` + |> to(bucket: "two", orgID: "000000000000000")` f, err := sys.TaskService.UpdateTask(authorizedCtx, task.ID, taskmodel.TaskUpdate{Options: options.Options{Offset: options.MustParseDuration("10s")}}) if err != nil { t.Fatal(err) @@ -699,14 +679,10 @@ from(bucket: "b") t.Fatalf("flux unexpected updated: %s", diff) } - withoutOffset := `option task = { - name: "task-Options-Update", - every: 10s, - concurrency: 100, -} + withoutOffset := `option task = {name: "task-Options-Update", every: 10s, concurrency: 100} from(bucket: "b") - |> to(bucket: "two", orgID: "000000000000000")` + |> to(bucket: "two", orgID: "000000000000000")` fNoOffset, err := sys.TaskService.UpdateTask(authorizedCtx, task.ID, taskmodel.TaskUpdate{Flux: &withoutOffset}) if err != nil { t.Fatal(err) @@ -1804,25 +1780,17 @@ func creds(t *testing.T, s *System) TestCreds { } const ( - scriptFmt = `option task = { - name: "task #%d", - cron: "* * * * *", - offset: 5s, - concurrency: 100, -} + scriptFmt = `option task = {name: "task #%d", cron: "* * * * *", offset: 5s, concurrency: 100} +// This comment should persist. from(bucket: "b") - |> to(bucket: "two", orgID: "000000000000000")` + |> to(bucket: "two", orgID: "000000000000000")` - scriptDifferentName = `option task = { - name: "task-changed #%d", - cron: "* * * * *", - offset: 5s, - concurrency: 100, -} + scriptDifferentName = `option task = {name: "task-changed #%d", cron: "* * * * *", offset: 5s, concurrency: 100} +// This comment should persist. from(bucket: "b") - |> to(bucket: "two", orgID: "000000000000000")` + |> to(bucket: "two", orgID: "000000000000000")` ) func testTaskType(t *testing.T, sys *System) { diff --git a/task/taskmodel/task.go b/task/taskmodel/task.go index 80292c8a0f1..1f3d4fda064 100644 --- a/task/taskmodel/task.go +++ b/task/taskmodel/task.go @@ -9,6 +9,7 @@ import ( "time" "github.com/influxdata/flux/ast" + "github.com/influxdata/flux/ast/astutil" "github.com/influxdata/flux/ast/edit" "github.com/influxdata/influxdb/v2/kit/platform" errors2 "github.com/influxdata/influxdb/v2/kit/platform/errors" @@ -401,7 +402,10 @@ func (t *TaskUpdate) updateFlux(parser fluxlang.FluxLanguageService, oldFlux str } t.Options.Clear() - s := ast.Format(parsed) + s, err := astutil.Format(parsed) + if err != nil { + return err + } t.Flux = &s } return nil diff --git a/task/taskmodel/task_test.go b/task/taskmodel/task_test.go index 2200bd9be53..f01f6c7169e 100644 --- a/task/taskmodel/task_test.go +++ b/task/taskmodel/task_test.go @@ -67,8 +67,7 @@ func TestOptionsEditWithAST(t *testing.T) { t.Run("fmt string", func(t *testing.T) { expected := `option task = {every: 10s, name: "foo"} -from(bucket: "x") - |> range(start: -1h)` +from(bucket: "x") |> range(start: -1h)` if *tu.Flux != expected { t.Errorf("got the wrong task back, expected %s,\n got %s\n diff: %s", expected, *tu.Flux, cmp.Diff(expected, *tu.Flux)) } @@ -136,8 +135,7 @@ from(bucket: "x") tu.Options.Offset = &options.Duration{} expscript := `option task = {cron: "* * * * *", name: "foo"} -from(bucket: "x") - |> range(start: -1h)` +from(bucket: "x") |> range(start: -1h)` if err := tu.UpdateFlux(fluxlang.DefaultService, `option task = {cron: "* * * * *", name: "foo", offset: 10s} from(bucket:"x") |> range(start:-1h)`); err != nil { t.Fatal(err) }