Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lockless multi-threaded reads #224

Merged
merged 7 commits into from
Jan 28, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AUTHORS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Thanks goes to these wonderful people (`emoji key <https://allcontributors.org/d
<td align="center"><a href="https://github.com/davidbrochart"><img src="https://avatars2.githubusercontent.com/u/4711805?v=4" width="100px;" alt="David Brochart"/><br /><sub><b>David Brochart</b></sub></a><br /><a href="https://github.com/corteva/rioxarray/commits?author=davidbrochart" title="Code">💻</a> <a href="https://github.com/corteva/rioxarray/commits?author=davidbrochart" title="Tests">⚠️</a> <a href="#ideas-davidbrochart" title="Ideas, Planning, & Feedback">🤔</a> <a href="https://github.com/corteva/rioxarray/commits?author=davidbrochart" title="Documentation">📖</a></td>
<td align="center"><a href="https://github.com/cheginit"><img src="https://avatars2.githubusercontent.com/u/13016644?v=4" width="100px;" alt="Taher Chegini"/><br /><sub><b>Taher Chegini</b></sub></a><br /><a href="https://github.com/corteva/rioxarray/commits?author=cheginit" title="Code">💻</a> <a href="https://github.com/corteva/rioxarray/issues?q=author%3Acheginit" title="Bug reports">🐛</a></td>
<td align="center"><a href="http://joehamman.com"><img src="https://avatars3.githubusercontent.com/u/2443309?v=4" width="100px;" alt="Joe Hamman"/><br /><sub><b>Joe Hamman</b></sub></a><br /><a href="https://github.com/corteva/rioxarray/commits?author=jhamman" title="Code">💻</a> <a href="https://github.com/corteva/rioxarray/issues?q=author%3Ajhamman" title="Bug reports">🐛</a></td>
<td align="center"><a href="https://github.com/TomAugspurger"><img src="https://avatars.githubusercontent.com/u/1312546?v=4" width="100px;" alt="Tom Augspurger"/><br /><sub><b>Tom Augspurger</b></sub></a><br /><a href="https://github.com/corteva/rioxarray/commits?author=cheginit" title="Code">💻</a> <a href="https://github.com/corteva/rioxarray/issues?q=author%3Acheginit" title="Bug reports">🐛</a></td>
</tr>
</table>

Expand Down
1 change: 1 addition & 0 deletions docs/examples/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ This page contains links to a collection of examples of how to use rioxarray.
clip_geom.ipynb
clip_box.ipynb
pad_box.ipynb
read-locks.ipynb
reproject.ipynb
reproject_match.ipynb
merge.ipynb
Expand Down
227 changes: 227 additions & 0 deletions docs/examples/read-locks.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Reading COGs in Parallel\n",
"\n",
"COGs can be internally chunked, which makes it possible to read them in parallel from multiple threads. However, the libraries `rioxarray` builds on, `rasterio` and `GDAL`, require some care to be used safely from multiple threads within a single process. By default, `rio.open_rasterio` will acquire a per-process lock when reading a chunk of a COG.\n",
"\n",
"If you're using `rioxarray` with [Dask](http://docs.dask.org/) through the `chunks` keyword, you can also specify the `lock=False` keyword to ensure that reading *and* operating on your data happen in parallel.\n",
"\n",
"## Scheduler Choice\n",
"\n",
"Dask has [several schedulers](https://docs.dask.org/en/latest/scheduling.html) which run computations in parallel. Which scheduler is best depends on a variety of factors, including whether your computation holds Python's Global Interpreter Lock, whether how much data needs to be moved around, and whether you need more than one machine's computational power. This section about read-locks only applies if you have more than one thread in a process. This will happen with Dask's [local threaded scheduler](https://docs.dask.org/en/latest/scheduling.html#local-threads) and it's [distributed scheduler](https://distributed.dask.org/en/latest/) when configured to use more than one thread per worker.\n",
snowman2 marked this conversation as resolved.
Show resolved Hide resolved
"\n",
"By default, `xarray` objects will use the local `threaded` scheduler."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Reading without Locks\n",
"\n",
"To read a COG without any locks, you'd specify `lock=False`. This tells `rioxarray` to open a new `rasterio.DatasetReader` in each thread, rather than trying to share one amongst multiple threads."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import rioxarray\n",
"\n",
"url = (\n",
" \"https://naipeuwest.blob.core.windows.net/naip/v002/md/2013/md_100cm_2013/\"\n",
" \"39076/m_3907617_ne_18_1_20130924.tif\"\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 2.4 s, sys: 361 ms, total: 2.76 s\n",
"Wall time: 3.32 s\n"
]
}
],
"source": [
"ds = rioxarray.open_rasterio(url, lock=False, chunks=(4, \"auto\", -1))\n",
"%time _ = ds.mean().compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Note: these timings are from a VM in the same Azure data center that's hosting the COG. Running this locally will give different times.\n",
"\n",
"## Chunking\n",
"\n",
"For maximum read performance, the chunking pattern you request should align with the internal chunking of the COG. Typically this means reading the data in a \"row major\" format: your chunks should be as wide as possible along the columns. We did that above with the chunks of `(4, \"auto\", -1)`. The `-1` says \"include all the columns\", and the `\"auto\"` will make the chunking along the rows as large as possible while staying in a reasonable limit (specified in `dask.config.get(\"array.chunk-size\")`).\n",
"\n",
"If we flipped that, and instead read as much of the rows as possible, we'll see slower performance."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 8.58 s, sys: 1.08 s, total: 9.66 s\n",
"Wall time: 11.2 s\n"
]
}
],
"source": [
"ds = rioxarray.open_rasterio(url, lock=False, chunks=(1, -1, \"auto\"))\n",
"%time _ = ds.mean().compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"That said, reading is typically just the first step in a larger computation. You'd want to consider what chunking is best for your whole computation. See https://docs.dask.org/en/latest/array-chunks.html for more on choosing chunks."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Caching Considerations\n",
"\n",
"Specifying `lock=False` will disable some internal caching done by xarray or rasterio. For example, the first and second reads here are roughly the same, since nothing is cached."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 2.49 s, sys: 392 ms, total: 2.88 s\n",
"Wall time: 3.25 s\n"
]
}
],
"source": [
"ds = rioxarray.open_rasterio(url, lock=False, chunks=(4, \"auto\", -1))\n",
"%time _ = ds.mean().compute()"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 2.48 s, sys: 292 ms, total: 2.78 s\n",
"Wall time: 2.97 s\n"
]
}
],
"source": [
"%time _ = ds.mean().compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"With `lock=True` the initial read is slower (since some threads are waiting around for a lock)."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 2.15 s, sys: 284 ms, total: 2.44 s\n",
"Wall time: 5.03 s\n"
]
}
],
"source": [
"ds = rioxarray.open_rasterio(url, chunks=(4, \"auto\", -1)) # use the default locking\n",
"%time _ = ds.mean().compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"But thanks to caching, subsequent reads are much faster."
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 223 ms, sys: 64.9 ms, total: 288 ms\n",
"Wall time: 200 ms\n"
]
}
],
"source": [
"%time _ = ds.mean().compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"If you're reapeatedly reading subsets of the data, consider using `lock=True` or `lock=some_lock_object` to benefit from the caching."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.6"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
1 change: 1 addition & 0 deletions docs/history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Latest
- ENH: Use the list version of `transform_geom` with rasterio 1.2+ (issue #180)
- ENH: Support driver autodetection with rasterio 1.2+ (issue #180)
- BUG: Allow `rio.write_crs` when spatial dimensions not found (pull #186)
- ENH: Allow multithreaded, lockless reads with `rio.read_xarary` (issue #214)
snowman2 marked this conversation as resolved.
Show resolved Hide resolved
- BUG: Update to support rasterio 1.2+ merge (issue #180)

0.1.1
Expand Down
75 changes: 66 additions & 9 deletions rioxarray/_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
Source file: https://github.com/pydata/xarray/blob/1d7bcbdc75b6d556c04e2c7d7a042e4379e15303/xarray/backends/rasterio_.py # noqa
"""

import contextlib
import os
import re
import sys
import warnings
from distutils.version import LooseVersion

Expand All @@ -17,7 +19,7 @@
from rasterio.vrt import WarpedVRT
from xarray import Dataset, IndexVariable
from xarray.backends.common import BackendArray
from xarray.backends.file_manager import CachingFileManager
from xarray.backends.file_manager import CachingFileManager, FileManager
from xarray.backends.locks import SerializableLock
from xarray.coding import times, variables
from xarray.core import indexing
Expand All @@ -30,6 +32,42 @@
# TODO: should this be GDAL_LOCK instead?
RASTERIO_LOCK = SerializableLock()

if sys.version_info >= (3, 7):
NO_LOCK = contextlib.nullcontext()
else:

class nullcontext(contextlib.AbstractContextManager):
def __enter__(self):
pass

def __exit__(self, *excinfo):
pass

NO_LOCK = nullcontext()


class URIManager(FileManager):
def __init__(
self,
opener,
*args,
mode="r",
kwargs=None,
):
self._opener = opener
self._args = args
self._mode = mode
self._kwargs = {} if kwargs is None else dict(kwargs)

def acquire(self, needs_lock=True):
return self._opener(*self._args, mode=self._mode, kwargs=self._kwargs)

def acquire_context(self, needs_lock=True):
yield self.acquire(needs_lock=needs_lock)

def close(self, needs_lock=True):
pass


class RasterioArrayWrapper(BackendArray):
"""A wrapper around rasterio dataset objects"""
Expand Down Expand Up @@ -596,6 +634,7 @@ def open_rasterio(
variable=None,
group=None,
default_name=None,
use_files=False,
**open_kwargs,
):
# pylint: disable=too-many-statements,too-many-locals,too-many-branches
Expand Down Expand Up @@ -638,11 +677,20 @@ def open_rasterio(
NumPy arrays when accessed to avoid reading from the underlying data-
store multiple times. Defaults to True unless you specify the `chunks`
argument to use dask, in which case it defaults to False.
lock: False, True or threading.Lock, optional
If chunks is provided, this argument is passed on to
:py:func:`dask.array.from_array`. By default, a global lock is
used to avoid issues with concurrent access to the same file when using
dask's multithreaded backend.
lock: bool or dask.utils.SerializableLock, optional

If chunks is provided, this argument is used to ensure that only one
thread per process is reading from a rasterio file object at a time.

When ``lock=True`` or a lock instance is provided,
a :class:`xarray.backends.CachingFileManager` is used to cache File objects.
Since rasterio also caches some data, this will make repeated reads from the
same object fast.

When ``lock=False``, no lock is used, allowing for completely parallel reads
from multiple threads or processes. However, a new file handle is opened on
each request.

masked: bool, optional
If True, read the mask and set values to NaN. Defaults to False.
mask_and_scale: bool, optional
Expand Down Expand Up @@ -687,16 +735,25 @@ def open_rasterio(

if lock is None:
lock = RASTERIO_LOCK
elif lock is False:
lock = NO_LOCK

# ensure default for sharing is False
# ref https://github.com/mapbox/rasterio/issues/1504
open_kwargs["sharing"] = open_kwargs.get("sharing", False)

with warnings.catch_warnings(record=True) as rio_warnings:
manager = CachingFileManager(
rasterio.open, filename, lock=lock, mode="r", kwargs=open_kwargs
)
if lock is not NO_LOCK:
manager = CachingFileManager(
rasterio.open, filename, lock=lock, mode="r", kwargs=open_kwargs
)
else:
manager = URIManager(rasterio.open, filename, mode="r", kwargs=open_kwargs)
riods = manager.acquire()
captured_warnings = rio_warnings.copy()

riods = manager.acquire()
captured_warnings = rio_warnings.copy()
# raise the NotGeoreferencedWarning if applicable
for rio_warning in captured_warnings:
if not riods.subdatasets or not isinstance(
Expand Down
7 changes: 7 additions & 0 deletions test/integration/test_integration__io.py
Original file line number Diff line number Diff line change
Expand Up @@ -949,3 +949,10 @@ def test_nc_attr_loading():
}
assert str(rds.time.values[0]) == "2016-12-19 10:27:29.687763"
assert str(rds.time.values[1]) == "2016-12-29 12:52:42.347451"


def test_lockless():
with rioxarray.open_rasterio(
os.path.join(TEST_INPUT_DATA_DIR, "PLANET_SCOPE_3D.nc"), lock=False, chunk=True
) as rds:
rds.mean().compute()