Skip to content

Commit

Permalink
#991 Support Relative Date Times (#1006)
Browse files Browse the repository at this point in the history
* Unrelated typos

Signed-off-by: currantw <[email protected]>

* Initial implementation of relative date time logic and unit tests

Signed-off-by: currantw <[email protected]>

* Minor cleanup

Signed-off-by: currantw <[email protected]>

* Update to make relative time case-insensitive, and add corresponding unit tests.

Signed-off-by: currantw <[email protected]>

* Add a few more unit test cases.

Signed-off-by: currantw <[email protected]>

* Split pattern into smaller strings.

Signed-off-by: currantw <[email protected]>

* Add `relativeDateTimeFunction` to `SerializableUdf`, along with corresponding unit tests in `SerializableTimeUdfTest`. Add `mockito-inline` to dependencies for `ppl-spark-integration` to allow mocking of current datetime.

Signed-off-by: currantw <[email protected]>

* Fix dangling Javadoc

Signed-off-by: currantw <[email protected]>

* Initial implementation of `relative_timestamp` UDF.

Signed-off-by: currantw <[email protected]>

* Add integration tests, refactor to use Timestamp

Signed-off-by: currantw <[email protected]>

* Add documentation

Signed-off-by: currantw <[email protected]>

* Review comments: add more documentation, add ignored tests for earliest and latest.

Signed-off-by: currantw <[email protected]>

* Minor clean up

Signed-off-by: currantw <[email protected]>

* Update to use Instant and ZoneId. For some reason, timestamp can be returned, but output from `$CurrentTimestamp` is an `Instant`

Signed-off-by: currantw <[email protected]>

* Remove unused import

Signed-off-by: currantw <[email protected]>

* Address review comments

Signed-off-by: currantw <[email protected]>

* Address review comments

Signed-off-by: currantw <[email protected]>

* Update unit tests as per review comments

Signed-off-by: currantw <[email protected]>

* Remove duplicate documentation.

Signed-off-by: currantw <[email protected]>

* Fix failing IT

Signed-off-by: currantw <[email protected]>

---------

Signed-off-by: currantw <[email protected]>
  • Loading branch information
currantw authored Jan 15, 2025
1 parent f054022 commit 4a40676
Show file tree
Hide file tree
Showing 14 changed files with 683 additions and 11 deletions.
5 changes: 5 additions & 0 deletions docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -499,4 +499,9 @@ _- **Limitation: another command usage of (relation) subquery is in `appendcols`
- `source = table | eval cdate = CAST('2012-08-07' as date), ctime = cast('2012-08-07T08:07:06' as timestamp) | fields cdate, ctime`
- `source = table | eval chained_cast = cast(cast("true" as boolean) as integer) | fields chained_cast`

