Skip to content

Commit

Permalink
WIP: Add spans to dark storage
Browse files Browse the repository at this point in the history
Add opentelemetry.instrumentation on server and client side
Fix azure log handler overriden by uvicorn.Config
  • Loading branch information
HakonSohoel committed Dec 13, 2024
1 parent 5ae6ece commit 9d7ee1b
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 12 deletions.
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ dependencies = [
"openpyxl", # extra dependency for pandas (excel)
"opentelemetry-api",
"opentelemetry-sdk",
"opentelemetry.instrumentation.fastapi",
"opentelemetry-instrumentation-httpx",
"opentelemetry-instrumentation-threading",
"orjson",
"packaging",
Expand Down
4 changes: 4 additions & 0 deletions src/ert/dark_storage/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from ert.dark_storage.endpoints import router as endpoints_router
from ert.dark_storage.exceptions import ErtStorageError

from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor


class JSONEncoder(json.JSONEncoder):
"""
Expand Down Expand Up @@ -107,3 +109,5 @@ async def healthcheck() -> str:


app.include_router(endpoints_router)

FastAPIInstrumentor.instrument_app(app)
53 changes: 41 additions & 12 deletions src/ert/services/_storage_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import signal
import socket
import string
import sys
import threading
import time
import warnings
Expand All @@ -21,6 +22,11 @@
from ert.shared import __file__ as ert_shared_path
from ert.shared import find_available_socket
from ert.shared.storage.command import add_parser_options
from ert.trace import get_trace_id, trace, tracer, tracer_provider

from opentelemetry.trace.span import Span

DARK_STORAGE_APP = "ert.dark_storage.app:app"


class Server(uvicorn.Server):
Expand Down Expand Up @@ -80,8 +86,8 @@ def _create_connection_info(sock: socket.socket, authtoken: str) -> dict[str, An

return connection_info


def run_server(args: argparse.Namespace | None = None, debug: bool = False) -> None:
def run_server(args: argparse.Namespace | None = None, debug: bool = False, uvicorn_config = None) -> None:
trace_id = get_trace_id()
if args is None:
args = parse_args()

Expand All @@ -102,7 +108,7 @@ def run_server(args: argparse.Namespace | None = None, debug: bool = False) -> N
# Appropriated from uvicorn.main:run
os.environ["ERT_STORAGE_NO_TOKEN"] = "1"
os.environ["ERT_STORAGE_ENS_PATH"] = os.path.abspath(args.project)
config = uvicorn.Config("ert.dark_storage.app:app", **config_args)
config = uvicorn.Config(DARK_STORAGE_APP, **config_args) if uvicorn_config is None else uvicorn_config #uvicorn.Config() resets the logging config (overriding additional handlers added to loggers like e.g. the ert_azurelogger handler added through the pluggin system
server = Server(config, json.dumps(connection_info))

logger = logging.getLogger("ert.shared.storage.info")
Expand Down Expand Up @@ -143,21 +149,44 @@ def check_parent_alive() -> bool:
os.kill(os.getpid(), signal.SIGTERM)


if __name__ == "__main__":
def main():
args = parse_args()
config_args: Dict[str, Any] = {}
with open(STORAGE_LOG_CONFIG, encoding="utf-8") as conf_file:
logging_conf = yaml.safe_load(conf_file)
logging.config.dictConfig(logging_conf)
config_args.update(log_config=logging_conf)
warnings.filterwarnings("ignore", category=DeprecationWarning)
uvicorn.config.LOGGING_CONFIG.clear()
uvicorn.config.LOGGING_CONFIG.update(logging_conf)

if args.debug:
config_args.update(reload=True, reload_dirs=[os.path.dirname(ert_shared_path)])
uvicorn_config = uvicorn.Config(DARK_STORAGE_APP, **config_args) # Need to run uvicorn.Config before entering the ErtPluginContext because uvicorn.Config overrides the configuration of existing loggers, thus removing log handlers added by ErtPluginContext

_stopped = threading.Event()
terminate_on_parent_death_thread = threading.Thread(
target=terminate_on_parent_death, args=[_stopped, 1.0]
)
with ErtPluginContext(logger=logging.getLogger()) as context:
with ErtPluginContext(logger=logging.getLogger(), trace_provider=tracer_provider) as context:
terminate_on_parent_death_thread.start()
try:
run_server(debug=False)
finally:
_stopped.set()
terminate_on_parent_death_thread.join()
with tracer.start_as_current_span(f"run_storage_server") as currentSpan:
try:
print(f"Opertation ID: {get_trace_id()}")
run_server(args, debug=False, uvicorn_config = uvicorn_config)
except BaseException as err:
print(f"Stopped with exception {err}")
finally:
_stopped.set()
terminate_on_parent_death_thread.join()
print("Closing2")



def sigterm_handler(_signo, _stack_frame):
print("handle sigterm")
sys.exit(0)

signal.signal(signal.SIGTERM, sigterm_handler)


if __name__ == "__main__":
main()
2 changes: 2 additions & 0 deletions src/ert/services/storage_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from ert.dark_storage.client import Client, ConnInfo
from ert.services._base_service import BaseService, _Context, local_exec_args

from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
HTTPXClientInstrumentor().instrument()

class StorageService(BaseService):
service_name = "storage"
Expand Down

0 comments on commit 9d7ee1b

Please sign in to comment.