There are four possibilities how to add properties to the functions.
Flux flux = Flux
.from("telegraf")
.window(15L, ChronoUnit.MINUTES, 20L, ChronoUnit.SECONDS)
.sum();
Flux.from("telegraf")
.window()
.withEvery(15L, ChronoUnit.MINUTES)
.withPeriod(20L, ChronoUnit.SECONDS)
.sum();
Flux.from("telegraf")
.window()
.withPropertyValue("every", 15L, ChronoUnit.MINUTES)
.withPropertyValue("period", 20L, ChronoUnit.SECONDS)
.sum();
Map<String, Object> properties = new HashMap<>();
properties.put("every", new TimeInterval(15L, ChronoUnit.MINUTES));
properties.put("period", new TimeInterval(20L, ChronoUnit.SECONDS));
Flux flux = Flux
.from("telegraf")
.window()
.withPropertyNamed("every")
.withPropertyNamed("period")
.sum();
String query = flux.toString(properties);
Starting point for all queries. Get data from the specified database [doc].
bucket
- The name of the bucket to query. [string]hosts
- [array of strings]
Flux flux = Flux.from("telegraf");
Flux flux = Flux
.from("telegraf", new String[]{"192.168.1.200", "192.168.1.100"})
.last();
Flux flux = Flux.from("telegraf")
.expression("map(fn: (r) => r._value * r._value)")
.expression("sum()");
Applies an aggregate or selector function (any function with a column parameter) to fixed windows of time [doc].
every
- The duration of windows. [duration]fn
- The aggregate function used in the operation. [function]column
- The column on which to operate. Defaults to_value
. [string]timeSrc
- The time column from which time is copied for the aggregate record. Defaults to_stop
. [string]timeDst
- The “time destination” column to which time is copied for the aggregate record. Defaults to_time
. [string]createEmpty
- For windows without data, this will create an empty window and fill it with anull
aggregate value. Defaults totrue
. [boolean]
Flux flux = Flux
.from("telegraf")
.aggregateWindow(10L, ChronoUnit.SECONDS, "mean");
Flux flux = Flux
.from("telegraf")
.aggregateWindow()
.withEvery("10s")
.withAggregateFunction("sum")
.withColumn("_value")
.withTimeSrc("_stop")
.withTimeDst("_time")
.withCreateEmpty(true);
Flux flux = Flux
.from("telegraf")
.aggregateWindow()
.withEvery(5L, ChronoUnit.MINUTES)
.withFunction("tables |> quantile(q: 0.99, column:column)");
The columns() function lists the column labels of input tables [doc].
column
- The name of the output column in which to store the column labels. Defaults to_value
. [string]
Flux flux = Flux
.from("telegraf")
.range(-12L, ChronoUnit.HOURS)
.window(10L, ChronoUnit.MINUTES)
.columns();
Counts the number of results [doc].
column
- The column to aggregate. Defaults to_value
. [string]
Flux flux = Flux
.from("telegraf")
.count();
Covariance is an aggregate operation. Covariance computes the covariance between two columns [doc].
columns
- List of columns on which to compute the covariance. Exactly two columns must be provided. [array of strings]pearsonr
- Indicates whether the result should be normalized to be the Pearson R coefficient. [boolean]valueDst
- The column into which the result will be placed. Defaults to_value
. [string]
Flux flux = Flux
.from("telegraf")
.covariance(new String[]{"_value", "_valueSquare"});
Flux flux = Flux
.from("telegraf")
.covariance()
.withColumns(new String[]{"columnA", "columnB"})
.withPearsonr(true)
.withValueDst("_newColumn");
Cumulative sum computes a running sum for non null records in the table. The output table schema will be the same as the input table [doc].
columns
- a list of columns on which to operate [array of strings]
Flux flux = Flux
.from("telegraf")
.cumulativeSum(new String[]{"_value"});
Computes the time based difference between subsequent non null records [doc].
unit
- The time duration to use for the result. [duration]nonNegative
- Indicates if the derivative is allowed to be negative. [boolean]columns
- List of columns on which to compute the derivative. [array of strings]timeColumn
- The source column for the time values. Defaults to_time
. [string]
Flux flux = Flux
.from("telegraf")
.derivative(1L, ChronoUnit.MINUTES);
Flux flux = Flux
.from("telegraf")
.derivative()
.withUnit(10L, ChronoUnit.DAYS)
.withNonNegative(true)
.withColumns(new String[]{"columnCompare_1", "columnCompare_2"})
.withTimeColumn("_timeColumn");
Difference computes the difference between subsequent non null records [doc].
nonNegative
- Indicates if the derivative is allowed to be negative. If a value is encountered which is less than the previous value then it is assumed the previous value should have been a zero. [boolean]columns
- The list of columns on which to compute the difference. Defaults["_value"]
. [array of strings]
Flux flux = Flux
.from("telegraf")
.groupBy("_measurement")
.difference();
Flux flux = Flux
.from("telegraf")
.range(-5L, ChronoUnit.MINUTES)
.difference(new String[]{"_value", "_time"}, false);
Distinct produces the unique values for a given column [doc].
column
- The column on which to track unique values. [string]
Flux flux = Flux
.from("telegraf")
.groupBy("_measurement")
.distinct("_measurement");
Drop will exclude specified columns from a table. Columns to exclude can be specified either through a list, or a predicate function. When a dropped column is part of the group key it will also be dropped from the key [doc].
columns
- The list of columns which should be excluded from the resulting table. Cannot be used withfn
. [array of strings]fn
- The function which takes a column name as a parameter and returns a boolean indicating whether or not the column should be excluded from the resulting table. Cannot be used withcolumns
. [function(column)]
Flux flux = Flux
.from("telegraf")
.drop(new String[]{"host", "_measurement"});
Flux flux = Flux
.from("telegraf")
.drop()
.withFunction("column =~ /free*/");
Duplicate will duplicate a specified column in a table [doc].
column
- The column to duplicate. [string]as
The name that should be assigned to the duplicate column. [string]
Flux flux = Flux
.from("telegraf")
.duplicate("host", "server");
Filters the results using an expression [doc].
fn
- Function to when filtering the records. The function must accept a single parameter which will be the records and return a boolean value. Records which evaluate to true, will be included in the results. [function(record) bool]
Supported Record columns:
_measurement
_field
_start
_stop
_time
_value
custom
- the custom column value byRestrictions.column("_id").notEqual(5)
Supported Record restrictions:
equal
not
notEqual
less
greater
greater
exists
lessOrEqual
greaterOrEqual
contains
custom
- the custom restriction byRestrictions.value().custom(15L, "=~")
Restrictions restrictions = Restrictions.and(
Restrictions.measurement().equal("mem"),
Restrictions.field().equal("usage_system"),
Restrictions.value().exists(),
Restrictions.tag("service").equal("app-server")
);
Flux flux = Flux
.from("telegraf")
.filter(restrictions)
.range(-4L, ChronoUnit.HOURS)
.count();
Restrictions restriction = Restrictions.and(
Restrictions.tag("instance_type").equal(Pattern.compile("/prod/")),
Restrictions.field().greater(10.5D),
Restrictions.time().lessOrEqual(new TimeInterval(-15L, ChronoUnit.HOURS))
);
Flux flux = Flux
.from("telegraf")
.filter(restriction)
.range(-4L, 2L, ChronoUnit.HOURS)
.count();
Returns the first result of the query [doc].
Flux flux = Flux
.from("telegraf")
.first();
Groups results by a user-specified set of tags [doc].
columns
- List of columns used to calculate the new group key. Default[]
. [array of strings]mode
- The grouping mode, can be one ofby
orexcept
. The default isby
.by
- the specified columns are the new group key [string]except
- the new group key is the difference between the columns of the table under exam andcolumns
. [string]
Flux.from("telegraf")
.range(-30L, ChronoUnit.MINUTES)
.groupBy(new String[]{"tag_a", "tag_b"});
Flux.from("telegraf")
.range(-30L, ChronoUnit.MINUTES)
.groupBy("tag_a"});
// except mode
Flux.from("telegraf")
.range(-30L, ChronoUnit.MINUTES)
.groupExcept(new String[]{"tag_c"});
For each aggregate column, it outputs the area under the curve of non null records. The curve is defined as function where the domain is the record times and the range is the record values. [doc].
unit
- Time duration to use when computing the integral. [duration]
Flux flux = Flux
.from("telegraf")
.integral(1L, ChronoUnit.MINUTES);
Join two time series together on time and the list of on
keys [doc].
tables
- Map of tables to join. Currently only two tables are allowed. [map of tables]on
- List of tag keys that when equal produces a result set. [array of strings]method
- An optional parameter that specifies the type of join to be performed. When not specified, an inner join is performed. [string]
The method parameter may take on any one of the following values:
inner
- inner joincross
- cross productleft
- left outer joinright
- right outer joinouter
- full outer join
The on
parameter and the cross
method are mutually exclusive.
Flux cpu = Flux.from("telegraf")
.filter(Restrictions.and(Restrictions.measurement().equal("cpu"), Restrictions.field().equal("usage_user")))
.range(-30L, ChronoUnit.MINUTES);
Flux mem = Flux.from("telegraf")
.filter(Restrictions.and(Restrictions.measurement().equal("mem"), Restrictions.field().equal("used_percent")))
.range(-30L, ChronoUnit.MINUTES);
Flux flux = Flux.join()
.withTable("cpu", cpu)
.withTable("mem", mem)
.withOn("host");
Keep is the inverse of drop. It will return a table containing only columns that are specified, ignoring all others.
Only columns in the group key that are also specified in keep
will be kept in the resulting group key [doc].
columns
- The list of columns that should be included in the resulting table. Cannot be used withfn
. [array of strings]fn
- The function which takes a column name as a parameter and returns a boolean indicating whether or not the column should be included in the resulting table. Cannot be used withcolumns
. [function(column)]
Flux flux = Flux
.from("telegraf")
.keep(new String[]{"_time", "_value"});
Flux flux = Flux
.from("telegraf")
.keep()
.withFunction("column =~ /inodes*/");
Returns the last result of the query [doc].
column
- The column used to verify the existence of a value. Defaults to_value
. [string]
Flux flux = Flux
.from("telegraf")
.last();
Restricts the number of rows returned in the results [doc].
n
- The maximum number of records to output. [int]offset
- The number of records to skip per table. Default to0
. [int]
Flux flux = Flux
.from("telegraf")
.limit(5);
Flux flux = Flux
.from("telegraf")
.limit(100, 10);
Applies a function to each row of the table [doc].
fn
- The function to apply to each row. The return value of the function may be a single value or an object.
// Square the value
Restrictions restriction = Restrictions.and(
Restrictions.measurement().equal("cpu"),
Restrictions.field().equal("usage_system"),
Restrictions.tag("service").equal("app-server")
);
Flux flux = Flux
.from("telegraf")
.filter(restriction)
.range(-12L, ChronoUnit.HOURS)
.map("r._value * r._value");
// Square the value and keep the original value
Restrictions restriction = Restrictions.and(
Restrictions.measurement().equal("cpu"),
Restrictions.field().equal("usage_system"),
Restrictions.tag("service").equal("app-server")
);
Flux flux = Flux
.from("telegraf")
.filter(restriction)
.range(-12L, ChronoUnit.HOURS)
.map("{value: r._value, value2:r._value * r._value}");
Returns the max value within the results [doc].
column
- The column to use to calculate the maximum value. Defaults to_value
. [string]
Flux flux = Flux
.from("telegraf")
.range(-12L, ChronoUnit.HOURS)
.window(10L, ChronoUnit.MINUTES)
.max();
Returns the mean of the values within the results [doc].
column
- The column to use to compute the mean. Defaults to_value
. [string]
Flux flux = Flux
.from("telegraf")
.range(-12L, ChronoUnit.HOURS)
.window(10L, ChronoUnit.MINUTES)
.mean();
Returns the min value within the results [doc].
column
- The column to use to calculate the minimum value. Defaults to_value
. [string]
Flux flux = Flux
.from("telegraf")
.range(-12L, ChronoUnit.HOURS)
.window(10L, ChronoUnit.MINUTES)
.min();
Percentile is both an aggregate operation and a selector operation depending on selected options. In the aggregate methods, it outputs the value that represents the specified percentile of the non null record as a float [doc].
columns
- specifies a list of columns to aggregate. Defaults to_value
. [array of strings]percentile
- value between 0 and 1 indicating the desired percentile. [float]method
- percentile provides 3 methods for computation:estimate_tdigest
- an aggregate result that uses a tdigest data structure to compute an accurate percentile estimate on large data sources.exact_mean
- an aggregate result that takes the average of the two points closest to the percentile value.exact_selector
- Percentile
compression
- Compression indicates how many centroids to use when compressing the dataset. A larger number produces a more accurate result at the cost of increased memory requirements. Defaults to1000
. [float]
Flux flux = Flux
.from("telegraf")
.percentile(0.80F);
Flux flux = Flux
.from("telegraf")
.percentile()
.withColumns(new String[]{"value2"})
.withPercentile(0.75F)
.withMethod(MethodType.EXACT_MEAN)
.withCompression(2_000F);
Pivot collects values stored vertically (column-wise) in a table and aligns them horizontally (row-wise) into logical sets [doc].
rowKey
- List of columns used to uniquely identify a row for the output. [array of strings]columnKey
- List of columns used to pivot values onto each row identified by the rowKey. [array of strings]valueColumn
- Identifies the single column that contains the value to be moved around the pivot [string]
Flux flux = Flux.from("telegraf")
.pivot()
.withRowKey(new String[]{"_time"})
.withColumnKey(new String[]{"_field"})
.withValueColumn("_value");
Filters the results by time boundaries [doc].
start
- Specifies the oldest time to be included in the results. [duration or timestamp]stop
- Specifies the exclusive newest time to be included in the results. Defaults to"now"
. [duration or timestamp]
// by interval
Flux flux = Flux
.from("telegraf")
.range(-12L, -1L, ChronoUnit.HOURS)
// by Instant
Flux flux = Flux
.from("telegraf")
.range(Instant.now().minus(4, ChronoUnit.HOURS),
Instant.now().minus(15, ChronoUnit.MINUTES)
);
Reduce aggregates records in each table according to the reducer fn
.
The output for each table will be the group key of the table, plus columns corresponding to each field in the reducer object [doc].
If the reducer record contains a column with the same name as a group key column, then the group key column's value is overwritten, and the outgoing group key is changed. However, if two reduced tables write to the same destination group key, then the function will error.
fn
- Function to apply to each record with a reducer object of type 'a. [(r: record, accumulator: 'a) -> 'a]identity
- an initial value to use when creating a reducer ['a]
Flux flux = Flux
.from("telegraf")
.range(-12L, ChronoUnit.HOURS)
.reduce("{ sum: r._value + accumulator.sum }", "{sum: 0.0}");
Flux flux = Flux
.from("telegraf")
.range(-12L, ChronoUnit.HOURS)
.reduce()
.withFunction("{sum: r._value + accumulator.sum,\ncount: accumulator.count + 1.0}")
.withIdentity("{sum: 0.0, count: 0.0}");
Rename will rename specified columns in a table. If a column is renamed and is part of the group key, the column name in the group key will be updated [doc].
columns
- The map of columns to rename and their corresponding new names. Cannot be used withfn
. [map of columns]fn
- The function which takes a single string parameter (the old column name) and returns a string representing the new column name. Cannot be used withcolumns
. [function(column)]
Map<String, String> map = new HashMap<>();
map.put("host", "server");
map.put("_value", "val");
Flux flux = Flux
.from("telegraf")
.rename(map);
Flux flux = Flux
.from("telegraf")
.rename("\"{col}_new\"");
Sample values from a table [doc].
n
- Sample every Nth element. [int]pos
- Position offset from start of results to begin sampling.pos
must be less thann
. Ifpos
less than 0, a random offset is used. Default is -1 (random offset). [int]
Flux flux = Flux.from("telegraf")
.filter(and(measurement().equal("cpu"), field().equal("usage_system")))
.range(-1L, ChronoUnit.DAYS)
.sample(10);
Flux flux = Flux.from("telegraf")
.filter(and(measurement().equal("cpu"), field().equal("usage_system")))
.range(-1L, ChronoUnit.DAYS)
.sample(5, 1);
Assigns a static value to each record [doc].
key
- Label for the column to set. [string]value
- Value for the column to set. [string]
Flux flux = Flux
.from("telegraf")
.set("location", "Carolina");
Shift add a fixed duration to time columns [doc].
duration
- The amount to add to each time value. [duration]columns
- The list of all columns that should be shifted. Defaults["_start", "_stop", "_time"]
. [array of strings]
Flux flux = Flux
.from("telegraf")
.timeShift(10L, ChronoUnit.HOURS);
Flux flux = Flux
.from("telegraf")
.timeShift(10L, ChronoUnit.HOURS, new String[]{"_time", "custom"});
Skew of the results [doc].
column
- The column on which to operate. Defaults to_value
. [string]
Flux flux = Flux
.from("telegraf")
.range(-30L, -15L, ChronoUnit.MINUTES)
.skew();
Sorts the results by the specified columns. Default sort is ascending [doc].
columns
- List of columns used to sort. Precedence from left to right. Default is"value"
. [array of strings]desc
- Sort results descending. Default false. [boolean]
Flux flux = Flux
.from("telegraf")
.sort(new String[]{"region", "value"});
Flux flux = Flux
.from("telegraf")
.sort(true);
Difference between min and max values [doc].
column
- The column on which to operate. Defaults to_value
. [string]
Flux flux = Flux
.from("telegraf")
.spread();
Standard Deviation of the results [doc].
column
- The column on which to operate. Defaults to_value
. [string]
Flux flux = Flux
.from("telegraf")
.stddev();
Sum of the results [doc].
column
- The column on which to operate. Defaults to_value
. [string]
Flux flux = Flux
.from("telegraf")
.sum();
Tail caps the number of records in output tables to a fixed size [doc].
n
- The maximum number of records to output. [int]offset
- The number of records to skip per table. Default to0
. [int]
Flux flux = Flux
.from("telegraf")
.tail(5);
Flux flux = Flux
.from("telegraf")
.tail(100, 10);
The To operation takes data from a stream and writes it to a bucket [doc].
bucket
- The bucket to which data will be written. [string]bucketID
- The ID of the bucket to which data will be written. [string]org
- The organization name of the above bucket. [string]orgID
- The organization ID of the above bucket. [string]host
- The remote host to write to. [string]token
- The authorization token to use when writing to a remote host. [string]timeColumn
- The time column of the output. [string]tagColumns
- The tag columns of the output. Default: All columns of type string, excluding all value columns and the_field
column if present. [array of strings]fieldFn
- Function that takes a record from the input table and returns an object. For each record from the input table fieldFn returns on object that maps output field key to output value. Default:(r) => ({ [r._field]: r._value })
[function(record) object]
Flux flux = Flux
.from("telegraf")
.to("my-bucket", "my-org");
Convert a value to a bool [doc].
Flux flux = Flux
.from("telegraf")
.filter(and(measurement().equal("mem"), field().equal("used")))
.toBool();
Convert a value to a int [doc].
Flux flux = Flux
.from("telegraf")
.filter(and(measurement().equal("mem"), field().equal("used")))
.toInt();
Convert a value to a float [doc].
Flux flux = Flux
.from("telegraf")
.filter(and(measurement().equal("mem"), field().equal("used")))
.toFloat();
Convert a value to a duration [doc].
Flux flux = Flux
.from("telegraf")
.filter(and(measurement().equal("mem"), field().equal("used")))
.toDuration();
Convert a value to a string [doc].
Flux flux = Flux
.from("telegraf")
.filter(and(measurement().equal("mem"), field().equal("used")))
.toString();
Convert a value to a time [doc].
Flux flux = Flux
.from("telegraf")
.filter(and(measurement().equal("mem"), field().equal("used")))
.toTime();
Convert a value to a uint [doc].
Flux flux = Flux
.from("telegraf")
.filter(and(measurement().equal("mem"), field().equal("used")))
.toUInt();
Groups the results by a given time range [doc].
every
- Duration of time between windows. Defaults toperiod's
value. [duration]period
- Duration of the windowed partition. Defaults toperiod's
value. [duration]offset
- The offset duration relative to the location offset. It can be negative, indicating that the offset goes backwards in time. The default aligns the window boundaries to line up with thenow
option time. [time]timeColumn
- Name of the time column to use. Defaults to_time
. [string]startColumn
- Name of the column containing the window start time. Defaults to_start
. [string]stopColumn
- Name of the column containing the window stop time. Defaults to_stop
. [string]
Flux flux = Flux
.from("telegraf")
.window(15L, ChronoUnit.MINUTES)
.max();
Flux flux = Flux
.from("telegraf")
.window(15L, ChronoUnit.MINUTES,
20L, ChronoUnit.SECONDS,
-50L, ChronoUnit.WEEKS,
1L, ChronoUnit.SECONDS)
.max();
Yield a query results to yielded results [doc].
name
- The unique name to give to yielded results. [string]
Flux flux = Flux
.from("telegraf")
.yield("0");
We assume that exist custom function measurement that filter measurement by their name. The Flux
implementation looks like this:
// Define measurement function which accepts table as the piped argument.
measurement = (m, table=<-) => table |> filter(fn: (r) => r._measurement == m)
The Java implementation:
public class FilterMeasurement extends AbstractParametrizedFlux {
public FilterMeasurement(@Nonnull final Flux source) {
super(source);
}
@Nonnull
@Override
protected String operatorName() {
// name of the Flux function
return "measurement";
}
/**
* @param measurement the measurement name. Has to be defined.
* @return this
*/
@Nonnull
public FilterMeasurement withName(@Nonnull final String measurement) {
Arguments.checkNonEmpty(measurement, "Measurement name");
// name of parameter from the Flux function
withPropertyValueEscaped("m", measurement);
return this;
}
}
Using the measurement function:
Flux flux = Flux
.from("telegraf")
.operator(FilterMeasurement.class)
.withName("cpu")
.sum();
The Flux script:
from(bucket:"telegraf")
|> measurement(m: "cpu")
|> sum()
The Flux option sets the default time zone of all times in the script. The default value is timezone.utc
.
You can construct a timezone with a fixed offset:
Flux flux = Flux
.from("telegraf")
.withLocationFixed("-8h")
.count();
The Flux script:
import "timezone"
option location = timezone.fixed(offset: -8h)
from(bucket:"telegraf")
|> count()
or you can construct a timezone based on a location name:
Flux flux = Flux
.from("telegraf")
.withLocationNamed("America/Los_Angeles")
.count();
The Flux script:
import "timezone"
option location = timezone.location(name: "America/Los_Angeles")
from(bucket:"telegraf")
|> count()
The latest version for Maven dependency:
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>flux-dsl</artifactId>
<version>6.2.0</version>
</dependency>
Or when using with Gradle:
dependencies {
implementation "com.influxdb:flux-dsl:6.2.0"
}
The snapshots are deployed into OSS Snapshot repository.
<repository>
<id>ossrh</id>
<name>OSS Snapshot repository</name>
<url>https://oss.sonatype.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
repositories {
maven { url "https://oss.sonatype.org/content/repositories/snapshots" }
}