#### **relative_timestamp**
[See additional function details](functions/ppl-datetime#RELATIVE_TIMESTAMP)
- `source = table | eval one_hour_ago = relative_timestamp("-1h") | where timestamp < one_hour_ago`
- `source = table | eval start_of_today = relative_timestamp("@d") | where timestamp > start_of_today`
- `source = table | eval last_saturday = relative_timestamp("-1d@w6") | where timestamp >= last_saturday`
---
87 changes: 87 additions & 0 deletions docs/ppl-lang/functions/ppl-datetime.md
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,93 @@ Example:
| 3 |
+-------------------------------+

### `RELATIVE_TIMESTAMP`

**Description:**


**Usage:** relative_timestamp(str) returns a relative timestamp corresponding to the given relative string and the
current timestamp at the time of query execution.

The relative timestamp string has syntax `[+|-]<offset_time_integer><offset_time_unit>@<snap_time_unit>`, and is
made up of two optional components.
* An offset from the current timestamp, which is composed of a sign (`+` or `-`), optional `offset_time_integer`, and
`offset_time_unit`. If the offset time integer is not specified, it defaults to `1`. For example, `+2hr` is two
hours after the current timestamp, while `-mon` is one month ago.
* A snap-to time using the `@` symbol followed by `snap_time_unit`. The snap-to time is applied after the offset (if
specified), and rounds the time <i>down</i> to the start of the specified time unit. For example, `@wk` is the start
of the current week (Sunday is considered to be the first day of the week).

The special relative timestamp string `now`, corresponding to the current timestamp, is also supported. The current
timestamp is determined once at the start of query execution, and is used for all relative timestamp calculations for
that query.

The relative timestamp string is case-insensitive.

The following values are supported for `offset_time_unit`:

| Time Unit | Supported Keywords |
|-----------|-------------------------------------------|
| Seconds | `s`, `sec`, `secs`, `second`, `seconds` |
| Minutes | `m`, `min`, `mins`, `minute`, `minutes` |
| Hours | `h`, `hr`, `hrs`, `hour`, `hours` |
| Days | `d`, `day`, `days` |
| Weeks | `w`, `wk`, `wks`, `week`, `weeks` |
| Quarters | `q`, `qtr`, `qtrs`, `quarter`, `quarters` |
| Years | `y`, `yr`, `yrs`, `year`, `years` |

All the time units above are supported for `snap_time_unit`, as well as the following day-of-the-week time units:

| Time Unit | Supported Keywords |
|-----------|--------------------|
| Sunday | `w0`, `w7` |
| Monday | `w1` |
| Tuesday | `w2` |
| Wednesday | `w3` |
| Thursday | `w4` |
| Friday | `w5` |
| Saturday | `w6` |

For example, if the current timestamp is Monday, January 03, 2000 at 01:01:01 am:

| Relative String | Description | Resulting Relative Time |
|-----------------|--------------------------------------------------------------|---------------------------------------------|
| `-60m` | Sixty minutes ago | Monday, January 03, 2000 at 00:01:01 am |
| `-1H` | One hour ago | Monday, January 03, 2000 at 00:01:01 am |
| `+2wk` | Two weeks from now | Monday, January 17, 2000 at 00:01:01 am |
| `-1h@W3` | One hour ago, rounded to the start of the previous Wednesday | Wednesday, December 29, 1999 at 00:00:00 am |
| `@d` | Start of the current day | Monday, January 03, 2000 at 00:00:00 am |
| `now` | Now | Monday, January 03, 2000 at 01:01:01 am |

Argument type: STRING

Return type: TIMESTAMP

Example:

os> source=people | eval seconds_diff = timestampdiff(SECOND, now(), relative_timestamp("now")) | fields seconds_diff | head 1
fetched rows / total rows = 1/1
+--------------+
| seconds_diff |
|--------------+
| 0 |
+--------------+

os> source=people | eval hours_diff = timestampdiff(HOUR, now(), relative_timestamp("+1h")) | fields hours_diff | head 1
fetched rows / total rows = 1/1
+------------+
| hours_diff |
|------------+
| 1 |
+------------+

os> source=people | eval day = day_of_week(relative_timestamp("@w0")) | fields day | head 1
fetched rows / total rows = 1/1
+-----+
| day |
|-----|
| 1 |
+-----+

### `SECOND`

Expand Down
1 change: 1 addition & 0 deletions docs/ppl-lang/ppl-where-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,4 @@ PPL query:
| eval factor = case(a > 15, a - 14, isnull(b), a - 7, a < 3, a + 1 else 1)
| where case(factor = 2, 'even', factor = 4, 'even', factor = 6, 'even', factor = 8, 'even' else 'odd') = 'even'
| stats count() by factor`
- `source = table | where timestamp >= relative_timestamp("-1d@w6")`
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,70 @@ class FlintSparkPPLBuiltInDateTimeFunctionITSuite
assertSameRows(Seq(Row(3)), frame)
}

test("test RELATIVE_TIMESTAMP") {
var frame = sql(s"""
| source=$testTable
| | eval seconds_diff = timestampdiff(SECOND, now(), relative_timestamp("now"))
| | fields seconds_diff
| | head 1
| """.stripMargin)
assertSameRows(Seq(Row(0)), frame)

frame = sql(s"""
| source=$testTable
| | eval hours_diff = timestampdiff(HOUR, relative_timestamp("+1h"), relative_timestamp("+1d"))
| | fields hours_diff
| | head 1
| """.stripMargin)
assertSameRows(Seq(Row(23)), frame)

frame = sql(s"""
| source =$testTable
| | eval day = day_of_week(relative_timestamp("@w0"))
| | fields day
| | head 1
| """.stripMargin)
assertSameRows(Seq(Row(1)), frame)

frame = sql(s"""
| source=$testTable
| | eval last_wednesday = relative_timestamp("-1d@w3")
| | eval actual_days_ago = timestampdiff(DAY, last_wednesday, now())
| | eval day_of_week = day_of_week(now())
| | eval expected_days_ago = case(day_of_week > 4, day_of_week - 4 else day_of_week + 3)
| | eval test_result = (expected_days_ago = actual_days_ago)
| | fields test_result
| | head 1
| """.stripMargin)
assertSameRows(Seq(Row(true)), frame)
}

// TODO #957: Support earliest
ignore("test EARLIEST") {
var frame = sql(s"""
| source=$testTable
| | eval earliest_hour_before = earliest(now(), "-1h")
| | eval earliest_now = earliest(now(), "now")
| | eval earliest_hour_after = earliest(now(), "+1h")
| | fields earliest_hour_before, earliest_now, earliest_hour_after
| | head 1
| """.stripMargin)
assertSameRows(Seq(Row(true), Row(true), Row(false)), frame)
}

// TODO #957: Support latest
ignore("test LATEST") {
var frame = sql(s"""
| source=$testTable
| | eval latest_hour_before = latest(now(), "-1h")
| | eval latest_now = latest(now(), "now")
| | eval latest_hour_after = latest(now(), "+1h")
| | fields latest_hour_before, latest_now, latest_hour_after
| | head 1
| """.stripMargin)
assertSameRows(Seq(Row(false), Row(true), Row(true)), frame)
}

test("test CURRENT_TIME is not supported") {
val ex = intercept[UnsupportedOperationException](sql(s"""
| source = $testTable
Expand Down
4 changes: 2 additions & 2 deletions ppl-spark-integration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ translation between PPL's logical plan to Spark's Catalyst logical plan.
### Context
The next concepts are the main purpose of introduction this functionality:
- Transforming PPL to become OpenSearch default query language (specifically for logs/traces/metrics signals)
- Promoting PPL as a viable candidate for the proposed CNCF Observability universal query language.
- Promoting PPL as a viable candidate for the proposed CNCF Observability universal query language.
- Seamlessly Interact with different datasources such as S3 / Prometheus / data-lake leveraging spark execution.
- Using spark's federative capabilities as a general purpose query engine to facilitate complex queries including joins
- Improve and promote PPL to become extensible and general purpose query language to be adopted by the community
Expand Down Expand Up @@ -37,7 +37,7 @@ In Apache Spark, the DataFrame API serves as a programmatic interface for data m

For instance, if you have a PPL query and a translator, you can convert it into DataFrame operations to generate an optimized execution plan. Spark's underlying Catalyst optimizer will convert these DataFrame transformations and actions into an optimized physical plan executed over RDDs or Datasets.

The following section describes the two main options for translating the PPL query (using the logical plan) into the spark corespondent component (either dataframe API or spark logical plan)
The following section describes the two main options for translating the PPL query (using the logical plan) into the spark correspondent component (either dataframe API or spark logical plan)


### Translation Process
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ MONTHNAME: 'MONTHNAME';
NOW: 'NOW';
PERIOD_ADD: 'PERIOD_ADD';
PERIOD_DIFF: 'PERIOD_DIFF';
RELATIVE_TIMESTAMP: 'RELATIVE_TIMESTAMP';
SEC_TO_TIME: 'SEC_TO_TIME';
STR_TO_DATE: 'STR_TO_DATE';
SUBDATE: 'SUBDATE';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,7 @@ dateTimeFunctionName
| NOW
| PERIOD_ADD
| PERIOD_DIFF
| RELATIVE_TIMESTAMP
| QUARTER
| SECOND
| SECOND_OF_MINUTE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.data.type.ExprCoreType;

/** The DataType defintion in AST. Question, could we use {@link ExprCoreType} directly in AST? */
/** The DataType definition in AST. Question, could we use {@link ExprCoreType} directly in AST? */

@RequiredArgsConstructor
public enum DataType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ public enum BuiltinFunctionName {
LOCALTIMESTAMP(FunctionName.of("localtimestamp")),
SYSDATE(FunctionName.of("sysdate")),

// Relative timestamp functions
RELATIVE_TIMESTAMP(FunctionName.of("relative_timestamp")),

/** Text Functions. */
TOSTRING(FunctionName.of("tostring")),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,23 @@
import org.apache.spark.sql.types.DataTypes;
import scala.Function1;
import scala.Function2;
import scala.Function3;
import scala.Option;
import scala.Serializable;
import scala.runtime.AbstractFunction1;
import scala.runtime.AbstractFunction2;
import scala.runtime.AbstractFunction3;
import scala.collection.JavaConverters;
import scala.collection.mutable.WrappedArray;

import java.lang.Boolean;
import java.math.BigInteger;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand All @@ -35,11 +42,18 @@

public interface SerializableUdf {

abstract class SerializableAbstractFunction1<T1, R> extends AbstractFunction1<T1, R>
implements Serializable {
}

abstract class SerializableAbstractFunction2<T1, T2, R> extends AbstractFunction2<T1, T2, R>
implements Serializable {
}

abstract class SerializableAbstractFunction3<T1, T2, T3, R> extends AbstractFunction3<T1, T2, T3, R>
implements Serializable {
}

/**
* Remove specified keys from a JSON string.
*
Expand Down Expand Up @@ -109,7 +123,7 @@ public String apply(String jsonStr, WrappedArray<String> elements) {
}
}
};

Function2<String, String, Boolean> cidrFunction = new SerializableAbstractFunction2<>() {

IPAddressStringParameters valOptions = new IPAddressStringParameters.Builder()
Expand Down Expand Up @@ -197,14 +211,40 @@ public BigInteger apply(String ipAddress) {
};
}

abstract class SerializableAbstractFunction1<T1,R> extends AbstractFunction1<T1,R>
implements Serializable {
}
/**
* Returns the {@link Instant} corresponding to the given relative string, current timestamp, and current time zone ID.
* Throws {@link RuntimeException} if the relative string is not supported.
*/
Function3<String, Object, String, Instant> relativeTimestampFunction = new SerializableAbstractFunction3<String, Object, String, Instant>() {

@Override
public Instant apply(String relativeString, Object currentTimestamp, String zoneIdString) {

/// If `spark.sql.datetime.java8API.enabled` is set to `true`, [org.apache.spark.sql.types.TimestampType]
/// is converted to [Instant] by Catalyst; otherwise, [Timestamp] is used instead.
Instant currentInstant =
currentTimestamp instanceof Timestamp
? ((Timestamp) currentTimestamp).toInstant()
: (Instant) currentTimestamp;

/// The Spark session time zone (`spark.sql.session.timeZone`)
/// is used, which may be different from the system time zone.
ZoneId zoneId = ZoneId.of(zoneIdString);

/// Relative time calculations are performed using [ZonedDateTime] because offsets (e.g. one hour ago)
/// need to account for changes in the time zone offset (e.g. daylight savings time), while snaps (e.g.
/// start of previous Wednesday) need to account for the local date time.
ZonedDateTime currentDateTime = ZonedDateTime.ofInstant(currentInstant, zoneId);
ZonedDateTime relativeDateTime = TimeUtils.getRelativeZonedDateTime(relativeString, currentDateTime);

return relativeDateTime.toInstant();
}
};

/**
* Get the function reference according to its name
*
* @param funcName string representing function to retrieve.
* @param funcName string representing function to retrieve.
* @return relevant ScalaUDF for given function name.
*/
static ScalaUDF visit(String funcName, List<Expression> expressions) {
Expand Down Expand Up @@ -247,13 +287,22 @@ static ScalaUDF visit(String funcName, List<Expression> expressions) {
true);
case "ip_to_int":
return new ScalaUDF(geoIpUtils.ipToInt,
DataTypes.createDecimalType(38,0),
DataTypes.createDecimalType(38, 0),
seq(expressions),
seq(),
Option.empty(),
Option.apply("ip_to_int"),
false,
true);
case "relative_timestamp":
return new ScalaUDF(relativeTimestampFunction,
DataTypes.TimestampType,
seq(expressions),
seq(),
Option.empty(),
Option.apply("relative_timestamp"),
false,
true);
default:
return null;
}
Expand Down
Loading

0 comments on commit 4a40676

Please sign in to comment.