-
Notifications
You must be signed in to change notification settings - Fork 43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Zarr Tile Sink #1446
Zarr Tile Sink #1446
Changes from 15 commits
0f537bb
5c6990b
29d8b19
599503f
10a1f7e
12946a0
f8fb42e
d697920
b50ecd9
f36f1a8
6b6a6a4
d6dd5ad
b7ed234
df8819f
0a832fd
cd0c715
41ea6b5
eaaf29e
0414f71
3da02f6
d0d6129
07e744f
5118fbc
e1e9831
6706f69
b58fcda
756502b
58897b3
a0b3fbc
f0e7482
5317daa
32c9c58
728faee
14bee28
ce1b1b7
5fac3a8
23b70f7
414ba67
f35d06e
157c731
62e6572
71d1068
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,19 +1,23 @@ | ||
import math | ||
import os | ||
import shutil | ||
import tempfile | ||
import threading | ||
import uuid | ||
from importlib.metadata import PackageNotFoundError | ||
from importlib.metadata import version as _importlib_version | ||
from pathlib import Path | ||
|
||
import numpy as np | ||
import packaging.version | ||
import zarr | ||
|
||
import large_image | ||
from large_image.cache_util import LruCacheMetaclass, methodcache | ||
from large_image.constants import TILE_FORMAT_NUMPY, SourcePriority | ||
from large_image.constants import NEW_IMAGE_PATH_FLAG, TILE_FORMAT_NUMPY, SourcePriority | ||
from large_image.exceptions import TileSourceError, TileSourceFileNotFoundError | ||
from large_image.tilesource import FileTileSource | ||
from large_image.tilesource.utilities import nearPowerOfTwo | ||
from large_image.tilesource.utilities import _imageToNumpy, nearPowerOfTwo | ||
|
||
try: | ||
__version__ = _importlib_version(__name__) | ||
|
@@ -52,6 +56,13 @@ | |
""" | ||
super().__init__(path, **kwargs) | ||
|
||
if str(path).startswith(NEW_IMAGE_PATH_FLAG): | ||
self._initNew(path, **kwargs) | ||
else: | ||
self._initOpen(**kwargs) | ||
self._tileLock = threading.RLock() | ||
|
||
def _initOpen(self, **kwargs): | ||
self._largeImagePath = str(self._getLargeImagePath()) | ||
self._zarr = None | ||
if not os.path.isfile(self._largeImagePath) and '//:' not in self._largeImagePath: | ||
|
@@ -79,7 +90,47 @@ | |
except Exception: | ||
msg = 'File cannot be opened -- not an OME NGFF file or understandable zarr file.' | ||
raise TileSourceError(msg) | ||
self._tileLock = threading.RLock() | ||
|
||
def _initNew(self, path, **kwargs): | ||
""" | ||
Initialize the tile class for creating a new image. | ||
""" | ||
self._tempfile = tempfile.NamedTemporaryFile(suffix=path) | ||
self._zarr_store = zarr.SQLiteStore(self._tempfile.name) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It turns out that the SQLiteStore is vastly less performant than I had hoped. Specifically, there are some sort of locking semantics on this store that prevent efficient thread sharing. The DirectoryStore is the most efficient (but, of course, means that we have a temp directory to clean up rather than a file; I think this is okay). The ZipStore is less efficient than the DirectoryStore, but still much more efficient than the SQLiteStore. In my sweep algortihm test, the ZipStore throws some warnings; I don't know if they are a concern or not. We should probably switch to the DirectoryStore for new files. As a side effect of switching to the DirectoryStore, if we knew how big things were going to grow, we might by inter-process pickleabilty and ability (assuming each process writes different parts of the array). This would be in a future PR to ensure we have proved it works correctly. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I switched to using |
||
self._zarr = zarr.open(self._zarr_store, mode='w') | ||
# Make unpickleable | ||
self._unpickleable = True | ||
self._largeImagePath = None | ||
self._dims = {} | ||
self.sizeX = self.sizeY = self.levels = 0 | ||
self.tileWidth = self.tileHeight = self._tileSize | ||
self._cacheValue = str(uuid.uuid4()) | ||
self._output = None | ||
self._editable = True | ||
self._bandRanges = None | ||
self._addLock = threading.RLock() | ||
self._framecount = 0 | ||
self._mm_x = 0 | ||
self._mm_y = 0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should probably add
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, added in 157c731 |
||
|
||
def __del__(self): | ||
if not hasattr(self, '_derivedSource'): | ||
try: | ||
self._zarr.close() | ||
except Exception: | ||
pass | ||
try: | ||
self._tempfile.close() | ||
except Exception: | ||
pass | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to also delete the temporary file (or directory if we switch to it). If possible, it'd be nice to flag temp files/directories so they delete on close in general, so if the python process is killed or dies, the OS still cleans them up. |
||
|
||
def _checkEditable(self): | ||
""" | ||
Raise an exception if this is not an editable image. | ||
""" | ||
if not self._editable: | ||
msg = 'Not an editable image' | ||
raise TileSourceError(msg) | ||
|
||
def _getGeneralAxes(self, arr): | ||
""" | ||
|
@@ -396,6 +447,9 @@ | |
|
||
@methodcache() | ||
def getTile(self, x, y, z, pilImageAllowed=False, numpyAllowed=False, **kwargs): | ||
if self._levels is None: | ||
annehaley marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self._validateZarr() | ||
|
||
frame = self._getFrame(**kwargs) | ||
self._xyzInRange(x, y, z, frame, self._framecount) | ||
x0, y0, x1, y1, step = self._xyzToCorners(x, y, z) | ||
|
@@ -438,6 +492,203 @@ | |
return self._outputTile(tile, TILE_FORMAT_NUMPY, x, y, z, | ||
pilImageAllowed, numpyAllowed, **kwargs) | ||
|
||
def addTile(self, tile, x=0, y=0, mask=None, axes=None, **kwargs): | ||
""" | ||
Add a numpy or image tile to the image, expanding the image as needed | ||
to accommodate it. Note that x and y can be negative. If so, the | ||
output image (and internal memory access of the image) will act as if | ||
the 0, 0 point is the most negative position. Cropping is applied | ||
after this offset. | ||
|
||
:param tile: a numpy array, PIL Image, or a binary string | ||
with an image. The numpy array can have 2 or 3 dimensions. | ||
:param x: location in destination for upper-left corner. | ||
:param y: location in destination for upper-left corner. | ||
:param mask: a 2-d numpy array (or 3-d if the last dimension is 1). | ||
If specified, areas where the mask is false will not be altered. | ||
:param axes: a string or list of strings specifying the names of axes | ||
in the same order as the tile dimensions | ||
:param kwargs: start locations for any additional axes | ||
""" | ||
# TODO: improve band bookkeeping | ||
|
||
self._checkEditable() | ||
placement = { | ||
'x': x, | ||
'y': y, | ||
**kwargs, | ||
} | ||
if axes is None: | ||
if len(tile.shape) == 2: | ||
axes = 'yx' | ||
elif len(tile.shape) == 3: | ||
axes = 'yxs' | ||
else: | ||
axes = '' | ||
if isinstance(axes, str): | ||
axes = axes.lower() | ||
elif isinstance(axes, list): | ||
axes = [x.lower() for x in axes] | ||
else: | ||
err = 'Invalid type for axes. Must be str or list[str].' | ||
raise ValueError(err) | ||
self._axes = {k: i for i, k in enumerate(axes)} | ||
|
||
tile, mode = _imageToNumpy(tile) | ||
while len(tile.shape) < len(axes): | ||
tile = np.expand_dims(tile, axis=0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It occurred to me this could be done without an explicit while loop. I think the equivalent would be:
I'm not sure that is clearer, though. |
||
|
||
new_dims = { | ||
a: max( | ||
self._dims.get(a, 0), | ||
placement.get(a, 0) + tile.shape[i], | ||
) | ||
for a, i in self._axes.items() | ||
} | ||
self._dims = new_dims | ||
self._dtype = tile.dtype | ||
self._bandCount = new_dims.get(axes[-1]) # last axis is assumed to be bands | ||
self.sizeX = new_dims.get('x') | ||
self.sizeY = new_dims.get('y') | ||
self._framecount = np.prod([ | ||
length | ||
for axis, length in new_dims.items() | ||
if axis in axes[:-3] | ||
]) | ||
self._levels = None | ||
self.levels = int(max(1, math.ceil(math.log(max( | ||
self.sizeX / self.tileWidth, self.sizeY / self.tileHeight)) / math.log(2)) + 1)) | ||
|
||
if not mask: | ||
annehaley marked this conversation as resolved.
Show resolved
Hide resolved
|
||
mask = np.full(tile.shape, True) | ||
full_mask = np.full(tuple(new_dims.values()), False) | ||
annehaley marked this conversation as resolved.
Show resolved
Hide resolved
|
||
mask_placement_slices = tuple([ | ||
slice(placement.get(a, 0), placement.get(a, 0) + mask.shape[i], 1) | ||
for i, a in enumerate(axes) | ||
]) | ||
full_mask[mask_placement_slices] = mask | ||
|
||
current_arrays = dict(self._zarr.arrays()) | ||
with self._addLock: | ||
annehaley marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if 'root' not in current_arrays: | ||
chunking = tuple([ | ||
self._tileSize if a in ['x', 'y'] else | ||
32 if a == 's' else 1 | ||
for a in axes | ||
]) | ||
self._zarr.create_dataset('root', data=tile, chunks=chunking) | ||
else: | ||
root = current_arrays['root'] | ||
root.resize(*tuple(new_dims.values())) | ||
data = root[:] | ||
np.place(data, full_mask, tile) | ||
annehaley marked this conversation as resolved.
Show resolved
Hide resolved
|
||
root[:] = data | ||
|
||
# Edit OME metadata | ||
self._zarr.attrs.update({ | ||
'multiscales': [{ | ||
'version': '0.5-dev', | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I haven't checked ome zarr docs for version information. Are we more conformant to a dev version than the latest official version? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was just following what I found here: https://ngff.openmicroscopy.org/latest/#multiscale-md, but I'm not entirely confident that this is all correct. |
||
'axes': [{ | ||
'name': a, | ||
'type': 'space' if a in ['x', 'y'] else 'other', | ||
} for a in axes], | ||
'datasets': [{'path': 0}], | ||
}], | ||
'omero': {'version': '0.5-dev'}, | ||
}) | ||
|
||
@property | ||
def crop(self): | ||
""" | ||
Crop only applies to the output file, not the internal data access. | ||
|
||
It consists of x, y, w, h in pixels. | ||
""" | ||
return getattr(self, '_crop', None) | ||
|
||
@crop.setter | ||
def crop(self, value): | ||
self._checkEditable() | ||
if value is None: | ||
self._crop = None | ||
return | ||
x, y, w, h = value | ||
x = int(x) | ||
y = int(y) | ||
w = int(w) | ||
h = int(h) | ||
if x < 0 or y < 0 or w <= 0 or h <= 0: | ||
msg = 'Crop must have non-negative x, y and positive w, h' | ||
raise TileSourceError(msg) | ||
self._crop = (x, y, w, h) | ||
|
||
def write( | ||
self, | ||
path, | ||
lossy=True, | ||
alpha=True, | ||
overwriteAllowed=True, | ||
): | ||
""" | ||
Output the current image to a file. | ||
|
||
:param path: output path. | ||
:param lossy: if false, emit a lossless file. | ||
:param alpha: True if an alpha channel is allowed. | ||
:param overwriteAllowed: if False, raise an exception if the output | ||
path exists. | ||
""" | ||
# TODO: compute half, quarter, etc. resolutions | ||
if not overwriteAllowed and os.path.exists(path): | ||
raise TileSourceError('Output path exists (%s).' % str(path)) | ||
annehaley marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
self._validateZarr() | ||
suffix = Path(path).suffix | ||
data_file = self._tempfile | ||
data_store = self._zarr_store | ||
|
||
if self.crop: | ||
x, y, w, h = self.crop | ||
current_arrays = dict(self._zarr.arrays()) | ||
# create new temp storage for cropped data | ||
data_file = tempfile.NamedTemporaryFile() | ||
data_store = zarr.SQLiteStore(data_file.name) | ||
cropped_zarr = zarr.open(data_store, mode='w') | ||
for arr_name in current_arrays: | ||
arr = np.array(current_arrays[arr_name]) | ||
cropped_arr = arr.take( | ||
indices=range(x, x + w), | ||
axis=self._axes.get('x'), | ||
).take( | ||
indices=range(y, y + h), | ||
axis=self._axes.get('y'), | ||
) | ||
cropped_zarr.create_dataset(arr_name, data=cropped_arr, overwrite=True) | ||
cropped_zarr.attrs.update(self._zarr.attrs) | ||
|
||
data_file.flush() | ||
|
||
if suffix in ['.db', '.sqlite']: | ||
shutil.copy2(data_file.name, path) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was trying out using a ZipStore or DirectoryStore for the temp file. In this case, I replaced the shutil.copy2 with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure. What does the error look like? I can search for if anyone has experienced it and found a solution. |
||
|
||
elif suffix == '.zip': | ||
zip_store = zarr.storage.ZipStore(path) | ||
annehaley marked this conversation as resolved.
Show resolved
Hide resolved
|
||
zarr.copy_store(data_store, zip_store) | ||
annehaley marked this conversation as resolved.
Show resolved
Hide resolved
|
||
zip_store.close() | ||
|
||
elif suffix == '.zarr': | ||
dir_store = zarr.storage.DirectoryStore(path) | ||
zarr.copy_store(data_store, dir_store) | ||
dir_store.close() | ||
|
||
else: | ||
from large_image_converter import convert | ||
|
||
convert(data_file.name, path, overwrite=overwriteAllowed) | ||
annehaley marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if self.crop: | ||
data_file.close() | ||
|
||
|
||
def open(*args, **kwargs): | ||
""" | ||
|
@@ -451,3 +702,11 @@ | |
Check if an input can be read by the module class. | ||
""" | ||
return ZarrFileTileSource.canRead(*args, **kwargs) | ||
|
||
|
||
def new(*args, **kwargs): | ||
""" | ||
Create a new image, collecting the results from patches of numpy arrays or | ||
smaller images. | ||
""" | ||
return ZarrFileTileSource(NEW_IMAGE_PATH_FLAG + str(uuid.uuid4()), *args, **kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name passed in
path
is guaranteed to be unique. I can see refactoring this (see other comments) to enable multi-process support, at which point we'd want to be able to consistently find the tempfile from the path. In that instance, if TMPDIR / path exists, we could open an existing zarr store and add to it rather than creating a new one. Obviously, doing so would require some logic changes (such as knowing the set of axes and maybe the final dimensions ahead of time and not recreating the dataset).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this to the "future work" section of this PR's description so we can come back to this idea when the core functionality is complete.