Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add client retry support to .map #2571

Closed
wants to merge 8 commits into from
Closed

Add client retry support to .map #2571

wants to merge 8 commits into from

Conversation

rohansingh
Copy link
Contributor

@rohansingh rohansingh commented Nov 25, 2024

Implements similar logic for .map as #2403 did for .remote.

To do this, we create a TimedPriorityQueue, which holds all pending inputs and pending retries, along with the timestamp at which they should be sent to the server.

When inputs are first read from the generator, they are added to the queue with a timestamp of "now". And if a failed output is received from the server, it's added back to the queue with a configured retry delay.

@rohansingh rohansingh changed the title Rohan/retry map Add client retry support to .map Nov 25, 2024
@rohansingh rohansingh force-pushed the rohan/retry-map branch 6 times, most recently from 00957f4 to 55349c4 Compare December 2, 2024 15:48
@rohansingh rohansingh marked this pull request as ready for review December 2, 2024 15:48
@rohansingh rohansingh force-pushed the rohan/retry-map branch 8 times, most recently from 85f41bb to bd1ee57 Compare December 2, 2024 21:08
When two items had the same timestamp, we would try to sort by the
actual item value, which breaks for types that don't support comparison.

Instead use a nonce when inserting an item, to ensure that we never have
to compare the item value itself.
Though very unlikely outside of unit tests, it's possible to have an output
returned before the corresponding retry context has been put into the
`pending_outputs` dict.
Once the input queue filled up, we had no more room to put pending
retries. And since we had no more room to put retries, we stopped
fetching new outputs. And since we stopped fetching new outputs, the
server stopped accepting new inputs.

As a result, the input queue would never burn down.

Instead, use a semaphore to ensure we never have more than 1000 items
outstanding.
Instead of using a priority queue, just use the event loop to schedule
retries in the future. This significantly simplifies the implementation
and makes it much more like the original.

Note that we still do have a semaphore that ensures that no more than 1K
inputs are in flight (i.e., sent to the server but not completed).
@rohansingh
Copy link
Contributor Author

Continued in #2734.

@rohansingh rohansingh closed this Jan 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant