Skip to content

Commit

Permalink
chore(grpc): print agent logs in addition to sending over grpc
Browse files Browse the repository at this point in the history
  • Loading branch information
efiop committed May 21, 2024
1 parent ed379aa commit 4f7e914
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 69 deletions.
7 changes: 5 additions & 2 deletions src/isolate/backends/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,23 +171,26 @@ def _unblocked_pipe() -> tuple[int, int]:
def logged_io(
stdout_hook: HookT,
stderr_hook: HookT | None = None,
) -> Iterator[tuple[int, int]]:
log_hook: HookT | None = None,
) -> Iterator[tuple[int, int, int]]:
"""Open two new streams (for stdout and stderr, respectively) and start relaying all
the output from them to the given hooks."""

stdout_reader_fd, stdout_writer_fd = _unblocked_pipe()
stderr_reader_fd, stderr_writer_fd = _unblocked_pipe()
log_reader_fd, log_writer_fd = _unblocked_pipe()

termination_event = threading.Event()
io_observer = _io_observer(
hooks={
stdout_reader_fd: stdout_hook,
stderr_reader_fd: stderr_hook or stdout_hook,
log_reader_fd: log_hook or stdout_hook,
},
termination_event=termination_event,
)
try:
yield stdout_writer_fd, stderr_writer_fd
yield stdout_writer_fd, stderr_writer_fd, log_writer_fd
finally:
termination_event.set()
try:
Expand Down
2 changes: 1 addition & 1 deletion src/isolate/backends/conda.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def _run_destroy(self, connection_key: str) -> None:

