Skip to content

Commit

Permalink
More docs and types for io_streams
Browse files Browse the repository at this point in the history
  • Loading branch information
ekzhang committed Nov 25, 2024
1 parent 56fd7e0 commit 60b4fa0
Showing 1 changed file with 41 additions and 32 deletions.
73 changes: 41 additions & 32 deletions modal/io_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ async def _container_process_logs_iterator(


class _StreamReader(Generic[T]):
"""Provides an interface to buffer and fetch logs from a stream (`stdout` or `stderr`).
"""Retrieve logs from a stream (`stdout` or `stderr`).
As an asynchronous iterable, the object supports the async for statement.
As an asynchronous iterable, the object supports the `for` and `async for`
statements. Just loop over the object to read in chunks.
**Usage**
Expand Down Expand Up @@ -140,12 +141,14 @@ def __init__(
self._consume_container_process_task = asyncio.create_task(self._consume_container_process_stream())

@property
def file_descriptor(self):
def file_descriptor(self) -> int:
"""Possible values are `1` for stdout and `2` for stderr."""
return self._file_descriptor

async def read(self) -> T:
"""Fetch and return contents of the entire stream. If EOF was received,
return an empty string.
"""Fetch and return contents of the entire stream.
If EOF was received, return an empty string.
**Usage**
Expand All @@ -157,7 +160,6 @@ async def read(self) -> T:
print(sandbox.stdout.read())
```
"""
data_str = ""
data_bytes = b""
Expand All @@ -175,9 +177,7 @@ async def read(self) -> T:
return cast(T, data_bytes)

async def _consume_container_process_stream(self):
"""
Consumes the container process stream and stores the messages in the buffer.
"""
"""Consume the container process stream and store messages in the buffer."""
if self._stream_type == StreamType.DEVNULL:
return

Expand Down Expand Up @@ -211,9 +211,7 @@ async def _consume_container_process_stream(self):
raise exc

async def _stream_container_process(self) -> AsyncGenerator[Tuple[Optional[bytes], str], None]:
"""mdmd:hidden
Streams the container process buffer to the reader.
"""
"""Streams the container process buffer to the reader."""
entry_id = 0
if self._last_entry_id:
entry_id = int(self._last_entry_id) + 1
Expand All @@ -232,8 +230,7 @@ async def _stream_container_process(self) -> AsyncGenerator[Tuple[Optional[bytes
entry_id += 1

async def _get_logs(self) -> AsyncGenerator[Optional[bytes], None]:
"""mdmd:hidden
Streams sandbox or process logs from the server to the reader.
"""Streams sandbox or process logs from the server to the reader.
Logs returned by this method may contain partial or multiple lines at a time.
Expand Down Expand Up @@ -278,9 +275,7 @@ async def _get_logs(self) -> AsyncGenerator[Optional[bytes], None]:
raise

async def _get_logs_by_line(self) -> AsyncGenerator[Optional[bytes], None]:
"""mdmd:hidden
Processes logs from the server and yields complete lines only.
"""
"""Process logs from the server and yield complete lines only."""
async for message in self._get_logs():
if message is None:
if self._line_buffer:
Expand Down Expand Up @@ -325,25 +320,25 @@ async def __anext__(self) -> T:
class _StreamWriter:
"""Provides an interface to buffer and write logs to a sandbox or container process stream (`stdin`)."""

def __init__(self, object_id: str, object_type: Literal["sandbox", "container_process"], client: _Client):
def __init__(self, object_id: str, object_type: Literal["sandbox", "container_process"], client: _Client) -> None:
"""mdmd:hidden"""
self._index = 1
self._object_id = object_id
self._object_type = object_type
self._client = client
self._is_closed = False
self._buffer = bytearray()

def get_next_index(self):
"""mdmd:hidden"""
def _get_next_index(self) -> int:
index = self._index
self._index += 1
return index

def write(self, data: Union[bytes, bytearray, memoryview, str]):
"""
Writes data to stream's internal buffer, but does not drain/flush the write.
def write(self, data: Union[bytes, bytearray, memoryview, str]) -> None:
"""Write data to the stream but does not send it immediately.
This method needs to be used along with the `drain()` method which flushes the buffer.
This is non-blocking and queues the data to an internal buffer. Must be
used along with the `drain()` method, which flushes the buffer.
**Usage**
Expand Down Expand Up @@ -376,21 +371,35 @@ def write(self, data: Union[bytes, bytearray, memoryview, str]):
raise TypeError(f"data argument must be a bytes-like object, not {type(data).__name__}")

def write_eof(self):
"""
Closes the write end of the stream after the buffered write data is drained.
If the process was blocked on input, it will become unblocked after `write_eof()`.
"""Close the write end of the stream after the buffered data is drained.
This method needs to be used along with the `drain()` method which flushes the EOF to the process.
If the process was blocked on input, it will become unblocked after
`write_eof()`. This method needs to be used along with the `drain()`
method, which flushes the EOF to the process.
"""
self._is_closed = True

async def drain(self):
"""
Flushes the write buffer to the running process. Flushes the EOF if the writer is closed.
async def drain(self) -> None:
"""Flush the write buffer and send data to the running process.
This is a flow control method that blocks until data is sent. It returns
when it is appropriate to continue writing data to the stream.
**Usage**
```python
# Synchronous
writer.write(data)
writer.drain()
# Async
writer.write(data)
await writer.drain.aio()
```
"""
data = bytes(self._buffer)
self._buffer.clear()
index = self.get_next_index()
index = self._get_next_index()

try:
if self._object_type == "sandbox":
Expand Down

0 comments on commit 60b4fa0

Please sign in to comment.