There are four possibilities how to add properties to the functions.
Flux flux = Flux
.from("telegraf")
.window(15L, ChronoUnit.MINUTES, 20L, ChronoUnit.SECONDS)
.sum();
Flux.from("telegraf")
.window()
.withEvery(15L, ChronoUnit.MINUTES)
.withPeriod(20L, ChronoUnit.SECONDS)
.sum();
Flux.from("telegraf")
.window()
.withPropertyValue("every", 15L, ChronoUnit.MINUTES)
.withPropertyValue("period", 20L, ChronoUnit.SECONDS)
.sum();
Map<String, Object> properties = new HashMap<>();
properties.put("every", new TimeInterval(15L, ChronoUnit.MINUTES));
properties.put("period", new TimeInterval(20L, ChronoUnit.SECONDS));
Flux flux = Flux
.from("telegraf")
.window()
.withPropertyNamed("every")
.withPropertyNamed("period")
.sum();
Starting point for all queries. Get data from the specified database [doc].
bucket
- The name of the bucket to query. [string]hosts
- [array of strings]
Flux flux = Flux.from("telegraf");
Flux flux = Flux
.from("telegraf", new String[]{"192.168.1.200", "192.168.1.100"})
.last();
Flux flux = Flux.from("telegraf")
.expression("map(fn: (r) => r._value * r._value)")
.expression("sum()");
Counts the number of results [doc].
useStartTime
- Use the start time as the timestamp of the resulting aggregate. [boolean]
Flux flux = Flux
.from("telegraf")
.count();
Covariance is an aggregate operation. Covariance computes the covariance between two columns [doc].
columns
- List of columns on which to compute the covariance. Exactly two columns must be provided. [array of strings]pearsonr
- Indicates whether the result should be normalized to be the Pearson R coefficient. [boolean]valueDst
- The column into which the result will be placed. Defaults to_value
. [string]
Flux flux = Flux
.from("telegraf")
.covariance(new String[]{"_value", "_valueSquare"});
Flux flux = Flux
.from("telegraf")
.covariance()
.withColumns(new String[]{"columnA", "columnB"})
.withPearsonr(true)
.withValueDst("_newColumn");
Computes the time based difference between subsequent non null records [doc].
unit
- The time duration to use for the result. [duration]nonNegative
- Indicates if the derivative is allowed to be negative. [boolean]columns
- List of columns on which to compute the derivative. [array of strings]timeSrc
- The source column for the time values. Defaults to_time
. [string]
Flux flux = Flux
.from("telegraf")
.derivative(1L, ChronoUnit.MINUTES);
Flux flux = Flux
.from("telegraf")
.derivative()
.withUnit(10L, ChronoUnit.DAYS)
.withNonNegative(true)
.withColumns(new String[]{"columnCompare_1", "columnCompare_2"})
.withTimeSrc("_timeColumn");
Difference computes the difference between subsequent non null records [doc].
nonNegative
- Indicates if the derivative is allowed to be negative. If a value is encountered which is less than the previous value then it is assumed the previous value should have been a zero. [boolean]columns
- The list of columns on which to compute the difference. Defaults["_value"]
. [array of strings]
Flux flux = Flux
.from("telegraf")
.groupBy("_measurement")
.difference();
Flux flux = Flux
.from("telegraf")
.range(-5L, ChronoUnit.MINUTES)
.difference(new String[]{"_value", "_time"}, false);
Distinct produces the unique values for a given column [doc].
column
- The column on which to track unique values. [string]
Flux flux = Flux
.from("telegraf")
.groupBy("_measurement")
.distinct("_measurement");
Drop will exclude specified columns from a table. Columns to exclude can be specified either through a list, or a predicate function. When a dropped column is part of the group key it will also be dropped from the key [doc].
columns
- The list of columns which should be excluded from the resulting table. Cannot be used withfn
. [array of strings]fn
- The function which takes a column name as a parameter and returns a boolean indicating whether or not the column should be excluded from the resulting table. Cannot be used withcolumns
. [function(column)]
Flux flux = Flux
.from("telegraf")
.drop(new String[]{"host", "_measurement"});
Flux flux = Flux
.from("telegraf")
.drop()
.withFunction("col =~ /free*/");
Duplicate will duplicate a specified column in a table [doc].
column
- The column to duplicate. [string]as
The name that should be assigned to the duplicate column. [string]
Flux flux = Flux
.from("telegraf")
.duplicate("host", "server");
Filters the results using an expression [doc].
fn
- Function to when filtering the records. The function must accept a single parameter which will be the records and return a boolean value. Records which evaluate to true, will be included in the results. [function(record) bool]
Supported Record columns:
_measurement
_field
_start
_stop
_time
_value
custom
- the custom column value byRestrictions.column("_id").notEqual(5)
Supported Record restrictions:
equal
notEqual
less
greater
greater
lessOrEqual
greaterOrEqual
custom
- the custom restriction byRestrictions.value().custom(15L, "=~")
Restrictions restrictions = Restrictions.and(
Restrictions.measurement().equal("mem"),
Restrictions.field().equal("usage_system"),
Restrictions.tag("service").equal("app-server")
);
Flux flux = Flux
.from("telegraf")
.filter(restrictions)
.range(-4L, ChronoUnit.HOURS)
.count();
Restrictions restriction = Restrictions.and(
Restrictions.tag("instance_type").equal(Pattern.compile("/prod/")),
Restrictions.field().greater(10.5D),
Restrictions.time().lessOrEqual(new TimeInterval(-15L, ChronoUnit.HOURS))
);
Flux flux = Flux
.from("telegraf")
.filter(restriction)
.range(-4L, 2L, ChronoUnit.HOURS)
.count();
Returns the first result of the query [doc].
useStartTime
- Use the start time as the timestamp of the resulting aggregate. [boolean]
Flux flux = Flux
.from("telegraf")
.first();
Groups results by a user-specified set of tags [doc].
by
- Group by these specific tag names. Cannot be used withexcept
option. [array of strings]keep
- Keep specific tag keys that were not inby
in the results. [array of strings]except
- Group by all but these tag keys. Cannot be used withby
option. [array of strings]
Flux.from("telegraf")
.range(-30L, ChronoUnit.MINUTES)
.groupBy(new String[]{"tag_a", "tag_b"});
// by + keep
Flux.from("telegraf")
.range(-30L, ChronoUnit.MINUTES)
.groupBy(new String[]{"tag_a", "tag_b"}, new String[]{"tag_c"});
// except + keep
Flux.from("telegraf")
.range(-30L, ChronoUnit.MINUTES)
.groupExcept(new String[]{"tag_a"}, new String[]{"tag_b", "tag_c"});
For each aggregate column, it outputs the area under the curve of non null records. The curve is defined as function where the domain is the record times and the range is the record values. [doc].
unit
- Time duration to use when computing the integral. [duration]
Flux flux = Flux
.from("telegraf")
.integral(1L, ChronoUnit.MINUTES);
Join two time series together on time and the list of on
keys [doc].
tables
- Map of tables to join. Currently only two tables are allowed. [map of tables]on
- List of tag keys that when equal produces a result set. [array of strings]method
- An optional parameter that specifies the type of join to be performed. When not specified, an inner join is performed. [string]
The method parameter may take on any one of the following values:
inner
- inner joincross
- cross productleft
- left outer joinright
- right outer joinouter
- full outer join
The on
parameter and the cross
method are mutually exclusive.
Flux cpu = Flux.from("telegraf")
.filter(Restrictions.and(Restrictions.measurement().equal("cpu"), Restrictions.field().equal("usage_user")))
.range(-30L, ChronoUnit.MINUTES);
Flux mem = Flux.from("telegraf")
.filter(Restrictions.and(Restrictions.measurement().equal("mem"), Restrictions.field().equal("used_percent")))
.range(-30L, ChronoUnit.MINUTES);
Flux flux = Flux.join()
.withTable("cpu", cpu)
.withTable("mem", mem)
.withOn("host");
Keep is the inverse of drop. It will return a table containing only columns that are specified, ignoring all others.
Only columns in the group key that are also specified in keep
will be kept in the resulting group key [doc].
columns
- The list of columns that should be included in the resulting table. Cannot be used withfn
. [array of strings]fn
- The function which takes a column name as a parameter and returns a boolean indicating whether or not the column should be included in the resulting table. Cannot be used withcolumns
. [function(column)]
Flux flux = Flux
.from("telegraf")
.keep(new String[]{"_time", "_value"});
Flux flux = Flux
.from("telegraf")
.keep()
.withFunction("col =~ /inodes*/");
Returns the last result of the query [doc].
useStartTime
- Use the start time as the timestamp of the resulting aggregate. [boolean]
Flux flux = Flux
.from("telegraf")
.last();
Restricts the number of rows returned in the results [doc].
n
- The maximum number of records to output. [int]
Flux flux = Flux
.from("telegraf")
.limit(5);
Applies a function to each row of the table [doc].
fn
- The function to apply to each row. The return value of the function may be a single value or an object.
// Square the value
Restrictions restriction = Restrictions.and(
Restrictions.measurement().equal("cpu"),
Restrictions.field().equal("usage_system"),
Restrictions.tag("service").equal("app-server")
);
Flux flux = Flux
.from("telegraf")
.filter(restriction)
.range(-12L, ChronoUnit.HOURS)
.map("r._value * r._value");
// Square the value and keep the original value
Restrictions restriction = Restrictions.and(
Restrictions.measurement().equal("cpu"),
Restrictions.field().equal("usage_system"),
Restrictions.tag("service").equal("app-server")
);
Flux flux = Flux
.from("telegraf")
.filter(restriction)
.range(-12L, ChronoUnit.HOURS)
.map("{value: r._value, value2:r._value * r._value}");
Returns the max value within the results [doc].
useStartTime
- Use the start time as the timestamp of the resulting aggregate. [boolean]
Flux flux = Flux
.from("telegraf")
.range(-12L, ChronoUnit.HOURS)
.window(10L, ChronoUnit.MINUTES)
.max();
Returns the mean of the values within the results [doc].
useStartTime
- Use the start time as the timestamp of the resulting aggregate. [boolean]
Flux flux = Flux
.from("telegraf")
.range(-12L, ChronoUnit.HOURS)
.window(10L, ChronoUnit.MINUTES)
.mean();
Returns the min value within the results [doc].
useStartTime
- Use the start time as the timestamp of the resulting aggregate. [boolean]
Flux flux = Flux
.from("telegraf")
.range(-12L, ChronoUnit.HOURS)
.window(10L, ChronoUnit.MINUTES)
.min();
Pivot collects values stored vertically (column-wise) in a table and aligns them horizontally (row-wise) into logical sets [doc].
rowKey
- List of columns used to uniquely identify a row for the output. [array of strings]colKey
- List of columns used to pivot values onto each row identified by the rowKey. [array of strings]valueCol
- Identifies the single column that contains the value to be moved around the pivot [string]
Flux flux = Flux.from("telegraf")
.pivot()
.withRowKey(new String[]{"_time"})
.withColKey(new String[]{"_field"})
.withValueCol("_value");
Filters the results by time boundaries [doc].
start
- Specifies the oldest time to be included in the results. [duration or timestamp]stop
- Specifies the exclusive newest time to be included in the results. Defaults to"now"
. [duration or timestamp]
// by interval
Flux flux = Flux
.from("telegraf")
.range(-12L, -1L, ChronoUnit.HOURS)
// by Instant
Flux flux = Flux
.from("telegraf")
.range(Instant.now().minus(4, ChronoUnit.HOURS),
Instant.now().minus(15, ChronoUnit.MINUTES)
);
Rename will rename specified columns in a table. If a column is renamed and is part of the group key, the column name in the group key will be updated [doc].
columns
- The map of columns to rename and their corresponding new names. Cannot be used withfn
. [map of columns]fn
- The function which takes a single string parameter (the old column name) and returns a string representing the new column name. Cannot be used withcolumns
. [function(column)]
Map<String, String> map = new HashMap<>();
map.put("host", "server");
map.put("_value", "val");
Flux flux = Flux
.from("telegraf")
.rename(map);
Flux flux = Flux
.from("telegraf")
.rename("{col}_new");
Sample values from a table [doc].
n
- Sample every Nth element. [int]pos
- Position offset from start of results to begin sampling.pos
must be less thann
. Ifpos
less than 0, a random offset is used. Default is -1 (random offset). [int]
Flux flux = Flux.from("telegraf")
.filter(and(measurement().equal("cpu"), field().equal("usage_system")))
.range(-1L, ChronoUnit.DAYS)
.sample(10);
Flux flux = Flux.from("telegraf")
.filter(and(measurement().equal("cpu"), field().equal("usage_system")))
.range(-1L, ChronoUnit.DAYS)
.sample(5, 1);
Assigns a static value to each record [doc].
key
- Label for the column to set. [string]value
- Value for the column to set. [string]
Flux flux = Flux
.from("telegraf")
.set("location", "Carolina");
Shift add a fixed duration to time columns [doc].
shift
- The amount to add to each time value. [duration]columns
- The list of all columns that should be shifted. Defaults["_start", "_stop", "_time"]
. [array of strings]
Flux flux = Flux
.from("telegraf")
.shift(10L, ChronoUnit.HOURS);
Flux flux = Flux
.from("telegraf")
.shift(10L, ChronoUnit.HOURS, new String[]{"_time", "custom"});
Skew of the results [doc].
useStartTime
- Use the start time as the timestamp of the resulting aggregate. [boolean]
Flux flux = Flux
.from("telegraf")
.range(-30L, -15L, ChronoUnit.MINUTES)
.skew();
Sorts the results by the specified columns. Default sort is ascending [doc].
cols
- List of columns used to sort. Precedence from left to right. Default is"value"
. [array of strings]desc
- Sort results descending. Default false. [boolean]
Flux flux = Flux
.from("telegraf")
.sort(new String[]{"region", "value"});
Flux flux = Flux
.from("telegraf")
.sort(true);
Difference between min and max values [doc].
useStartTime
- Use the start time as the timestamp of the resulting aggregate. [boolean]
Flux flux = Flux
.from("telegraf")
.spread();
Standard Deviation of the results [doc].
useStartTime
- Use the start time as the timestamp of the resulting aggregate. [boolean]
Flux flux = Flux
.from("telegraf")
.stddev();
Sum of the results [doc].
Flux flux = Flux
.from("telegraf")
.sum();
Convert a value to a bool [doc].
Flux flux = Flux
.from("telegraf")
.filter(and(measurement().equal("mem"), field().equal("used")))
.toBool();
Convert a value to a int [doc].
Flux flux = Flux
.from("telegraf")
.filter(and(measurement().equal("mem"), field().equal("used")))
.toInt();
Convert a value to a float [doc].
Flux flux = Flux
.from("telegraf")
.filter(and(measurement().equal("mem"), field().equal("used")))
.toFloat();
Convert a value to a duration [doc].
Flux flux = Flux
.from("telegraf")
.filter(and(measurement().equal("mem"), field().equal("used")))
.toDuration();
Convert a value to a string [doc].
Flux flux = Flux
.from("telegraf")
.filter(and(measurement().equal("mem"), field().equal("used")))
.toString();
Convert a value to a time [doc].
Flux flux = Flux
.from("telegraf")
.filter(and(measurement().equal("mem"), field().equal("used")))
.toTime();
Convert a value to a uint [doc].
Flux flux = Flux
.from("telegraf")
.filter(and(measurement().equal("mem"), field().equal("used")))
.toUInt();
Groups the results by a given time range [doc].
every
- Duration of time between windows. Defaults toperiod's
value. [duration]period
- Duration of the windowed partition. Defaults toperiod's
value. [duration]offset
- The offset duration relative to the location offset. It can be negative, indicating that the offset goes backwards in time. The default aligns the window boundaries to line up with thenow
option time. [time]column
- Name of the time column to use. Defaults to_time
. [string]startCol
- Name of the column containing the window start time. Defaults to_start
. [string]stopCol
- Name of the column containing the window stop time. Defaults to_stop
. [string]
Flux flux = Flux
.from("telegraf")
.window(15L, ChronoUnit.MINUTES)
.max();
Flux flux = Flux
.from("telegraf")
.window(15L, ChronoUnit.MINUTES,
20L, ChronoUnit.SECONDS,
-50L, ChronoUnit.WEEKS,
1L, ChronoUnit.SECONDS)
.max();
Yield a query results to yielded results [doc].
name
- The unique name to give to yielded results. [string]
Flux flux = Flux
.from("telegraf")
.yield("0");
We assume that exist custom function measurement that filter measurement by their name. The Flux
implementation looks like this:
// Define measurement function which accepts table as the piped argument.
measurement = (m, table=<-) => table |> filter(fn: (r) => r._measurement == m)
The Java implementation:
public class FilterMeasurement extends AbstractParametrizedFlux {
public FilterMeasurement(@Nonnull final Flux source) {
super(source);
}
@Nonnull
@Override
protected String operatorName() {
// name of the Flux function
return "measurement";
}
/**
* @param measurement the measurement name. Has to be defined.
* @return this
*/
@Nonnull
public FilterMeasurement withName(@Nonnull final String measurement) {
Arguments.checkNonEmpty(measurement, "Measurement name");
// name of parameter from the Flux function
withPropertyValueEscaped("m", measurement);
return this;
}
}
Using the measurement function:
Flux flux = Flux
.from("telegraf")
.operator(FilterMeasurement.class)
.withName("cpu")
.sum();
The Flux script:
from(bucket:"telegraf")
|> measurement(m: "cpu")
|> sum()