Skip to content

Commit

Permalink
Problem: error Too many open files (#720)
Browse files Browse the repository at this point in the history
Jira ticket: ALEPH-298

some CRN failed on any action with error OSError: [Errno 24] Too many open files:

Solution:
Properly close stream to journald when the VM is stopped
  • Loading branch information
olethanh authored Nov 8, 2024
1 parent 662c0c0 commit 7461a49
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 18 deletions.
41 changes: 28 additions & 13 deletions src/aleph/vm/hypervisors/firecracker/microvm.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from pathlib import Path
from pwd import getpwnam
from tempfile import NamedTemporaryFile
from typing import Any
from typing import Any, BinaryIO

import msgpack
from aleph_message.models import ItemHash
Expand Down Expand Up @@ -93,6 +93,8 @@ class MicroVM:
mounted_rootfs: Path | None = None
_unix_socket: Server | None = None
enable_log: bool
journal_stdout: BinaryIO | int | None = None
journal_stderr: BinaryIO | int | None = None

def __repr__(self):
return f"<MicroVM {self.vm_id}>"
Expand Down Expand Up @@ -219,19 +221,19 @@ async def start_firecracker(self, config_path: Path) -> asyncio.subprocess.Proce
str(config_path),
)
if self.enable_log:
journal_stdout = journal.stream(self._journal_stdout_name)
journal_stderr = journal.stream(self._journal_stderr_name)
self.journal_stdout = journal.stream(self._journal_stdout_name)
self.journal_stderr = journal.stream(self._journal_stderr_name)
else:
journal_stdout = asyncio.subprocess.DEVNULL
journal_stderr = asyncio.subprocess.DEVNULL
self.journal_stdout = asyncio.subprocess.DEVNULL
self.journal_stderr = asyncio.subprocess.DEVNULL

logger.debug(" ".join(options))

self.proc = await asyncio.create_subprocess_exec(
*options,
stdin=asyncio.subprocess.PIPE,
stdout=journal_stdout,
stderr=journal_stderr,
stdout=self.journal_stdout,
stderr=self.journal_stderr,
)
return self.proc

Expand All @@ -252,11 +254,11 @@ async def start_jailed_firecracker(self, config_path: Path) -> asyncio.subproces

self.config_file_path = config_path
if self.enable_log:
journal_stdout = journal.stream(self._journal_stdout_name)
journal_stderr = journal.stream(self._journal_stderr_name)
self.journal_stdout = journal.stream(self._journal_stdout_name)
self.journal_stderr = journal.stream(self._journal_stderr_name)
else:
journal_stdout = asyncio.subprocess.DEVNULL
journal_stderr = asyncio.subprocess.DEVNULL
self.journal_stdout = asyncio.subprocess.DEVNULL
self.journal_stderr = asyncio.subprocess.DEVNULL

options = (
str(self.jailer_bin_path),
Expand All @@ -280,8 +282,8 @@ async def start_jailed_firecracker(self, config_path: Path) -> asyncio.subproces
self.proc = await asyncio.create_subprocess_exec(
*options,
stdin=asyncio.subprocess.PIPE,
stdout=journal_stdout,
stderr=journal_stderr,
stdout=self.journal_stdout,
stderr=self.journal_stderr,
)
return self.proc

Expand Down Expand Up @@ -480,6 +482,19 @@ async def teardown(self):
if self.stderr_task:
self.stderr_task.cancel()

if (
self.journal_stdout
and self.journal_stdout != asyncio.subprocess.DEVNULL
and hasattr(self.journal_stdout, "close")
):
self.journal_stdout.close()
if (
self.journal_stderr
and self.journal_stderr != asyncio.subprocess.DEVNULL
and hasattr(self.journal_stderr, "close")
):
self.journal_stderr.close()

# Clean mounted block devices
if self.mounted_rootfs:
logger.debug("Waiting for one second for the VM to shutdown")
Expand Down
17 changes: 12 additions & 5 deletions src/aleph/vm/hypervisors/qemu/qemuvm.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from asyncio.subprocess import Process
from dataclasses import dataclass
from pathlib import Path
from typing import TextIO
from typing import BinaryIO, TextIO

import qmp
from systemd import journal
Expand All @@ -28,6 +28,8 @@ class QemuVM:
interface_name: str
qemu_process: Process | None = None
host_volumes: list[HostVolume]
journal_stdout: TextIO | None
journal_stderr: TextIO | None

def __repr__(self) -> str:
if self.qemu_process:
Expand Down Expand Up @@ -72,8 +74,8 @@ async def start(
# qemu-system-x86_64 -enable-kvm -m 2048 -net nic,model=virtio
# -net tap,ifname=tap0,script=no,downscript=no -drive file=alpine.qcow2,media=disk,if=virtio -nographic

journal_stdout: TextIO = journal.stream(self._journal_stdout_name)
journal_stderr: TextIO = journal.stream(self._journal_stderr_name)
self.journal_stdout: BinaryIO = journal.stream(self._journal_stdout_name)
self.journal_stderr: BinaryIO = journal.stream(self._journal_stderr_name)
# hardware_resources.published ports -> not implemented at the moment
# hardware_resources.seconds -> only for microvm
args = [
Expand Down Expand Up @@ -120,8 +122,8 @@ async def start(
self.qemu_process = proc = await asyncio.create_subprocess_exec(
*args,
stdin=asyncio.subprocess.DEVNULL,
stdout=journal_stdout,
stderr=journal_stderr,
stdout=self.journal_stdout,
stderr=self.journal_stderr,
)

print(
Expand Down Expand Up @@ -149,3 +151,8 @@ def send_shutdown_message(self):
async def stop(self):
"""Stop the VM."""
self.send_shutdown_message()

if self.journal_stdout and self.journal_stdout != asyncio.subprocess.DEVNULL:
self.journal_stdout.close()
if self.journal_stderr and self.journal_stderr != asyncio.subprocess.DEVNULL:
self.journal_stderr.close()

0 comments on commit 7461a49

Please sign in to comment.