Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MOD-2454: remove aiostream dependency: async_map #2349

Closed
Closed
Changes from 1 commit
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
5d829be
async_merge
kramstrom Oct 16, 2024
f67fff4
test
kramstrom Oct 16, 2024
9f99d5e
feedback
kramstrom Oct 16, 2024
6fefd8e
typing
kramstrom Oct 16, 2024
295da40
typing
kramstrom Oct 16, 2024
4bdd660
typing
kramstrom Oct 16, 2024
b644bbe
Merge branch 'main' into kramstrom/mod-2454-remove-aiostream-dependen…
kramstrom Oct 17, 2024
c6a41f0
Merge branch 'main' of github.com:modal-labs/modal-client into kramst…
kramstrom Oct 17, 2024
7870fe6
async_map
kramstrom Oct 17, 2024
7168310
Merge branch 'main' of github.com:modal-labs/modal-client into kramst…
kramstrom Oct 17, 2024
9103d5d
typing
kramstrom Oct 17, 2024
a745f57
remove
kramstrom Oct 17, 2024
a11769b
remove
kramstrom Oct 17, 2024
b12ba5f
fix
kramstrom Oct 17, 2024
80f2a65
wrapping
kramstrom Oct 17, 2024
f976685
fix
kramstrom Oct 17, 2024
31b1913
Merge branch 'main' into kramstrom/mod-2454-remove-aiostream-dependen…
kramstrom Oct 18, 2024
e50f61d
update from comments
kramstrom Oct 18, 2024
fa40bad
Merge branch 'kramstrom/mod-2454-remove-aiostream-dependency-async_ma…
kramstrom Oct 18, 2024
2d205e4
[draft PR for a draft PR] async_map fixes (#2363)
freider Oct 21, 2024
50a6464
Merge branch 'main' of github.com:modal-labs/modal-client into kramst…
kramstrom Oct 21, 2024
cb1202a
re-order
kramstrom Oct 21, 2024
07547c7
formatting
kramstrom Oct 21, 2024
b20c985
async
kramstrom Oct 21, 2024
3498aa3
tests
kramstrom Oct 21, 2024
fd194d8
typing
kramstrom Oct 21, 2024
4ee4bd1
typing
kramstrom Oct 21, 2024
20ebf90
typing
kramstrom Oct 21, 2024
f4e3efd
typing
kramstrom Oct 21, 2024
73acd60
fixes
kramstrom Oct 21, 2024
9082db2
type
kramstrom Oct 21, 2024
e7845d7
Merge branch 'main' of github.com:modal-labs/modal-client into kramst…
kramstrom Oct 21, 2024
ee5b26b
type
kramstrom Oct 21, 2024
96ab30d
fixes
kramstrom Oct 21, 2024
5f439b0
more tests
kramstrom Oct 21, 2024
f5dcfd7
more tests
kramstrom Oct 21, 2024
31e4ff3
raise different exception
kramstrom Oct 21, 2024
672d55d
fix
kramstrom Oct 21, 2024
1d72a47
tests
kramstrom Oct 22, 2024
3089dd8
Merge branch 'main' of github.com:modal-labs/modal-client into kramst…
kramstrom Oct 22, 2024
ae71793
Merge branch 'main' of github.com:modal-labs/modal-client into kramst…
kramstrom Oct 23, 2024
3b2cf99
cancellation errors
kramstrom Oct 23, 2024
62cf9f4
queue maxsize
kramstrom Oct 23, 2024
3dc129d
Merge branch 'main' of github.com:modal-labs/modal-client into kramst…
kramstrom Oct 24, 2024
566e7ca
Merge branch 'main' of github.com:modal-labs/modal-client into kramst…
kramstrom Oct 25, 2024
1bc20c7
Merge branch 'main' of github.com:modal-labs/modal-client into kramst…
kramstrom Oct 28, 2024
9827418
Merge branch 'main' of github.com:modal-labs/modal-client into kramst…
kramstrom Oct 29, 2024
e4abd87
adds
kramstrom Oct 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
tests
kramstrom committed Oct 22, 2024
commit 1d72a477fb3cacb1105b0d1cb2ee6e4a3441231c
73 changes: 41 additions & 32 deletions test/async_utils_test.py
Original file line number Diff line number Diff line change
@@ -655,7 +655,8 @@ async def mapper(x):


@pytest.mark.asyncio
async def test_async_map_input_cancellation_before_yield():
async def test_async_map_input_exception_async_producer():
# test exception async producer
result = []
states = []

@@ -666,9 +667,9 @@ async def mapper_func(x):
async def gen():
states.append("enter")
try:
raise SampleException("test")
for i in range(5):
await asyncio.sleep(0.1)
if i == 3:
raise SampleException("test")
yield i
finally:
states.append("exit")
@@ -678,89 +679,97 @@ async def gen():
result.append(item)

assert sorted(result) == []
assert states == ["enter", "exit"]
assert sorted(states) == ["enter", "exit"]


@pytest.mark.asyncio
async def test_async_map_input_cancellation_after_yield():
async def test_async_map_input_exception_sync_producer():
# test exception sync producer
result = []
states = []

async def mapper_func(x):
await asyncio.sleep(0.1)
return x * 2

async def gen():
def gen():
states.append("enter")
try:
for i in range(5):
await asyncio.sleep(0.1)
if i == 3:
raise SampleException("test")
yield i
raise SampleException("test")
finally:
states.append("exit")

with pytest.raises(SampleException):
async for item in async_map(gen(), mapper_func, concurrency=3):
result.append(item)

assert sorted(result) == [0, 2, 4, 6]
assert sorted(result) == []
assert sorted(states) == ["enter", "exit"]


@pytest.mark.asyncio
async def test_async_map_input_cancellation_between_yields():
async def test_async_map_output_exception_async_func():
# test cancelling async mapper function
result = []
states = []

async def mapper_func(x):
await asyncio.sleep(0.1)
return x * 2

async def gen():
def gen():
states.append("enter")
try:
for i in range(5):
if i == 3:
raise SampleException("test")
await asyncio.sleep(0.1)
yield i
finally:
states.append("exit")

async def mapper_func(x):
await asyncio.sleep(0.1)
if x == 3:
raise SampleException("test")
return x * 2

with pytest.raises(SampleException):
async for item in async_map(gen(), mapper_func, concurrency=3):
result.append(item)

assert sorted(result) == [0, 2]
assert sorted(states) == ["enter", "exit"]
assert sorted(result) == [0, 2, 4]
assert states == ["enter", "exit"]


@pytest.mark.asyncio
@pytest.mark.parametrize("cancelled_at_idx", [0, 1, 3])
async def test_async_map_output_cancellation(cancelled_at_idx):
async def test_async_map_streaming_input():
# ensure we can stream input
# and dont buffer all the items and return them after
result = []
states = []

def gen():
async def gen():
states.append("enter")
try:
for i in range(5):
yield i
yield 1
await asyncio.sleep(1)
yield 2
yield 3
finally:
states.append("exit")

async def mapper_func(x):
async def mapper(x):
await asyncio.sleep(0.1)
if x == cancelled_at_idx:
raise SampleException("test")
return x * 2

with pytest.raises(SampleException):
async for item in async_map(gen(), mapper_func, concurrency=3):
result.append(item)
import time

start = time.time()
async for item in async_map(gen(), mapper, concurrency=3):
if item == 2:
assert time.time() - start < 0.5
else:
assert time.time() - start > 0.5
result.append(item)

assert sorted(result) == [0, 2, 4][:cancelled_at_idx]
assert result == [2, 4, 6]
assert states == ["enter", "exit"]