Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve logging, Add logging per VM #724

Merged
merged 11 commits into from
Nov 20, 2024
7 changes: 4 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -191,15 +191,16 @@ lint.ignore = [
# Allow the use of assert statements
"S101",
]
# Tests can use magic values, assertions, and relative imports
lint.per-file-ignores."tests/**/*" = [ "PLR2004", "S101", "TID252" ]
#[tool.ruff.flake8-tidy-imports]
#ban-relative-imports = "all"
#unfixable = [
# # Don't touch unused imports
# "F401",
#]
lint.isort = [ "aleph.vm" ]
#lint.isort = [ "aleph.vm" ]

# Tests can use magic values, assertions, and relative imports
lint.per-file-ignores."tests/**/*" = [ "PLR2004", "S101", "TID252" ]

[tool.pytest.ini_options]
pythonpath = [
Expand Down
3 changes: 1 addition & 2 deletions src/aleph/vm/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ class Settings(BaseSettings):
# System logs make boot ~2x slower
PRINT_SYSTEM_LOGS = False
IGNORE_TRACEBACK_FROM_DIAGNOSTICS = True
LOG_LEVEL = "WARNING"
DEBUG_ASYNCIO = False

# Networking does not work inside Docker/Podman
Expand Down Expand Up @@ -396,8 +397,6 @@ def setup(self):
STREAM_CHAINS[Chain.AVAX].rpc = str(self.RPC_AVAX)
STREAM_CHAINS[Chain.BASE].rpc = str(self.RPC_BASE)

logger.info(STREAM_CHAINS)

os.makedirs(self.MESSAGE_CACHE, exist_ok=True)
os.makedirs(self.CODE_CACHE, exist_ok=True)
os.makedirs(self.RUNTIME_CACHE, exist_ok=True)
Expand Down
14 changes: 11 additions & 3 deletions src/aleph/vm/orchestrator/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from aleph.vm.version import __version__, get_version_from_apt, get_version_from_git

from . import metrics, supervisor
from .custom_logs import setup_handlers

Check warning on line 26 in src/aleph/vm/orchestrator/cli.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/cli.py#L26

Added line #L26 was not covered by tests
from .pubsub import PubSub
from .run import run_code_on_event, run_code_on_request, start_persistent_vm

Expand Down Expand Up @@ -65,7 +66,7 @@
help="set loglevel to INFO",
action="store_const",
const=logging.INFO,
default=logging.WARNING,
default=settings.LOG_LEVEL,
)
parser.add_argument(
"-vv",
Expand Down Expand Up @@ -282,7 +283,7 @@


async def run_async_db_migrations():
async_engine = create_async_engine(make_db_url(), echo=True)
async_engine = create_async_engine(make_db_url(), echo=False)

Check warning on line 286 in src/aleph/vm/orchestrator/cli.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/cli.py#L286

Added line #L286 was not covered by tests
async with async_engine.begin() as conn:
await conn.run_sync(run_db_migrations)

Expand All @@ -293,13 +294,20 @@
log_format = (
"%(relativeCreated)4f | %(levelname)s | %(message)s"
if args.profile
else "%(asctime)s | %(levelname)s | %(message)s"
else "%(asctime)s | %(levelname)s %(name)s:%(lineno)s | %(message)s"
)
# log_format = "[%(asctime)s] p%(process)s {%(pathname)s:%(lineno)d} %(levelname)s - %(message)s"

handlers = setup_handlers(args, log_format)

Check warning on line 301 in src/aleph/vm/orchestrator/cli.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/cli.py#L301

Added line #L301 was not covered by tests
logging.basicConfig(
level=args.loglevel,
format=log_format,
handlers=handlers,
)

logging.getLogger("aiosqlite").setLevel(settings.LOG_LEVEL)
logging.getLogger("sqlalchemy.engine").setLevel(settings.LOG_LEVEL)

Check warning on line 309 in src/aleph/vm/orchestrator/cli.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/cli.py#L308-L309

Added lines #L308 - L309 were not covered by tests

settings.update(
USE_JAILER=args.use_jailer,
PRINT_SYSTEM_LOGS=args.system_logs,
Expand Down
54 changes: 54 additions & 0 deletions src/aleph/vm/orchestrator/custom_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import contextlib
import logging
from contextvars import ContextVar

from aleph_message.models import ItemHash

from aleph.vm.models import VmExecution

ctx_current_execution: ContextVar[VmExecution | None] = ContextVar("current_execution")
ctx_current_execution_hash: ContextVar[ItemHash | None] = ContextVar("current_execution_hash")


@contextlib.contextmanager
def set_vm_for_logging(vm_hash):
token = ctx_current_execution_hash.set(vm_hash)
try:
yield
finally:
ctx_current_execution_hash.reset(token)


class InjectingFilter(logging.Filter):
"""
A filter which injects context-specific information into logs
"""

def filter(self, record):

vm_hash = ctx_current_execution_hash.get(None)

Check warning on line 29 in src/aleph/vm/orchestrator/custom_logs.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/custom_logs.py#L29

Added line #L29 was not covered by tests
if not vm_hash:
vm_execution: VmExecution | None = ctx_current_execution.get(None)

Check warning on line 31 in src/aleph/vm/orchestrator/custom_logs.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/custom_logs.py#L31

Added line #L31 was not covered by tests
if vm_execution:
vm_hash = vm_execution.vm_hash

Check warning on line 33 in src/aleph/vm/orchestrator/custom_logs.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/custom_logs.py#L33

Added line #L33 was not covered by tests

if not vm_hash:
return False

Check warning on line 36 in src/aleph/vm/orchestrator/custom_logs.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/custom_logs.py#L36

Added line #L36 was not covered by tests

record.vm_hash = vm_hash
return True

Check warning on line 39 in src/aleph/vm/orchestrator/custom_logs.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/custom_logs.py#L38-L39

Added lines #L38 - L39 were not covered by tests


def setup_handlers(args, log_format):
# Set up two custom handler, one that will add the VM information if present and the other print if not
execution_handler = logging.StreamHandler()
execution_handler.addFilter(InjectingFilter())
execution_handler.setFormatter(

Check warning on line 46 in src/aleph/vm/orchestrator/custom_logs.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/custom_logs.py#L44-L46

Added lines #L44 - L46 were not covered by tests
logging.Formatter("%(asctime)s | %(levelname)s %(name)s:%(lineno)s | {%(vm_hash)s} %(message)s ")
)
non_execution_handler = logging.StreamHandler()
non_execution_handler.addFilter(lambda x: ctx_current_execution_hash.get(None) is None)
non_execution_handler.setFormatter(

Check warning on line 51 in src/aleph/vm/orchestrator/custom_logs.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/custom_logs.py#L49-L51

Added lines #L49 - L51 were not covered by tests
logging.Formatter("%(asctime)s | %(levelname)s %(name)s:%(lineno)s | %(message)s ")
)
return [non_execution_handler, execution_handler]

Check warning on line 54 in src/aleph/vm/orchestrator/custom_logs.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/custom_logs.py#L54

Added line #L54 was not covered by tests
2 changes: 1 addition & 1 deletion src/aleph/vm/orchestrator/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

def setup_engine():
global AsyncSessionMaker
engine = create_async_engine(make_db_url(), echo=True)
engine = create_async_engine(make_db_url(), echo=False)
AsyncSessionMaker = async_sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)
return engine

Expand Down
8 changes: 6 additions & 2 deletions src/aleph/vm/orchestrator/views/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import binascii
import contextlib
import logging
from decimal import Decimal
from hashlib import sha256
Expand All @@ -25,6 +26,7 @@
from aleph.vm.hypervisors.firecracker.microvm import MicroVMFailedInitError
from aleph.vm.orchestrator import payment, status
from aleph.vm.orchestrator.chain import STREAM_CHAINS, ChainInfo
from aleph.vm.orchestrator.custom_logs import set_vm_for_logging
from aleph.vm.orchestrator.messages import try_get_message
from aleph.vm.orchestrator.metrics import get_execution_records
from aleph.vm.orchestrator.payment import (
Expand Down Expand Up @@ -75,7 +77,8 @@
) from e

pool: VmPool = request.app["vm_pool"]
return await run_code_on_request(message_ref, path, pool, request)
with set_vm_for_logging(vm_hash=message_ref):
return await run_code_on_request(message_ref, path, pool, request)

Check warning on line 81 in src/aleph/vm/orchestrator/views/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/views/__init__.py#L80-L81

Added lines #L80 - L81 were not covered by tests


async def run_code_from_hostname(request: web.Request) -> web.Response:
Expand Down Expand Up @@ -112,7 +115,8 @@
return HTTPNotFound(reason="Invalid message reference")

pool = request.app["vm_pool"]
return await run_code_on_request(message_ref, path, pool, request)
with set_vm_for_logging(vm_hash=message_ref):
return await run_code_on_request(message_ref, path, pool, request)

Check warning on line 119 in src/aleph/vm/orchestrator/views/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/views/__init__.py#L118-L119

Added lines #L118 - L119 were not covered by tests


def authenticate_request(request: web.Request) -> None:
Expand Down
Loading
Loading