From 592d770d9d0b356fcbf7f67bf1afaf56606c8b87 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Thu, 22 Feb 2024 12:04:12 +0700 Subject: [PATCH 01/10] A motivating demo for MARA to work toward. --- demo/mara_join.py | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 demo/mara_join.py diff --git a/demo/mara_join.py b/demo/mara_join.py new file mode 100644 index 0000000..39214c8 --- /dev/null +++ b/demo/mara_join.py @@ -0,0 +1,39 @@ +"""Demo of a spatial join using Qarray and dask_geopandas for MARA.""" +import dask.dataframe as dd +import dask_geopandas as gdd +import xarray as xr +import qarray as qr + + +mv_df = dd.read_csv( + 'https://raw.githubusercontent.com/wildlife-dynamics/ecoscope/master/tests/' + 'sample_data/vector/movbank_data.csv' +) + +mv_df['timestamp'] = dd.to_datetime(mv_df['timestamp'], utc=True) +mv_df['geometry'] = gdd.points_from_xy( + mv_df['location-long'], mv_df['location-lat'] +) +# What is the CRS? +mv_gdf = gdd.from_dask_dataframe(mv_df, 'geometry') + +era5_ds = xr.open_zarr( + 'gs://gcp-public-data-arco-era5/ar/' + '1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2', + chunks={'time': 240, 'level': 1} +) +era5_wind_ds = era5_ds[['u_component_of_wind', 'v_component_of_wind']].sel( + time=slice(mv_gdf['timestamp'].min(), mv_gdf['timestamp'].max()) +) +era5_wind_df = qr.to_dd(era5_wind_ds) +# What is the CRS? +era5_wind_df['geometry'] = gdd.points_from_xy( + era5_wind_df['longitude'], era5_wind_df['latitude'], era5_wind_df['level'] +) +era5_wind_gdf = gdd.from_dask_dataframe(era5_wind_df, 'geometry') + + +# Only an inner spatial join is supported right now (in dask_geopandas). +intersection = mv_gdf.sjoin(era5_wind_gdf) +print(intersection.head()) + From 02679aa3fe0eb078b41cb1358d2087c39e705b47 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Sun, 25 Feb 2024 20:15:49 +0700 Subject: [PATCH 02/10] Debugged demo join; now focused on tuning performance. --- demo/mara_join.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/demo/mara_join.py b/demo/mara_join.py index 39214c8..b71838e 100644 --- a/demo/mara_join.py +++ b/demo/mara_join.py @@ -7,28 +7,34 @@ mv_df = dd.read_csv( 'https://raw.githubusercontent.com/wildlife-dynamics/ecoscope/master/tests/' - 'sample_data/vector/movbank_data.csv' + 'sample_data/vector/movbank_data.csv', ) -mv_df['timestamp'] = dd.to_datetime(mv_df['timestamp'], utc=True) +mv_df['timestamp'] = dd.to_datetime(mv_df['timestamp']) +mv_df.set_index('timestamp', drop=False, sort=True) mv_df['geometry'] = gdd.points_from_xy( - mv_df['location-long'], mv_df['location-lat'] + mv_df, 'location-long', 'location-lat', crs=4326 +) +timerange = slice( + mv_df.timestamp.min().compute(), + mv_df.timestamp.max().compute(), ) -# What is the CRS? mv_gdf = gdd.from_dask_dataframe(mv_df, 'geometry') +# For MARA, we'd replace this with an Xee call. era5_ds = xr.open_zarr( 'gs://gcp-public-data-arco-era5/ar/' '1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2', - chunks={'time': 240, 'level': 1} + chunks={'time': 48, 'level': 1} ) era5_wind_ds = era5_ds[['u_component_of_wind', 'v_component_of_wind']].sel( - time=slice(mv_gdf['timestamp'].min(), mv_gdf['timestamp'].max()) + time=timerange, + level=1000, # surface level only. ) era5_wind_df = qr.to_dd(era5_wind_ds) # What is the CRS? era5_wind_df['geometry'] = gdd.points_from_xy( - era5_wind_df['longitude'], era5_wind_df['latitude'], era5_wind_df['level'] + era5_wind_df, 'longitude', 'latitude', ) era5_wind_gdf = gdd.from_dask_dataframe(era5_wind_df, 'geometry') From 8e145997323a5324902c819faf8ea2a854f6ec4e Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Sun, 25 Feb 2024 22:55:50 +0700 Subject: [PATCH 03/10] Using new chunks argument. --- demo/mara_join.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/demo/mara_join.py b/demo/mara_join.py index b71838e..3141513 100644 --- a/demo/mara_join.py +++ b/demo/mara_join.py @@ -25,13 +25,13 @@ era5_ds = xr.open_zarr( 'gs://gcp-public-data-arco-era5/ar/' '1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2', - chunks={'time': 48, 'level': 1} + chunks={'time': 240, 'level': 1} ) era5_wind_ds = era5_ds[['u_component_of_wind', 'v_component_of_wind']].sel( time=timerange, level=1000, # surface level only. ) -era5_wind_df = qr.to_dd(era5_wind_ds) +era5_wind_df = qr.to_dd(era5_wind_ds, chunks=dict(time=6)) # What is the CRS? era5_wind_df['geometry'] = gdd.points_from_xy( era5_wind_df, 'longitude', 'latitude', From 3b08521409a6082ac2b6779db9f707e6af265dd2 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Wed, 28 Feb 2024 19:11:51 +0700 Subject: [PATCH 04/10] Made the join into a script. Added print debug stmts. --- demo/mara_join.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) mode change 100644 => 100755 demo/mara_join.py diff --git a/demo/mara_join.py b/demo/mara_join.py old mode 100644 new mode 100755 index 3141513..3aea74a --- a/demo/mara_join.py +++ b/demo/mara_join.py @@ -1,3 +1,4 @@ +#!/usr/bin/env python3 """Demo of a spatial join using Qarray and dask_geopandas for MARA.""" import dask.dataframe as dd import dask_geopandas as gdd @@ -27,19 +28,20 @@ '1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2', chunks={'time': 240, 'level': 1} ) +print('era5 dataset opened.') era5_wind_ds = era5_ds[['u_component_of_wind', 'v_component_of_wind']].sel( time=timerange, level=1000, # surface level only. ) -era5_wind_df = qr.to_dd(era5_wind_ds, chunks=dict(time=6)) +era5_wind_df = qr.to_dd(era5_wind_ds) # What is the CRS? era5_wind_df['geometry'] = gdd.points_from_xy( era5_wind_df, 'longitude', 'latitude', ) era5_wind_gdf = gdd.from_dask_dataframe(era5_wind_df, 'geometry') - +print('beginning spatial join') # Only an inner spatial join is supported right now (in dask_geopandas). -intersection = mv_gdf.sjoin(era5_wind_gdf) -print(intersection.head()) +intersection = era5_wind_gdf.sjoin(mv_gdf).compute() +print(intersection.compute) From 5ac81537e76cda63440dd6c095b4822bb783fccd Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Sun, 3 Mar 2024 15:33:01 +0530 Subject: [PATCH 05/10] Not totally working demo, but much progress made. --- demo/mara_join.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/demo/mara_join.py b/demo/mara_join.py index 3aea74a..f510440 100755 --- a/demo/mara_join.py +++ b/demo/mara_join.py @@ -33,7 +33,7 @@ time=timerange, level=1000, # surface level only. ) -era5_wind_df = qr.to_dd(era5_wind_ds) +era5_wind_df = qr.read_xarray(era5_wind_ds) # What is the CRS? era5_wind_df['geometry'] = gdd.points_from_xy( era5_wind_df, 'longitude', 'latitude', @@ -43,5 +43,5 @@ print('beginning spatial join') # Only an inner spatial join is supported right now (in dask_geopandas). intersection = era5_wind_gdf.sjoin(mv_gdf).compute() -print(intersection.compute) +print(intersection) From 24f8e33deee8c0ebf12f90fdc2fee02ac22c8ff2 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Sun, 3 Mar 2024 15:36:22 +0530 Subject: [PATCH 06/10] Swapped order; added login note. --- demo/mara_join.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/demo/mara_join.py b/demo/mara_join.py index f510440..6169fef 100755 --- a/demo/mara_join.py +++ b/demo/mara_join.py @@ -1,5 +1,11 @@ #!/usr/bin/env python3 -"""Demo of a spatial join using Qarray and dask_geopandas for MARA.""" +"""Demo of a spatial join using Qarray and dask_geopandas for MARA. + +Please run the following to access the ERA5 dataset: +``` +gcloud auth application-default login +``` +""" import dask.dataframe as dd import dask_geopandas as gdd import xarray as xr @@ -42,6 +48,6 @@ print('beginning spatial join') # Only an inner spatial join is supported right now (in dask_geopandas). -intersection = era5_wind_gdf.sjoin(mv_gdf).compute() +intersection = mv_gdf.sjoin(era5_wind_gdf).compute() print(intersection) From e001a165c18d89102a9f6b05adfad299cdcef889 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Sat, 23 Mar 2024 20:55:21 +0530 Subject: [PATCH 07/10] Simplest version of the SST Demo. --- demo/sst | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 demo/sst diff --git a/demo/sst b/demo/sst new file mode 100644 index 0000000..6e38b6e --- /dev/null +++ b/demo/sst @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 +"""Demo of calculating global average sea surface temperature (SST) with SQL. + +Please run the following to access the ERA5 dataset: +``` +gcloud auth application-default login +``` +""" +import xarray as xr +import xarray_sql as qr + +# TODO(alxmrs): Add coiled or dask cluster code. + +era5_ds = xr.open_zarr( + 'gs://gcp-public-data-arco-era5/ar/' + '1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2', + chunks={'time': 240, 'level': 1} +) +print('dataset opened.') +# TODO(alxmrs): Slice to small time range based on script args. +era5_sst_ds = era5_ds[['sea_surface_temperature']].sel( + level=1000, # surface level only. +) + +# chunk sizes determined from VM memory limit of 16 GB. +c = qr.Context() +c.create_table('era5', era5_sst_ds, chunks=dict(time=24)) + +print('beginning query.') +df = c.sql(""" +SELECT + DATE("time") as date, + AVG("sea_surface_temperature") as daily_avg_sst +FROM + "era5" +GROUP BY + DATE("time") +""") + +# TODO(alxmrs): time slice should be in file name. +df.to_csv('global_avg_sst_*.cvs') \ No newline at end of file From ae45fe596a0bffaf5e2a582c2feb82c0745d7d6e Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Sat, 23 Mar 2024 21:17:09 +0530 Subject: [PATCH 08/10] Added coiled cluster config. --- demo/sst | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/demo/sst b/demo/sst index 6e38b6e..178bfa4 100644 --- a/demo/sst +++ b/demo/sst @@ -9,7 +9,16 @@ gcloud auth application-default login import xarray as xr import xarray_sql as qr -# TODO(alxmrs): Add coiled or dask cluster code. +from coiled import Cluster + +cluster = Cluster( + region='us-central1', + worker_memory='16 GiB', + spot_policy='spot_with_fallback', + arm=True, +) +client = cluster.get_client() +cluster.adapt(minimum=1, maximum=100) era5_ds = xr.open_zarr( 'gs://gcp-public-data-arco-era5/ar/' @@ -22,7 +31,7 @@ era5_sst_ds = era5_ds[['sea_surface_temperature']].sel( level=1000, # surface level only. ) -# chunk sizes determined from VM memory limit of 16 GB. +# chunk sizes determined from VM memory limit of 16 GiB. c = qr.Context() c.create_table('era5', era5_sst_ds, chunks=dict(time=24)) From b1bed45703cfdb9acb3d321f7a62df4ac6f46907 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Sat, 23 Mar 2024 22:50:39 +0530 Subject: [PATCH 09/10] Added script arguments. --- demo/sst | 50 --------------------------------------- demo/sst.py | 63 ++++++++++++++++++++++++++++++++++++++++++++++++++ pyproject.toml | 3 +++ 3 files changed, 66 insertions(+), 50 deletions(-) delete mode 100644 demo/sst create mode 100755 demo/sst.py diff --git a/demo/sst b/demo/sst deleted file mode 100644 index 178bfa4..0000000 --- a/demo/sst +++ /dev/null @@ -1,50 +0,0 @@ -#!/usr/bin/env python3 -"""Demo of calculating global average sea surface temperature (SST) with SQL. - -Please run the following to access the ERA5 dataset: -``` -gcloud auth application-default login -``` -""" -import xarray as xr -import xarray_sql as qr - -from coiled import Cluster - -cluster = Cluster( - region='us-central1', - worker_memory='16 GiB', - spot_policy='spot_with_fallback', - arm=True, -) -client = cluster.get_client() -cluster.adapt(minimum=1, maximum=100) - -era5_ds = xr.open_zarr( - 'gs://gcp-public-data-arco-era5/ar/' - '1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2', - chunks={'time': 240, 'level': 1} -) -print('dataset opened.') -# TODO(alxmrs): Slice to small time range based on script args. -era5_sst_ds = era5_ds[['sea_surface_temperature']].sel( - level=1000, # surface level only. -) - -# chunk sizes determined from VM memory limit of 16 GiB. -c = qr.Context() -c.create_table('era5', era5_sst_ds, chunks=dict(time=24)) - -print('beginning query.') -df = c.sql(""" -SELECT - DATE("time") as date, - AVG("sea_surface_temperature") as daily_avg_sst -FROM - "era5" -GROUP BY - DATE("time") -""") - -# TODO(alxmrs): time slice should be in file name. -df.to_csv('global_avg_sst_*.cvs') \ No newline at end of file diff --git a/demo/sst.py b/demo/sst.py new file mode 100755 index 0000000..1482ec5 --- /dev/null +++ b/demo/sst.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python3 +"""Demo of calculating global average sea surface temperature (SST) with SQL. + +Please run the following to set up cloud resources: +``` +gcloud auth application-default login +coiled setup +``` +""" +import argparse +import xarray as xr +import xarray_sql as qr + +parser = argparse.ArgumentParser() +parser.add_argument('--start', type=str, default='2020-01-01', help='start time ISO string') +parser.add_argument('--end', type=str, default='2020-01-02', help='end time ISO string') +parser.add_argument('--cluster', action='store_true', help='deploy on coiled cluster') + +args = parser.parse_args() + +if args.cluster: + from coiled import Cluster + + cluster = Cluster( + region='us-central1', + worker_memory='16 GiB', + spot_policy='spot_with_fallback', + arm=True, + ) + client = cluster.get_client() + cluster.adapt(minimum=1, maximum=100) +else: + from dask.distributed import LocalCluster + cluster = LocalCluster(processes=False) + client = cluster.get_client() + +era5_ds = xr.open_zarr( + 'gs://gcp-public-data-arco-era5/ar/' + '1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2', + chunks={'time': 240, 'level': 1} +) +print('dataset opened.') +era5_sst_ds = era5_ds[['sea_surface_temperature']].sel( + time=slice(args.start, args.end), + level=1000, # surface level only. +) + +c = qr.Context() +# chunk sizes determined from VM memory limit of 16 GiB. +c.create_table('era5', era5_sst_ds, chunks=dict(time=24)) + +print('beginning query.') +df = c.sql(""" +SELECT + DATE("time") as date, + AVG("sea_surface_temperature") as daily_avg_sst +FROM + "era5" +GROUP BY + DATE("time") +""") + +df.to_csv(f'global_avg_sst_{args.start}-{args.end}_*.cvs') diff --git a/pyproject.toml b/pyproject.toml index 2d6eeb9..2242507 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,9 @@ dev = [ "pyink", "py-spy" ] +demo = [ + "coiled" +] [project.urls] Homepage = "https://github.com/alxmrs/xarray-sql" From 88732bd581f9fb2a6577130005ec823200971ce5 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Sun, 24 Mar 2024 16:45:12 +0530 Subject: [PATCH 10/10] SST demo works with local fake data. --- demo/sst.py | 94 ++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 75 insertions(+), 19 deletions(-) diff --git a/demo/sst.py b/demo/sst.py index 1482ec5..6b83e75 100755 --- a/demo/sst.py +++ b/demo/sst.py @@ -11,10 +11,58 @@ import xarray as xr import xarray_sql as qr + +def local_data(start: str, end: str) -> xr.Dataset: + import numpy as np + import pandas as pd + + np.random.seed(42) + + lat = np.linspace(-90, 90, num=720) + lon = np.linspace(-180, 180, num=1440) + time = pd.date_range(start, end, freq='H') + level = np.array([1000, 500], dtype=np.int32) + reference_time = pd.Timestamp(start) + + temperature = 15 + 8 * np.random.randn(720, 1440, len(time), len(level)) + precipitation = 10 * np.random.rand(720, 1440, len(time), len(level)) + + return xr.Dataset( + data_vars=dict( + sea_surface_temperature=( + ['lat', 'lon', 'time', 'level'], + temperature, + ), + precipitation=(['lat', 'lon', 'time', 'level'], precipitation), + ), + coords=dict( + lat=lat, + lon=lon, + time=time, + level=level, + reference_time=reference_time, + ), + attrs=dict(description='Random weather.'), + ) + + parser = argparse.ArgumentParser() -parser.add_argument('--start', type=str, default='2020-01-01', help='start time ISO string') -parser.add_argument('--end', type=str, default='2020-01-02', help='end time ISO string') -parser.add_argument('--cluster', action='store_true', help='deploy on coiled cluster') +parser.add_argument( + '--start', type=str, default='2020-01-01', help='start time ISO string' +) +parser.add_argument( + '--end', type=str, default='2020-01-02', help='end time ISO string' +) +parser.add_argument( + '--cluster', + action='store_true', + help='deploy on coiled cluster, default: local cluster', +) +parser.add_argument( + '--fake', + action='store_true', + help='use local dummy data, default: ARCO-ERA5 data', +) args = parser.parse_args() @@ -22,27 +70,32 @@ from coiled import Cluster cluster = Cluster( - region='us-central1', - worker_memory='16 GiB', - spot_policy='spot_with_fallback', - arm=True, + region='us-central1', + worker_memory='16 GiB', + spot_policy='spot_with_fallback', + arm=True, ) client = cluster.get_client() cluster.adapt(minimum=1, maximum=100) else: from dask.distributed import LocalCluster + cluster = LocalCluster(processes=False) client = cluster.get_client() -era5_ds = xr.open_zarr( - 'gs://gcp-public-data-arco-era5/ar/' - '1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2', - chunks={'time': 240, 'level': 1} -) +if args.fake: + era5_ds = local_data(args.start, args.end).chunk({'time': 240, 'level': 1}) +else: + era5_ds = xr.open_zarr( + 'gs://gcp-public-data-arco-era5/ar/' + '1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2', + chunks={'time': 240, 'level': 1}, + ) + print('dataset opened.') era5_sst_ds = era5_ds[['sea_surface_temperature']].sel( - time=slice(args.start, args.end), - level=1000, # surface level only. + time=slice(args.start, args.end), + level=1000, # surface level only. ) c = qr.Context() @@ -50,14 +103,17 @@ c.create_table('era5', era5_sst_ds, chunks=dict(time=24)) print('beginning query.') -df = c.sql(""" +# TODO(alxmrs): `DATE` function is not supported in Apache Calcite out-of-the-box. +df = c.sql( + """ SELECT - DATE("time") as date, + "time", AVG("sea_surface_temperature") as daily_avg_sst FROM "era5" GROUP BY - DATE("time") -""") + "time" +""" +) -df.to_csv(f'global_avg_sst_{args.start}-{args.end}_*.cvs') +df.to_csv(f'global_avg_sst_{args.start}_to_{args.end}_*.cvs')