Skip to content

Commit

Permalink
Add DATETIMECONVERTWINDOWHOP function (apache#11773)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexch2000 authored Nov 4, 2023
1 parent 5904df3 commit baea4a2
Show file tree
Hide file tree
Showing 13 changed files with 981 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ public enum TransformFunctionType {
OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY, SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER,
SqlTypeFamily.CHARACTER)), "date_time_convert"),

DATE_TIME_CONVERT_WINDOW_HOP("dateTimeConvertWindowHop", ReturnTypes.TO_ARRAY, OperandTypes.family(
ImmutableList.of(SqlTypeFamily.ANY, SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER,
SqlTypeFamily.CHARACTER)), "date_time_convert_window_hop"),

DATE_TRUNC("dateTrunc",
ReturnTypes.BIGINT_FORCE_NULLABLE,
OperandTypes.family(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class FunctionDefinitionRegistryTest {
// Functions without scalar function counterpart as of now
"arraylength", "arrayaverage", "arraymin", "arraymax", "arraysum", "clpdecode", "groovy", "inidset",
"jsonextractscalar", "jsonextractindex", "jsonextractkey", "lookup", "mapvalue", "timeconvert", "valuein",
"datetimeconvertwindowhop",
// functions not needed for register b/c they are in std sql table or they will not be composed directly.
"in", "not_in", "and", "or", "range", "extract", "is_true", "is_not_true", "is_false", "is_not_false"
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.operator.transform.function;

import java.util.List;
import java.util.Map;
import org.apache.pinot.core.operator.ColumnContext;
import org.apache.pinot.core.operator.blocks.ValueBlock;
import org.apache.pinot.core.operator.transform.TransformResultMetadata;
import org.apache.pinot.core.operator.transform.transformer.datetimehop.BaseDateTimeWindowHopTransformer;
import org.apache.pinot.core.operator.transform.transformer.datetimehop.DateTimeWindowHopTransformerFactory;
import org.apache.pinot.core.operator.transform.transformer.datetimehop.EpochToEpochWindowHopTransformer;
import org.apache.pinot.core.operator.transform.transformer.datetimehop.EpochToSDFHopWindowTransformer;
import org.apache.pinot.core.operator.transform.transformer.datetimehop.SDFToEpochWindowHopTransformer;
import org.apache.pinot.core.operator.transform.transformer.datetimehop.SDFToSDFWindowHopTransformer;
import org.roaringbitmap.RoaringBitmap;


/**
* The <code>DateTimeConversionHopTransformFunction</code> class implements the date time conversion
* with hop transform function.
* <ul>
* <li>
* This transform function should be invoked with arguments:
* <ul>
* <li>Column name to convert. E.g. Date</li>
* <li>Input format of the column. E.g. EPOCH|MILLISECONDS (See Pipe Format in DateTimeFormatSpec)</li>
* <li>Output format. E.g. EPOCH|MILLISECONDS/|10</li>
* <li>Output granularity. E.g. MINUTES|15</li>
* <li>Hop window size. E.g. HOURS</li>
* </ul>
* </li>
* <li>
* Outputs:
* <ul>
* <li>Time values converted to the desired format and bucketed to desired granularity with hop windows</li>
* <li>Below is an example for one hour window with 15min hop for 12:10</li>
* |-----------------| 11:15 - 12:15
* |-----------------| 11:30 - 12:30
* |-----------------| 11:45 - 12:45
* |-----------------| 12:00 - 13:00
* <li>The beginning of the windows returned</>
* <li>The end of the window can be fetched by adding window size</>
* </ul>
* </li>
* </ul>
*/
public class DateTimeConversionHopTransformFunction extends BaseTransformFunction {
public static final String FUNCTION_NAME = "dateTimeConvertWindowHop";

private TransformFunction _mainTransformFunction;
private TransformResultMetadata _resultMetadata;
private BaseDateTimeWindowHopTransformer<?, ?> _dateTimeTransformer;

@Override
public void init(List<TransformFunction> arguments, Map<String, ColumnContext> columnContextMap) {
super.init(arguments, columnContextMap);
// Check that there are exactly 4 arguments
if (arguments.size() != 5) {
throw new IllegalArgumentException("Exactly 5 arguments are required for DATE_TIME_CONVERT_HOP function");
}
TransformFunction firstArgument = arguments.get(0);
if (firstArgument instanceof LiteralTransformFunction || !firstArgument.getResultMetadata().isSingleValue()) {
throw new IllegalArgumentException(
"The first argument of DATE_TIME_CONVERT_HOP transform function must be a single-valued column or "
+ "a transform function");
}
_mainTransformFunction = firstArgument;

_dateTimeTransformer = DateTimeWindowHopTransformerFactory.getDateTimeTransformer(
((LiteralTransformFunction) arguments.get(1)).getStringLiteral(),
((LiteralTransformFunction) arguments.get(2)).getStringLiteral(),
((LiteralTransformFunction) arguments.get(3)).getStringLiteral(),
((LiteralTransformFunction) arguments.get(4)).getStringLiteral());
if (_dateTimeTransformer instanceof EpochToEpochWindowHopTransformer
|| _dateTimeTransformer instanceof SDFToEpochWindowHopTransformer) {
_resultMetadata = LONG_MV_NO_DICTIONARY_METADATA;
} else {
_resultMetadata = STRING_MV_NO_DICTIONARY_METADATA;
}
}

@Override
public String getName() {
return FUNCTION_NAME;
}

@Override
public TransformResultMetadata getResultMetadata() {
return _resultMetadata;
}

@Override
public long[][] transformToLongValuesMV(ValueBlock valueBlock) {
if (_resultMetadata != LONG_MV_NO_DICTIONARY_METADATA) {
return super.transformToLongValuesMV(valueBlock);
}

int length = valueBlock.getNumDocs();
initLongValuesMV(length);
if (_dateTimeTransformer instanceof EpochToEpochWindowHopTransformer) {
EpochToEpochWindowHopTransformer dateTimeTransformer = (EpochToEpochWindowHopTransformer) _dateTimeTransformer;
dateTimeTransformer.transform(_mainTransformFunction.transformToLongValuesSV(valueBlock), _longValuesMV, length);
} else if (_dateTimeTransformer instanceof SDFToEpochWindowHopTransformer) {
SDFToEpochWindowHopTransformer dateTimeTransformer = (SDFToEpochWindowHopTransformer) _dateTimeTransformer;
dateTimeTransformer.transform(_mainTransformFunction.transformToStringValuesSV(valueBlock), _longValuesMV,
length);
}
return _longValuesMV;
}

public String[][] transformToStringValuesMV(ValueBlock valueBlock) {
if (_resultMetadata != STRING_MV_NO_DICTIONARY_METADATA) {
return super.transformToStringValuesMV(valueBlock);
}

int length = valueBlock.getNumDocs();
initStringValuesMV(length);
if (_dateTimeTransformer instanceof EpochToSDFHopWindowTransformer) {
EpochToSDFHopWindowTransformer dateTimeTransformer = (EpochToSDFHopWindowTransformer) _dateTimeTransformer;
dateTimeTransformer.transform(_mainTransformFunction.transformToLongValuesSV(valueBlock), _stringValuesMV,
length);
} else if (_dateTimeTransformer instanceof SDFToSDFWindowHopTransformer) {
SDFToSDFWindowHopTransformer dateTimeTransformer = (SDFToSDFWindowHopTransformer) _dateTimeTransformer;
dateTimeTransformer.transform(_mainTransformFunction.transformToStringValuesSV(valueBlock), _stringValuesMV,
length);
}
return _stringValuesMV;
}

@Override
public RoaringBitmap getNullBitmap(ValueBlock valueBlock) {
return _mainTransformFunction.getNullBitmap(valueBlock);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ private static Map<String, Class<? extends TransformFunction>> createRegistry()
typeToImplementation.put(TransformFunctionType.JSON_EXTRACT_KEY, JsonExtractKeyTransformFunction.class);
typeToImplementation.put(TransformFunctionType.TIME_CONVERT, TimeConversionTransformFunction.class);
typeToImplementation.put(TransformFunctionType.DATE_TIME_CONVERT, DateTimeConversionTransformFunction.class);
typeToImplementation.put(TransformFunctionType.DATE_TIME_CONVERT_WINDOW_HOP,
DateTimeConversionHopTransformFunction.class);
typeToImplementation.put(TransformFunctionType.DATE_TRUNC, DateTruncTransformFunction.class);
typeToImplementation.put(TransformFunctionType.JSON_EXTRACT_INDEX, JsonExtractIndexTransformFunction.class);
typeToImplementation.put(TransformFunctionType.YEAR, DateTimeTransformFunction.Year.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pinot.core.operator.transform.transformer.datetimehop;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.core.operator.transform.transformer.DataTransformer;
import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.data.DateTimeFormatUnitSpec;
import org.apache.pinot.spi.data.DateTimeGranularitySpec;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormatter;


public abstract class BaseDateTimeWindowHopTransformer<I, O> implements DataTransformer<I, O> {
protected final long _hopWindowSizeMillis;
private final int _inputTimeSize;
private final TimeUnit _inputTimeUnit;
private final DateTimeFormatter _inputDateTimeFormatter;
private final int _outputTimeSize;
private final DateTimeFormatUnitSpec.DateTimeTransformUnit _outputTimeUnit;
private final DateTimeFormatter _outputDateTimeFormatter;
private final long _outputGranularityMillis;

public BaseDateTimeWindowHopTransformer(DateTimeFormatSpec inputFormat, DateTimeFormatSpec outputFormat,
DateTimeGranularitySpec outputGranularity, DateTimeGranularitySpec hopWindowSize) {
_inputTimeSize = inputFormat.getColumnSize();
_inputTimeUnit = inputFormat.getColumnUnit();
_inputDateTimeFormatter = inputFormat.getDateTimeFormatter();
_outputTimeSize = outputFormat.getColumnSize();
_outputTimeUnit = outputFormat.getColumnDateTimeTransformUnit();
_outputDateTimeFormatter = outputFormat.getDateTimeFormatter();
_outputGranularityMillis = outputGranularity.granularityToMillis();
_hopWindowSizeMillis = hopWindowSize.granularityToMillis();
}

protected long transformEpochToMillis(long epochTime) {
return _inputTimeUnit.toMillis(epochTime * _inputTimeSize);
}

protected long transformSDFToMillis(String sdfTime) {
return _inputDateTimeFormatter.parseMillis(sdfTime);
}

protected long transformMillisToEpoch(long millisSinceEpoch) {
return _outputTimeUnit.fromMillis(millisSinceEpoch) / _outputTimeSize;
}

protected String transformMillisToSDF(long millisSinceEpoch) {
return _outputDateTimeFormatter.print(new DateTime(millisSinceEpoch));
}

protected long transformToOutputGranularity(long millisSinceEpoch) {
return (millisSinceEpoch / _outputGranularityMillis) * _outputGranularityMillis;
}

protected List<Long> hopWindows(long millisSinceEpoch) {
List<Long> hops = new ArrayList<>();
long totalHopMillis = _hopWindowSizeMillis;
long granularityMillis = _outputGranularityMillis;

long adjustedMillis = (millisSinceEpoch / granularityMillis) * granularityMillis;

// Start from the adjusted timestamp and decrement by the hop until we've covered the entire window duration
for (long currentMillis = adjustedMillis; currentMillis > millisSinceEpoch - totalHopMillis;
currentMillis -= granularityMillis) {
hops.add(currentMillis);
}
return hops;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pinot.core.operator.transform.transformer.datetimehop;

import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DateTimeFieldSpec.TimeFormat;
import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.data.DateTimeGranularitySpec;


public class DateTimeWindowHopTransformerFactory {

private static final TimeFormat EPOCH = DateTimeFieldSpec.TimeFormat.EPOCH;
private static final TimeFormat TIMESTAMP = DateTimeFieldSpec.TimeFormat.TIMESTAMP;

private DateTimeWindowHopTransformerFactory() {
}

public static BaseDateTimeWindowHopTransformer getDateTimeTransformer(String inputFormatStr, String outputFormatStr,
String outputGranularityStr, String hopSizeStr) {
DateTimeFormatSpec inputFormatSpec = new DateTimeFormatSpec(inputFormatStr);
DateTimeFormatSpec outputFormatSpec = new DateTimeFormatSpec(outputFormatStr);
DateTimeGranularitySpec outputGranularity = new DateTimeGranularitySpec(outputGranularityStr);
DateTimeGranularitySpec hopSizeFormat = new DateTimeGranularitySpec(hopSizeStr);

TimeFormat inputFormat = inputFormatSpec.getTimeFormat();
TimeFormat outputFormat = outputFormatSpec.getTimeFormat();

if (isEpochOrTimestamp(inputFormat) && isEpochOrTimestamp(outputFormat)) {
return new EpochToEpochWindowHopTransformer(inputFormatSpec, outputFormatSpec, outputGranularity, hopSizeFormat);
} else if (isEpochOrTimestamp(inputFormat) && isStringFormat(outputFormat)) {
return new EpochToSDFHopWindowTransformer(inputFormatSpec, outputFormatSpec, outputGranularity, hopSizeFormat);
} else if (isStringFormat(inputFormat) && isEpochOrTimestamp(outputFormat)) {
return new SDFToEpochWindowHopTransformer(inputFormatSpec, outputFormatSpec, outputGranularity, hopSizeFormat);
} else if (isStringFormat(inputFormat) && isStringFormat(outputFormat)) {
return new SDFToSDFWindowHopTransformer(inputFormatSpec, outputFormatSpec, outputGranularity, hopSizeFormat);
}
throw new IllegalArgumentException("Wrong inputFormat: " + inputFormat + " outputFormat: " + outputFormat);
}

private static boolean isEpochOrTimestamp(TimeFormat format) {
return format == EPOCH || format == TIMESTAMP;
}

private static boolean isStringFormat(TimeFormat format) {
return format == TimeFormat.SIMPLE_DATE_FORMAT;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pinot.core.operator.transform.transformer.datetimehop;

import java.util.List;
import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.data.DateTimeGranularitySpec;


public class EpochToEpochWindowHopTransformer extends BaseDateTimeWindowHopTransformer<long[], long[][]> {
public EpochToEpochWindowHopTransformer(DateTimeFormatSpec inputFormat, DateTimeFormatSpec outputFormat,
DateTimeGranularitySpec outputGranularity, DateTimeGranularitySpec hopSize) {
super(inputFormat, outputFormat, outputGranularity, hopSize);
}

@Override
public void transform(long[] input, long[][] output, int length) {
for (int i = 0; i < length; i++) {
long epochTime = input[i];
long millisSinceEpoch = transformEpochToMillis(epochTime);
List<Long> hopWindows = hopWindows(millisSinceEpoch);

long[] transformedArray = new long[hopWindows.size()];
for (int j = 0; j < hopWindows.size(); j++) {
long millis = hopWindows.get(j);
transformedArray[j] = transformMillisToEpoch(millis);
}
output[i] = transformedArray;
}
}
}
Loading

0 comments on commit baea4a2

Please sign in to comment.