diff --git a/demo/mara_join.py b/demo/mara_join.py new file mode 100755 index 0000000..6169fef --- /dev/null +++ b/demo/mara_join.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python3 +"""Demo of a spatial join using Qarray and dask_geopandas for MARA. + +Please run the following to access the ERA5 dataset: +``` +gcloud auth application-default login +``` +""" +import dask.dataframe as dd +import dask_geopandas as gdd +import xarray as xr +import qarray as qr + + +mv_df = dd.read_csv( + 'https://raw.githubusercontent.com/wildlife-dynamics/ecoscope/master/tests/' + 'sample_data/vector/movbank_data.csv', +) + +mv_df['timestamp'] = dd.to_datetime(mv_df['timestamp']) +mv_df.set_index('timestamp', drop=False, sort=True) +mv_df['geometry'] = gdd.points_from_xy( + mv_df, 'location-long', 'location-lat', crs=4326 +) +timerange = slice( + mv_df.timestamp.min().compute(), + mv_df.timestamp.max().compute(), +) +mv_gdf = gdd.from_dask_dataframe(mv_df, 'geometry') + +# For MARA, we'd replace this with an Xee call. +era5_ds = xr.open_zarr( + 'gs://gcp-public-data-arco-era5/ar/' + '1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2', + chunks={'time': 240, 'level': 1} +) +print('era5 dataset opened.') +era5_wind_ds = era5_ds[['u_component_of_wind', 'v_component_of_wind']].sel( + time=timerange, + level=1000, # surface level only. +) +era5_wind_df = qr.read_xarray(era5_wind_ds) +# What is the CRS? +era5_wind_df['geometry'] = gdd.points_from_xy( + era5_wind_df, 'longitude', 'latitude', +) +era5_wind_gdf = gdd.from_dask_dataframe(era5_wind_df, 'geometry') + +print('beginning spatial join') +# Only an inner spatial join is supported right now (in dask_geopandas). +intersection = mv_gdf.sjoin(era5_wind_gdf).compute() +print(intersection) + diff --git a/demo/sst.py b/demo/sst.py new file mode 100755 index 0000000..6b83e75 --- /dev/null +++ b/demo/sst.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python3 +"""Demo of calculating global average sea surface temperature (SST) with SQL. + +Please run the following to set up cloud resources: +``` +gcloud auth application-default login +coiled setup +``` +""" +import argparse +import xarray as xr +import xarray_sql as qr + + +def local_data(start: str, end: str) -> xr.Dataset: + import numpy as np + import pandas as pd + + np.random.seed(42) + + lat = np.linspace(-90, 90, num=720) + lon = np.linspace(-180, 180, num=1440) + time = pd.date_range(start, end, freq='H') + level = np.array([1000, 500], dtype=np.int32) + reference_time = pd.Timestamp(start) + + temperature = 15 + 8 * np.random.randn(720, 1440, len(time), len(level)) + precipitation = 10 * np.random.rand(720, 1440, len(time), len(level)) + + return xr.Dataset( + data_vars=dict( + sea_surface_temperature=( + ['lat', 'lon', 'time', 'level'], + temperature, + ), + precipitation=(['lat', 'lon', 'time', 'level'], precipitation), + ), + coords=dict( + lat=lat, + lon=lon, + time=time, + level=level, + reference_time=reference_time, + ), + attrs=dict(description='Random weather.'), + ) + + +parser = argparse.ArgumentParser() +parser.add_argument( + '--start', type=str, default='2020-01-01', help='start time ISO string' +) +parser.add_argument( + '--end', type=str, default='2020-01-02', help='end time ISO string' +) +parser.add_argument( + '--cluster', + action='store_true', + help='deploy on coiled cluster, default: local cluster', +) +parser.add_argument( + '--fake', + action='store_true', + help='use local dummy data, default: ARCO-ERA5 data', +) + +args = parser.parse_args() + +if args.cluster: + from coiled import Cluster + + cluster = Cluster( + region='us-central1', + worker_memory='16 GiB', + spot_policy='spot_with_fallback', + arm=True, + ) + client = cluster.get_client() + cluster.adapt(minimum=1, maximum=100) +else: + from dask.distributed import LocalCluster + + cluster = LocalCluster(processes=False) + client = cluster.get_client() + +if args.fake: + era5_ds = local_data(args.start, args.end).chunk({'time': 240, 'level': 1}) +else: + era5_ds = xr.open_zarr( + 'gs://gcp-public-data-arco-era5/ar/' + '1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2', + chunks={'time': 240, 'level': 1}, + ) + +print('dataset opened.') +era5_sst_ds = era5_ds[['sea_surface_temperature']].sel( + time=slice(args.start, args.end), + level=1000, # surface level only. +) + +c = qr.Context() +# chunk sizes determined from VM memory limit of 16 GiB. +c.create_table('era5', era5_sst_ds, chunks=dict(time=24)) + +print('beginning query.') +# TODO(alxmrs): `DATE` function is not supported in Apache Calcite out-of-the-box. +df = c.sql( + """ +SELECT + "time", + AVG("sea_surface_temperature") as daily_avg_sst +FROM + "era5" +GROUP BY + "time" +""" +) + +df.to_csv(f'global_avg_sst_{args.start}_to_{args.end}_*.cvs') diff --git a/pyproject.toml b/pyproject.toml index 2d6eeb9..2242507 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,9 @@ dev = [ "pyink", "py-spy" ] +demo = [ + "coiled" +] [project.urls] Homepage = "https://github.com/alxmrs/xarray-sql"