Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft] MARA demo #33

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
53 changes: 53 additions & 0 deletions demo/mara_join.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#!/usr/bin/env python3
"""Demo of a spatial join using Qarray and dask_geopandas for MARA.

Please run the following to access the ERA5 dataset:
```
gcloud auth application-default login
```
"""
import dask.dataframe as dd
import dask_geopandas as gdd
import xarray as xr
import qarray as qr


mv_df = dd.read_csv(
'https://raw.githubusercontent.com/wildlife-dynamics/ecoscope/master/tests/'
'sample_data/vector/movbank_data.csv',
)

mv_df['timestamp'] = dd.to_datetime(mv_df['timestamp'])
mv_df.set_index('timestamp', drop=False, sort=True)
mv_df['geometry'] = gdd.points_from_xy(
mv_df, 'location-long', 'location-lat', crs=4326
)
timerange = slice(
mv_df.timestamp.min().compute(),
mv_df.timestamp.max().compute(),
)
mv_gdf = gdd.from_dask_dataframe(mv_df, 'geometry')

# For MARA, we'd replace this with an Xee call.
era5_ds = xr.open_zarr(
'gs://gcp-public-data-arco-era5/ar/'
'1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2',
chunks={'time': 240, 'level': 1}
)
print('era5 dataset opened.')
era5_wind_ds = era5_ds[['u_component_of_wind', 'v_component_of_wind']].sel(
time=timerange,
level=1000, # surface level only.
)
era5_wind_df = qr.read_xarray(era5_wind_ds)
# What is the CRS?
era5_wind_df['geometry'] = gdd.points_from_xy(
era5_wind_df, 'longitude', 'latitude',
)
era5_wind_gdf = gdd.from_dask_dataframe(era5_wind_df, 'geometry')

print('beginning spatial join')
# Only an inner spatial join is supported right now (in dask_geopandas).
intersection = mv_gdf.sjoin(era5_wind_gdf).compute()
print(intersection)

119 changes: 119 additions & 0 deletions demo/sst.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
#!/usr/bin/env python3
"""Demo of calculating global average sea surface temperature (SST) with SQL.

Please run the following to set up cloud resources:
```
gcloud auth application-default login
coiled setup
```
"""
import argparse
import xarray as xr
import xarray_sql as qr


def local_data(start: str, end: str) -> xr.Dataset:
import numpy as np
import pandas as pd

np.random.seed(42)

lat = np.linspace(-90, 90, num=720)
lon = np.linspace(-180, 180, num=1440)
time = pd.date_range(start, end, freq='H')
level = np.array([1000, 500], dtype=np.int32)
reference_time = pd.Timestamp(start)

temperature = 15 + 8 * np.random.randn(720, 1440, len(time), len(level))
precipitation = 10 * np.random.rand(720, 1440, len(time), len(level))

return xr.Dataset(
data_vars=dict(
sea_surface_temperature=(
['lat', 'lon', 'time', 'level'],
temperature,
),
precipitation=(['lat', 'lon', 'time', 'level'], precipitation),
),
coords=dict(
lat=lat,
lon=lon,
time=time,
level=level,
reference_time=reference_time,
),
attrs=dict(description='Random weather.'),
)


parser = argparse.ArgumentParser()
parser.add_argument(
'--start', type=str, default='2020-01-01', help='start time ISO string'
)
parser.add_argument(
'--end', type=str, default='2020-01-02', help='end time ISO string'
)
parser.add_argument(
'--cluster',
action='store_true',
help='deploy on coiled cluster, default: local cluster',
)
parser.add_argument(
'--fake',
action='store_true',
help='use local dummy data, default: ARCO-ERA5 data',
)

args = parser.parse_args()

if args.cluster:
from coiled import Cluster

cluster = Cluster(
region='us-central1',
worker_memory='16 GiB',
spot_policy='spot_with_fallback',
arm=True,
)
client = cluster.get_client()
cluster.adapt(minimum=1, maximum=100)
else:
from dask.distributed import LocalCluster

cluster = LocalCluster(processes=False)
client = cluster.get_client()

if args.fake:
era5_ds = local_data(args.start, args.end).chunk({'time': 240, 'level': 1})
else:
era5_ds = xr.open_zarr(
'gs://gcp-public-data-arco-era5/ar/'
'1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2',
chunks={'time': 240, 'level': 1},
)

print('dataset opened.')
era5_sst_ds = era5_ds[['sea_surface_temperature']].sel(
time=slice(args.start, args.end),
level=1000, # surface level only.
)

c = qr.Context()
# chunk sizes determined from VM memory limit of 16 GiB.
c.create_table('era5', era5_sst_ds, chunks=dict(time=24))

print('beginning query.')
# TODO(alxmrs): `DATE` function is not supported in Apache Calcite out-of-the-box.
df = c.sql(
"""
SELECT
"time",
AVG("sea_surface_temperature") as daily_avg_sst
FROM
"era5"
GROUP BY
"time"
"""
)

df.to_csv(f'global_avg_sst_{args.start}_to_{args.end}_*.cvs')
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ dev = [
"pyink",
"py-spy"
]
demo = [
"coiled"
]

[project.urls]
Homepage = "https://github.com/alxmrs/xarray-sql"
Expand Down
Loading