diff --git a/docs/ppl-lang/PPL-Example-Commands.md b/docs/ppl-lang/PPL-Example-Commands.md index a2a5884bf..6e3fc9786 100644 --- a/docs/ppl-lang/PPL-Example-Commands.md +++ b/docs/ppl-lang/PPL-Example-Commands.md @@ -499,4 +499,9 @@ _- **Limitation: another command usage of (relation) subquery is in `appendcols` - `source = table | eval cdate = CAST('2012-08-07' as date), ctime = cast('2012-08-07T08:07:06' as timestamp) | fields cdate, ctime` - `source = table | eval chained_cast = cast(cast("true" as boolean) as integer) | fields chained_cast` +#### **relative_timestamp** +[See additional function details](functions/ppl-datetime#RELATIVE_TIMESTAMP) +- `source = table | eval one_hour_ago = relative_timestamp("-1h") | where timestamp < one_hour_ago` +- `source = table | eval start_of_today = relative_timestamp("@d") | where timestamp > start_of_today` +- `source = table | eval last_saturday = relative_timestamp("-1d@w6") | where timestamp >= last_saturday` --- diff --git a/docs/ppl-lang/functions/ppl-datetime.md b/docs/ppl-lang/functions/ppl-datetime.md index e479176a4..de345b581 100644 --- a/docs/ppl-lang/functions/ppl-datetime.md +++ b/docs/ppl-lang/functions/ppl-datetime.md @@ -733,6 +733,93 @@ Example: | 3 | +-------------------------------+ +### `RELATIVE_TIMESTAMP` + +**Description:** + + +**Usage:** relative_timestamp(str) returns a relative timestamp corresponding to the given relative string and the +current timestamp at the time of query execution. + +The relative timestamp string has syntax `[+|-]@`, and is +made up of two optional components. +* An offset from the current timestamp, which is composed of a sign (`+` or `-`), optional `offset_time_integer`, and + `offset_time_unit`. If the offset time integer is not specified, it defaults to `1`. For example, `+2hr` is two + hours after the current timestamp, while `-mon` is one month ago. +* A snap-to time using the `@` symbol followed by `snap_time_unit`. The snap-to time is applied after the offset (if + specified), and rounds the time down to the start of the specified time unit. For example, `@wk` is the start + of the current week (Sunday is considered to be the first day of the week). + +The special relative timestamp string `now`, corresponding to the current timestamp, is also supported. The current +timestamp is determined once at the start of query execution, and is used for all relative timestamp calculations for +that query. + +The relative timestamp string is case-insensitive. + +The following values are supported for `offset_time_unit`: + +| Time Unit | Supported Keywords | +|-----------|-------------------------------------------| +| Seconds | `s`, `sec`, `secs`, `second`, `seconds` | +| Minutes | `m`, `min`, `mins`, `minute`, `minutes` | +| Hours | `h`, `hr`, `hrs`, `hour`, `hours` | +| Days | `d`, `day`, `days` | +| Weeks | `w`, `wk`, `wks`, `week`, `weeks` | +| Quarters | `q`, `qtr`, `qtrs`, `quarter`, `quarters` | +| Years | `y`, `yr`, `yrs`, `year`, `years` | + +All the time units above are supported for `snap_time_unit`, as well as the following day-of-the-week time units: + +| Time Unit | Supported Keywords | +|-----------|--------------------| +| Sunday | `w0`, `w7` | +| Monday | `w1` | +| Tuesday | `w2` | +| Wednesday | `w3` | +| Thursday | `w4` | +| Friday | `w5` | +| Saturday | `w6` | + +For example, if the current timestamp is Monday, January 03, 2000 at 01:01:01 am: + +| Relative String | Description | Resulting Relative Time | +|-----------------|--------------------------------------------------------------|---------------------------------------------| +| `-60m` | Sixty minutes ago | Monday, January 03, 2000 at 00:01:01 am | +| `-1H` | One hour ago | Monday, January 03, 2000 at 00:01:01 am | +| `+2wk` | Two weeks from now | Monday, January 17, 2000 at 00:01:01 am | +| `-1h@W3` | One hour ago, rounded to the start of the previous Wednesday | Wednesday, December 29, 1999 at 00:00:00 am | +| `@d` | Start of the current day | Monday, January 03, 2000 at 00:00:00 am | +| `now` | Now | Monday, January 03, 2000 at 01:01:01 am | + +Argument type: STRING + +Return type: TIMESTAMP + +Example: + + os> source=people | eval seconds_diff = timestampdiff(SECOND, now(), relative_timestamp("now")) | fields seconds_diff | head 1 + fetched rows / total rows = 1/1 + +--------------+ + | seconds_diff | + |--------------+ + | 0 | + +--------------+ + + os> source=people | eval hours_diff = timestampdiff(HOUR, now(), relative_timestamp("+1h")) | fields hours_diff | head 1 + fetched rows / total rows = 1/1 + +------------+ + | hours_diff | + |------------+ + | 1 | + +------------+ + + os> source=people | eval day = day_of_week(relative_timestamp("@w0")) | fields day | head 1 + fetched rows / total rows = 1/1 + +-----+ + | day | + |-----| + | 1 | + +-----+ ### `SECOND` diff --git a/docs/ppl-lang/ppl-where-command.md b/docs/ppl-lang/ppl-where-command.md index ec676ab62..d6c68cdaf 100644 --- a/docs/ppl-lang/ppl-where-command.md +++ b/docs/ppl-lang/ppl-where-command.md @@ -61,3 +61,4 @@ PPL query: | eval factor = case(a > 15, a - 14, isnull(b), a - 7, a < 3, a + 1 else 1) | where case(factor = 2, 'even', factor = 4, 'even', factor = 6, 'even', factor = 8, 'even' else 'odd') = 'even' | stats count() by factor` +- `source = table | where timestamp >= relative_timestamp("-1d@w6")` diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBuiltInDateTimeFunctionITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBuiltInDateTimeFunctionITSuite.scala index 8001a690d..d9d5099cd 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBuiltInDateTimeFunctionITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBuiltInDateTimeFunctionITSuite.scala @@ -368,6 +368,70 @@ class FlintSparkPPLBuiltInDateTimeFunctionITSuite assertSameRows(Seq(Row(3)), frame) } + test("test RELATIVE_TIMESTAMP") { + var frame = sql(s""" + | source=$testTable + | | eval seconds_diff = timestampdiff(SECOND, now(), relative_timestamp("now")) + | | fields seconds_diff + | | head 1 + | """.stripMargin) + assertSameRows(Seq(Row(0)), frame) + + frame = sql(s""" + | source=$testTable + | | eval hours_diff = timestampdiff(HOUR, relative_timestamp("+1h"), relative_timestamp("+1d")) + | | fields hours_diff + | | head 1 + | """.stripMargin) + assertSameRows(Seq(Row(23)), frame) + + frame = sql(s""" + | source =$testTable + | | eval day = day_of_week(relative_timestamp("@w0")) + | | fields day + | | head 1 + | """.stripMargin) + assertSameRows(Seq(Row(1)), frame) + + frame = sql(s""" + | source=$testTable + | | eval last_wednesday = relative_timestamp("-1d@w3") + | | eval actual_days_ago = timestampdiff(DAY, last_wednesday, now()) + | | eval day_of_week = day_of_week(now()) + | | eval expected_days_ago = case(day_of_week > 4, day_of_week - 4 else day_of_week + 3) + | | eval test_result = (expected_days_ago = actual_days_ago) + | | fields test_result + | | head 1 + | """.stripMargin) + assertSameRows(Seq(Row(true)), frame) + } + + // TODO #957: Support earliest + ignore("test EARLIEST") { + var frame = sql(s""" + | source=$testTable + | | eval earliest_hour_before = earliest(now(), "-1h") + | | eval earliest_now = earliest(now(), "now") + | | eval earliest_hour_after = earliest(now(), "+1h") + | | fields earliest_hour_before, earliest_now, earliest_hour_after + | | head 1 + | """.stripMargin) + assertSameRows(Seq(Row(true), Row(true), Row(false)), frame) + } + + // TODO #957: Support latest + ignore("test LATEST") { + var frame = sql(s""" + | source=$testTable + | | eval latest_hour_before = latest(now(), "-1h") + | | eval latest_now = latest(now(), "now") + | | eval latest_hour_after = latest(now(), "+1h") + | | fields latest_hour_before, latest_now, latest_hour_after + | | head 1 + | """.stripMargin) + assertSameRows(Seq(Row(false), Row(true), Row(true)), frame) + } + test("test CURRENT_TIME is not supported") { val ex = intercept[UnsupportedOperationException](sql(s""" | source = $testTable diff --git a/ppl-spark-integration/README.md b/ppl-spark-integration/README.md index 73c526868..8265f19d8 100644 --- a/ppl-spark-integration/README.md +++ b/ppl-spark-integration/README.md @@ -6,7 +6,7 @@ translation between PPL's logical plan to Spark's Catalyst logical plan. ### Context The next concepts are the main purpose of introduction this functionality: - Transforming PPL to become OpenSearch default query language (specifically for logs/traces/metrics signals) -- Promoting PPL as a viable candidate for the proposed CNCF Observability universal query language. +- Promoting PPL as a viable candidate for the proposed CNCF Observability universal query language. - Seamlessly Interact with different datasources such as S3 / Prometheus / data-lake leveraging spark execution. - Using spark's federative capabilities as a general purpose query engine to facilitate complex queries including joins - Improve and promote PPL to become extensible and general purpose query language to be adopted by the community @@ -37,7 +37,7 @@ In Apache Spark, the DataFrame API serves as a programmatic interface for data m For instance, if you have a PPL query and a translator, you can convert it into DataFrame operations to generate an optimized execution plan. Spark's underlying Catalyst optimizer will convert these DataFrame transformations and actions into an optimized physical plan executed over RDDs or Datasets. -The following section describes the two main options for translating the PPL query (using the logical plan) into the spark corespondent component (either dataframe API or spark logical plan) +The following section describes the two main options for translating the PPL query (using the logical plan) into the spark correspondent component (either dataframe API or spark logical plan) ### Translation Process diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 index e2c1c7b59..d80f988d3 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 @@ -338,6 +338,7 @@ MONTHNAME: 'MONTHNAME'; NOW: 'NOW'; PERIOD_ADD: 'PERIOD_ADD'; PERIOD_DIFF: 'PERIOD_DIFF'; +RELATIVE_TIMESTAMP: 'RELATIVE_TIMESTAMP'; SEC_TO_TIME: 'SEC_TO_TIME'; STR_TO_DATE: 'STR_TO_DATE'; SUBDATE: 'SUBDATE'; diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 index e8d40f803..10ffe1977 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 @@ -755,6 +755,7 @@ dateTimeFunctionName | NOW | PERIOD_ADD | PERIOD_DIFF + | RELATIVE_TIMESTAMP | QUARTER | SECOND | SECOND_OF_MINUTE diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/DataType.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/DataType.java index 6f0de02f5..59ba4e289 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/DataType.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/DataType.java @@ -9,7 +9,7 @@ import lombok.RequiredArgsConstructor; import org.opensearch.sql.data.type.ExprCoreType; -/** The DataType defintion in AST. Question, could we use {@link ExprCoreType} directly in AST? */ +/** The DataType definition in AST. Question, could we use {@link ExprCoreType} directly in AST? */ @RequiredArgsConstructor public enum DataType { diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java index 86970cefb..411a9c5ea 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java @@ -133,6 +133,9 @@ public enum BuiltinFunctionName { LOCALTIMESTAMP(FunctionName.of("localtimestamp")), SYSDATE(FunctionName.of("sysdate")), + // Relative timestamp functions + RELATIVE_TIMESTAMP(FunctionName.of("relative_timestamp")), + /** Text Functions. */ TOSTRING(FunctionName.of("tostring")), diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java index e931175ff..c559fb7ad 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java @@ -13,16 +13,23 @@ import org.apache.spark.sql.types.DataTypes; import scala.Function1; import scala.Function2; +import scala.Function3; import scala.Option; import scala.Serializable; import scala.runtime.AbstractFunction1; import scala.runtime.AbstractFunction2; +import scala.runtime.AbstractFunction3; import scala.collection.JavaConverters; import scala.collection.mutable.WrappedArray; +import java.lang.Boolean; import java.math.BigInteger; import java.net.InetAddress; import java.net.UnknownHostException; +import java.sql.Timestamp; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; import java.util.Collection; import java.util.List; import java.util.Map; @@ -35,11 +42,18 @@ public interface SerializableUdf { + abstract class SerializableAbstractFunction1 extends AbstractFunction1 + implements Serializable { + } abstract class SerializableAbstractFunction2 extends AbstractFunction2 implements Serializable { } + abstract class SerializableAbstractFunction3 extends AbstractFunction3 + implements Serializable { + } + /** * Remove specified keys from a JSON string. * @@ -109,7 +123,7 @@ public String apply(String jsonStr, WrappedArray elements) { } } }; - + Function2 cidrFunction = new SerializableAbstractFunction2<>() { IPAddressStringParameters valOptions = new IPAddressStringParameters.Builder() @@ -197,14 +211,40 @@ public BigInteger apply(String ipAddress) { }; } - abstract class SerializableAbstractFunction1 extends AbstractFunction1 - implements Serializable { - } + /** + * Returns the {@link Instant} corresponding to the given relative string, current timestamp, and current time zone ID. + * Throws {@link RuntimeException} if the relative string is not supported. + */ + Function3 relativeTimestampFunction = new SerializableAbstractFunction3() { + + @Override + public Instant apply(String relativeString, Object currentTimestamp, String zoneIdString) { + + /// If `spark.sql.datetime.java8API.enabled` is set to `true`, [org.apache.spark.sql.types.TimestampType] + /// is converted to [Instant] by Catalyst; otherwise, [Timestamp] is used instead. + Instant currentInstant = + currentTimestamp instanceof Timestamp + ? ((Timestamp) currentTimestamp).toInstant() + : (Instant) currentTimestamp; + + /// The Spark session time zone (`spark.sql.session.timeZone`) + /// is used, which may be different from the system time zone. + ZoneId zoneId = ZoneId.of(zoneIdString); + + /// Relative time calculations are performed using [ZonedDateTime] because offsets (e.g. one hour ago) + /// need to account for changes in the time zone offset (e.g. daylight savings time), while snaps (e.g. + /// start of previous Wednesday) need to account for the local date time. + ZonedDateTime currentDateTime = ZonedDateTime.ofInstant(currentInstant, zoneId); + ZonedDateTime relativeDateTime = TimeUtils.getRelativeZonedDateTime(relativeString, currentDateTime); + + return relativeDateTime.toInstant(); + } + }; /** * Get the function reference according to its name * - * @param funcName string representing function to retrieve. + * @param funcName string representing function to retrieve. * @return relevant ScalaUDF for given function name. */ static ScalaUDF visit(String funcName, List expressions) { @@ -247,13 +287,22 @@ static ScalaUDF visit(String funcName, List expressions) { true); case "ip_to_int": return new ScalaUDF(geoIpUtils.ipToInt, - DataTypes.createDecimalType(38,0), + DataTypes.createDecimalType(38, 0), seq(expressions), seq(), Option.empty(), Option.apply("ip_to_int"), false, true); + case "relative_timestamp": + return new ScalaUDF(relativeTimestampFunction, + DataTypes.TimestampType, + seq(expressions), + seq(), + Option.empty(), + Option.apply("relative_timestamp"), + false, + true); default: return null; } diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/TimeUtils.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/TimeUtils.java new file mode 100644 index 000000000..fff3ee78c --- /dev/null +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/TimeUtils.java @@ -0,0 +1,197 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.expression.function; + +import com.google.common.collect.ImmutableMap; +import lombok.experimental.UtilityClass; + +import java.time.DayOfWeek; +import java.time.Duration; +import java.time.Month; +import java.time.Period; +import java.time.ZonedDateTime; +import java.time.temporal.ChronoUnit; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +@UtilityClass +public class TimeUtils { + + private static final String NOW = "now"; + private static final String NEGATIVE_SIGN = "-"; + + // Pattern for relative string. + private static final String OFFSET_PATTERN_STRING = "(?[+-])(?\\d+)?(?\\w+)"; + private static final String SNAP_PATTERN_STRING = "[@](?\\w+)"; + + private static final Pattern RELATIVE_PATTERN = Pattern.compile(String.format( + "(?%s)?(?%s)?", OFFSET_PATTERN_STRING, SNAP_PATTERN_STRING), + Pattern.CASE_INSENSITIVE); + + // Supported time units. + private static final Set SECOND_UNITS_SET = Set.of("s", "sec", "secs", "second", "seconds"); + private static final Set MINUTE_UNITS_SET = Set.of("m", "min", "mins", "minute", "minutes"); + private static final Set HOUR_UNITS_SET = Set.of("h", "hr", "hrs", "hour", "hours"); + private static final Set DAY_UNITS_SET = Set.of("d", "day", "days"); + private static final Set WEEK_UNITS_SET = Set.of("w", "wk", "wks", "week", "weeks"); + private static final Set MONTH_UNITS_SET = Set.of("mon", "month", "months"); + private static final Set QUARTER_UNITS_SET = Set.of("q", "qtr", "qtrs", "quarter", "quarters"); + private static final Set YEAR_UNITS_SET = Set.of("y", "yr", "yrs", "year", "years"); + + // Map from time unit to the corresponding duration. + private static final Map DURATION_FOR_TIME_UNIT_MAP; + + static { + Map durationMap = new HashMap<>(); + SECOND_UNITS_SET.forEach(u -> durationMap.put(u, Duration.ofSeconds(1))); + MINUTE_UNITS_SET.forEach(u -> durationMap.put(u, Duration.ofMinutes(1))); + HOUR_UNITS_SET.forEach(u -> durationMap.put(u, Duration.ofHours(1))); + DURATION_FOR_TIME_UNIT_MAP = ImmutableMap.copyOf(durationMap); + } + + // Map from time unit to the corresponding period. + private static final Map PERIOD_FOR_TIME_UNIT_MAP; + + static { + Map periodMap = new HashMap<>(); + DAY_UNITS_SET.forEach(u -> periodMap.put(u, Period.ofDays(1))); + WEEK_UNITS_SET.forEach(u -> periodMap.put(u, Period.ofWeeks(1))); + MONTH_UNITS_SET.forEach(u -> periodMap.put(u, Period.ofMonths(1))); + QUARTER_UNITS_SET.forEach(u -> periodMap.put(u, Period.ofMonths(3))); + YEAR_UNITS_SET.forEach(u -> periodMap.put(u, Period.ofYears(1))); + PERIOD_FOR_TIME_UNIT_MAP = ImmutableMap.copyOf(periodMap); + } + + // Map from snap unit to the corresponding day of the week. + private static final Map DAY_OF_THE_WEEK_FOR_SNAP_UNIT_MAP = Map.ofEntries( + Map.entry("w0", DayOfWeek.SUNDAY), + Map.entry("w7", DayOfWeek.SUNDAY), + Map.entry("w1", DayOfWeek.MONDAY), + Map.entry("w2", DayOfWeek.TUESDAY), + Map.entry("w3", DayOfWeek.WEDNESDAY), + Map.entry("w4", DayOfWeek.THURSDAY), + Map.entry("w5", DayOfWeek.FRIDAY), + Map.entry("w6", DayOfWeek.SATURDAY)); + + /** + * Returns the relative {@link ZonedDateTime} corresponding to the given relative string and zoned date time. + * @see RELATIVE_TIMESTAMP for more details. + */ + public static ZonedDateTime getRelativeZonedDateTime(String relativeString, ZonedDateTime zonedDateTime) { + + ZonedDateTime relativeZonedDateTime = zonedDateTime; + + if (relativeString.equalsIgnoreCase(NOW)) { + return zonedDateTime; + } + + Matcher matcher = RELATIVE_PATTERN.matcher(relativeString); + if (!matcher.matches()) { + String message = String.format("The relative date time '%s' is not supported.", relativeString); + throw new IllegalArgumentException(message); + } + + + if (matcher.group("offset") != null) { + relativeZonedDateTime = applyOffset( + relativeZonedDateTime, + matcher.group("offsetSign"), + matcher.group("offsetValue"), + matcher.group("offsetUnit")); + } + + if (matcher.group("snap") != null) { + relativeZonedDateTime = applySnap( + relativeZonedDateTime, + matcher.group("snapUnit")); + } + + return relativeZonedDateTime; + } + + /** + * Applies the offset specified by the offset sign, value, + * and unit to the given zoned date time, and returns the result. + */ + private ZonedDateTime applyOffset(ZonedDateTime zonedDateTime, String offsetSign, String offsetValue, String offsetUnit) { + + int offsetValueInt = Optional.ofNullable(offsetValue).map(Integer::parseInt).orElse(1); + if (offsetSign.equals(NEGATIVE_SIGN)) { + offsetValueInt *= -1; + } + + /* {@link Duration} and {@link Period} must be handled separately because, even + though they both inherit from {@link java.time.temporal.TemporalAmount}, they + define separate 'multipliedBy' methods. */ + + // Convert to lower case to make case-insensitive. + String offsetUnitLowerCase = offsetUnit.toLowerCase(); + + if (DURATION_FOR_TIME_UNIT_MAP.containsKey(offsetUnitLowerCase)) { + Duration offsetDuration = DURATION_FOR_TIME_UNIT_MAP.get(offsetUnitLowerCase).multipliedBy(offsetValueInt); + return zonedDateTime.plus(offsetDuration); + } + + if (PERIOD_FOR_TIME_UNIT_MAP.containsKey(offsetUnitLowerCase)) { + Period offsetPeriod = PERIOD_FOR_TIME_UNIT_MAP.get(offsetUnitLowerCase).multipliedBy(offsetValueInt); + return zonedDateTime.plus(offsetPeriod); + } + + String message = String.format("The relative date time unit '%s' is not supported.", offsetUnit); + throw new IllegalArgumentException(message); + } + + /** + * Snaps the given zoned date time to the start of the previous time + * period specified by the given snap unit, and returns the result. + */ + private ZonedDateTime applySnap(ZonedDateTime zonedDateTime, String snapUnit) { + + // Convert to lower case to make case-insensitive. + String snapUnitLowerCase = snapUnit.toLowerCase(); + + if (SECOND_UNITS_SET.contains(snapUnitLowerCase)) { + return zonedDateTime.truncatedTo(ChronoUnit.SECONDS); + } else if (MINUTE_UNITS_SET.contains(snapUnitLowerCase)) { + return zonedDateTime.truncatedTo(ChronoUnit.MINUTES); + } else if (HOUR_UNITS_SET.contains(snapUnitLowerCase)) { + return zonedDateTime.truncatedTo(ChronoUnit.HOURS); + } else if (DAY_UNITS_SET.contains(snapUnitLowerCase)) { + return zonedDateTime.truncatedTo(ChronoUnit.DAYS); + } else if (WEEK_UNITS_SET.contains(snapUnitLowerCase)) { + return applySnapToDayOfWeek(zonedDateTime, DayOfWeek.SUNDAY); + } else if (MONTH_UNITS_SET.contains(snapUnitLowerCase)) { + return zonedDateTime.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1); + } else if (QUARTER_UNITS_SET.contains(snapUnitLowerCase)) { + Month snapMonth = zonedDateTime.getMonth().firstMonthOfQuarter(); + return zonedDateTime.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1).withMonth(snapMonth.getValue()); + } else if (YEAR_UNITS_SET.contains(snapUnitLowerCase)) { + return zonedDateTime.truncatedTo(ChronoUnit.DAYS).withDayOfYear(1); + } else if (DAY_OF_THE_WEEK_FOR_SNAP_UNIT_MAP.containsKey(snapUnitLowerCase)) { + return applySnapToDayOfWeek(zonedDateTime, DAY_OF_THE_WEEK_FOR_SNAP_UNIT_MAP.get(snapUnitLowerCase)); + } + + String message = String.format("The relative date time unit '%s' is not supported.", snapUnit); + throw new IllegalArgumentException(message); + } + + /** + * Snaps the given date time to the start of the previous + * specified day of the week, and returns the result. + */ + private ZonedDateTime applySnapToDayOfWeek(ZonedDateTime zonedDateTime, DayOfWeek snapDayOfWeek) { + ZonedDateTime snappedDateTime = zonedDateTime.truncatedTo(ChronoUnit.DAYS); + + int daysToSnap = zonedDateTime.getDayOfWeek().getValue() - snapDayOfWeek.getValue(); + if (daysToSnap < 0) daysToSnap += DayOfWeek.values().length; + + return snappedDateTime.minusDays(daysToSnap); + } +} diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/BuiltinFunctionTransformer.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/BuiltinFunctionTransformer.java index f73a1c491..01987757f 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/BuiltinFunctionTransformer.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/BuiltinFunctionTransformer.java @@ -21,7 +21,6 @@ import org.opensearch.sql.ast.expression.IntervalUnit; import org.opensearch.sql.expression.function.BuiltinFunctionName; import org.opensearch.sql.expression.function.SerializableUdf; -import org.opensearch.sql.ppl.CatalystPlanContext; import scala.Option; import java.util.Arrays; @@ -58,6 +57,7 @@ import static org.opensearch.sql.expression.function.BuiltinFunctionName.LOCALTIME; import static org.opensearch.sql.expression.function.BuiltinFunctionName.MINUTE_OF_HOUR; import static org.opensearch.sql.expression.function.BuiltinFunctionName.MONTH_OF_YEAR; +import static org.opensearch.sql.expression.function.BuiltinFunctionName.RELATIVE_TIMESTAMP; import static org.opensearch.sql.expression.function.BuiltinFunctionName.SECOND_OF_MINUTE; import static org.opensearch.sql.expression.function.BuiltinFunctionName.SUBDATE; import static org.opensearch.sql.expression.function.BuiltinFunctionName.SYSDATE; @@ -174,6 +174,9 @@ public interface BuiltinFunctionTransformer { args -> { return ToUTCTimestamp$.MODULE$.apply(CurrentTimestamp$.MODULE$.apply(), CurrentTimeZone$.MODULE$.apply()); }) + .put( + RELATIVE_TIMESTAMP, + args -> SerializableUdf.visit("relative_timestamp", List.of(args.get(0), CurrentTimestamp$.MODULE$.apply(), CurrentTimeZone$.MODULE$.apply()))) .build(); static Expression builtinFunction(org.opensearch.sql.ast.expression.Function function, List args) { @@ -182,7 +185,7 @@ static Expression builtinFunction(org.opensearch.sql.ast.expression.Function fun if(udf == null) { throw new UnsupportedOperationException(function.getFuncName() + " is not a builtin function of PPL"); } - return udf; + return udf; } else { BuiltinFunctionName builtin = BuiltinFunctionName.of(function.getFuncName()).get(); String name = SPARK_BUILTIN_FUNCTION_NAME_MAPPING.get(builtin); diff --git a/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableTimeUdfTest.java b/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableTimeUdfTest.java new file mode 100644 index 000000000..3611a080b --- /dev/null +++ b/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableTimeUdfTest.java @@ -0,0 +1,74 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.expression.function; + +import org.junit.Test; + +import java.sql.Timestamp; +import java.time.Instant; +import java.time.ZoneId; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +public class SerializableTimeUdfTest { + + // Monday, Jan 03, 2000 @ 01:01:01.100 UTC + private final ZoneId MOCK_ZONE_ID = ZoneId.of("UTC"); + private final Instant MOCK_INSTANT = Instant.parse("2000-01-03T01:01:01.100Z"); + private final Timestamp MOCK_TIMESTAMP = Timestamp.from(MOCK_INSTANT); + + @Test + public void relativeTimestampTest() { + + /// These are only basic tests of the relative time functionality. + /// For more comprehensive tests, see [TimeUtilsTest]. + + testValidInstant("-60m", "2000-01-03T00:01:01.100Z"); + testValidInstant("-H", "2000-01-03T00:01:01.100Z"); + testValidInstant("+2wk", "2000-01-17T01:01:01.100Z"); + testValidInstant("-1h@W3", "1999-12-29T00:00:00Z"); + testValidInstant("@d", "2000-01-03T00:00:00Z"); + testValidInstant("now", "2000-01-03T01:01:01.100Z"); + + testValidTimestamp("-60m", "2000-01-03T00:01:01.100Z"); + testValidTimestamp("-H", "2000-01-03T00:01:01.100Z"); + testValidTimestamp("+2wk", "2000-01-17T01:01:01.100Z"); + testValidTimestamp("-1h@W3", "1999-12-29T00:00:00Z"); + testValidTimestamp("@d", "2000-01-03T00:00:00Z"); + testValidTimestamp("now", "2000-01-03T01:01:01.100Z"); + + testInvalidString("invalid", "The relative date time 'invalid' is not supported."); + testInvalidString("INVALID", "The relative date time 'INVALID' is not supported."); + testInvalidString("~h", "The relative date time '~h' is not supported."); + testInvalidString("+1.1h", "The relative date time '+1.1h' is not supported."); + testInvalidString("+ms", "The relative date time unit 'ms' is not supported."); + testInvalidString("+1INVALID", "The relative date time unit 'INVALID' is not supported."); + testInvalidString("@INVALID", "The relative date time unit 'INVALID' is not supported."); + testInvalidString("@ms", "The relative date time unit 'ms' is not supported."); + testInvalidString("@w8", "The relative date time unit 'w8' is not supported."); + } + + private void testValidInstant(String relativeString, String expectedInstantString) { + String testMessage = String.format("\"%s\"", relativeString); + Instant expectedInstant = Instant.parse(expectedInstantString); + Instant actualTimestamp = SerializableUdf.relativeTimestampFunction.apply(relativeString, MOCK_INSTANT, MOCK_ZONE_ID.toString()); + assertEquals(testMessage, expectedInstant, actualTimestamp); + } + + private void testValidTimestamp(String relativeString, String expectedInstantString) { + String testMessage = String.format("\"%s\"", relativeString); + Instant expectedInstant = Instant.parse(expectedInstantString); + Instant actualTimestamp = SerializableUdf.relativeTimestampFunction.apply(relativeString, MOCK_TIMESTAMP, MOCK_ZONE_ID.toString()); + assertEquals(testMessage, expectedInstant, actualTimestamp); + } + + private void testInvalidString(String relativeString, String expectedExceptionMessage) { + String testMessage = String.format("\"%s\"", relativeString); + String actualExceptionMessage = assertThrows(testMessage, RuntimeException.class, () -> SerializableUdf.relativeTimestampFunction.apply(relativeString, MOCK_INSTANT, MOCK_ZONE_ID.toString())).getMessage(); + assertEquals(expectedExceptionMessage, actualExceptionMessage); + } +} diff --git a/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/TimeUtilsTest.java b/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/TimeUtilsTest.java new file mode 100644 index 000000000..dd75e8a6d --- /dev/null +++ b/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/TimeUtilsTest.java @@ -0,0 +1,187 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.expression.function; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +import java.time.ZonedDateTime; + +import org.junit.Test; + +public class TimeUtilsTest { + + // Monday, Jan 03, 2000 @ 01:01:01.100 UTC + private final ZonedDateTime MOCK_ZONED_DATE_TIME = ZonedDateTime.parse("2000-01-03T01:01:01.100Z"); + + @Test + public void testRelative() { + testValid("-60m", "2000-01-03T00:01:01.100Z"); + testValid("-H", "2000-01-03T00:01:01.100Z"); + testValid("+2wk", "2000-01-17T01:01:01.100Z"); + testValid("-1h@W3", "1999-12-29T00:00:00Z"); + testValid("@d", "2000-01-03T00:00Z"); + testValid("now", "2000-01-03T01:01:01.100Z"); + + testInvalid("invalid", "The relative date time 'invalid' is not supported."); + } + + @Test + public void testRelativeCaseInsensitive() { + testValid("NOW", "2000-01-03T01:01:01.100Z"); + testValid("-60M", "2000-01-03T00:01:01.100Z"); + testValid("-H", "2000-01-03T00:01:01.100Z"); + testValid("+2WK", "2000-01-17T01:01:01.100Z"); + testValid("-1H@H", "2000-01-03T00:00Z"); + testValid("@D", "2000-01-03T00:00Z"); + + testInvalid("INVALID", "The relative date time 'INVALID' is not supported."); + } + + @Test + public void testRelativeOffsetSign() { + testValid("+1h", "2000-01-03T02:01:01.100Z"); + testValid("-h", "2000-01-03T00:01:01.100Z"); + + testInvalid("~h", "The relative date time '~h' is not supported."); + } + + @Test + public void testRelativeOffsetValue() { + testValid("+h", "2000-01-03T02:01:01.100Z"); + testValid("+0h", "2000-01-03T01:01:01.100Z"); + testValid("+12h", "2000-01-03T13:01:01.100Z"); + testValid("-3d", "1999-12-31T01:01:01.100Z"); + + testInvalid("+1.1h", "The relative date time '+1.1h' is not supported."); + } + + @Test + public void testRelativeOffsetUnit() { + testValid("+s", "2000-01-03T01:01:02.1Z"); + testValid("+sec", "2000-01-03T01:01:02.1Z"); + testValid("+secs", "2000-01-03T01:01:02.1Z"); + testValid("+second", "2000-01-03T01:01:02.1Z"); + testValid("+seconds", "2000-01-03T01:01:02.1Z"); + + testValid("+m", "2000-01-03T01:02:01.100Z"); + testValid("+min", "2000-01-03T01:02:01.100Z"); + testValid("+mins", "2000-01-03T01:02:01.100Z"); + testValid("+minute", "2000-01-03T01:02:01.100Z"); + testValid("+minutes", "2000-01-03T01:02:01.100Z"); + + testValid("+h", "2000-01-03T02:01:01.100Z"); + testValid("+hr", "2000-01-03T02:01:01.100Z"); + testValid("+hrs", "2000-01-03T02:01:01.100Z"); + testValid("+hour", "2000-01-03T02:01:01.100Z"); + testValid("+hours", "2000-01-03T02:01:01.100Z"); + + testValid("+d", "2000-01-04T01:01:01.100Z"); + testValid("+day", "2000-01-04T01:01:01.100Z"); + testValid("+days", "2000-01-04T01:01:01.100Z"); + + testValid("+w", "2000-01-10T01:01:01.100Z"); + testValid("+wk", "2000-01-10T01:01:01.100Z"); + testValid("+wks", "2000-01-10T01:01:01.100Z"); + testValid("+week", "2000-01-10T01:01:01.100Z"); + testValid("+weeks", "2000-01-10T01:01:01.100Z"); + + testValid("+mon", "2000-02-03T01:01:01.100Z"); + testValid("+month", "2000-02-03T01:01:01.100Z"); + testValid("+months", "2000-02-03T01:01:01.100Z"); + + testValid("+q", "2000-04-03T01:01:01.100Z"); + testValid("+qtr", "2000-04-03T01:01:01.100Z"); + testValid("+qtrs", "2000-04-03T01:01:01.100Z"); + testValid("+quarter", "2000-04-03T01:01:01.100Z"); + testValid("+quarters", "2000-04-03T01:01:01.100Z"); + + testValid("+y", "2001-01-03T01:01:01.100Z"); + testValid("+yr", "2001-01-03T01:01:01.100Z"); + testValid("+yrs", "2001-01-03T01:01:01.100Z"); + testValid("+year", "2001-01-03T01:01:01.100Z"); + testValid("+years", "2001-01-03T01:01:01.100Z"); + + testInvalid("+ms", "The relative date time unit 'ms' is not supported."); + testInvalid("+1INVALID", "The relative date time unit 'INVALID' is not supported."); + testInvalid("+now", "The relative date time unit 'now' is not supported."); + } + + @Test + public void testRelativeSnap() { + testValid("@s", "2000-01-03T01:01:01Z"); + testValid("@sec", "2000-01-03T01:01:01Z"); + testValid("@secs", "2000-01-03T01:01:01Z"); + testValid("@second", "2000-01-03T01:01:01Z"); + testValid("@seconds", "2000-01-03T01:01:01Z"); + + testValid("@m", "2000-01-03T01:01Z"); + testValid("@min", "2000-01-03T01:01Z"); + testValid("@mins", "2000-01-03T01:01Z"); + testValid("@minute", "2000-01-03T01:01Z"); + testValid("@minutes", "2000-01-03T01:01Z"); + + testValid("@h", "2000-01-03T01:00Z"); + testValid("@hr", "2000-01-03T01:00Z"); + testValid("@hrs", "2000-01-03T01:00Z"); + testValid("@hour", "2000-01-03T01:00Z"); + testValid("@hours", "2000-01-03T01:00Z"); + + testValid("@d", "2000-01-03T00:00Z"); + testValid("@day", "2000-01-03T00:00Z"); + testValid("@days", "2000-01-03T00:00Z"); + + testValid("@w", "2000-01-02T00:00Z"); + testValid("@wk", "2000-01-02T00:00Z"); + testValid("@wks", "2000-01-02T00:00Z"); + testValid("@week", "2000-01-02T00:00Z"); + testValid("@weeks", "2000-01-02T00:00Z"); + + testValid("@mon", "2000-01-01T00:00Z"); + testValid("@month", "2000-01-01T00:00Z"); + testValid("@months", "2000-01-01T00:00Z"); + + testValid("@q", "2000-01-01T00:00Z"); + testValid("@qtr", "2000-01-01T00:00Z"); + testValid("@qtrs", "2000-01-01T00:00Z"); + testValid("@quarter", "2000-01-01T00:00Z"); + testValid("@quarters", "2000-01-01T00:00Z"); + + testValid("@y", "2000-01-01T00:00Z"); + testValid("@yr", "2000-01-01T00:00Z"); + testValid("@yrs", "2000-01-01T00:00Z"); + testValid("@year", "2000-01-01T00:00Z"); + testValid("@years", "2000-01-01T00:00Z"); + + testValid("@w0", "2000-01-02T00:00Z"); + testValid("@w1", "2000-01-03T00:00Z"); + testValid("@w2", "1999-12-28T00:00Z"); + testValid("@w3", "1999-12-29T00:00Z"); + testValid("@w4", "1999-12-30T00:00Z"); + testValid("@w5", "1999-12-31T00:00Z"); + testValid("@w6", "2000-01-01T00:00Z"); + testValid("@w7", "2000-01-02T00:00Z"); + + testInvalid("@INVALID", "The relative date time unit 'INVALID' is not supported."); + testInvalid("@ms", "The relative date time unit 'ms' is not supported."); + testInvalid("@w8", "The relative date time unit 'w8' is not supported."); + testInvalid("@now", "The relative date time unit 'now' is not supported."); + } + + private void testValid(String relativeDateTimeString, String expectedDateTimeString) { + String testMessage = String.format("\"%s\"", relativeDateTimeString); + ZonedDateTime expectedDateTime = ZonedDateTime.parse(expectedDateTimeString); + ZonedDateTime actualDateTime = TimeUtils.getRelativeZonedDateTime(relativeDateTimeString, MOCK_ZONED_DATE_TIME); + assertEquals(testMessage, expectedDateTime, actualDateTime); + } + + private void testInvalid(String relativeDateTimeString, String expectedExceptionMessage) { + String testMessage = String.format("\"%s\"", relativeDateTimeString); + String actualExceptionMessage = assertThrows(testMessage, RuntimeException.class, + () -> TimeUtils.getRelativeZonedDateTime(relativeDateTimeString, MOCK_ZONED_DATE_TIME)).getMessage(); + assertEquals(expectedExceptionMessage, actualExceptionMessage); + } +}