Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet Dataset cache not reliable #800

Closed
fjetter opened this issue Jan 24, 2024 · 7 comments · Fixed by #798
Closed

Parquet Dataset cache not reliable #800

fjetter opened this issue Jan 24, 2024 · 7 comments · Fixed by #798
Assignees

Comments

@fjetter
Copy link
Member

fjetter commented Jan 24, 2024

The parquet dataset cache, see https://github.com/dask-contrib/dask-expr/blob/13af21d1be9e0a3393f4971a0d95382188f6f248/dask_expr/io/parquet.py#L54-L57
is currently identified with a token that is deterministic given the user input arguments but it is not sensitive to any external information. Particularly this means that there is no user accessible way to invalidate this cache. Any kind of mutation of the dataset whether this is to schema, metadata or just an append operation would go unnoticed.

This problem is amplified by us additionally using a cached property for everything derived from this making it impossible to reuse an existing instance correctly once the dataset is mutated (this can cause errors but also data loss).

Both of these caching layers would need to be removed to enable Expr objects becoming singletons as proposed in #798

This scenario is actually tested in our unit tests, see test_to_parquet. This test is writing a dataset, reading it and overwriting it again. The only reason why this test currently works is because the cache is invalidated if the overwrite keyword is used, see here. However, this kind of cache invalidation will not work reliably in any kind of multi interpreter environment, let alone a complex system where we are just ingesting a dataset.

The global cache should use a modified at timestamp or similar that allows invalidation. If something like this is not possible we need at the very least a mechanism that allows us to ignore or invalidate the cache.

@mrocklin
Copy link
Member

mrocklin commented Jan 24, 2024 via email

@fjetter
Copy link
Member Author

fjetter commented Jan 24, 2024

The tricky problem will be the ordinary user API where a user just points to a directory. Without any metadata files we'll need to walk over all files and collect the hashes/etags for every file. This is what _collect_dataset_info already does and (kind of) but this is also relatively expensive. If we have to do this to verify the cache we could just as well not cache it

@rjzamora
Copy link
Member

I think it probably makes sense to add and document a mechanism to clear all file system and data-source caching.

I think most dask users are not frequently mutating their dataset between read-operations, and so we probably don't want to add a lot of complexity to inspect the dataset all the time.

@fjetter
Copy link
Member Author

fjetter commented Jan 29, 2024

I'm currently looking a little closer into the S3FS implementation and as it turns out it is all but guaranteed to walk over the entire bucket even before we try to read any data. This should be fixed... but it also tells me that collecting etags/last modified for all/most/some of the files wouldn't be catastrophic since we're already doing it...

@rjzamora
Copy link
Member

rjzamora commented Jan 29, 2024

but it also tells me that collecting etags/last modified for all/most/some of the files wouldn't be catastrophic since we're already doing it...

Thanks for looking into this. It makes sense that this wouldn't be catastrophic as long as we don't need to gather new information every optimization step. However, I'm worried that we would need to do exactly that to be completely robust.

I wonder how reasonable it would be to simply add an optional "reset_cache" kwarg to read_parquet (and other IO APIs) for the special case that someone is reading from and writing to the same dataset within the same workflow. We could then ignore/replace a matching key in Expr._instances (and other caches).

@fjetter
Copy link
Member Author

fjetter commented Jan 30, 2024

If we implemented any checksum/hash/etag/"modified since" cache invalidation logic this would only be relevant on a user read_parquet call. We would still cache that kind of information such that the optimizer can utilize the cache. We would create a unique token in the user facing read_parquet call and just pass this as an argument to the expression.

I'm rather strongly against a reset_cache kwarg or any other manual cache invalidation since I think this violates the principle of least surprise for user APIs. If I perform a read_parquet call I expect this to represent the dataset at the time I called that API. From that perspective, a simple solution to get started would be a simple UUID that is being generated in read_parquet and passed as an argument to ReadParquet(Expr) which is used for the name/token. Eventually, this could be replaced with a checksum/etag token.

@rjzamora
Copy link
Member

We would create a unique token in the user facing read_parquet call and just pass this as an argument to the expression.

Okay, that makes sense to me. I was originally going to recommend the same thing. I started to get worried that scanning the file system outside the expression would be too inconsistent with the current programming model. However, I’m starting to come back around.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants