Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

data type mismatch exception from withRangeStats function #286

Open
padamshrestha opened this issue Dec 31, 2022 · 2 comments
Open

data type mismatch exception from withRangeStats function #286

padamshrestha opened this issue Dec 31, 2022 · 2 comments
Assignees
Labels
bug Something isn't working

Comments

@padamshrestha
Copy link

padamshrestha commented Dec 31, 2022

I'm getting data type mismatch from withRangeStats while using the sample dataset.

python version: 3.10.6
Spark/PySpark version: 3.3.1
delta lake version: 2.2.0
tempo version: commit from 10/24/2022 0350dffb5d6d2a6d7d138036738e07ed3411a2a2

Usage:
moving_avg = trades_tsdf.withRangeStats("trade_pr", rangeBackWindowSecs=600).df

Error:
{
"name": "AnalysisException",
"message": "cannot resolve '(PARTITION BY spark_catalog.tempo.trades.date, spark_catalog.tempo.trades.symbol ORDER BY spark_catalog.tempo.trades.event_ts ASC NULLS FIRST RANGE BETWEEN -600L FOLLOWING AND CURRENT ROW)' due to data type mismatch: The data type 'timestamp' used in the order specification does not match the data type 'bigint' which is used in the range frame.;\n'Project [symbol#70, event_ts#71, trade_dt#72, trade_pr#73, trade_qt#74, date#75, avg(trade_pr#73) windowspecdefinition(date#75, symbol#70, event_ts#71 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -600, currentrow$())) AS mean_trade_pr#4778, count(trade_pr#73) windowspecdefinition(date#75, symbol#70, event_ts#71 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -600, currentrow$())) AS count_trade_pr#4780, min(trade_pr#73) windowspecdefinition(date#75, symbol#70, event_ts#71 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -600, currentrow$())) AS min_trade_pr#4782, max(trade_pr#73) windowspecdefinition(date#75, symbol#70, event_ts#71 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -600, currentrow$())) AS max_trade_pr#4784, sum(trade_pr#73) windowspecdefinition(date#75, symbol#70, event_ts#71 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -600, currentrow$())) AS sum_trade_pr#4786, stddev_samp(trade_pr#73) windowspecdefinition(date#75, symbol#70, event_ts#71 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -600, currentrow$())) AS stddev_trade_pr#4796]\n+- SubqueryAlias spark_catalog.tempo.trades\n +- Relation tempo.trades[symbol#70,event_ts#71,trade_dt#72,trade_pr#73,trade_qt#74,date#75] parquet\n",
"stack": "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m\n\u001b[0;31mAnalysisException\u001b[0m Traceback (most recent call last)\nCell \u001b[0;32mIn[25], line 2\u001b[0m\n\u001b[1;32m 1\u001b[0m \u001b[39m# DBTITLE 1,Simple Moving Average with Tempo\u001b[39;00m\n\u001b[0;32m----> 2\u001b[0m moving_avg \u001b[39m=\u001b[39m trades_tsdf\u001b[39m.\u001b[39;49mwithRangeStats(colsToSummarize\u001b[39m=\u001b[39;49m[\u001b[39m"\u001b[39;49m\u001b[39mtrade_pr\u001b[39;49m\u001b[39m"\u001b[39;49m],rangeBackWindowSecs\u001b[39m=\u001b[39;49m\u001b[39m600\u001b[39;49m)\u001b[39m.\u001b[39mdf\n\u001b[1;32m 3\u001b[0m \u001b[39m# output = moving_avg.select('symbol', 'event_ts', 'trade_pr', 'mean_trade_pr', 'stddev_trade_pr', 'sum_trade_pr', 'min_trade_pr')\u001b[39;00m\n\nFile \u001b[0;32m/opt/workspace/notebooks/src/dbl-tempo/python/tempo/tsdf.py:1070\u001b[0m, in \u001b[0;36mTSDF.withRangeStats\u001b[0;34m(self, type, colsToSummarize, rangeBackWindowSecs)\u001b[0m\n\u001b[1;32m 1063\u001b[0m selectedCols\u001b[39m.\u001b[39mappend(f\u001b[39m.\u001b[39mstddev(metric)\u001b[39m.\u001b[39mover(w)\u001b[39m.\u001b[39malias(\u001b[39m"\u001b[39m\u001b[39mstddev_\u001b[39m\u001b[39m"\u001b[39m \u001b[39m+\u001b[39m metric))\n\u001b[1;32m 1064\u001b[0m derivedCols\u001b[39m.\u001b[39mappend(\n\u001b[1;32m 1065\u001b[0m (\n\u001b[1;32m 1066\u001b[0m (f\u001b[39m.\u001b[39mcol(metric) \u001b[39m-\u001b[39m f\u001b[39m.\u001b[39mcol(\u001b[39m"\u001b[39m\u001b[39mmean_\u001b[39m\u001b[39m"\u001b[39m \u001b[39m+\u001b[39m metric))\n\u001b[1;32m 1067\u001b[0m \u001b[39m/\u001b[39m f\u001b[39m.\u001b[39mcol(\u001b[39m"\u001b[39m\u001b[39mstddev_\u001b[39m\u001b[39m"\u001b[39m \u001b[39m+\u001b[39m metric)\n\u001b[1;32m 1068\u001b[0m )\u001b[39m.\u001b[39malias(\u001b[39m"\u001b[39m\u001b[39mzscore_\u001b[39m\u001b[39m"\u001b[39m \u001b[39m+\u001b[39m metric)\n\u001b[1;32m 1069\u001b[0m )\n\u001b[0;32m-> 1070\u001b[0m selected_df \u001b[39m=\u001b[39m \u001b[39mself\u001b[39;49m\u001b[39m.\u001b[39;49mdf\u001b[39m.\u001b[39;49mselect(\u001b[39m*\u001b[39;49mselectedCols)\n\u001b[1;32m 1071\u001b[0m summary_df \u001b[39m=\u001b[39m selected_df\u001b[39m.\u001b[39mselect(\u001b[39m*\u001b[39mselected_df\u001b[39m.\u001b[39mcolumns, \u001b[39m*\u001b[39mderivedCols)\u001b[39m.\u001b[39mdrop(\n\u001b[1;32m 1072\u001b[0m \u001b[39m"\u001b[39m\u001b[39mdouble_ts\u001b[39m\u001b[39m"\u001b[39m\n\u001b[1;32m 1073\u001b[0m )\n\u001b[1;32m 1075\u001b[0m \u001b[39mreturn\u001b[39;00m TSDF(summary_df, \u001b[39mself\u001b[39m\u001b[39m.\u001b[39mts_col, \u001b[39mself\u001b[39m\u001b[39m.\u001b[39mpartitionCols)\n\nFile \u001b[0;32m/usr/local/lib/python3.10/dist-packages/pyspark/sql/dataframe.py:2023\u001b[0m, in \u001b[0;36mDataFrame.select\u001b[0;34m(self, cols)\u001b[0m\n\u001b[1;32m 2002\u001b[0m \u001b[39mdef\u001b[39;00m \u001b[39mselect\u001b[39m(\u001b[39mself\u001b[39m, \u001b[39m\u001b[39mcols: \u001b[39m"\u001b[39m\u001b[39mColumnOrName\u001b[39m\u001b[39m"\u001b[39m) \u001b[39m-\u001b[39m\u001b[39m>\u001b[39m \u001b[39m"\u001b[39m\u001b[39mDataFrame\u001b[39m\u001b[39m"\u001b[39m: \u001b[39m# type: ignore[misc]\u001b[39;00m\n\u001b[1;32m 2003\u001b[0m \u001b[39m"""Projects a set of expressions and returns a new :class:DataFrame.\u001b[39;00m\n\u001b[1;32m 2004\u001b[0m \n\u001b[1;32m 2005\u001b[0m \u001b[39m .. versionadded:: 1.3.0\u001b[39;00m\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 2021\u001b[0m \u001b[39m [Row(name='Alice', age=12), Row(name='Bob', age=15)]\u001b[39;00m\n\u001b[1;32m 2022\u001b[0m \u001b[39m """\u001b[39;00m\n\u001b[0;32m-> 2023\u001b[0m jdf \u001b[39m=\u001b[39m \u001b[39mself\u001b[39;49m\u001b[39m.\u001b[39;49m_jdf\u001b[39m.\u001b[39;49mselect(\u001b[39mself\u001b[39;49m\u001b[39m.\u001b[39;49m_jcols(\u001b[39m*\u001b[39;49mcols))\n\u001b[1;32m 2024\u001b[0m \u001b[39mreturn\u001b[39;00m DataFrame(jdf, \u001b[39mself\u001b[39m\u001b[39m.\u001b[39msparkSession)\n\nFile \u001b[0;32m/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py:1321\u001b[0m, in \u001b[0;36mJavaMember.call\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m 1315\u001b[0m command \u001b[39m=\u001b[39m proto\u001b[39m.\u001b[39mCALL_COMMAND_NAME \u001b[39m+\u001b[39m\\n\u001b[1;32m 1316\u001b[0m \u001b[39mself\u001b[39m\u001b[39m.\u001b[39mcommand_header \u001b[39m+\u001b[39m\\n\u001b[1;32m 1317\u001b[0m args_command \u001b[39m+\u001b[39m\\n\u001b[1;32m 1318\u001b[0m proto\u001b[39m.\u001b[39mEND_COMMAND_PART\n\u001b[1;32m 1320\u001b[0m answer \u001b[39m=\u001b[39m \u001b[39mself\u001b[39m\u001b[39m.\u001b[39mgateway_client\u001b[39m.\u001b[39msend_command(command)\n\u001b[0;32m-> 1321\u001b[0m return_value \u001b[39m=\u001b[39m get_return_value(\n\u001b[1;32m 1322\u001b[0m answer, \u001b[39mself\u001b[39;49m\u001b[39m.\u001b[39;49mgateway_client, \u001b[39mself\u001b[39;49m\u001b[39m.\u001b[39;49mtarget_id, \u001b[39mself\u001b[39;49m\u001b[39m.\u001b[39;49mname)\n\u001b[1;32m 1324\u001b[0m \u001b[39mfor\u001b[39;00m temp_arg \u001b[39min\u001b[39;00m temp_args:\n\u001b[1;32m 1325\u001b[0m temp_arg\u001b[39m.\u001b[39m_detach()\n\nFile \u001b[0;32m/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py:196\u001b[0m, in \u001b[0;36mcapture_sql_exception..deco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m 192\u001b[0m converted \u001b[39m=\u001b[39m convert_exception(e\u001b[39m.\u001b[39mjava_exception)\n\u001b[1;32m 193\u001b[0m \u001b[39mif\u001b[39;00m \u001b[39mnot\u001b[39;00m \u001b[39misinstance\u001b[39m(converted, UnknownException):\n\u001b[1;32m 194\u001b[0m \u001b[39m# Hide where the exception came from that shows a non-Pythonic\u001b[39;00m\n\u001b[1;32m 195\u001b[0m \u001b[39m# JVM exception message.\u001b[39;00m\n\u001b[0;32m--> 196\u001b[0m \u001b[39mraise\u001b[39;00m converted \u001b[39mfrom\u001b[39;00m \u001b[39mNone\u001b[39m\n\u001b[1;32m 197\u001b[0m \u001b[39melse\u001b[39;00m:\n\u001b[1;32m 198\u001b[0m \u001b[39mraise\u001b[39;00m\n\n\u001b[0;31mAnalysisException\u001b[0m: cannot resolve '(PARTITION BY spark_catalog.tempo.trades.date, spark_catalog.tempo.trades.symbol ORDER BY spark_catalog.tempo.trades.event_ts ASC NULLS FIRST RANGE BETWEEN -600L FOLLOWING AND CURRENT ROW)' due to data type mismatch: The data type 'timestamp' used in the order specification does not match the data type 'bigint' which is used in the range frame.;\n'Project [symbol#70, event_ts#71, trade_dt#72, trade_pr#73, trade_qt#74, date#75, avg(trade_pr#73) windowspecdefinition(date#75, symbol#70, event_ts#71 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -600, currentrow$())) AS mean_trade_pr#4778, count(trade_pr#73) windowspecdefinition(date#75, symbol#70, event_ts#71 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -600, currentrow$())) AS count_trade_pr#4780, min(trade_pr#73) windowspecdefinition(date#75, symbol#70, event_ts#71 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -600, currentrow$())) AS min_trade_pr#4782, max(trade_pr#73) windowspecdefinition(date#75, symbol#70, event_ts#71 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -600, currentrow$())) AS max_trade_pr#4784, sum(trade_pr#73) windowspecdefinition(date#75, symbol#70, event_ts#71 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -600, currentrow$())) AS sum_trade_pr#4786, stddev_samp(trade_pr#73) windowspecdefinition(date#75, symbol#70, event_ts#71 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -600, currentrow$())) AS stddev_trade_pr#4796]\n+- SubqueryAlias spark_catalog.tempo.trades\n +- Relation tempo.trades[symbol#70,event_ts#71,trade_dt#72,trade_pr#73,trade_qt#74,date#75] parquet\n"
}

@padamshrestha padamshrestha changed the title data type mismatch from withRangeStats data type mismatch exception from withRangeStats function Dec 31, 2022
@rportilla-databricks
Copy link
Contributor

Hi @padamshrestha, thanks for the question. I've tested this on Spark 3.3.0 with the latest version of tempo and the commands below work fine (note that the trades_df and quotes_df are created by enforcing the timestamp data type for event_ts)

Can you paste the exact code you run before the withRangeStats method? Also, which Spark platform do you run this on?

@padamshrestha
Copy link
Author

padamshrestha commented Jan 2, 2023

Hi @rportilla-databricks, thanks for the quick response. Here is the code, it's actually from example from the link you provided:

from tempo import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

trade_schema = StructType([
    StructField("symbol", StringType()),
    StructField("event_ts", TimestampType()),
    StructField("trade_dt", StringType()),
    StructField("trade_pr", DoubleType()),
    StructField("trade_qt", IntegerType()),
    StructField("date", TimestampType())
])

spark.read.format("csv").schema(trade_schema).option("header", "true").option("delimiter", ",").load("ASOF_Trades.csv").withColumn("trade_qt", lit(100)).withColumn("date", col("event_ts").cast("date")).write.mode('overwrite').option("overwriteSchema", "true").saveAsTable('tempo.trades')
trades_df = spark.table("tempo.trades")

trades_tsdf = TSDF(trades_df, partition_cols = ['date', 'symbol'], ts_col = 'event_ts')

moving_avg = trades_tsdf.withRangeStats("trade_pr", rangeBackWindowSecs=600).df

I have spark-3.3.1-bin-hadoop3.tgz on ubuntu container running them as local cluster with driver as client mode. And the host is macOS Ventura 13.0.1 (22A400)

@tnixon tnixon added the bug Something isn't working label Apr 24, 2023
@tnixon tnixon self-assigned this Apr 24, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants