From 04264c498caf1d517da785367c5132ef53057bed Mon Sep 17 00:00:00 2001 From: liyuan Date: Mon, 20 Jan 2025 18:40:55 +0800 Subject: [PATCH] buildings Signed-off-by: liyuan --- .../open-street-map-benchmarks.ipynb | 140 +++++++++--------- 1 file changed, 72 insertions(+), 68 deletions(-) diff --git a/examples/SQL+DF-Examples/open-street-map/notebooks/open-street-map-benchmarks.ipynb b/examples/SQL+DF-Examples/open-street-map/notebooks/open-street-map-benchmarks.ipynb index a4f05941..892b67ae 100644 --- a/examples/SQL+DF-Examples/open-street-map/notebooks/open-street-map-benchmarks.ipynb +++ b/examples/SQL+DF-Examples/open-street-map/notebooks/open-street-map-benchmarks.ipynb @@ -11,7 +11,7 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": 1, "id": "1c3a15d7", "metadata": {}, "outputs": [], @@ -24,7 +24,7 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": 2, "id": "0c3536ad", "metadata": {}, "outputs": [], @@ -43,7 +43,7 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": 3, "id": "975717da", "metadata": {}, "outputs": [ @@ -53,14 +53,14 @@ "text": [ "Setting default log level to \"WARN\".\n", "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n", - "25/01/20 09:23:47 INFO SparkEnv: Registering MapOutputTracker\n", - "25/01/20 09:23:47 INFO SparkEnv: Registering BlockManagerMaster\n", - "25/01/20 09:23:47 INFO SparkEnv: Registering BlockManagerMasterHeartbeat\n", - "25/01/20 09:23:47 INFO SparkEnv: Registering OutputCommitCoordinator\n", - "25/01/20 09:23:48 WARN RapidsPluginUtils: RAPIDS Accelerator 24.10.0 using cudf 24.10.0, private revision bd4e99e18e20234ee0c54f95f4b0bfce18a6255e\n", - "25/01/20 09:23:48 WARN RapidsPluginUtils: spark.rapids.sql.multiThreadedRead.numThreads is set to 20.\n", - "25/01/20 09:23:48 WARN RapidsPluginUtils: RAPIDS Accelerator is enabled, to disable GPU support set `spark.rapids.sql.enabled` to false.\n", - "25/01/20 09:23:48 WARN RapidsPluginUtils: spark.rapids.sql.explain is set to `NOT_ON_GPU`. Set it to 'NONE' to suppress the diagnostics logging about the query placement on the GPU.\n", + "25/01/20 10:36:07 INFO SparkEnv: Registering MapOutputTracker\n", + "25/01/20 10:36:07 INFO SparkEnv: Registering BlockManagerMaster\n", + "25/01/20 10:36:07 INFO SparkEnv: Registering BlockManagerMasterHeartbeat\n", + "25/01/20 10:36:07 INFO SparkEnv: Registering OutputCommitCoordinator\n", + "25/01/20 10:36:08 WARN RapidsPluginUtils: RAPIDS Accelerator 24.10.0 using cudf 24.10.0, private revision bd4e99e18e20234ee0c54f95f4b0bfce18a6255e\n", + "25/01/20 10:36:08 WARN RapidsPluginUtils: spark.rapids.sql.multiThreadedRead.numThreads is set to 20.\n", + "25/01/20 10:36:08 WARN RapidsPluginUtils: RAPIDS Accelerator is enabled, to disable GPU support set `spark.rapids.sql.enabled` to false.\n", + "25/01/20 10:36:08 WARN RapidsPluginUtils: spark.rapids.sql.explain is set to `NOT_ON_GPU`. Set it to 'NONE' to suppress the diagnostics logging about the query placement on the GPU.\n", " \r" ] }, @@ -81,6 +81,7 @@ "conf.set(\"spark.executor.cores\", \"16\") \n", "conf.set(\"spark.driver.memory\", \"10g\")\n", "conf.set(\"spark.driver.cores\", \"1\")\n", + "conf.set(\"spark.sql.parquet.binaryAsString\",\"true\")\n", "## The tasks will run on GPU memory, so there is no need to set a high host memory\n", "conf.set(\"spark.executor.memory\", \"16g\")\n", "\n", @@ -104,21 +105,21 @@ "# Create spark session\n", "spark = SparkSession.builder.config(conf=conf).getOrCreate()\n", "# Load dataframe and create tempView\n", - "spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/history_changesets/\").createOrReplaceTempView(\"history_changesets\")\n", - "spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/history_layers/\").createOrReplaceTempView(\"history_layers\")\n", - "spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/history_nodes/\").createOrReplaceTempView(\"history_nodes\")\n", - "spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/history_relations/\").createOrReplaceTempView(\"history_relations\")\n", - "spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/history_ways/\").createOrReplaceTempView(\"history_ways\")\n", - "spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/planet_changesets/\").createOrReplaceTempView(\"planet_changesets\")\n", - "spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/planet_features/\").createOrReplaceTempView(\"planet_features\")\n", - "spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/planet_features_lines/\").createOrReplaceTempView(\"planet_features_lines\")\n", - "spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/planet_features_multilinestrings/\").createOrReplaceTempView(\"planet_features_multilinestrings\")\n", - "spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/planet_features_multipolygons/\").createOrReplaceTempView(\"planet_features_multipolygons\")\n", - "spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/planet_features_other_relations/\").createOrReplaceTempView(\"planet_features_other_relations\")\n", - "spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/planet_features_points/\").createOrReplaceTempView(\"planet_features_points\")\n", - "spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/planet_layers/\").createOrReplaceTempView(\"planet_layers\")\n", - "spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/planet_nodes/\").createOrReplaceTempView(\"planet_nodes\")\n", - "spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/planet_relations/\").createOrReplaceTempView(\"planet_relations\")\n", + "# spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/history_changesets/\").createOrReplaceTempView(\"history_changesets\")\n", + "# spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/history_layers/\").createOrReplaceTempView(\"history_layers\")\n", + "# spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/history_nodes/\").createOrReplaceTempView(\"history_nodes\")\n", + "# spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/history_relations/\").createOrReplaceTempView(\"history_relations\")\n", + "# spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/history_ways/\").createOrReplaceTempView(\"history_ways\")\n", + "# spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/planet_changesets/\").createOrReplaceTempView(\"planet_changesets\")\n", + "# spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/planet_features/\").createOrReplaceTempView(\"planet_features\")\n", + "# spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/planet_features_lines/\").createOrReplaceTempView(\"planet_features_lines\")\n", + "# spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/planet_features_multilinestrings/\").createOrReplaceTempView(\"planet_features_multilinestrings\")\n", + "# spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/planet_features_multipolygons/\").createOrReplaceTempView(\"planet_features_multipolygons\")\n", + "# spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/planet_features_other_relations/\").createOrReplaceTempView(\"planet_features_other_relations\")\n", + "# spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/planet_features_points/\").createOrReplaceTempView(\"planet_features_points\")\n", + "# spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/planet_layers/\").createOrReplaceTempView(\"planet_layers\")\n", + "# spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/planet_nodes/\").createOrReplaceTempView(\"planet_nodes\")\n", + "# spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/planet_relations/\").createOrReplaceTempView(\"planet_relations\")\n", "spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/planet_ways/\").createOrReplaceTempView(\"planet_ways\")\n", "print(\"-\"*50)" ] @@ -128,89 +129,92 @@ "id": "7136eb63", "metadata": {}, "source": [ - "### Calculate length of ways with 'highway' tag in Japan\n" + "### Calculate the building numbers over 5 levels" ] }, { "cell_type": "code", - "execution_count": 48, + "execution_count": 4, "id": "dd12d749", "metadata": {}, - "outputs": [], - "source": [ - "query = '''\n", - "select count(osm_id),feature_type from planet_features group by feature_type\n", - "\n", - "'''" - ] - }, - { - "cell_type": "code", - "execution_count": 49, - "id": "33368682", - "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ - "25/01/20 09:50:03 WARN GpuOverrides: \n", + "25/01/20 10:37:35 WARN GpuOverrides: \n", + "!Exec cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it\n", + " @Partitioning could run on GPU\n", + "\n", + "25/01/20 10:37:35 WARN GpuOverrides: \n", "!Exec cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it\n", " @Partitioning could run on GPU\n", "\n", - "25/01/20 09:50:03 WARN GpuOverrides: \n", + "25/01/20 10:37:35 WARN GpuOverrides: \n", "!Exec cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it\n", " @Partitioning could run on GPU\n", "\n", - "25/01/20 09:50:03 WARN GpuOverrides: \n", + "25/01/20 10:38:12 WARN GpuOverrides: 80][Stage 2:==============>(279 + 1) / 280]\n", "!Exec cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it\n", " @Partitioning could run on GPU\n", "\n", - "25/01/20 09:50:29 WARN GpuOverrides: ==========================>(311 + 1) / 312]\n", + "25/01/20 10:40:11 WARN GpuOverrides: ==========================>(279 + 1) / 280]\n", "!Exec cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it\n", " @Partitioning could run on GPU\n", "\n", - "25/01/20 09:50:29 WARN GpuOverrides: \n", + "25/01/20 10:40:11 WARN GpuOverrides: \n", "!Exec cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it\n", " @Partitioning could run on GPU\n", - "\n" + "\n", + " \r" ] }, { "name": "stdout", "output_type": "stream", "text": [ - "+-------------+----------------+\n", - "|count(osm_id)| feature_type|\n", - "+-------------+----------------+\n", - "| 5628557| multipolygons|\n", - "| 2782002| other_relations|\n", - "| 247031768| lines|\n", - "| 184798630| points|\n", - "| 804708|multilinestrings|\n", - "+-------------+----------------+\n", + "+---------+-------+-------------------+---------+-------------------+\n", + "| id|version| username|changeset| osm_timestamp|\n", + "+---------+-------+-------------------+---------+-------------------+\n", + "|109568528| 10| Stalker61| 91879126|2020-10-02 18:25:07|\n", + "|236002978| 3| MaksimTo| 54555625|2017-12-12 03:43:01|\n", + "| 85820500| 4| Achoin| 50043141|2017-07-04 19:46:51|\n", + "|505004057| 5| mbouzada| 58562021|2018-04-30 17:31:20|\n", + "|252765823| 1|catastro_pontevedra| 19552118|2013-12-20 16:01:44|\n", + "+---------+-------+-------------------+---------+-------------------+\n", + "only showing top 5 rows\n", "\n", - "Retry times : 1, Japan highway benchmark takes 26.28 seconds\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "\r", - " \r" + "Retry times : 1, 5 levels building numbers benchmark takes 159.22 seconds\n" ] } ], "source": [ - "# Run microbenchmark with n retry time\n", - "runQuery(spark,\"Japan highway\",query,1)" + "query = \"\"\"\n", + "WITH exploded_tags AS (\n", + " SELECT id, version, username, changeset, osm_timestamp, tag.key AS tag_key, tag.value AS tag_value\n", + " FROM planet_ways\n", + " LATERAL VIEW explode(all_tags) AS tag\n", + ")\n", + "\n", + "SELECT id, version, username, changeset, osm_timestamp\n", + "FROM exploded_tags\n", + "WHERE tag_key = 'building' \n", + " AND EXISTS (\n", + " SELECT 1\n", + " FROM exploded_tags AS inner_tags\n", + " WHERE inner_tags.id = exploded_tags.id \n", + " AND inner_tags.tag_key = 'building:levels' \n", + " AND CAST(inner_tags.tag_value AS INT) > 5\n", + " )\n", + "\n", + "\"\"\"\n", + "runQuery(spark,\"5 levels building numbers\",query,1)" ] }, { "cell_type": "code", "execution_count": null, - "id": "9d47060d", + "id": "d9b3d5db", "metadata": {}, "outputs": [], "source": []