def _run_conda(self, *args: Any) -> None:
conda_executable = get_executable(self._exec_command, self._exec_home)
with logged_io(partial(self.log, level=LogLevel.INFO)) as (stdout, stderr):
with logged_io(partial(self.log, level=LogLevel.INFO)) as (stdout, stderr, _):
subprocess.check_call(
[conda_executable, *args],
stdout=stdout,
Expand Down
4 changes: 2 additions & 2 deletions src/isolate/backends/pyenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def _try_get_prefix(self, pyenv: Path, root_path: Path) -> Path | None:
return Path(prefix.strip())

def _install_python(self, pyenv: Path, root_path: Path) -> None:
with logged_io(partial(self.log, level=LogLevel.INFO)) as (stdout, stderr):
with logged_io(partial(self.log, level=LogLevel.INFO)) as (stdout, stderr, _):
try:
subprocess.check_call(
[pyenv, "install", "--skip-existing", self.python_version],
Expand All @@ -102,7 +102,7 @@ def destroy(self, connection_key: Path) -> None:
return None

pyenv_root = connection_key.parent.parent
with logged_io(self.log) as (stdout, stderr):
with logged_io(self.log) as (stdout, stderr, _):
subprocess.check_call(
[pyenv, "uninstall", "-f", connection_key.name],
env={**os.environ, "PYENV_ROOT": str(pyenv_root)},
Expand Down
2 changes: 1 addition & 1 deletion src/isolate/backends/virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def install_requirements(self, path: Path) -> None:
for extra_index_url in self.extra_index_urls:
pip_cmd.extend(["--extra-index-url", extra_index_url])

with logged_io(partial(self.log, level=LogLevel.INFO)) as (stdout, stderr):
with logged_io(partial(self.log, level=LogLevel.INFO)) as (stdout, stderr, _):
try:
subprocess.check_call(
pip_cmd,
Expand Down
21 changes: 15 additions & 6 deletions src/isolate/connections/_local/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from isolate.backends.common import get_executable_path, logged_io
from isolate.connections.common import AGENT_SIGNATURE
from isolate.logs import LogLevel
from isolate.logs import LogLevel, LogSource

if TYPE_CHECKING:
from isolate.backends import BaseEnvironment
Expand Down Expand Up @@ -113,14 +113,22 @@ def start_process(

python_executable = get_executable_path(self.environment_path, "python")
with logged_io(
partial(self.handle_agent_log, level=LogLevel.STDOUT),
partial(self.handle_agent_log, level=LogLevel.STDERR),
) as (stdout, stderr):
partial(
self.handle_agent_log, source=LogSource.USER, level=LogLevel.STDOUT
),
partial(
self.handle_agent_log, source=LogSource.USER, level=LogLevel.STDERR
),
partial(
self.handle_agent_log, source=LogSource.BRIDGE, level=LogLevel.TRACE
),
) as (stdout, stderr, log_fd):
yield subprocess.Popen(
self.get_python_cmd(python_executable, connection),
self.get_python_cmd(python_executable, connection, log_fd),
env=self.get_env_vars(),
stdout=stdout,
stderr=stderr,
pass_fds=(log_fd,),
text=True,
)

Expand Down Expand Up @@ -158,11 +166,12 @@ def get_python_cmd(
self,
executable: Path,
connection: ConnectionType,
log_fd: int,
) -> list[str | Path]:
"""Return the command to run the agent process with."""
raise NotImplementedError

def handle_agent_log(self, line: str, level: LogLevel) -> None:
def handle_agent_log(self, line: str, level: LogLevel, source: LogSource) -> None:
"""Handle a log line emitted by the agent process. The level will be either
STDOUT or STDERR."""
raise NotImplementedError
8 changes: 6 additions & 2 deletions src/isolate/connections/grpc/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,17 @@ def get_python_cmd(
self,
executable: Path,
connection: str,
log_fd: int,
) -> List[Union[str, Path]]:
return [
executable,
agent_startup.__file__,
agent.__file__,
connection,
"--log-fd",
str(log_fd),
]

def handle_agent_log(self, line: str, level: LogLevel) -> None:
self.log(line, level=level, source=LogSource.USER)
def handle_agent_log(self, line: str, level: LogLevel, source: LogSource) -> None:
print(f"[{source}] [{level}] {line}")
self.log(line, level=level, source=source)
56 changes: 26 additions & 30 deletions src/isolate/connections/grpc/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@

from __future__ import annotations

import os
import sys
import traceback
from argparse import ArgumentParser
from concurrent import futures
from dataclasses import dataclass, field
from dataclasses import dataclass
from typing import (
Any,
Generator,
Iterable,
Iterator,
cast,
)

import grpc
Expand All @@ -21,26 +22,28 @@
from isolate.connections.common import SerializationError, serialize_object
from isolate.connections.grpc import definitions
from isolate.connections.grpc.configuration import get_default_options
from isolate.connections.grpc.interface import from_grpc, to_grpc
from isolate.connections.grpc.interface import from_grpc
from isolate.exceptions import IsolateException
from isolate.logs import Log, LogLevel, LogSource


@dataclass
class AbortException(IsolateException):
message: str


@dataclass
class AgentServicer(definitions.AgentServicer):
_run_cache: dict[str, Any] = field(default_factory=dict)
def __init__(self, log_fd: int | None = None):
super().__init__()

self._run_cache: dict[str, Any] = {}
self._log = sys.stdout if log_fd is None else os.fdopen(log_fd, "w")

def Run(
self,
request: definitions.FunctionCall,
context: ServicerContext,
) -> Iterator[definitions.PartialRunResult]:
yield from self.log(f"A connection has been established: {context.peer()}!")
self.log(f"A connection has been established: {context.peer()}!")

extra_args = []
if request.HasField("setup_func"):
Expand All @@ -54,13 +57,13 @@ def Run(
result,
was_it_raised,
stringized_tb,
) = yield from self.execute_function(
) = self.execute_function(
request.setup_func,
"setup",
)

if was_it_raised:
yield from self.log(
self.log(
"The setup function has thrown an error. Aborting the run."
)
yield from self.send_object(
Expand All @@ -79,7 +82,7 @@ def Run(
extra_args.append(self._run_cache[cache_key])

try:
result, was_it_raised, stringized_tb = yield from self.execute_function(
result, was_it_raised, stringized_tb = self.execute_function(
request.function,
"function",
extra_args=extra_args,
Expand All @@ -99,7 +102,7 @@ def execute_function(
function_kind: str,
*,
extra_args: Iterable[Any] = (),
) -> Generator[definitions.PartialRunResult, None, Any]:
) -> tuple[Any, bool, str | None]:
if function.was_it_raised:
raise AbortException(
f"The {function_kind} function must be callable, "
Expand All @@ -119,7 +122,7 @@ def execute_function(
f"not {type(function).__name__}."
)

yield from self.log(f"Starting the execution of the {function_kind} function.")
self.log(f"Starting the execution of the {function_kind} function.")

was_it_raised = False
stringized_tb = None
Expand All @@ -131,7 +134,7 @@ def execute_function(
num_frames = len(traceback.extract_stack()[:-5])
stringized_tb = "".join(traceback.format_exc(limit=-num_frames))

yield from self.log(f"Completed the execution of the {function_kind} function.")
self.log(f"Completed the execution of the {function_kind} function.")
return result, was_it_raised, stringized_tb

def send_object(
Expand All @@ -145,20 +148,18 @@ def send_object(
definition = serialize_object(serialization_method, result)
except SerializationError:
if stringized_tb:
yield from self.log(
stringized_tb, source=LogSource.USER, level=LogLevel.STDERR
)
print(stringized_tb, file=sys.stderr)
raise AbortException(
"Error while serializing the execution result "
f"(object of type {type(result)})."
)
except BaseException:
yield from self.log(traceback.format_exc(), level=LogLevel.ERROR)
self.log(traceback.format_exc())
raise AbortException(
"An unexpected error occurred while serializing the result."
)

yield from self.log("Sending the result.")
self.log("Sending the result.")
serialized_obj = definitions.SerializedObject(
method=serialization_method,
definition=definition,
Expand All @@ -171,15 +172,9 @@ def send_object(
logs=[],
)

def log(
self,
message: str,
level: LogLevel = LogLevel.TRACE,
source: LogSource = LogSource.BRIDGE,
) -> Iterator[definitions.PartialRunResult]:
log = to_grpc(Log(message, level=level, source=source))
log = cast(definitions.Log, log)
yield definitions.PartialRunResult(result=None, is_complete=False, logs=[log])
def log(self, message: str) -> None:
self._log.write(str(message))
self._log.flush()

def abort_with_msg(
self,
Expand Down Expand Up @@ -209,10 +204,10 @@ def create_server(address: str) -> grpc.Server:
return server


def run_agent(address: str) -> int:
def run_agent(address: str, log_fd: int | None = None) -> int:
"""Run the agent servicer on the given address."""
server = create_server(address)
servicer = AgentServicer()
servicer = AgentServicer(log_fd=log_fd)

# This function just calls some methods on the server
# and register a generic handler for the bridge. It does
Expand All @@ -227,9 +222,10 @@ def run_agent(address: str) -> int:
def main() -> int:
parser = ArgumentParser()
parser.add_argument("address", type=str)
parser.add_argument("--log-fd", type=int)

options = parser.parse_args()
return run_agent(options.address)
return run_agent(options.address, log_fd=options.log_fd)


if __name__ == "__main__":
Expand Down
19 changes: 4 additions & 15 deletions src/isolate/connections/ipc/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ def get_python_cmd(
self,
executable: Path,
connection: AgentListener,
log_fd: int,
) -> list[str | Path]:
assert isinstance(connection.address, tuple)
return [
Expand All @@ -214,21 +215,9 @@ def get_python_cmd(
# the connection with the bridge.
"--serialization-backend",
self.environment.settings.serialization_method,
"--log-fd",
str(log_fd),
]

def handle_agent_log(self, line: str, level: LogLevel) -> None:
# TODO: we probably should create a new fd and pass it as
# one of the the arguments to the child process. Then everything
# from that fd can be automatically logged as originating from the
# bridge.

# Agent can produce [trace] messages, so change the log
# level to it if this does not originate from the user.
if line.startswith("[trace]"):
line = line.replace("[trace]", "", 1)
level = LogLevel.TRACE
source = LogSource.BRIDGE
else:
source = LogSource.USER

def handle_agent_log(self, line: str, level: LogLevel, source: LogSource) -> None:
self.log(line, level=level, source=source)
Loading

0 comments on commit 4f7e914

Please sign in to comment.