From 032ceaae0b08f438b5353d3713c0a5fd3526b058 Mon Sep 17 00:00:00 2001 From: Dawsh Date: Sun, 13 Oct 2024 18:31:26 +0330 Subject: [PATCH] feat(xray): automatically restart on crashes (#8) --- .env.example | 2 ++ marznode/backends/xray/_runner.py | 3 +++ marznode/backends/xray/xray_backend.py | 20 +++++++++++++++++++- marznode/config.py | 8 ++++++-- marznode/storage/base.py | 4 ++++ marznode/storage/memory.py | 9 +++++++++ 6 files changed, 43 insertions(+), 3 deletions(-) diff --git a/.env.example b/.env.example index f7d0f6a..7c15d83 100644 --- a/.env.example +++ b/.env.example @@ -7,6 +7,8 @@ #XRAY_ASSETS_PATH=/usr/share/xray #XRAY_CONFIG_PATH=/etc/xray/xray_config.json #XRAY_VLESS_REALITY_FLOW=xtls-rprx-vision +#XRAY_RESTART_ON_FAILURE=False +#XRAY_RESTART_ON_FAILURE_INTERVAL=0 #HYSTERIA_ENABLED=False diff --git a/marznode/backends/xray/_runner.py b/marznode/backends/xray/_runner.py index 5d62578..2abfc0f 100644 --- a/marznode/backends/xray/_runner.py +++ b/marznode/backends/xray/_runner.py @@ -29,6 +29,7 @@ def __init__(self, executable_path: str, assets_path: str): self._snd_streams = [] self._logs_buffer = deque(maxlen=100) self._env = {"XRAY_LOCATION_ASSET": assets_path} + self.stop_event = asyncio.Event() atexit.register(lambda: self.stop() if self.running else None) @@ -108,6 +109,8 @@ async def capture_stream(stream): capture_stream(self._process.stderr), capture_stream(self._process.stdout) ) logger.warning("Xray stopped/died") + self.stop_event.set() + self.stop_event.clear() def get_logs_stm(self) -> MemoryObjectReceiveStream: new_snd_stm, new_rcv_stm = create_memory_object_stream() diff --git a/marznode/backends/xray/xray_backend.py b/marznode/backends/xray/xray_backend.py index 760acb4..bcf9a5f 100644 --- a/marznode/backends/xray/xray_backend.py +++ b/marznode/backends/xray/xray_backend.py @@ -15,6 +15,7 @@ TagNotFoundError, ) from marznode.backends.xray.api.types.account import accounts_map +from marznode.config import XRAY_RESTART_ON_FAILURE, XRAY_RESTART_ON_FAILURE_INTERVAL from marznode.models import User, Inbound from marznode.storage import BaseStorage from marznode.utils.network import find_free_port @@ -41,6 +42,7 @@ def __init__( self._storage = storage self._config_path = config_path self._restart_lock = asyncio.Lock() + asyncio.create_task(self._restart_on_failure()) @property def running(self) -> bool: @@ -64,6 +66,23 @@ def save_config(self, config: str) -> None: with open(self._config_path, "w") as f: f.write(config) + async def add_storage_users(self): + for inbound in self._inbounds: + for user in await self._storage.list_inbound_users(inbound.tag): + await self.add_user(user, inbound) + + async def _restart_on_failure(self): + while True: + await self._runner.stop_event.wait() + if self._restart_lock.locked(): + logger.debug("Xray restarting as planned") + else: + logger.debug("Xray stopped unexpectedly") + if XRAY_RESTART_ON_FAILURE: + await asyncio.sleep(XRAY_RESTART_ON_FAILURE_INTERVAL) + await self.start() + await self.add_storage_users() + async def start(self, backend_config: str | None = None): if backend_config is None: with open(self._config_path) as f: @@ -77,7 +96,6 @@ async def start(self, backend_config: str | None = None): self._inbounds = list(self._config.list_inbounds()) self._api = XrayAPI("127.0.0.1", xray_api_port) await self._runner.start(self._config) - await asyncio.sleep(0.15) def stop(self): self._runner.stop() diff --git a/marznode/config.py b/marznode/config.py index e01f314..54ff3ff 100644 --- a/marznode/config.py +++ b/marznode/config.py @@ -1,8 +1,9 @@ """loads config files from environment and env file""" +from enum import Enum + from decouple import config from dotenv import load_dotenv -from enum import Enum load_dotenv() @@ -15,7 +16,10 @@ XRAY_ASSETS_PATH = config("XRAY_ASSETS_PATH", default="/usr/share/xray") XRAY_CONFIG_PATH = config("XRAY_CONFIG_PATH", default="/etc/xray/config.json") XRAY_VLESS_REALITY_FLOW = config("XRAY_VLESS_REALITY_FLOW", default="xtls-rprx-vision") - +XRAY_RESTART_ON_FAILURE = config("XRAY_RESTART_ON_FAILURE", cast=bool, default=False) +XRAY_RESTART_ON_FAILURE_INTERVAL = config( + "XRAY_RESTART_ON_FAILURE_INTERVAL", cast=int, default=0 +) HYSTERIA_ENABLED = config("HYSTERIA_ENABLED", cast=bool, default=False) HYSTERIA_EXECUTABLE_PATH = config( diff --git a/marznode/storage/base.py b/marznode/storage/base.py index d7f8319..b94c206 100644 --- a/marznode/storage/base.py +++ b/marznode/storage/base.py @@ -51,6 +51,10 @@ async def flush_users(self) -> None: :return: nothing """ + @abstractmethod + async def list_inbound_users(self, tag: str) -> list[User]: + """returns a list of users subscribed to an inbound""" + @abstractmethod def register_inbound(self, inbound: Inbound) -> None: """ diff --git a/marznode/storage/memory.py b/marznode/storage/memory.py index 792412f..7820220 100644 --- a/marznode/storage/memory.py +++ b/marznode/storage/memory.py @@ -32,6 +32,15 @@ async def list_inbounds( # return [i for i in self.storage["inbounds"].values() if i.tag in tag] return list(self.storage["inbounds"].values()) + async def list_inbound_users(self, tag: str) -> list[User]: + users = [] + for user in self.storage["users"].values(): + for inbound in user.inbounds: + if inbound.tag == tag: + users.append(user) + break + return users + async def remove_user(self, user: User) -> None: del self.storage["users"][user.id]