Skip to content

Commit

Permalink
feat: add aggregateWindow operator (#111)
Browse files Browse the repository at this point in the history
  • Loading branch information
bednar authored May 13, 2020
1 parent 364fec8 commit 1f93fd8
Show file tree
Hide file tree
Showing 5 changed files with 362 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Features

1. [#110](https://github.com/influxdata/influxdb-client-java/pull/110): Added support "inf" in Duration
1. [#111](https://github.com/influxdata/influxdb-client-java/pull/111): Add aggregateWindow operator to FluxDSL

### Bug Fixes

Expand Down
36 changes: 35 additions & 1 deletion flux-dsl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,42 @@ Flux flux = Flux
Flux flux = Flux.from("telegraf")
.expression("map(fn: (r) => r._value * r._value)")
.expression("sum()");
```
```

### aggregateWindow
Applies an aggregate or selector function (any function with a column parameter) to fixed windows of time [[doc](http://bit.ly/flux-spec#aggregateWindow)].
- `every` - The duration of windows. [duration]
- `fn` - The aggregate function used in the operation. [function]
- `column` - The column on which to operate. Defaults to `_value`. [string]
- `timeSrc` - The time column from which time is copied for the aggregate record. Defaults to `_stop`. [string]
- `timeDst` - The “time destination” column to which time is copied for the aggregate record. Defaults to `_time`. [string]
- `createEmpty` - For windows without data, this will create an empty window and fill it with a `null` aggregate value. Defaults to `true`. [boolean]

```java
Flux flux = Flux
.from("telegraf")
.aggregateWindow(10L, ChronoUnit.SECONDS, "mean");
```

```java
Flux flux = Flux
.from("telegraf")
.aggregateWindow()
.withEvery("10s")
.withAggregateFunction("sum")
.withColumn("_value")
.withTimeSrc("_stop")
.withTimeDst("_time")
.withCreateEmpty(true);
```

```java
Flux flux = Flux
.from("telegraf")
.aggregateWindow()
.withEvery(5L, ChronoUnit.MINUTES)
.withFunction("tables |> quantile(q: 0.99, column:column)");
```
### count
Counts the number of results [[doc](http://bit.ly/flux-spec#count)].
- `useStartTime` - Use the start time as the timestamp of the resulting aggregate. [boolean]
Expand Down
64 changes: 54 additions & 10 deletions flux-dsl/src/main/java/com/influxdb/query/dsl/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import com.influxdb.Arguments;
import com.influxdb.query.dsl.functions.AbstractParametrizedFlux;
import com.influxdb.query.dsl.functions.AggregateWindow;
import com.influxdb.query.dsl.functions.CountFlux;
import com.influxdb.query.dsl.functions.CovarianceFlux;
import com.influxdb.query.dsl.functions.CumulativeSumFlux;
Expand Down Expand Up @@ -84,12 +85,11 @@
* <br>
* <a href="http://bit.ly/flux-spec">Flux Specification</a>
* <p>
* TODO integration tests.
*
* <h3>The functions:</h3>
* <ul>
* <li>{@link AggregateWindow}</li>
* <li>{@link FromFlux}</li>
* <li>!TODO Buckets</li>
* <li>{@link CountFlux}</li>
* <li>{@link CovarianceFlux}</li>
* <li>{@link CumulativeSumFlux}</li>
Expand All @@ -101,15 +101,11 @@
* <li>{@link FilterFlux}</li>
* <li>{@link FirstFlux}</li>
* <li>{@link GroupFlux}</li>
* <li>!TODO - histogram</li>
* <li>!TODO - histogramQuantile</li>
* <li>{@link IntegralFlux}</li>
* <li>{@link JoinFlux}</li>
* <li>{@link KeepFlux}</li>
* <li>{@link LastFlux}</li>
* <li>{@link LimitFlux}</li>
* <li>!TODO - LinearBuckets</li>
* <li>TODO - LogrithmicBuckets</li>
* <li>{@link MapFlux}</li>
* <li>{@link MaxFlux}</li>
* <li>{@link MeanFlux}</li>
Expand All @@ -124,8 +120,6 @@
* <li>{@link SkewFlux}</li>
* <li>{@link SortFlux}</li>
* <li>{@link SpreadFlux}</li>
* <li>!TODO stateTracking - Not defined in documentation or SPEC</li>
* <li>!TODO InfluxFieldsAsColsC</li>
* <li>{@link StddevFlux}</li>
* <li>{@link SumFlux}</li>
* <li>{@link ToFlux}</li>
Expand All @@ -139,8 +133,6 @@
* <li>{@link WindowFlux}</li>
* <li>{@link YieldFlux}</li>
* <li>{@link ExpressionFlux}</li>
* <li>TODO - toKafka, toHttp</li>
* <li>TODO - SHOW DATABASES</li>
* </ul>
*
* @author Jakub Bednar (bednar@github) (22/06/2018 10:16)
Expand Down Expand Up @@ -197,6 +189,58 @@ public static Flux from(@Nonnull final String bucket, @Nonnull final String[] ho
.withPropertyValue("hosts", hosts);
}

/**
* Applies an aggregate or selector function to fixed windows of time.
*
* <h3>The parameters had to be defined by:</h3>
* <ul>
* <li>{@link AggregateWindow#withEvery(Long, ChronoUnit)}</li>
* <li>{@link AggregateWindow#withEvery(String)}</li>
* <li>{@link AggregateWindow#withFunction(String, Object)}</li>
* <li>{@link AggregateWindow#withAggregateFunction(String)}</li>
* <li>{@link AggregateWindow#withColumn(String)}</li>
* <li>{@link AggregateWindow#withTimeSrc(String)}</li>
* <li>{@link AggregateWindow#withTimeDst(String)}</li>
* <li>{@link AggregateWindow#withCreateEmpty(boolean)}</li>
* </ul>
*
* @return {@link AggregateWindow}
*/
public final AggregateWindow aggregateWindow() {
return new AggregateWindow(this);
}

/**
* Applies an aggregate or selector function to fixed windows of time.
*
* <h3>The parameters had to be defined by:</h3>
* <ul>
* <li>{@link AggregateWindow#withEvery(Long, ChronoUnit)}</li>
* <li>{@link AggregateWindow#withEvery(String)}</li>
* <li>{@link AggregateWindow#withFunction(String, Object)}</li>
* <li>{@link AggregateWindow#withAggregateFunction(String)}</li>
* <li>{@link AggregateWindow#withColumn(String)}</li>
* <li>{@link AggregateWindow#withTimeSrc(String)}</li>
* <li>{@link AggregateWindow#withTimeDst(String)}</li>
* <li>{@link AggregateWindow#withCreateEmpty(boolean)}</li>
* </ul>
*
* @param every The duration of windows.
* @param everyUnit a {@code ChronoUnit} determining how to interpret the {@code every}.
* @param namedFunction specifies the named aggregate operation to perform.
* @return {@link AggregateWindow}
*/
@Nonnull
public final AggregateWindow aggregateWindow(@Nonnull final Long every,
@Nonnull final ChronoUnit everyUnit,
@Nonnull final String namedFunction) {

Arguments.checkNotNull(every, "Every is required");
Arguments.checkNotNull(everyUnit, "Every ChronoUnit is required");

return new AggregateWindow(this).withEvery(every, everyUnit).withAggregateFunction(namedFunction);
}

/**
* Join two time series together on time and the list of tags.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
* The MIT License
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.influxdb.query.dsl.functions;

import java.time.temporal.ChronoUnit;
import javax.annotation.Nonnull;

import com.influxdb.Arguments;
import com.influxdb.query.dsl.Flux;

/**
* Applies an aggregate or selector function (any function with a column parameter) to fixed windows of time.
* <a href="http://bit.ly/flux-spec#aggregateWindow">See SPEC</a>.
*
* <h3>Options</h3>
* <ul>
* <li><b>every</b> - The duration of windows. [duration]</li>
* <li><b>fn</b> - The aggregate function used in the operation. [function]</li>
* <li><b>column</b> - The column on which to operate. Defaults to <i>"_value"</i>. [string]</li>
* <li><b>timeSrc</b> -
* The time column from which time is copied for the aggregate record. Defaults to <i>"_stop"</i>. [string]</li>
* <li><b>timeDst</b> -
* The “time destination” column to which time is copied for the aggregate record.
* Defaults to <i>"_time"</i>. [string]</li>
* <li><b>createEmpty</b> -
* For windows without data, this will create an empty window and fill it with a <i>null</i> aggregate value.
* Defaults to <i>true</i>. [boolean]</li>
* </ul>
*
* <h3>Example</h3>
* <pre>
* Flux flux = Flux
* .from("telegraf")
* .aggregateWindow(10L, ChronoUnit.SECONDS, "mean");
*
* Flux flux = Flux
* .from("telegraf")
* .aggregateWindow()
* .withEvery("10s")
* .withAggregateFunction("sum")
* .withColumn("_value")
* .withTimeSrc("_stop")
* .withTimeDst("_time")
* .withCreateEmpty(true);
*
* Flux flux = Flux
* .from("telegraf")
* .aggregateWindow()
* .withEvery(5L, ChronoUnit.MINUTES)
* .withFunction("tables |> quantile(q: 0.99, column:column)");
* </pre>
*
* @author Jakub Bednar (13/05/2020 08:41)
*/
public final class AggregateWindow extends AbstractParametrizedFlux {

public AggregateWindow(@Nonnull final Flux source) {
super(source);
}

@Nonnull
@Override
protected String operatorName() {
return "aggregateWindow";
}

/**
* @param every The duration of windows.
* @param everyUnit a {@code ChronoUnit} determining how to interpret the {@code every}.
* @return this
*/
@Nonnull
public AggregateWindow withEvery(@Nonnull final Long every, @Nonnull final ChronoUnit everyUnit) {

Arguments.checkNotNull(every, "Every is required");
Arguments.checkNotNull(everyUnit, "Every ChronoUnit is required");

this.withPropertyValue("every", every, everyUnit);

return this;
}

/**
* @param every The duration of windows.
* @return this
*/
@Nonnull
public AggregateWindow withEvery(@Nonnull final String every) {

Arguments.checkDuration(every, "Every");

this.withPropertyValue("every", every);

return this;
}

/**
* @param function specifies the aggregate operation to perform.
* @return this
*/
@Nonnull
public AggregateWindow withFunction(@Nonnull final String function) {

Arguments.checkNonEmpty(function, "Function");

this.withFunction("fn: (column, tables=<-)", function);

return this;
}

/**
* @param namedFunction specifies the named aggregate operation to perform.
* @return this
*/
@Nonnull
public AggregateWindow withAggregateFunction(@Nonnull final String namedFunction) {

Arguments.checkNonEmpty(namedFunction, "Function");

this.withPropertyValue("fn", namedFunction);

return this;
}

/**
* @param column The column on which to operate.
* @return this
*/
@Nonnull
public AggregateWindow withColumn(@Nonnull final String column) {

Arguments.checkNonEmpty(column, "Column");

this.withPropertyValueEscaped("column", column);

return this;
}

/**
* @param timeSrc The time column from which time is copied for the aggregate record.
* @return this
*/
@Nonnull
public AggregateWindow withTimeSrc(@Nonnull final String timeSrc) {

Arguments.checkNonEmpty(timeSrc, "timeSrc");

this.withPropertyValueEscaped("timeSrc", timeSrc);

return this;
}

/**
* @param timeDst The “time destination” column to which time is copied for the aggregate record.
* @return this
*/
@Nonnull
public AggregateWindow withTimeDst(@Nonnull final String timeDst) {

Arguments.checkNonEmpty(timeDst, "timeDst");

this.withPropertyValueEscaped("timeDst", timeDst);

return this;
}

/**
* @param createEmpty For windows without data,
* this will create an empty window and fill it with a <i>null</i> aggregate value.
* @return this
*/
@Nonnull
public AggregateWindow withCreateEmpty(final boolean createEmpty) {

this.withPropertyValue("createEmpty", createEmpty);

return this;
}
}
Loading

0 comments on commit 1f93fd8

Please sign in to comment.